前言

mllm在最近(2024-08-09, PR112)合并进去了最新的NPU版本,该版本对应的是mllm的NPU相关工作的论文:Empowering 1000 tokens/second on-device LLM prefilling with mllm-NPU。本文将对mllm的NPU工作做解析。本文将首先回顾论文中的主要算法,然后对mllm的实现做自顶向下的分析。NPU相关工作的思路非常的好,文中给出的效果也非常惊艳,但是mllm中的实现实在是有点粗糙了(比如在关键部位传值而非引用,头文件里面无inline的普通函数等),接下来需要进行修整一番。

算法回顾

Empowering 1000 tokens/second on-device LLM prefilling with mllm-NPU
Chunk Graph Algorithm

Empowering 1000 tokens/second on-device LLM prefilling with mllm-NPU

  1. Dynamic Ops是在CPU上运行的(为了精度问题),Static Ops是在NPU上运行的。
  2. QNN目前似乎支持了Dynamic Tensor,但是Chunk的方案对于异构设备应该会带来更多的Overlap上的空间。

$$\text{Attention} = \text{softmax}(\frac{QK^T}{\sqrt{d_k}})V$$

  1. Chunk Sharing Graph方案的总计算量是减少的。

我们不妨计算下CPU/NPU上的矩阵乘法的Flops。假设$Q,K,V$的形状都是$(B, H, S, D)$

W/O Chunk:纯CPU计算

$QK^T$的Flops是$2\times S^2DBH$,Softmax后的结果乘上$V$,Flops是一样的。总的Flops是$4\times S^2 DBH$

W Chunk:Linear分块的计算量是一样的,就是注意力Block不一样

假设Chunk Size是$C$。一个Sequence被分割成$\frac{S}{C}$个Chunk。

第一个Chunk的计算量是:$4\times C^2 DBH$Flops

第二个Chunk的计算量是:$4\times 2 \times C^2DBH$Flops

总的Chunk计算量是:$4 \times C^2DBH \times \sum_{i=1}^{\frac{S}{C}} i = 2\times CSDBH +2 \times S^2DBH$


再来分析下为什么计算量会变小:

实际上,将Attention的输入X分为几个Chunk类似于Masked MatMul的 Early Exit。另:在普通手机上运行,Chunk Graph流水线能不能真的并行起来还存疑(可能会有CPU瓶颈+Layout转换瓶颈),笔者没有运行过。

这么说的原因是在$Q\times K^T$这一步,因为我们需要给上Mask,那么就有很多的元素是被重复计算的。所以在Flash Attention中有根据分块对应的Mask 来做 Early Exit 的操作。

在使用了Chunk以后,$Q$的$\text{Seq}$维度大小变小了,那么每次计算浪费的元素就少了。

自顶向下分析,以Qwen为例

对于各个环节的数据类型和量化方法,本文不做过多分析,可以看文章中的描述。

模型实现

Qwen的模型实现分为两个内容,CPU、NPU。我们先来看最主要的CPUNPU Attention部分的代码:

std::vector<NetTensor *> Qwen_CPUNPUAttention(Context *c, NetTensor *x, NetTensor *res, int embedding_size, int hidden_size, int head_size, int cache_max, string name, int seq, int chunk) {
    x = x->view(1, static_cast<int>(seq / chunk / 32), static_cast<int>(32), hidden_size * head_size);
    auto *q = _LinearINT8({x}, embedding_size, hidden_size * head_size, true, name + ".q_proj");
    auto *k = _LinearINT8({x}, embedding_size, hidden_size * head_size, true, name + ".k_proj");
    auto *v = _LinearINT8({x}, embedding_size, hidden_size * head_size, true, name + ".v_proj");
    q = q->view(1, head_size, seq / chunk, hidden_size);
    k = k->view(1, head_size, seq / chunk, hidden_size);
    v = v->view(1, head_size, seq / chunk, hidden_size);

    q = _Dequantize({q}, true, (string)name + ".q_proj.dequantize");
    k = _Dequantize({k}, true, (string)name + ".k_proj.dequantize");
    v = _Dequantize({v}, true, (string)name + ".v_proj.dequantize");

    v = _Transpose({v}, {0, 2, 3, 1}, (string)name + ".v_proj.transpose");

    auto *m = _MergeOutput({q, k, v, res}, name + ".qkv_merge");

    // --------------------
    _SubgraphBegin(c, MLLM_CPU);
    // --------------------

    auto s = _SplitInput({m}, true, 4, name + ".qkv_split");

    q = s[0];
    k = s[1];
    v = s[2];
    res = s[3];

    q = _RoPE({q}, HFHUBROPE, name + ".q_rope", 1000000, 32768);
    k = _RoPE({k}, HFHUBROPE, name + ".k_rope", 1000000, 32768);

    k = _KVCacheNPU({k}, cache_max, name + ".k_cache");
    v = _KVCacheNPU({v}, cache_max, name + ".v_cache");

    auto *qk = _Matmul({q, k}, false, true, name + ".qk");
    // qk = *qk / std::sqrt(hidden_size);
    // qk = _Causalmask({qk}, name + ".mask");
    qk = _Softmax({qk}, DIMENSION, true, name + ".softmax");

    auto *o = _Matmul({qk, v}, false, false, name + ".qkv");
    o = _Quantize({o}, true, (string)name + ".o_proj.quantize");
    m = _MergeOutput({o, res}, name + ".or_merge");

    // --------------------
    _SubgraphBegin(c);
    // --------------------
    s = _SplitInput({m}, true, 2, name + ".or_split");

    o = s[0];
    res = s[1];

    o = o->view(1, static_cast<int>(seq / chunk / 32), static_cast<int>(32), hidden_size * head_size);
    res = res->view(-1, 1, -1, hidden_size * head_size);
    o = _LinearINT8({o}, hidden_size * head_size, embedding_size, false, name + ".o_proj");
    o = _Dequantize({o}, true, (string)name + ".o_proj.dequantize");

    return {o, res};
}

