前言
mllm在最近(2024-08-09, PR112)合并进去了最新的NPU版本,该版本对应的是mllm的NPU相关工作的论文:Empowering 1000 tokens/second on-device LLM prefilling with mllm-NPU。本文将对mllm的NPU工作做解析。本文将首先回顾论文中的主要算法,然后对mllm的实现做自顶向下的分析。NPU相关工作的思路非常的好,文中给出的效果也非常惊艳,但是mllm中的实现实在是有点粗糙了(比如在关键部位传值而非引用,头文件里面无inline的普通函数等),接下来需要进行修整一番。
算法回顾
- Dynamic Ops是在CPU上运行的(为了精度问题),Static Ops是在NPU上运行的。
- QNN目前似乎支持了Dynamic Tensor,但是Chunk的方案对于异构设备应该会带来更多的Overlap上的空间。
$$\text{Attention} = \text{softmax}(\frac{QK^T}{\sqrt{d_k}})V$$
- Chunk Sharing Graph方案的总计算量是减少的。
我们不妨计算下CPU/NPU上的矩阵乘法的Flops。假设$Q,K,V$的形状都是$(B, H, S, D)$
W/O Chunk:纯CPU计算
$QK^T$的Flops是$2\times S^2DBH$,Softmax后的结果乘上$V$,Flops是一样的。总的Flops是$4\times S^2 DBH$
W Chunk:Linear分块的计算量是一样的,就是注意力Block不一样
假设Chunk Size是$C$。一个Sequence被分割成$\frac{S}{C}$个Chunk。
第一个Chunk的计算量是:$4\times C^2 DBH$Flops
第二个Chunk的计算量是:$4\times 2 \times C^2DBH$Flops
总的Chunk计算量是:$4 \times C^2DBH \times \sum_{i=1}^{\frac{S}{C}} i = 2\times CSDBH +2 \times S^2DBH$
再来分析下为什么计算量会变小:
实际上,将Attention的输入X分为几个Chunk类似于Masked MatMul的 Early Exit。另:在普通手机上运行,Chunk Graph流水线能不能真的并行起来还存疑(可能会有CPU瓶颈+Layout转换瓶颈),笔者没有运行过。
这么说的原因是在$Q\times K^T$这一步,因为我们需要给上Mask,那么就有很多的元素是被重复计算的。所以在Flash Attention中有根据分块对应的Mask 来做 Early Exit 的操作。
在使用了Chunk以后,$Q$的$\text{Seq}$维度大小变小了,那么每次计算浪费的元素就少了。
自顶向下分析,以Qwen为例
对于各个环节的数据类型和量化方法,本文不做过多分析,可以看文章中的描述。
模型实现
Qwen的模型实现分为两个内容,CPU、NPU。我们先来看最主要的CPUNPU Attention部分的代码:
std::vector<NetTensor *> Qwen_CPUNPUAttention(Context *c, NetTensor *x, NetTensor *res, int embedding_size, int hidden_size, int head_size, int cache_max, string name, int seq, int chunk) {
x = x->view(1, static_cast<int>(seq / chunk / 32), static_cast<int>(32), hidden_size * head_size);
auto *q = _LinearINT8({x}, embedding_size, hidden_size * head_size, true, name + ".q_proj");
auto *k = _LinearINT8({x}, embedding_size, hidden_size * head_size, true, name + ".k_proj");
auto *v = _LinearINT8({x}, embedding_size, hidden_size * head_size, true, name + ".v_proj");
q = q->view(1, head_size, seq / chunk, hidden_size);
k = k->view(1, head_size, seq / chunk, hidden_size);
v = v->view(1, head_size, seq / chunk, hidden_size);
q = _Dequantize({q}, true, (string)name + ".q_proj.dequantize");
k = _Dequantize({k}, true, (string)name + ".k_proj.dequantize");
v = _Dequantize({v}, true, (string)name + ".v_proj.dequantize");
v = _Transpose({v}, {0, 2, 3, 1}, (string)name + ".v_proj.transpose");
auto *m = _MergeOutput({q, k, v, res}, name + ".qkv_merge");
// --------------------
_SubgraphBegin(c, MLLM_CPU);
// --------------------
auto s = _SplitInput({m}, true, 4, name + ".qkv_split");
q = s[0];
k = s[1];
v = s[2];
res = s[3];
q = _RoPE({q}, HFHUBROPE, name + ".q_rope", 1000000, 32768);
k = _RoPE({k}, HFHUBROPE, name + ".k_rope", 1000000, 32768);
k = _KVCacheNPU({k}, cache_max, name + ".k_cache");
v = _KVCacheNPU({v}, cache_max, name + ".v_cache");
auto *qk = _Matmul({q, k}, false, true, name + ".qk");
// qk = *qk / std::sqrt(hidden_size);
// qk = _Causalmask({qk}, name + ".mask");
qk = _Softmax({qk}, DIMENSION, true, name + ".softmax");
auto *o = _Matmul({qk, v}, false, false, name + ".qkv");
o = _Quantize({o}, true, (string)name + ".o_proj.quantize");
m = _MergeOutput({o, res}, name + ".or_merge");
// --------------------
_SubgraphBegin(c);
// --------------------
s = _SplitInput({m}, true, 2, name + ".or_split");
o = s[0];
res = s[1];
o = o->view(1, static_cast<int>(seq / chunk / 32), static_cast<int>(32), hidden_size * head_size);
res = res->view(-1, 1, -1, hidden_size * head_size);
o = _LinearINT8({o}, hidden_size * head_size, embedding_size, false, name + ".o_proj");
o = _Dequantize({o}, true, (string)name + ".o_proj.dequantize");
return {o, res};
}
在代码中,以_SubgraphBegin(c, ...);
为分界线完成了论文中算法的主要思路。
_SubgraphBegin(c, MLLM_CPU);
以上的部分是NPU上执行的Shared QKV Linear Layer。SubgraphBegin(c, MLLM_CPU);
以下的部分是在CPU上执行的$O = \text{softmax}(\text{mask}(\frac{QK^T}{\sqrt{d_k}}))V$的部分。之后的_SubgraphBegin(c);
后实现了Shared O Linear Layer。
mllm中的子图的实现方式对于我来说有点奇怪,应该是不能实现sub Graph in sub Graph。是在每次_SubgraphBegin(c);
之后,所有新创建的算子都在这张top的子图上。
其他的图构建方式和上述代码类似,这里就不赘述了。
Context,Net.Convert,Executor
我们可以在main_qwen_npu.cpp
中看到具体的网络执行流程:
- 创建Context
- 创建计算图
- Convert计算图到Net
- 创建Param Loader
- 创建Executor
Context
struct Context {
vector<NetParameter> sub_param_;
vector<BackendType> sub_backend_;
BackendType next_backend = MLLM_DEFAULT;
vector<NetOp *> net_ops;
std::set<NetTensor *> net_tensors;
int idx = 0;
int active_sub = 0;
};
正如其名,用来记录网络参数、backend类型、算子、tensor等全局的参数。
Net.Convert
默认的Net::Convert如下代码所示。它的作用是将网络参数转换为特定后端类型(将NetTensor这个Node抽象转换成实际执行的Tensor抽象),并进行拓扑排序,以构建计算图。
void Net::convert(vector<NetParameter> ¶m, BackendType backend_type, int threadCount) {
for (int ii = 0; ii < (int)param.size(); ++ii) {
auto &sub_param = param[ii];
vector<string> names = {};
auto net_in_tensor = sub_param.net_inputs;
for (const auto &out_t : net_in_tensor) {
tensors_[out_t->name] = std::make_shared<Tensor>(backends_[backend_type].get());
tensors_[out_t->name]->setName(out_t->name);
for (auto &tensor_name : tensor_names_) {
tensor_name.erase(std::remove(tensor_name.begin(), tensor_name.end(), out_t->name), tensor_name.end());
}
names.push_back(out_t->name);
}
for (auto *t:sub_param.net_tensors) {
if(t->in == NULL){
auto *in_tensor = t;
tensors_[in_tensor->name] = std::make_shared<Tensor>(backends_[backend_type].get());
tensors_[in_tensor->name]->setName(in_tensor->name);
input_names_.push_back(in_tensor->name);
inputname_graphidx_[in_tensor->name] = ii;
names.push_back(in_tensor->name);
}
}
tensor_names_.push_back(names);
}
for (int i = 0; i < (int)param.size(); ++i) {
param[i].topologySort();
shared_ptr<Graph> subg_1;
subg_1.reset(new Graph( param[i], backends_[backend_type].get(), tensors_, threadCount));
subGraphs_["G" + std::to_string(i)] = subg_1;
}
}
Executor
Executor是一个对图执行逻辑的封装,见3.3章节的run
QNNPipelineExecutor::run
首先明确,只有在Prefill阶段才会用到QNN。先来看下Chunk Graph是怎么运行起来的,第一个chunk运行完NPU的部分(也就是图中的绿色部分)后进入CPU部分,然后下一个chunk进入NPU部分,以此反复。目前我猜测,主要瓶颈可能还是在CPU上,导致NPU等待,并行度上不去。
QNN Pipeline run 的代码在QNNExecutor.hpp/.cpp
中,函数定义是
void QNNPipelineExecutor::run(Context *ctx, Net *net, vector<shared_ptr<Tensor>> input_tensors) {
...
}
我们先看拆分input的代码:
// input will be split into chunks and execute in pipeline
const int chunk_size = 32;
int chunk_num = (input_tensors[0]->sequence() + chunk_size - 1) / chunk_size;
// create a new tensor for each chunk
vector<vector<shared_ptr<Tensor>>> chunked_tensors_list(chunk_num, vector<shared_ptr<Tensor>>(input_tensors.size()));
// split the tensor in chunks
for (int i = 0; i < chunk_num; ++i) {
// for all inputs in input_tensors
auto &chunked_tensors = chunked_tensors_list[i];
for (int j = 0; j < input_tensors.size(); ++j) {
chunked_tensors[j] = std::make_shared<Tensor>();
chunked_tensors[j]->setBackend(net->backends()[BackendType::MLLM_CPU].get());
chunked_tensors[j]->reshape(1, 1, chunk_size, 1);
chunked_tensors[j]->setName(net->inputNames()[j]);
// use deepCopyFrom for each chunk to avoid memcpy
chunked_tensors[j]->deepCopyFrom(input_tensors[j].get(), false, {0, 0, i * chunk_size, 0});
}
}
作者假设了这里的input_tensors
是Seqence长度一致或者是只有一个元素的。代码逻辑简单,就是对每个input_tensors
中的tensor拆分成为chunks。
再来看计算图修改inputs的代码:
vector<int> flashGid = {};
for (int tid = 0; tid < net->inputNames().size(); ++tid) {
auto input_name = net->inputNames()[tid];
auto input_tensor = chunked_tensors_list[0][tid];
input_tensor->setName(input_name);
net->tensors()[input_name] = input_tensor;
if (std::find(flashGid.begin(), flashGid.end(), net->inGmap()[input_name]) == flashGid.end()) {
flashGid.push_back(net->inGmap()[input_name]);
}
}
for (auto Gid : flashGid) {
net->subGraph()[graphNamingRule(Gid)]->reflashInput(net->tensors());
}
因为切分了Seq为32一个chunk的tensor,这里把每个子图的inputs重新设置为seq=32的tensor。这里的inGmap
函数就是直接传值的,可以改为引用。我们可以看到这里使用的是第一个chunk:chunked_tensors_list[0][tid];
,这是因为在下文中,会有在子线程内根据线程id对自己的input进行更新的操作,即下文中(i == 0)
的scope内部的操作。
图改写完成后,就是offload计算图到对应的Backend:
for (int i = 0; i < (int)net->subGraph().size(); ++i) {
string name = graphNamingRule(i);
auto &g = net->subGraph()[name];
// cast graph to QNNGraph
// the qnn_graph below is where we cast the Graph to QNNGraph
auto expectedBackend = ctx->sub_backend_[i];
if (graphOffloadRule(expectedBackend, i) == MLLM_CPU) {
g->reshape();
g->setUpTensors();
} else if (graphOffloadRule(expectedBackend, i) == MLLM_QNN) {
auto *qnn_graph = dynamic_cast<QNNGraph *>(g.get());
g->reshape();
qnn_graph->setUpTensors(name);
} else {
std::cerr << "Backend Not Support" << std::endl;
exit(1);
}
}
接下来就是具体执行的代码了,作者在这里写了个chunkExecutionFunction:
std::function<void(int chunk_id)> chunkExecutionFunction = [&](int chunk_id) {
for (int i = 0; i < (int)net->subGraph().size(); ++i) {
// make sure chunks execute by order
while (true) {
chunk_mutex.lock();
if (graph_chunk_index[i] == chunk_id) {
graph_chunk_index[i]++;
chunk_mutex.unlock();
break;
} else {
chunk_mutex.unlock();
std::this_thread::yield();
}
}
// make sure current graph is ready for this chunk
// lock the mutex of mutexes at i
mutexes[i].lock();
if (i == 0) {
// update the input tensor for each chunk
for (int tid = 0; tid < net->inputNames().size(); ++tid) {
auto input_name = net->inputNames()[tid];
auto input_tensor = chunked_tensors_list[chunk_id][tid];
unordered_map<string, shared_ptr<Tensor>> map;
map[input_name] = input_tensor;
string graphName = graphNamingRule(i);
net->subGraph()[graphName]->reflashInput(map);
}
}
auto expectedBackend = ctx->sub_backend_[i];
string name = graphNamingRule(i);
if (graphOffloadRule(expectedBackend, i) == MLLM_CPU) {
// execute only one cpu graph at a time
cpu_mutex.lock();
#ifdef DEBUGPRINT
std::cout << "chunk:" << chunk_id << " execute cpu graph " << i << std::endl;
#endif
auto &g = net->subGraph()[name];
if (chunk_id != 0) {
// cpu graph should reshape and setup for every chunk forward for KVCache op
g->reshape();
g->setUpTensors();
}
// only get the result at the last graph
if (i == net->subGraph().size() - 1) {
chunked_result_list = g->forward();
} else {
g->forward();
}
// execute only one cpu graph at a time
cpu_mutex.unlock();
} else if (graphOffloadRule(expectedBackend, i) == MLLM_QNN) {
#ifdef DEBUGPRINT
std::cout << "chunk:" << chunk_id << " execute qnn graph " << i << std::endl;
#endif
auto &g = net->subGraph()[name];
auto *qnn_graph = dynamic_cast<QNNGraph *>(g.get());
qnn_graph->forward(name);
// only get the result at the last graph
if (i == net->subGraph().size() - 1) {
chunked_result_list = qnn_graph->forward(name);
} else {
qnn_graph->forward(name);
}
} else {
std::cerr << "Backend Not Support" << std::endl;
exit(1);
}
PRINT_MEMORY_USAGE((string("execute graph: ") + std::to_string(i)).c_str());
// if it is the last graph, move the result to the final result
if (i == (int)net->subGraph().size() - 1) {
result_.resize(chunked_result_list.size());
if (chunk_id == 0) { // reshape the result tensor when first chunk is executed
for (int tid = 0; tid < chunked_result_list.size(); ++tid) {
result_[tid] = std::make_shared<Tensor>();
result_[tid]->setBackend(net->backends()[BackendType::MLLM_CPU].get());
result_[tid]->reshape(chunked_result_list[tid]->batch(),
chunked_result_list[tid]->head(),
chunk_size * chunk_num,
chunked_result_list[tid]->dimension());
result_[tid]->alloc();
}
}
// move the result to the final result
for (int tid = 0; tid < chunked_result_list.size(); ++tid) {
auto &result_tensor = chunked_result_list[tid];
memcpy(result_[tid]->ptrAt<float>(0, 0, chunk_size * chunk_id, 0), result_tensor->hostPtr<float>(), result_tensor->count() * sizeof(float));
}
}
// unlock the mutex of mutexes at i
mutexes[i].unlock();
}
};
代码中,开头的while循环是检查现在的chunk是否可以被执行,如果不可以被执行则yield这个线程。接下来如果是第一个子图的话,就是首先更新当前子图需要的chunk inputs。接下来就分别按照需求在CPU/NPU上进行运算。如果是最后的一个子图,就将运算的结果写在一个最终的result中,这个result是没有chunk的大小,写入的时候使用offset来指定写入的位置。
然后会有一个ThreadPool来执行这几个chunck子图:
// wrap the thread pool execution in a function and await the thread pool to finish
std::function executeFunction = [&]() {
// use thread pool to manage the threads
ThreadPool thread_pool(4);
for (int i = 0; i < chunk_num; ++i) {
thread_pool.enqueue(std::bind(chunkExecutionFunction, i));
}
};
executeFunction();
QNN Backend
QNN的使用还不熟悉,先学习先。
改进
易用性
-
可以改进前端实现和模型执行的流程,对于子图,更希望是torch中的形式,一个Module可以视为一个子图,使用
to_device()
等函数来标明运行设备。笔者最近在实现一个Eager+Lazy+Static合一的计算图构建模式,可能可以借鉴进去。 -
子图Dispatch是手动的,有完整的计算图,还是用自动的Dispatch算法吧,通用性会更好点。
-
代码中有多处shared_ptr和裸指针混用问题,继承问题可以用
enable_shared_from_this<T>
和static_ptr_cast
来解决的。
性能问题
-
我严重怀疑存在CPU瓶颈+Layout/精度转换瓶颈。导致Chunk之间是没法完全并行起来的。这是最主要的问题。
-
最好使用细粒度的Thread Dispatch方法,OpenMP的线程可能与当前Chunk Thread Pool中的线程干扰。
-
NPU构建图的时候需要不少的Memory,导致峰值内存需求大,可能会使用Swapping memory。