|
Covalent Bond
0.0.1-alpha
'covalent bond' is a data middle office. As a 2022-2023 Fall SWE final project.
|
Go to the documentation of this file.
29 auto __shape = item->getShape();
30 tmp +=
sizeof(
cbMySQLCell) * __shape[0] * __shape[1];
38 auto __shape = item->getShape();
39 tmp += __shape[0] * __shape[1];
49 for (
auto item :
m_fields) {
delete item; }
57 m_lua().script_file(filePath);
78 fmt::print(fg(fmt::color::red),
79 "We have not support multi SQL sentence for now\n It's just for testing purpose\n");
84 if (
graph ==
nullptr) {
85 fmt::print(fg(fmt::color::red),
"Graph is not setted for this node.\n");
88 if (
task->get_state() != WFT_STATE_SUCCESS) {
89 fprintf(stderr,
"task error=%d\n",
task->get_error());
93 protocol::MySQLResultCursor cursor(
task->get_resp());
96 if (
task->get_resp()->get_packet_type() == MYSQL_PACKET_ERROR) {
97 fprintf(stderr,
"ERROR. error_code=%d %s\n",
task->get_resp()->get_error_code(),
98 task->get_resp()->get_error_msg().c_str());
135 void* usrData, int32_t retryTimes) {
136 return m_device->
set(params, callback_func, usrData, retryTimes);
149 WFGoTask* goTask = WFTaskFactory::create_go_task(
"execMain", [=]() {
154 goTask->set_callback([=](WFGoTask*
task) {
155 if (
task->get_state() == WFT_STATE_SUCCESS) {
158 graph->m_sharedMem->setOutStruct(io.O.getShape(), io.O.getInfo());
163 fmt::print(fg(fmt::color::red),
"Go Task exec failed.");
169 return (
void*)goTask;
172 void cbOperatorNode::overload(sol::function& funcPtr) { Op->overload(funcPtr); }
174 cbComputeGraph::cbComputeGraph(int32_t idx)
180 auto covalentBoundF = covalentBound[
"F"].get_or_create<sol::table>();
181 auto covalentBoundTable = covalentBound[
"Data"].get_or_create<sol::table>();
184 covalentBoundF.set_function(
199 sol::meta_function::new_index,
207 sol::meta_function::construct,
210 sol::call_constructor,
272 sol::meta_function::construct,
275 sol::call_constructor,
317 covalentBound.new_usertype<
cbOpIO>(
328 covalentBound.new_usertype<
baseOp>(
379 for (
auto item :
m_nodes) {
delete item; }
390 if (next == cur) {
return false; }
397 int __singleOutCnt = 0;
399 if (item->nextNode ==
nullptr) {
401 item->isFinalOutput =
true;
404 return __singleOutCnt == 1 ? true :
false;
463 const std::string& name) {
474 const sol::function& exF) {
495 WFGraphTask* graph = WFTaskFactory::create_graph_task([=](WFGraphTask* task) {
496 fmt::print(fg(fmt::color::steel_blue) | fmt::emphasis::italic,
497 "Graph task {} complete. Wakeup main process\n",
m_idx);
498 fmt::print(fg(fmt::color::steel_blue) | fmt::emphasis::italic,
499 "--------------------------------------------------\n");
506 for (int32_t i = 0; i < row; ++i) {
507 for (int32_t j = 0; j < col; ++j) {
509 std::vector<std::string> kv;
510 kv.push_back(gos->genKey4Redis(i, j));
512 if (cell->isDate() || cell->isDatetime() || cell->isString()) {
513 kv.push_back(cell->asString());
514 }
else if (cell->isInt()) {
515 kv.push_back(std::to_string(cell->asInt()));
516 }
else if (cell->isFloat()) {
517 kv.push_back(std::to_string(cell->asFloat()));
518 }
else if (cell->isULL()) {
519 kv.push_back(std::to_string(cell->asULL()));
520 }
else if (cell->asDouble()) {
521 kv.push_back(std::to_string(cell->asDouble()));
526 kv, [](WFRedisTask* task) {},
nullptr, 3);
527 redisSetTask->start();
533 for (
auto& item :
m_nodes) { item->io.I.clear(); }
536 fmt::print(fg(fmt::color::red),
"The graph {} is not DAG or has multi outputs\n",
m_idx);
539 std::map<cbNode*, WFGraphNode*> __cb2WF;
541 WFGraphNode* __tail =
nullptr;
544 switch (item->nodeT) {
546 __cb2WF[item] = &graph->create_graph_node((WFMySQLTask*)item->generateTask());
549 __cb2WF[item] = &graph->create_graph_node((WFGoTask*)item->generateTask());
555 if (item->nextNode !=
nullptr) [[likely]] {
556 (*__cb2WF[item])-- > (*__cb2WF[item->nextNode]);
558 __tail = __cb2WF[item];
565 if (task !=
nullptr && graph !=
nullptr) {
566 fmt::print(fg(fmt::color::steel_blue) | fmt::emphasis::italic,
567 "--------------------------------------------------\n");
570 fmt::print(fg(fmt::color::red),
"The pointer to task or graph is nullptr\n");
std::vector< std::string > __luaPackedStringAsVec()
cbRedisCachingNode * createRedisCachingNode(int32_t idx)
! for sql only, now.
void setRedisDevice(trivial::cbRedisDevice *device=nullptr)
int32_t getId()
Get the Id object.
void execScriptFile(const std::string &filePath)
execute the script from file.
std::string colTypeAt(int32_t i)
static WFMySQLTask * asSQLTask(void *metaTask)
cbMySQLCell * atPtr(int32_t i, int32_t j)
row major
cbOutputTableStruct * getOutput()
Get the Output object.
cbMySQLField * getInfoAt(int32_t i)
Get the Info At object.
nodeType
The node of compute graphs has 3 types: Leaf: the input, mostly the Sql device Output: the virtual ta...
cbMySQLField ** getInfo()
Get the Info object.
cbVirtualTable works as a reference from shared memory. It only use a shape and SqlCell to define dif...
std::vector< cbMySQLField * > m_fields
std::function< void(WFRedisTask *)> redis_callback
void setVirtualDeviceManager(trivial::cbVirtualDeviceManager *virtualDeviceM)
Set the Virtual Device Manager object.
cbVirtualTable getRow(int32_t i)
Get the Row object.
virtual void execMain()=0
cbRedisDevice * getRedisDevice(int32_t idx)
A operator node. Generate all go task. Then pass the output to the next node's inputs.
A copy move from workflow MySQLResult.h and .inl file.
std::vector< cbNode * > getNodes()
Get the Nodes object.
std::function< void(WFGraphTask *)> graph_callback
_WIN32
bool isDAG()
To judge the Graph is DAG or not.
void setPtrAt(int32_t i, int32_t j, cbMySQLCell *v)
Set the Ptr At object.
cbOutputTableStruct * getOutStruct()
Get the Out Struct object.
bool isSingleOutput()
To judge the graph has single output or not.
trivial::cbMySqlDevice * getDevice()
Get the Device object.
cbOutputTableStruct * m_outStruct
cbRedisCachingNode()=delete
auto storeShapeIndex(T &c, int32_t idx, int32_t const &v) -> void
void update(const cbShape< 2 > &shape, cbMySQLField **info)
This node include virtual device infomation and perform the final operation of how to get the data fr...
void resetShapeH(const cbShape< 2 > &shape)
luaJitThread & get()
get the lua state.
friend cbVirtualDeviceNode
cbOperatorNode * createFilterNode(const sol::function &boolF, const sol::function &exF)
Create a Filter Node object.
void mapShared2Virtual(cbVirtualSharedTable *sharedT, cbVirtualTable *virtualT)
void pushRow(const std::vector< cbMySQLCell * > &row)
void setInfoAt(int32_t i, cbMySQLField *v)
Set the Info At object.
cbVirtualTable getCol(int32_t i)
Get the Col object.
void setTable(const std::string &value)
int32_t getCellNum()
Get the Cell Num object.
The input and output of one Operator.
~cbVirtualDeviceNode() override final
void addQuery(const std::string &q)
Add a String type sql sentence to this node. Waiting to be execute.
cbOperatorNode * createCombineNode(const std::vector< std::string > &keys, const std::string &name)
Create a Combine Node object.
void execScript(const std::string &script)
execute the script
void setOutStruct(const cbShape< 2 > &shape, cbMySQLField **info)
Set the Out Struct object.
auto fetchShapeIndex(T &c, int32_t idx) -> int32_t &
std::vector< std::string > m_queries
cbVirtualDeviceNode * createVirtualDeviceNode(int32_t idx)
create nodes belongs to this graph.
void * generateTask() override
void * generateTask() override final
void overload(const sol::function &func) override final
cbMySqlDevice * getMySqlDevice(int32_t idx)
std::vector< cbMySQLCell * > m_dataPool
trivial::cbVirtualDeviceManager * m_virtualDevice
sol::function luaOverrideFunc
cbVirtualSharedTable is a container of shared memory.
The compute graph(DAG), a prepared graph for task flow to execute. It works as a state machine.
std::vector< cbVirtualTable > I
~cbRedisCachingNode() override final
void execScript(const std::string &script)
execute the hard coded script.
cbMySQLCell *& atPtrRef(int32_t i, int32_t j)
cbMySQLCell * createCell()
Create a Cell object.
abstract virtual device. Provide MySql/Redis/Kafka warper. All virtual device will handle the connect...
cbOperatorNode(baseOp *op)
void push(cbVirtualSharedTable *v)
Push a virtual shared table to mem of graph.
std::string colNameAt(int32_t i)
void execScriptFile(const std::string &filePath)
execute the script file from disk directly.
void addCacheServer(cbRedisCachingNode *v)
The cache server node. Lua binding in.
WFRedisTask * _generateSetTask(const std::vector< std::string > ¶ms, const redis_callback &callback_func=nullptr, void *usrData=nullptr, int32_t retryTimes=3)
void resetShape(const cbShape< 2 > &shape)
~cbGraphSharedMem()
Destroy the cb Graph Shared Mem object.
void setMySQLDevice(trivial::cbMySqlDevice *device=nullptr)
trivial::cbMySqlDevice * m_device
static void execMain(WFGraphTask *task, cbComputeGraph *graph)
Execute the graph task.
WFGraphTask * generateGraphTask(const graph_callback &func=nullptr)
Create the graph task.
void PointTo(cbNode *ptr)
cbRedisCachingNode * m_cacheNode
cbGraphSharedLuaStack * m_sharedLuaStack
void * generateTask() override final
generate a SQL wf task for now.
static WFGoTask * asGoTask(void *metaTask)
cbNode(const nodeType &nt)
The shared memory of compute graph. Include 2 basic components.
std::vector< cbNode * > m_nodes
WFRedisTask * set(const std::vector< std::string > ¶ms, const redis_callback &callback_func=nullptr, void *usrData=nullptr, int32_t retryTimes=3)
size_t getMemUsed()
Get the Mem Used object.
std::vector< cbVirtualSharedTable * > m_dataFromDevice
std::vector< cbMySQLCell * > __luaPackedCellAsVec(cbMySQLCell *v)
trivial::cbRedisDevice * m_device
~cbOperatorNode() override
cbGraphSharedMem * m_sharedMem
void registerNode(cbNode *node)
register a node. Not used by lua binding.
cbShape< 2 > getShape()
Get the Shape object.
std::map< int32_t, int32_t > keyBy(const std::string &colName) const
void overload(const sol::function &func) override final
WFMySQLTask * query(const std::string &q, const mysql_callback &callback_func=nullptr)
friend cbRedisCachingNode