在代码中,以_SubgraphBegin(c, ...);为分界线完成了论文中算法的主要思路。

_SubgraphBegin(c, MLLM_CPU);以上的部分是NPU上执行的Shared QKV Linear Layer。SubgraphBegin(c, MLLM_CPU);以下的部分是在CPU上执行的$O = \text{softmax}(\text{mask}(\frac{QK^T}{\sqrt{d_k}}))V$的部分。之后的_SubgraphBegin(c);后实现了Shared O Linear Layer。

mllm中的子图的实现方式对于我来说有点奇怪,应该是不能实现sub Graph in sub Graph。是在每次_SubgraphBegin(c);之后,所有新创建的算子都在这张top的子图上。

其他的图构建方式和上述代码类似,这里就不赘述了。

Context,Net.Convert,Executor

我们可以在main_qwen_npu.cpp中看到具体的网络执行流程:

  1. 创建Context
  2. 创建计算图
  3. Convert计算图到Net
  4. 创建Param Loader
  5. 创建Executor

Context

struct Context {
    vector<NetParameter> sub_param_;
    vector<BackendType> sub_backend_;
    BackendType next_backend = MLLM_DEFAULT;
    vector<NetOp *> net_ops;
    std::set<NetTensor *> net_tensors;
    int idx = 0;
    int active_sub = 0;
};

正如其名,用来记录网络参数、backend类型、算子、tensor等全局的参数。

Net.Convert

默认的Net::Convert如下代码所示。它的作用是将网络参数转换为特定后端类型(将NetTensor这个Node抽象转换成实际执行的Tensor抽象),并进行拓扑排序,以构建计算图。

void Net::convert(vector<NetParameter> &param, BackendType backend_type, int threadCount) {
    for (int ii = 0; ii < (int)param.size(); ++ii) {
        auto &sub_param = param[ii];
        vector<string> names = {};
        auto net_in_tensor = sub_param.net_inputs;
        for (const auto &out_t : net_in_tensor) {
            tensors_[out_t->name] = std::make_shared<Tensor>(backends_[backend_type].get());
            tensors_[out_t->name]->setName(out_t->name);
            for (auto &tensor_name : tensor_names_) {
                tensor_name.erase(std::remove(tensor_name.begin(), tensor_name.end(), out_t->name), tensor_name.end());
            }
            names.push_back(out_t->name);
        }

        for (auto *t:sub_param.net_tensors) {
            if(t->in == NULL){
                auto *in_tensor = t;
                tensors_[in_tensor->name] = std::make_shared<Tensor>(backends_[backend_type].get());
                tensors_[in_tensor->name]->setName(in_tensor->name);
                input_names_.push_back(in_tensor->name);
                inputname_graphidx_[in_tensor->name] = ii;
                names.push_back(in_tensor->name);
            }
        }
        tensor_names_.push_back(names);
    }

    for (int i = 0; i < (int)param.size(); ++i) {
        param[i].topologySort();
        shared_ptr<Graph> subg_1;
        subg_1.reset(new Graph( param[i], backends_[backend_type].get(), tensors_, threadCount));
        subGraphs_["G" + std::to_string(i)] = subg_1;
    }
}

Executor

Executor是一个对图执行逻辑的封装,见3.3章节的run

QNNPipelineExecutor::run

