Covalent Bond  0.0.1-alpha
'covalent bond' is a data middle office. As a 2022-2023 Fall SWE final project.
cbComputeGraph.cpp
Go to the documentation of this file.
1 #include "cbComputeGraph.hpp"
3 
4 namespace cb {
5 namespace graph {
6 
8  for (auto item : m_dataFromDevice) { delete item; }
9  for (auto item : m_dataPool) { delete item; }
10 }
11 
13 
14 void cbGraphSharedMem::push(cbMySQLCell* v) { m_dataPool.push_back(v); }
15 
16 void cbGraphSharedMem::push(cbMySQLField* v) { m_fields.push_back(v); }
17 
19  if (!m_outStruct) {
20  m_outStruct = new cbOutputTableStruct(shape, info);
21  } else {
22  m_outStruct->update(shape, info);
23  }
24 }
25 
27  size_t tmp = sizeof(cbMySQLCell) * m_dataPool.size();
28  for (auto& item : m_dataFromDevice) {
29  auto __shape = item->getShape();
30  tmp += sizeof(cbMySQLCell) * __shape[0] * __shape[1];
31  }
32  return tmp;
33 }
34 
36  int32_t tmp = m_dataPool.size();
37  for (auto& item : m_dataFromDevice) {
38  auto __shape = item->getShape();
39  tmp += __shape[0] * __shape[1];
40  }
41  return tmp;
42 }
43 
45 
47  for (auto item : m_dataFromDevice) { delete item; }
48  for (auto item : m_dataPool) { delete item; }
49  for (auto item : m_fields) { delete item; }
50  m_dataFromDevice.clear();
51  m_dataPool.clear();
52  m_fields.clear();
53  // m_outStruct->clear();
54 }
55 
56 void cbGraphSharedLuaStack::execScriptFile(const std::string& filePath) {
57  m_lua().script_file(filePath);
58 }
59 
60 void cbGraphSharedLuaStack::execScript(const std::string& script) { m_lua().script(script); }
61 
63 
64 cbNode::cbNode(const nodeType& nt) : nodeT(nt) {}
65 
66 void cbNode::PointTo(cbNode* ptr) { nextNode = ptr; }
67 
68 WFMySQLTask* cbNode::asSQLTask(void* metaTask) { return (WFMySQLTask*)metaTask; }
69 
70 WFGoTask* cbNode::asGoTask(void* metaTask) { return (WFGoTask*)metaTask; }
71 
73  // no members need to been free
74 }
75 
77  if (m_queries.size() != 1) {
78  fmt::print(fg(fmt::color::red),
79  "We have not support multi SQL sentence for now\n It's just for testing purpose\n");
80  return nullptr;
81  }
82  return (void*)m_device->query(m_queries[0], [=](WFMySQLTask* task) -> void {
83  // 1. Judge the graph is bind to this node.
84  if (graph == nullptr) {
85  fmt::print(fg(fmt::color::red), "Graph is not setted for this node.\n");
86  }
87  // 2. If the task is success.
88  if (task->get_state() != WFT_STATE_SUCCESS) {
89  fprintf(stderr, "task error=%d\n", task->get_error());
90  return;
91  }
92  // 3. get the cursor.
93  protocol::MySQLResultCursor cursor(task->get_resp());
94 
95  // 4. Judge the packet is ok.
96  if (task->get_resp()->get_packet_type() == MYSQL_PACKET_ERROR) {
97  fprintf(stderr, "ERROR. error_code=%d %s\n", task->get_resp()->get_error_code(),
98  task->get_resp()->get_error_msg().c_str());
99  }
100 
101  // store all data.
102  cbVirtualSharedTable* __data = new cbVirtualSharedTable(&cursor);
103  graph->m_sharedMem->push(__data);
104 
105  // Store the virtual table to the io.O port. And pass to the next Node's inputs port.
106  mapShared2Virtual(__data, &io.O);
107  if (nextNode) { nextNode->io.I.push_back(io.O); }
108  if (isFinalOutput) {
110  graph->io.O = io.O;
111  }
112  return;
113  });
114 }
115 
116 void cbVirtualDeviceNode::addQuery(const std::string& q) { m_queries.push_back(q); }
117 
119 
121 
123 
125 
127 
129  // TODO series set.
130  return nullptr;
131 }
132 
133 WFRedisTask* cbRedisCachingNode::_generateSetTask(const std::vector<std::string>& params,
134  const redis_callback& callback_func,
135  void* usrData, int32_t retryTimes) {
136  return m_device->set(params, callback_func, usrData, retryTimes);
137 }
138 
140 
142  Op->luaOverrideFunc = sol::nil;
143  delete Op;
144 }
145 
147 
149  WFGoTask* goTask = WFTaskFactory::create_go_task("execMain", [=]() {
150  Op->io.I = io.I;
151  Op->execMain();
152  });
153 
154  goTask->set_callback([=](WFGoTask* task) {
155  if (task->get_state() == WFT_STATE_SUCCESS) {
156  io.O = Op->io.O;
157  if (isFinalOutput) {
158  graph->m_sharedMem->setOutStruct(io.O.getShape(), io.O.getInfo());
159  graph->io.O = io.O;
160  }
161  if (nextNode) { nextNode->io.I.push_back(io.O); }
162  } else {
163  fmt::print(fg(fmt::color::red), "Go Task exec failed.");
164  return;
165  }
166  Op->io.I.clear();
167  });
168 
169  return (void*)goTask;
170 }
171 
172 void cbOperatorNode::overload(sol::function& funcPtr) { Op->overload(funcPtr); }
173 
174 cbComputeGraph::cbComputeGraph(int32_t idx)
175  : m_idx(idx),
176  m_sharedMem(new cbGraphSharedMem()),
177  m_sharedLuaStack(new cbGraphSharedLuaStack()) {
178  // bind all functions to lua state.
179  auto covalentBound = m_sharedLuaStack->get()()["Cb"].get_or_create<sol::table>();
180  auto covalentBoundF = covalentBound["F"].get_or_create<sol::table>();
181  auto covalentBoundTable = covalentBound["Data"].get_or_create<sol::table>();
182 
183  // bind numerous functions.
184  covalentBoundF.set_function(
185  "refNode", sol::overload([](cbVirtualDeviceNode* v) -> cbNode* { return (cbNode*)v; },
186  [](cbOperatorNode* v) -> cbNode* { return (cbNode*)v; }));
187 
188  covalentBoundF.set_function("__cpp_packedCellAsVec", &__luaPackedCellAsVec);
189  covalentBoundF.set_function("__cpp_packedStringAsVec", &__luaPackedStringAsVec);
190 
191  // bind virtual table/shape, etc;
192  covalentBoundTable.new_usertype<cbShape<2>>(
193  "Shape",
194 
195  "numElements", &cbShape<2>::numElements,
196 
197  sol::meta_function::index, sol::resolve<int32_t&(cbShape<2>&, int32_t)>(fetchShapeIndex),
198 
199  sol::meta_function::new_index,
200  sol::resolve<void(cbShape<2>&, int32_t, int32_t const&)>(storeShapeIndex)
201 
202  );
203 
204  covalentBoundTable.new_usertype<cbVirtualTable>(
205  "KVTable",
206 
207  sol::meta_function::construct,
208  sol::factories([](cbShape<2>& shape) { return MAKE_SHARED(cbVirtualTable)(shape); }),
209 
210  sol::call_constructor,
211  sol::factories([](cbShape<2>& shape) { return MAKE_SHARED(cbVirtualTable)(shape); }),
212 
213  "resetShape", &cbVirtualTable::resetShape,
214 
215  "resetShapeH", &cbVirtualTable::resetShapeH,
216 
217  "setInfoAt", &cbVirtualTable::setInfoAt,
218 
219  "getInfoAt", &cbVirtualTable::getInfoAt,
220 
221  "setPtrAt", &cbVirtualTable::setPtrAt,
222 
223  "getShape", &cbVirtualTable::getShape,
224 
225  "atPtr", &cbVirtualTable::atPtr,
226 
227  "atPtrRef", &cbVirtualTable::atPtrRef,
228 
229  "colNameAt", &cbVirtualTable::colNameAt,
230 
231  "colTypeAt", &cbVirtualTable::colTypeAt,
232 
233  "pushRow", &cbVirtualTable::pushRow,
234 
235  "keyBy", &cbVirtualTable::keyBy,
236 
237  "getCol", &cbVirtualTable::getCol,
238 
239  "getRow", &cbVirtualTable::getRow,
240 
241  "print", &cbVirtualTable::str
242 
243  );
244 
245  covalentBound.new_usertype<cbMySQLField>(
246 
247  "KVField",
248 
249  "setTableName", &cbMySQLField::setTable
250 
251  );
252 
253  // bind enumerate cbMySQLType
254  covalentBound.new_enum<cbMySQLType>(
255 
256  "CellType", {{"Float", cbMySQLType::Float},
257  {"Double", cbMySQLType::Double},
258  {"Int", cbMySQLType::Int},
259  {"ULL", cbMySQLType::ULL},
260  {"String", cbMySQLType::String},
261  {"Date", cbMySQLType::Date},
262  {"Time", cbMySQLType::Time},
263  {"DataTime", cbMySQLType::DataTime},
264  {"Null", cbMySQLType::Null}}
265 
266  );
267 
268  // bind graph.
269  covalentBound.new_usertype<cbComputeGraph>(
270  "ComputeGraph",
271 
272  sol::meta_function::construct,
273  sol::factories([](const int32_t idx) { return MAKE_SHARED(cbComputeGraph)(idx); }),
274 
275  sol::call_constructor,
276  sol::factories([](const int32_t idx) { return MAKE_SHARED(cbComputeGraph)(idx); }),
277 
278  "isDAG", &cbComputeGraph::isDAG,
279 
280  "isSingleOutput", &cbComputeGraph::isSingleOutput,
281 
282  "createKVCell",
283  sol::overload([](cbComputeGraph& p) { return p.createCell(); },
284  [](cbComputeGraph& p, int value) { return p.createCell(value); },
285  [](cbComputeGraph& p, float value) { return p.createCell(value); },
286  [](cbComputeGraph& p, double value) { return p.createCell(value); },
287  [](cbComputeGraph& p, unsigned long long value) { return p.createCell(value); },
288  [](cbComputeGraph& p, const std::string& value) { return p.createCell(value); },
289  [](cbComputeGraph& p, const std::string& value, const cbMySQLType& t) {
290  return p.createCell(value, t);
291  }),
292 
293  "createVirtualDeviceNode", &cbComputeGraph::createVirtualDeviceNode,
294 
295  "createCombineNode", &cbComputeGraph::createCombineNode,
296 
297  "createRedisCachingNode", &cbComputeGraph::createRedisCachingNode,
298 
299  "createFilterNode", &cbComputeGraph::createFilterNode,
300 
302 
304 
306 
308 
309  "addCacheServer", &cbComputeGraph::addCacheServer
310 
311  );
312 
313  // bind this to graph
314  m_sharedLuaStack->get()()["ThisGraph"].get_or_create<cbComputeGraph>(this);
315 
316  // cbIO
317  covalentBound.new_usertype<cbOpIO>(
318 
319  "OpIO",
320 
321  "I", &cbOpIO::I,
322 
323  "O", &cbOpIO::O
324 
325  );
326 
327  // bind base Op
328  covalentBound.new_usertype<baseOp>(
329 
330  "cbBaseOp",
331 
332  "io", &baseOp::io
333 
334  );
335 
336  // bind CombineOp
337  covalentBound.new_usertype<cbOpCombine>(
338 
339  "cbOpCombine",
340 
341  "overrideFunc", &cbOpCombine::overload
342 
343  );
344 
345  // bind cbVirtualDeviceNode
346  covalentBound.new_usertype<cbVirtualDeviceNode>(
347 
348  "VirtualDeviceNode",
349 
350  "PointTo", &cbVirtualDeviceNode::PointTo,
351 
352  "addQuery", &cbVirtualDeviceNode::addQuery,
353 
355 
356  );
357 
358  // bind caching node
359  covalentBound.new_usertype<cbRedisCachingNode>("RedisCachingNode");
360 
361  // bind operation node
362  covalentBound.new_usertype<cbOperatorNode>(
363 
364  "OperatorNode",
365 
366  "PointTo", &cbOperatorNode::PointTo,
367 
368  "io", &cbOperatorNode::io,
369 
370  "Op", &cbOperatorNode::Op
371 
372  );
373 }
374 
376  delete m_sharedMem;
377  delete m_sharedLuaStack;
378  if (!m_cacheNode) delete m_cacheNode;
379  for (auto item : m_nodes) { delete item; }
380 }
381 
383  if (!isSingleOutput()) return false;
384  // n^2 is ok, the graph is not such big.
385  for (auto item : m_nodes) {
386  cbNode* cur = item;
387  cbNode* next = cur;
388  while (next->nextNode != nullptr) {
389  next = next->nextNode;
390  if (next == cur) { return false; }
391  }
392  }
393  return true;
394 }
395 
397  int __singleOutCnt = 0;
398  for (auto& item : m_nodes) {
399  if (item->nextNode == nullptr) {
400  __singleOutCnt++;
401  item->isFinalOutput = true;
402  }
403  }
404  return __singleOutCnt == 1 ? true : false;
405 }
406 
408  cbMySQLCell* ans = new cbMySQLCell();
409  m_sharedMem->push(ans);
410  return ans;
411 }
412 
414  cbMySQLCell* ans = new cbMySQLCell(value);
415  m_sharedMem->push(ans);
416  return ans;
417 }
418 
420  cbMySQLCell* ans = new cbMySQLCell(value);
421  m_sharedMem->push(ans);
422  return ans;
423 }
424 
426  cbMySQLCell* ans = new cbMySQLCell(value);
427  m_sharedMem->push(ans);
428  return ans;
429 }
430 
431 cbMySQLCell* cbComputeGraph::createCell(unsigned long long value) {
432  cbMySQLCell* ans = new cbMySQLCell(value);
433  m_sharedMem->push(ans);
434  return ans;
435 }
436 
437 cbMySQLCell* cbComputeGraph::createCell(const std::string& value) {
438  cbMySQLCell* ans = new cbMySQLCell(value);
439  m_sharedMem->push(ans);
440  return ans;
441 }
442 
443 cbMySQLCell* cbComputeGraph::createCell(const std::string& value, const cbMySQLType& t) {
444  cbMySQLCell* ans = new cbMySQLCell(value, t);
445  m_sharedMem->push(ans);
446  return ans;
447 }
448 
452  this->registerNode(ans);
453  return ans;
454 }
455 
457  cbRedisCachingNode* ans = new cbRedisCachingNode(idx);
459  return ans;
460 }
461 
462 cbOperatorNode* cbComputeGraph::createCombineNode(const std::vector<std::string>& keys,
463  const std::string& name) {
464  cbOpCombine* ansOp = new cbOpCombine(keys, name);
465 
466  ansOp->overload(this->m_sharedLuaStack->get()()["Cb"]["Op"]["CombineOp"]);
467 
468  cbOperatorNode* ans = new cbOperatorNode(ansOp);
469  this->registerNode(ans);
470  return ans;
471 }
472 
474  const sol::function& exF) {
475  cbOpFilter* ansOp = new cbOpFilter(boolF, exF);
476 
477  ansOp->overload(this->m_sharedLuaStack->get()()["Cb"]["Op"]["Filter"]);
478  cbOperatorNode* ans = new cbOperatorNode(ansOp);
479  this->registerNode(ans);
480  return ans;
481 }
482 
484  m_virtualDevice = virtualDeviceM;
485 }
486 
488  m_nodes.push_back(node);
489  node->graph = this;
490 }
491 
492 int32_t cbComputeGraph::getId() { return m_idx; }
493 
495  WFGraphTask* graph = WFTaskFactory::create_graph_task([=](WFGraphTask* task) {
496  fmt::print(fg(fmt::color::steel_blue) | fmt::emphasis::italic,
497  "Graph task {} complete. Wakeup main process\n", m_idx);
498  fmt::print(fg(fmt::color::steel_blue) | fmt::emphasis::italic,
499  "--------------------------------------------------\n");
500 
501  // If the caching set is not nullptr. Caching all table to l3 redis server.
502  if (m_cacheNode && m_sharedMem->getOutStruct() != nullptr) {
503  int32_t row = m_sharedMem->getOutStruct()->m_shape[0];
504  int32_t col = m_sharedMem->getOutStruct()->m_shape[1];
505  auto gos = m_sharedMem->getOutStruct();
506  for (int32_t i = 0; i < row; ++i) {
507  for (int32_t j = 0; j < col; ++j) {
508  // To String and get packed to key and value.
509  std::vector<std::string> kv;
510  kv.push_back(gos->genKey4Redis(i, j));
511  auto cell = io.O.atPtr(i, j);
512  if (cell->isDate() || cell->isDatetime() || cell->isString()) {
513  kv.push_back(cell->asString());
514  } else if (cell->isInt()) {
515  kv.push_back(std::to_string(cell->asInt()));
516  } else if (cell->isFloat()) {
517  kv.push_back(std::to_string(cell->asFloat()));
518  } else if (cell->isULL()) {
519  kv.push_back(std::to_string(cell->asULL()));
520  } else if (cell->asDouble()) {
521  kv.push_back(std::to_string(cell->asDouble()));
522  }
523 
524  // Generate redis tasks.
525  auto redisSetTask = m_cacheNode->_generateSetTask(
526  kv, [](WFRedisTask* task) {}, nullptr, 3);
527  redisSetTask->start();
528  }
529  }
530  }
531  // clear tmp memory;
532  m_sharedMem->clear();
533  for (auto& item : m_nodes) { item->io.I.clear(); }
534  });
535  if (!(isDAG() && isSingleOutput())) {
536  fmt::print(fg(fmt::color::red), "The graph {} is not DAG or has multi outputs\n", m_idx);
537  return nullptr;
538  }
539  std::map<cbNode*, WFGraphNode*> __cb2WF;
540 
541  WFGraphNode* __tail = nullptr;
542 
543  for (auto item : m_nodes) {
544  switch (item->nodeT) {
545  case nodeType::Leaf:
546  __cb2WF[item] = &graph->create_graph_node((WFMySQLTask*)item->generateTask());
547  break;
548  case nodeType::Operator:
549  __cb2WF[item] = &graph->create_graph_node((WFGoTask*)item->generateTask());
550  break;
551  case nodeType::Output: break;
552  }
553  }
554  for (auto item : m_nodes) {
555  if (item->nextNode != nullptr) [[likely]] {
556  (*__cb2WF[item])-- > (*__cb2WF[item->nextNode]);
557  } else {
558  __tail = __cb2WF[item];
559  }
560  }
561  return graph;
562 }
563 
564 void cbComputeGraph::execMain(WFGraphTask* task, cbComputeGraph* graph) {
565  if (task != nullptr && graph != nullptr) {
566  fmt::print(fg(fmt::color::steel_blue) | fmt::emphasis::italic,
567  "--------------------------------------------------\n");
568  task->start();
569  } else {
570  fmt::print(fg(fmt::color::red), "The pointer to task or graph is nullptr\n");
571  }
572 }
573 
574 void cbComputeGraph::execScriptFile(const std::string& filePath) {
575  m_sharedLuaStack->execScriptFile(filePath);
576 }
577 
578 void cbComputeGraph::execScript(const std::string& script) { m_sharedLuaStack->execScript(script); }
579 
581  m_cacheNode = v;
582  v->graph = this;
583 }
584 
586 
587 std::vector<cbNode*> cbComputeGraph::getNodes() { return m_nodes; }
588 
589 }; // namespace graph
590 } // namespace cb
cb::graph::cbGraphSharedMem::clear
void clear()
Definition: cbComputeGraph.cpp:46
__luaPackedStringAsVec
std::vector< std::string > __luaPackedStringAsVec()
Definition: cbTable.cpp:474
cb::graph::cbComputeGraph::createRedisCachingNode
cbRedisCachingNode * createRedisCachingNode(int32_t idx)
! for sql only, now.
Definition: cbComputeGraph.cpp:456
cbMySQLType::String
@ String
cb::graph::cbComputeGraph
Definition: cbComputeGraph.hpp:342
cb::graph::cbRedisCachingNode::setRedisDevice
void setRedisDevice(trivial::cbRedisDevice *device=nullptr)
Definition: cbComputeGraph.cpp:139
cbOpIO::O
cbVirtualTable O
Definition: cbOperator.hpp:40
cb::graph::cbComputeGraph::getId
int32_t getId()
Get the Id object.
Definition: cbComputeGraph.cpp:492
cbVirtualTable::str
void str()
Definition: cbTable.cpp:199
cbOpCombine
Definition: cbOperator.hpp:108
luaJitThread
Definition: luaEngine.hpp:46
cb::graph::cbComputeGraph::execScriptFile
void execScriptFile(const std::string &filePath)
execute the script from file.
Definition: cbComputeGraph.cpp:574
cbVirtualTable::colTypeAt
std::string colTypeAt(int32_t i)
Definition: cbTable.cpp:197
cb::graph::cbNode::asSQLTask
static WFMySQLTask * asSQLTask(void *metaTask)
Definition: cbComputeGraph.cpp:68
cbMySQLType::Float
@ Float
cbVirtualTable::atPtr
cbMySQLCell * atPtr(int32_t i, int32_t j)
row major
Definition: cbTable.cpp:128
cb::graph::cbComputeGraph::getOutput
cbOutputTableStruct * getOutput()
Get the Output object.
Definition: cbComputeGraph.cpp:585
cbVirtualTable::getInfoAt
cbMySQLField * getInfoAt(int32_t i)
Get the Info At object.
Definition: cbTable.cpp:124
cb::graph::nodeType
nodeType
The node of compute graphs has 3 types: Leaf: the input, mostly the Sql device Output: the virtual ta...
Definition: cbComputeGraph.hpp:51
cbVirtualTable::getInfo
cbMySQLField ** getInfo()
Get the Info object.
Definition: cbTable.cpp:115
cbVirtualTable
cbVirtualTable works as a reference from shared memory. It only use a shape and SqlCell to define dif...
Definition: cbTable.hpp:605
cb::listNode
Elements of list.
Definition: cbLRUCache.hpp:26
cb::graph::cbGraphSharedMem::m_fields
std::vector< cbMySQLField * > m_fields
Definition: cbComputeGraph.hpp:163
cb::graph::cbGraphSharedLuaStack::m_lua
luaJitThread m_lua
Definition: cbComputeGraph.hpp:212
redis_callback
std::function< void(WFRedisTask *)> redis_callback
Definition: cbVirtualDevice.hpp:33
cb::graph::cbComputeGraph::setVirtualDeviceManager
void setVirtualDeviceManager(trivial::cbVirtualDeviceManager *virtualDeviceM)
Set the Virtual Device Manager object.
Definition: cbComputeGraph.cpp:483
cbVirtualTable::getRow
cbVirtualTable getRow(int32_t i)
Get the Row object.
Definition: cbTable.cpp:134
cb::graph::cbComputeGraph::io
cbOpIO io
Definition: cbComputeGraph.hpp:532
cbOutputTableStruct
Definition: cbTable.hpp:776
baseOp::execMain
virtual void execMain()=0
trivial::cbVirtualDeviceManager::getRedisDevice
cbRedisDevice * getRedisDevice(int32_t idx)
Definition: cbVirtualDevice.cpp:146
baseOp::io
cbOpIO io
Definition: cbOperator.hpp:54
cb::graph::cbOperatorNode
A operator node. Generate all go task. Then pass the output to the next node's inputs.
Definition: cbComputeGraph.hpp:313
cbMySQLField
A copy move from workflow MySQLResult.h and .inl file.
Definition: cbTable.hpp:404
cb::graph::cbComputeGraph::getNodes
std::vector< cbNode * > getNodes()
Get the Nodes object.
Definition: cbComputeGraph.cpp:587
cbMySQLType::Time
@ Time
graph_callback
std::function< void(WFGraphTask *)> graph_callback
_WIN32
Definition: cbComputeGraph.hpp:37
cb::graph::cbComputeGraph::isDAG
bool isDAG()
To judge the Graph is DAG or not.
Definition: cbComputeGraph.cpp:382
cbVirtualTable::setPtrAt
void setPtrAt(int32_t i, int32_t j, cbMySQLCell *v)
Set the Ptr At object.
Definition: cbTable.cpp:132
cb::graph::cbGraphSharedMem::getOutStruct
cbOutputTableStruct * getOutStruct()
Get the Out Struct object.
Definition: cbComputeGraph.cpp:44
cb::graph::cbComputeGraph::isSingleOutput
bool isSingleOutput()
To judge the graph has single output or not.
Definition: cbComputeGraph.cpp:396
cb::graph::cbComputeGraph::cbOperatorNode
friend cbOperatorNode
Definition: cbComputeGraph.hpp:346
cb::graph::cbVirtualDeviceNode::getDevice
trivial::cbMySqlDevice * getDevice()
Get the Device object.
Definition: cbComputeGraph.cpp:118
cb::graph::cbGraphSharedMem::m_outStruct
cbOutputTableStruct * m_outStruct
Definition: cbComputeGraph.hpp:160
cbMySQLType::DataTime
@ DataTime
cb::graph::cbRedisCachingNode::cbRedisCachingNode
cbRedisCachingNode()=delete
cb::graph::cbRedisCachingNode
Definition: cbComputeGraph.hpp:286
storeShapeIndex
auto storeShapeIndex(T &c, int32_t idx, int32_t const &v) -> void
Definition: cbTable.hpp:798
cb::graph::cbNode::task
void * task
Definition: cbComputeGraph.hpp:240
cbOutputTableStruct::update
void update(const cbShape< 2 > &shape, cbMySQLField **info)
Definition: cbTable.cpp:217
cb::graph::cbVirtualDeviceNode
This node include virtual device infomation and perform the final operation of how to get the data fr...
Definition: cbComputeGraph.hpp:253
cbMySQLCell
Definition: cbTable.hpp:468
cbOpFilter
TODO.
Definition: cbOperator.hpp:138
cbVirtualTable::resetShapeH
void resetShapeH(const cbShape< 2 > &shape)
Definition: cbTable.cpp:109
cb::graph::cbNode
Definition: cbComputeGraph.hpp:219
cb::graph::cbGraphSharedLuaStack::get
luaJitThread & get()
get the lua state.
Definition: cbComputeGraph.cpp:62
cb::graph::cbComputeGraph::cbVirtualDeviceNode
friend cbVirtualDeviceNode
Definition: cbComputeGraph.hpp:344
cb::graph::cbComputeGraph::createFilterNode
cbOperatorNode * createFilterNode(const sol::function &boolF, const sol::function &exF)
Create a Filter Node object.
Definition: cbComputeGraph.cpp:473
mapShared2Virtual
void mapShared2Virtual(cbVirtualSharedTable *sharedT, cbVirtualTable *virtualT)
Definition: cbTable.cpp:232
cbVirtualTable::pushRow
void pushRow(const std::vector< cbMySQLCell * > &row)
Definition: cbTable.cpp:189
cbVirtualTable::setInfoAt
void setInfoAt(int32_t i, cbMySQLField *v)
Set the Info At object.
Definition: cbTable.cpp:119
cbVirtualTable::getCol
cbVirtualTable getCol(int32_t i)
Get the Col object.
Definition: cbTable.cpp:145
cb::graph::cbNode::isFinalOutput
bool isFinalOutput
Definition: cbComputeGraph.hpp:237
cb
_WIN32
Definition: api.cpp:4
cb::graph::cbNode::io
cbOpIO io
Definition: cbComputeGraph.hpp:234
cbMySQLField::setTable
void setTable(const std::string &value)
Definition: cbTable.cpp:295
cb::graph::cbGraphSharedMem::getCellNum
int32_t getCellNum()
Get the Cell Num object.
Definition: cbComputeGraph.cpp:35
cbMySQLType::Double
@ Double
cbOpIO
The input and output of one Operator.
Definition: cbOperator.hpp:38
cb::graph::cbVirtualDeviceNode::~cbVirtualDeviceNode
~cbVirtualDeviceNode() override final
Definition: cbComputeGraph.cpp:72
cb::graph::cbVirtualDeviceNode::addQuery
void addQuery(const std::string &q)
Add a String type sql sentence to this node. Waiting to be execute.
Definition: cbComputeGraph.cpp:116
cb::graph::cbComputeGraph::createCombineNode
cbOperatorNode * createCombineNode(const std::vector< std::string > &keys, const std::string &name)
Create a Combine Node object.
Definition: cbComputeGraph.cpp:462
cb::graph::cbComputeGraph::execScript
void execScript(const std::string &script)
execute the script
Definition: cbComputeGraph.cpp:578
cb::graph::cbGraphSharedMem::setOutStruct
void setOutStruct(const cbShape< 2 > &shape, cbMySQLField **info)
Set the Out Struct object.
Definition: cbComputeGraph.cpp:18
fetchShapeIndex
auto fetchShapeIndex(T &c, int32_t idx) -> int32_t &
Definition: cbTable.hpp:793
cb::graph::cbVirtualDeviceNode::cbVirtualDeviceNode
cbVirtualDeviceNode()
Definition: cbComputeGraph.cpp:122
cbMySQLType::ULL
@ ULL
cb::graph::cbVirtualDeviceNode::m_queries
std::vector< std::string > m_queries
Definition: cbComputeGraph.hpp:283
cb::graph::cbComputeGraph::m_idx
int32_t m_idx
Definition: cbComputeGraph.hpp:531
cbShape< 2 >
cb::graph::cbComputeGraph::createVirtualDeviceNode
cbVirtualDeviceNode * createVirtualDeviceNode(int32_t idx)
create nodes belongs to this graph.
Definition: cbComputeGraph.cpp:449
cb::graph::nodeType::Leaf
@ Leaf
cb::graph::cbOperatorNode::generateTask
void * generateTask() override
Definition: cbComputeGraph.cpp:148
cb::graph::cbNode::graph
cbComputeGraph * graph
Definition: cbComputeGraph.hpp:242
cb::graph::cbRedisCachingNode::generateTask
void * generateTask() override final
Definition: cbComputeGraph.cpp:128
cbOpFilter::overload
void overload(const sol::function &func) override final
Definition: cbOperator.cpp:75
trivial::cbVirtualDeviceManager::getMySqlDevice
cbMySqlDevice * getMySqlDevice(int32_t idx)
Definition: cbVirtualDevice.cpp:141
cb::graph::cbGraphSharedMem::m_dataPool
std::vector< cbMySQLCell * > m_dataPool
Definition: cbComputeGraph.hpp:162
cb::graph::cbGraphSharedLuaStack
Definition: cbComputeGraph.hpp:170
cb::graph::cbComputeGraph::m_virtualDevice
trivial::cbVirtualDeviceManager * m_virtualDevice
Definition: cbComputeGraph.hpp:538
baseOp::luaOverrideFunc
sol::function luaOverrideFunc
Definition: cbOperator.hpp:58
cbVirtualSharedTable
cbVirtualSharedTable is a container of shared memory.
Definition: cbTable.hpp:518
cb::graph::cbComputeGraph::~cbComputeGraph
~cbComputeGraph()
Definition: cbComputeGraph.cpp:375
cbComputeGraph.hpp
The compute graph(DAG), a prepared graph for task flow to execute. It works as a state machine.
cbOpIO::I
std::vector< cbVirtualTable > I
Definition: cbOperator.hpp:39
cb::graph::cbRedisCachingNode::~cbRedisCachingNode
~cbRedisCachingNode() override final
Definition: cbComputeGraph.cpp:126
cb::graph::cbOperatorNode::Op
baseOp * Op
Definition: cbComputeGraph.hpp:333
cb::graph::cbGraphSharedLuaStack::execScript
void execScript(const std::string &script)
execute the hard coded script.
Definition: cbComputeGraph.cpp:60
trivial::cbVirtualDeviceManager
Definition: cbVirtualDevice.hpp:260
cbVirtualTable::atPtrRef
cbMySQLCell *& atPtrRef(int32_t i, int32_t j)
Definition: cbTable.cpp:130
cb::graph::cbComputeGraph::createCell
cbMySQLCell * createCell()
Create a Cell object.
Definition: cbComputeGraph.cpp:407
cbVirtualDevice.hpp
abstract virtual device. Provide MySql/Redis/Kafka warper. All virtual device will handle the connect...
cb::graph::cbOperatorNode::cbOperatorNode
cbOperatorNode(baseOp *op)
Definition: cbComputeGraph.cpp:146
trivial::cbMySqlDevice
Definition: cbVirtualDevice.hpp:95
cbMySQLType::Null
@ Null
cb::graph::cbGraphSharedMem::push
void push(cbVirtualSharedTable *v)
Push a virtual shared table to mem of graph.
Definition: cbComputeGraph.cpp:12
cb::graph::cbNode::nextNode
cbNode * nextNode
Definition: cbComputeGraph.hpp:241
cbVirtualTable::colNameAt
std::string colNameAt(int32_t i)
Definition: cbTable.cpp:195
cbMySQLType
cbMySQLType
Definition: cbTable.hpp:375
cb::graph::cbGraphSharedLuaStack::execScriptFile
void execScriptFile(const std::string &filePath)
execute the script file from disk directly.
Definition: cbComputeGraph.cpp:56
cb::graph::cbComputeGraph::addCacheServer
void addCacheServer(cbRedisCachingNode *v)
The cache server node. Lua binding in.
Definition: cbComputeGraph.cpp:580
cb::graph::cbRedisCachingNode::_generateSetTask
WFRedisTask * _generateSetTask(const std::vector< std::string > &params, const redis_callback &callback_func=nullptr, void *usrData=nullptr, int32_t retryTimes=3)
Definition: cbComputeGraph.cpp:133
cbVirtualTable::resetShape
void resetShape(const cbShape< 2 > &shape)
Definition: cbTable.cpp:98
cb::graph::cbGraphSharedMem::~cbGraphSharedMem
~cbGraphSharedMem()
Destroy the cb Graph Shared Mem object.
Definition: cbComputeGraph.cpp:7
cb::graph::cbVirtualDeviceNode::setMySQLDevice
void setMySQLDevice(trivial::cbMySqlDevice *device=nullptr)
Definition: cbComputeGraph.cpp:120
cb::graph::cbVirtualDeviceNode::m_device
trivial::cbMySqlDevice * m_device
Definition: cbComputeGraph.hpp:282
cb::graph::nodeType::Operator
@ Operator
cb::graph::cbComputeGraph::execMain
static void execMain(WFGraphTask *task, cbComputeGraph *graph)
Execute the graph task.
Definition: cbComputeGraph.cpp:564
cb::graph::cbComputeGraph::generateGraphTask
WFGraphTask * generateGraphTask(const graph_callback &func=nullptr)
Create the graph task.
Definition: cbComputeGraph.cpp:494
baseOp
basic operator.
Definition: cbOperator.hpp:47
cb::graph::cbNode::PointTo
void PointTo(cbNode *ptr)
Definition: cbComputeGraph.cpp:66
cb::graph::cbComputeGraph::m_cacheNode
cbRedisCachingNode * m_cacheNode
Definition: cbComputeGraph.hpp:534
cb::graph::cbComputeGraph::m_sharedLuaStack
cbGraphSharedLuaStack * m_sharedLuaStack
Definition: cbComputeGraph.hpp:536
cbMySQLType::Int
@ Int
trivial::cbRedisDevice
Definition: cbVirtualDevice.hpp:178
cb::graph::cbVirtualDeviceNode::generateTask
void * generateTask() override final
generate a SQL wf task for now.
Definition: cbComputeGraph.cpp:76
cb::graph::cbNode::asGoTask
static WFGoTask * asGoTask(void *metaTask)
Definition: cbComputeGraph.cpp:70
cb::graph::cbNode::cbNode
cbNode(const nodeType &nt)
Definition: cbComputeGraph.cpp:64
cb::graph::cbGraphSharedMem
The shared memory of compute graph. Include 2 basic components.
Definition: cbComputeGraph.hpp:63
cb::graph::cbComputeGraph::m_nodes
std::vector< cbNode * > m_nodes
Definition: cbComputeGraph.hpp:533
trivial::cbRedisDevice::set
WFRedisTask * set(const std::vector< std::string > &params, const redis_callback &callback_func=nullptr, void *usrData=nullptr, int32_t retryTimes=3)
Definition: cbVirtualDevice.cpp:74
cb::graph::cbGraphSharedMem::getMemUsed
size_t getMemUsed()
Get the Mem Used object.
Definition: cbComputeGraph.cpp:26
cb::graph::cbGraphSharedMem::m_dataFromDevice
std::vector< cbVirtualSharedTable * > m_dataFromDevice
Definition: cbComputeGraph.hpp:161
__luaPackedCellAsVec
std::vector< cbMySQLCell * > __luaPackedCellAsVec(cbMySQLCell *v)
Definition: cbTable.cpp:467
cb::graph::cbRedisCachingNode::m_device
trivial::cbRedisDevice * m_device
Definition: cbComputeGraph.hpp:305
cb::graph::cbOperatorNode::~cbOperatorNode
~cbOperatorNode() override
Definition: cbComputeGraph.cpp:141
cb::graph::cbComputeGraph::m_sharedMem
cbGraphSharedMem * m_sharedMem
Definition: cbComputeGraph.hpp:535
cb::graph::cbComputeGraph::registerNode
void registerNode(cbNode *node)
register a node. Not used by lua binding.
Definition: cbComputeGraph.cpp:487
cbVirtualTable::getShape
cbShape< 2 > getShape()
Get the Shape object.
Definition: cbTable.hpp:687
cbOutputTableStruct::m_shape
cbShape< 2 > m_shape
Definition: cbTable.hpp:786
cbVirtualTable::keyBy
std::map< int32_t, int32_t > keyBy(const std::string &colName) const
Definition: cbTable.cpp:156
cb::graph::nodeType::Output
@ Output
cbOpCombine::overload
void overload(const sol::function &func) override final
Definition: cbOperator.cpp:34
trivial::cbMySqlDevice::query
WFMySQLTask * query(const std::string &q, const mysql_callback &callback_func=nullptr)
Definition: cbVirtualDevice.cpp:31
MAKE_SHARED
#define MAKE_SHARED(x)
Definition: pch.hpp:182
cb::graph::cbComputeGraph::cbRedisCachingNode
friend cbRedisCachingNode
Definition: cbComputeGraph.hpp:345
cbMySQLType::Date
@ Date