首先明确,只有在Prefill阶段才会用到QNN。先来看下Chunk Graph是怎么运行起来的,第一个chunk运行完NPU的部分(也就是图中的绿色部分)后进入CPU部分,然后下一个chunk进入NPU部分,以此反复。目前我猜测,主要瓶颈可能还是在CPU上,导致NPU等待,并行度上不去。

QNN Pipeline run 的代码在QNNExecutor.hpp/.cpp中,函数定义是

void QNNPipelineExecutor::run(Context *ctx, Net *net, vector<shared_ptr<Tensor>> input_tensors) {
    ...
}

我们先看拆分input的代码:

// input will be split into chunks and execute in pipeline
const int chunk_size = 32;
int chunk_num = (input_tensors[0]->sequence() + chunk_size - 1) / chunk_size;
// create a new tensor for each chunk
vector<vector<shared_ptr<Tensor>>> chunked_tensors_list(chunk_num, vector<shared_ptr<Tensor>>(input_tensors.size()));

// split the tensor in chunks
for (int i = 0; i < chunk_num; ++i) {
    // for all inputs in input_tensors
    auto &chunked_tensors = chunked_tensors_list[i];
    for (int j = 0; j < input_tensors.size(); ++j) {
        chunked_tensors[j] = std::make_shared<Tensor>();
        chunked_tensors[j]->setBackend(net->backends()[BackendType::MLLM_CPU].get());
        chunked_tensors[j]->reshape(1, 1, chunk_size, 1);
        chunked_tensors[j]->setName(net->inputNames()[j]);
        // use deepCopyFrom for each chunk to avoid memcpy
        chunked_tensors[j]->deepCopyFrom(input_tensors[j].get(), false, {0, 0, i * chunk_size, 0});
    }
}

作者假设了这里的input_tensors是Seqence长度一致或者是只有一个元素的。代码逻辑简单,就是对每个input_tensors中的tensor拆分成为chunks。

再来看计算图修改inputs的代码:

vector<int> flashGid = {};
for (int tid = 0; tid < net->inputNames().size(); ++tid) {
    auto input_name = net->inputNames()[tid];
    auto input_tensor = chunked_tensors_list[0][tid];
    input_tensor->setName(input_name);
    net->tensors()[input_name] = input_tensor;
    if (std::find(flashGid.begin(), flashGid.end(), net->inGmap()[input_name]) == flashGid.end()) {
        flashGid.push_back(net->inGmap()[input_name]);
    }
}

for (auto Gid : flashGid) {
    net->subGraph()[graphNamingRule(Gid)]->reflashInput(net->tensors());
}

因为切分了Seq为32一个chunk的tensor,这里把每个子图的inputs重新设置为seq=32的tensor。这里的inGmap函数就是直接传值的,可以改为引用。我们可以看到这里使用的是第一个chunk:chunked_tensors_list[0][tid];,这是因为在下文中,会有在子线程内根据线程id对自己的input进行更新的操作,即下文中(i == 0)的scope内部的操作。
图改写完成后,就是offload计算图到对应的Backend:

for (int i = 0; i < (int)net->subGraph().size(); ++i) {
    string name = graphNamingRule(i);
    auto &g = net->subGraph()[name];

    // cast graph to QNNGraph
    // the qnn_graph below is where we cast the Graph to QNNGraph
    auto expectedBackend = ctx->sub_backend_[i];

    if (graphOffloadRule(expectedBackend, i) == MLLM_CPU) {
        g->reshape();
        g->setUpTensors();
    } else if (graphOffloadRule(expectedBackend, i) == MLLM_QNN) {
        auto *qnn_graph = dynamic_cast<QNNGraph *>(g.get());
        g->reshape();
        qnn_graph->setUpTensors(name);
    } else {
        std::cerr << "Backend Not Support" << std::endl;
        exit(1);
    }
}

接下来就是具体执行的代码了,作者在这里写了个chunkExecutionFunction:

std::function<void(int chunk_id)> chunkExecutionFunction = [&](int chunk_id) {
    for (int i = 0; i < (int)net->subGraph().size(); ++i) {
        // make sure chunks execute by order
        while (true) {
            chunk_mutex.lock();
            if (graph_chunk_index[i] == chunk_id) {
                graph_chunk_index[i]++;
                chunk_mutex.unlock();
                break;
            } else {
                chunk_mutex.unlock();
                std::this_thread::yield();
            }
        }

        // make sure current graph is ready for this chunk
        // lock the mutex of mutexes at i
        mutexes[i].lock();

        if (i == 0) {
            // update the input tensor for each chunk
            for (int tid = 0; tid < net->inputNames().size(); ++tid) {
                auto input_name = net->inputNames()[tid];
                auto input_tensor = chunked_tensors_list[chunk_id][tid];
                unordered_map<string, shared_ptr<Tensor>> map;
                map[input_name] = input_tensor;
                string graphName = graphNamingRule(i);
                net->subGraph()[graphName]->reflashInput(map);
            }
        }

        auto expectedBackend = ctx->sub_backend_[i];
        string name = graphNamingRule(i);
        if (graphOffloadRule(expectedBackend, i) == MLLM_CPU) {
            // execute only one cpu graph at a time
            cpu_mutex.lock();
#ifdef DEBUGPRINT
            std::cout << "chunk:" << chunk_id << " execute cpu graph " << i << std::endl;
#endif

            auto &g = net->subGraph()[name];
            if (chunk_id != 0) {
                // cpu graph should reshape and setup for every chunk forward for KVCache op
                g->reshape();
                g->setUpTensors();
            }

            // only get the result at the last graph
            if (i == net->subGraph().size() - 1) {
                chunked_result_list = g->forward();
            } else {
                g->forward();
            }

            // execute only one cpu graph at a time
            cpu_mutex.unlock();
        } else if (graphOffloadRule(expectedBackend, i) == MLLM_QNN) {
#ifdef DEBUGPRINT
            std::cout << "chunk:" << chunk_id << " execute qnn graph " << i << std::endl;
#endif
            auto &g = net->subGraph()[name];
            auto *qnn_graph = dynamic_cast<QNNGraph *>(g.get());
            qnn_graph->forward(name);

            // only get the result at the last graph
            if (i == net->subGraph().size() - 1) {
                chunked_result_list = qnn_graph->forward(name);
            } else {
                qnn_graph->forward(name);
            }
        } else {
            std::cerr << "Backend Not Support" << std::endl;
            exit(1);
        }

        PRINT_MEMORY_USAGE((string("execute graph: ") + std::to_string(i)).c_str());

        // if it is the last graph, move the result to the final result
        if (i == (int)net->subGraph().size() - 1) {
            result_.resize(chunked_result_list.size());
            if (chunk_id == 0) { // reshape the result tensor when first chunk is executed
                for (int tid = 0; tid < chunked_result_list.size(); ++tid) {
                    result_[tid] = std::make_shared<Tensor>();
                    result_[tid]->setBackend(net->backends()[BackendType::MLLM_CPU].get());
                    result_[tid]->reshape(chunked_result_list[tid]->batch(),
                                          chunked_result_list[tid]->head(),
                                          chunk_size * chunk_num,
                                          chunked_result_list[tid]->dimension());
                    result_[tid]->alloc();
                }
            }

            // move the result to the final result
            for (int tid = 0; tid < chunked_result_list.size(); ++tid) {
                auto &result_tensor = chunked_result_list[tid];

                memcpy(result_[tid]->ptrAt<float>(0, 0, chunk_size * chunk_id, 0), result_tensor->hostPtr<float>(), result_tensor->count() * sizeof(float));
            }
        }

        // unlock the mutex of mutexes at i
        mutexes[i].unlock();
    }
};

代码中,开头的while循环是检查现在的chunk是否可以被执行,如果不可以被执行则yield这个线程。接下来如果是第一个子图的话,就是首先更新当前子图需要的chunk inputs。接下来就分别按照需求在CPU/NPU上进行运算。如果是最后的一个子图,就将运算的结果写在一个最终的result中,这个result是没有chunk的大小,写入的时候使用offset来指定写入的位置。

然后会有一个ThreadPool来执行这几个chunck子图:

// wrap the thread pool execution in a function and await the thread pool to finish
std::function executeFunction = [&]() {
    // use thread pool to manage the threads
    ThreadPool thread_pool(4);
    for (int i = 0; i < chunk_num; ++i) {
        thread_pool.enqueue(std::bind(chunkExecutionFunction, i));
    }
};
executeFunction();

QNN Backend

QNN的使用还不熟悉,先学习先。

改进

易用性

  1. 可以改进前端实现和模型执行的流程,对于子图,更希望是torch中的形式,一个Module可以视为一个子图,使用to_device()等函数来标明运行设备。笔者最近在实现一个Eager+Lazy+Static合一的计算图构建模式,可能可以借鉴进去。

  2. 子图Dispatch是手动的,有完整的计算图,还是用自动的Dispatch算法吧,通用性会更好点。

  3. 代码中有多处shared_ptr和裸指针混用问题,继承问题可以用enable_shared_from_this<T>static_ptr_cast来解决的。

性能问题

  1. 我严重怀疑存在CPU瓶颈+Layout/精度转换瓶颈。导致Chunk之间是没法完全并行起来的。这是最主要的问题。

  2. 最好使用细粒度的Thread Dispatch方法,OpenMP的线程可能与当前Chunk Thread Pool中的线程干扰。

  3. NPU构建图的时候需要不少的Memory,导致峰值内存需求大,可能会使用Swapping memory。