Covalent Bond  0.0.1-alpha
'covalent bond' is a data middle office. As a 2022-2023 Fall SWE final project.
cbComputeGraph.hpp
Go to the documentation of this file.
1 
16 #ifndef __SERVER_CB_COMPUTE_GRAPH_HPP_
17 #define __SERVER_CB_COMPUTE_GRAPH_HPP_
18 
19 #ifdef _WIN32
20 #if _MSC_VER > 1000
21 #pragma once
22 #endif
23 #endif
24 
25 #if defined(__unix__) && defined(__clang__)
26 #pragma once
27 #endif
28 
29 #include <workflow/WFGraphTask.h>
30 #include <workflow/WFFacilities.h>
31 #include <workflow/WFTaskFactory.h>
32 
33 #include "luaEngine.hpp"
34 #include "cbOperator.hpp"
36 
37 typedef std::function<void(WFGraphTask*)> graph_callback;
38 
39 namespace cb {
40 namespace graph {
41 
42 class cbComputeGraph;
43 
51 enum class nodeType : uint32_t {
52  Leaf = 1,
53  Output = 2,
54  Operator = 3,
55 };
56 
64  public:
69  cbGraphSharedMem() = default;
70 
76  cbGraphSharedMem(cbGraphSharedMem& rhs) = delete;
77 
83  cbGraphSharedMem(const cbGraphSharedMem& rhs) = delete;
84 
92 
99  cbGraphSharedMem operator=(const cbGraphSharedMem& rhs) = delete;
100 
106 
112  void push(cbVirtualSharedTable* v);
113 
119  void push(cbMySQLCell* v);
120 
126  void push(cbMySQLField* v);
127 
134  void setOutStruct(const cbShape<2>& shape, cbMySQLField** info);
135 
141  size_t getMemUsed();
142 
148  int32_t getCellNum();
149 
156 
157  void clear();
158 
159  private:
161  std::vector<cbVirtualSharedTable*> m_dataFromDevice;
162  std::vector<cbMySQLCell*> m_dataPool;
163  std::vector<cbMySQLField*> m_fields;
164 };
165 
175  cbGraphSharedLuaStack() = default;
176 
182 
189 
195  void execScriptFile(const std::string& filePath);
196 
202  void execScript(const std::string& script);
203 
209  luaJitThread& get();
210 
211  private:
213 };
214 
219 struct cbNode {
224  virtual ~cbNode(){};
225  cbNode(const nodeType& nt);
226  void PointTo(cbNode* ptr);
227 
228  virtual void* generateTask() = 0;
229 
230  static WFMySQLTask* asSQLTask(void* metaTask);
231  static WFGoTask* asGoTask(void* metaTask);
232 
233  // The output.
235 
236  // others
237  bool isFinalOutput = false;
238  bool visited = false;
240  void* task = nullptr;
241  cbNode* nextNode = nullptr;
242  cbComputeGraph* graph = nullptr;
243 };
244 
253 struct cbVirtualDeviceNode final : public cbNode {
255 
256  ~cbVirtualDeviceNode() override final;
258 
264  void* generateTask() override final;
265 
271  void addQuery(const std::string& q);
272 
278  trivial::cbMySqlDevice* getDevice();
279 
280  private:
281  void setMySQLDevice(trivial::cbMySqlDevice* device = nullptr);
282  trivial::cbMySqlDevice* m_device;
283  std::vector<std::string> m_queries;
284 };
285 
286 struct cbRedisCachingNode final : public cbNode {
288 
289  ~cbRedisCachingNode() override final;
290  cbRedisCachingNode() = delete;
291  cbRedisCachingNode(int32_t idx);
292  cbRedisCachingNode(const cbRedisCachingNode& v) = delete;
293 
294  cbRedisCachingNode operator=(const cbRedisCachingNode& v) = delete;
295 
296  void* generateTask() override final;
297 
298  WFRedisTask* _generateSetTask(const std::vector<std::string>& params,
299  const redis_callback& callback_func = nullptr,
300  void* usrData = nullptr, int32_t retryTimes = 3);
301 
302  private:
303  void setRedisDevice(trivial::cbRedisDevice* device = nullptr);
304  int32_t m_idx;
305  trivial::cbRedisDevice* m_device = nullptr;
306 };
307 
313 struct cbOperatorNode : public cbNode {
315 
316  ~cbOperatorNode() override;
317  cbOperatorNode(baseOp* op);
318 
324  void* generateTask() override;
325 
331  void overload(sol::function& funcPtr);
332 
334 };
335 
336 // operator Node;
337 
343  public:
347 
348  cbComputeGraph(int32_t idx);
349  ~cbComputeGraph();
350 
357  bool isDAG();
358 
365  bool isSingleOutput();
366 
367  // create cells belongs to this graph.
373  cbMySQLCell* createCell();
374 
381  cbMySQLCell* createCell(int value);
382 
389  cbMySQLCell* createCell(float value);
390 
397  cbMySQLCell* createCell(double value);
398 
405  cbMySQLCell* createCell(unsigned long long value);
406 
413  cbMySQLCell* createCell(const std::string& value);
414 
422  cbMySQLCell* createCell(const std::string& value, const cbMySQLType& t);
423 
430  cbVirtualDeviceNode* createVirtualDeviceNode(int32_t idx);
431 
438  cbRedisCachingNode* createRedisCachingNode(int32_t idx);
439 
447  cbOperatorNode* createCombineNode(const std::vector<std::string>& keys, const std::string& name);
448 
456  cbOperatorNode* createFilterNode(const sol::function& boolF, const sol::function& exF);
457 
463  void setVirtualDeviceManager(trivial::cbVirtualDeviceManager* virtualDeviceM);
464 
470  void registerNode(cbNode* node);
471 
477  int32_t getId();
478 
485  WFGraphTask* generateGraphTask(const graph_callback& func = nullptr);
486 
493  static void execMain(WFGraphTask* task, cbComputeGraph* graph);
494 
500  void execScriptFile(const std::string& filePath);
501 
507  void execScript(const std::string& script);
508 
514  void addCacheServer(cbRedisCachingNode* v);
515 
521  cbOutputTableStruct* getOutput();
522 
528  std::vector<cbNode*> getNodes();
529 
530  private:
531  int32_t m_idx = 0;
533  std::vector<cbNode*> m_nodes;
534  cbRedisCachingNode* m_cacheNode = nullptr;
535  cbGraphSharedMem* m_sharedMem = nullptr;
536  cbGraphSharedLuaStack* m_sharedLuaStack = nullptr;
537  // Have the state of all Virtual device
538  trivial::cbVirtualDeviceManager* m_virtualDevice = nullptr;
539 };
540 
541 }; // namespace graph
542 } // namespace cb
543 
544 #endif //!__SERVER_CB_COMPUTE_GRAPH_HPP_
trivial
Definition: cbVirtualDevice.cpp:4
cb::graph::cbGraphSharedMem::clear
void clear()
Definition: cbComputeGraph.cpp:46
cb::graph::cbComputeGraph
Definition: cbComputeGraph.hpp:342
luaJitThread
Definition: luaEngine.hpp:46
cb::graph::cbNode::asSQLTask
static WFMySQLTask * asSQLTask(void *metaTask)
Definition: cbComputeGraph.cpp:68
cb::graph::nodeType
nodeType
The node of compute graphs has 3 types: Leaf: the input, mostly the Sql device Output: the virtual ta...
Definition: cbComputeGraph.hpp:51
cb::listNode
Elements of list.
Definition: cbLRUCache.hpp:26
cb::graph::cbGraphSharedMem::m_fields
std::vector< cbMySQLField * > m_fields
Definition: cbComputeGraph.hpp:163
cb::graph::cbGraphSharedLuaStack::m_lua
luaJitThread m_lua
Definition: cbComputeGraph.hpp:212
redis_callback
std::function< void(WFRedisTask *)> redis_callback
Definition: cbVirtualDevice.hpp:33
cb::graph::cbComputeGraph::io
cbOpIO io
Definition: cbComputeGraph.hpp:532
cbOutputTableStruct
Definition: cbTable.hpp:776
cb::graph::cbRedisCachingNode::cbComputeGraph
friend cbComputeGraph
Definition: cbComputeGraph.hpp:287
cb::graph::cbOperatorNode
A operator node. Generate all go task. Then pass the output to the next node's inputs.
Definition: cbComputeGraph.hpp:313
cbMySQLField
A copy move from workflow MySQLResult.h and .inl file.
Definition: cbTable.hpp:404
graph_callback
std::function< void(WFGraphTask *)> graph_callback
_WIN32
Definition: cbComputeGraph.hpp:37
cb::graph::cbGraphSharedMem::getOutStruct
cbOutputTableStruct * getOutStruct()
Get the Out Struct object.
Definition: cbComputeGraph.cpp:44
cb::graph::cbComputeGraph::cbOperatorNode
friend cbOperatorNode
Definition: cbComputeGraph.hpp:346
cb::graph::cbNode::generateTask
virtual void * generateTask()=0
cb::graph::cbOperatorNode::cbComputeGraph
friend cbComputeGraph
Definition: cbComputeGraph.hpp:314
cb::graph::cbVirtualDeviceNode::getDevice
trivial::cbMySqlDevice * getDevice()
Get the Device object.
Definition: cbComputeGraph.cpp:118
cb::graph::cbGraphSharedMem::m_outStruct
cbOutputTableStruct * m_outStruct
Definition: cbComputeGraph.hpp:160
cb::graph::cbGraphSharedMem::cbGraphSharedMem
cbGraphSharedMem()=default
Construct a new cb Graph Shared Mem object.
cb::graph::cbRedisCachingNode
Definition: cbComputeGraph.hpp:286
cb::graph::cbNode::task
void * task
Definition: cbComputeGraph.hpp:240
cb::graph::cbVirtualDeviceNode
This node include virtual device infomation and perform the final operation of how to get the data fr...
Definition: cbComputeGraph.hpp:253
cbMySQLCell
Definition: cbTable.hpp:468
cb::graph::cbNode
Definition: cbComputeGraph.hpp:219
cb::graph::cbGraphSharedLuaStack::get
luaJitThread & get()
get the lua state.
Definition: cbComputeGraph.cpp:62
cb::graph::cbComputeGraph::cbVirtualDeviceNode
friend cbVirtualDeviceNode
Definition: cbComputeGraph.hpp:344
cb::graph::cbNode::isFinalOutput
bool isFinalOutput
Definition: cbComputeGraph.hpp:237
cb
_WIN32
Definition: api.cpp:4
cb::graph::cbNode::io
cbOpIO io
Definition: cbComputeGraph.hpp:234
cb::graph::cbGraphSharedMem::getCellNum
int32_t getCellNum()
Get the Cell Num object.
Definition: cbComputeGraph.cpp:35
cbOpIO
The input and output of one Operator.
Definition: cbOperator.hpp:38
cb::graph::cbVirtualDeviceNode::~cbVirtualDeviceNode
~cbVirtualDeviceNode() override final
Definition: cbComputeGraph.cpp:72
cb::graph::cbNode::nodeT
nodeType nodeT
Definition: cbComputeGraph.hpp:239
cb::graph::cbVirtualDeviceNode::addQuery
void addQuery(const std::string &q)
Add a String type sql sentence to this node. Waiting to be execute.
Definition: cbComputeGraph.cpp:116
cb::graph::cbGraphSharedMem::setOutStruct
void setOutStruct(const cbShape< 2 > &shape, cbMySQLField **info)
Set the Out Struct object.
Definition: cbComputeGraph.cpp:18
cb::graph::cbVirtualDeviceNode::m_queries
std::vector< std::string > m_queries
Definition: cbComputeGraph.hpp:283
cbShape< 2 >
cb::graph::nodeType::Leaf
@ Leaf
cb::graph::cbGraphSharedLuaStack::operator=
cbGraphSharedLuaStack operator=(const cbGraphSharedLuaStack &)=delete
operator of const = is deleted
cb::graph::cbNode::graph
cbComputeGraph * graph
Definition: cbComputeGraph.hpp:242
cb::graph::cbGraphSharedMem::m_dataPool
std::vector< cbMySQLCell * > m_dataPool
Definition: cbComputeGraph.hpp:162
cb::graph::cbGraphSharedLuaStack
Definition: cbComputeGraph.hpp:170
cbVirtualSharedTable
cbVirtualSharedTable is a container of shared memory.
Definition: cbTable.hpp:518
cb::graph::cbNode::~cbNode
virtual ~cbNode()
Destroy the cb Node object Virtual function.
Definition: cbComputeGraph.hpp:224
cb::graph::cbOperatorNode::Op
baseOp * Op
Definition: cbComputeGraph.hpp:333
cbOperator.hpp
The operator of compute graph.
cb::graph::cbGraphSharedLuaStack::execScript
void execScript(const std::string &script)
execute the hard coded script.
Definition: cbComputeGraph.cpp:60
trivial::cbVirtualDeviceManager
Definition: cbVirtualDevice.hpp:260
cbVirtualDevice.hpp
abstract virtual device. Provide MySql/Redis/Kafka warper. All virtual device will handle the connect...
cb::graph::cbGraphSharedMem::push
void push(cbVirtualSharedTable *v)
Push a virtual shared table to mem of graph.
Definition: cbComputeGraph.cpp:12
cb::graph::cbNode::nextNode
cbNode * nextNode
Definition: cbComputeGraph.hpp:241
cb::graph::cbNode::visited
bool visited
Definition: cbComputeGraph.hpp:238
cbMySQLType
cbMySQLType
Definition: cbTable.hpp:375
cb::graph::cbGraphSharedLuaStack::execScriptFile
void execScriptFile(const std::string &filePath)
execute the script file from disk directly.
Definition: cbComputeGraph.cpp:56
cb::graph::cbGraphSharedMem::~cbGraphSharedMem
~cbGraphSharedMem()
Destroy the cb Graph Shared Mem object.
Definition: cbComputeGraph.cpp:7
cb::graph::cbVirtualDeviceNode::setMySQLDevice
void setMySQLDevice(trivial::cbMySqlDevice *device=nullptr)
Definition: cbComputeGraph.cpp:120
cb::graph::cbVirtualDeviceNode::m_device
trivial::cbMySqlDevice * m_device
Definition: cbComputeGraph.hpp:282
cb::graph::nodeType::Operator
@ Operator
baseOp
basic operator.
Definition: cbOperator.hpp:47
cb::graph::cbNode::PointTo
void PointTo(cbNode *ptr)
Definition: cbComputeGraph.cpp:66
cb::graph::cbGraphSharedLuaStack::cbGraphSharedLuaStack
cbGraphSharedLuaStack()=default
Construct a new cb Graph Shared Lua Stack object.
cb::graph::cbVirtualDeviceNode::generateTask
void * generateTask() override final
generate a SQL wf task for now.
Definition: cbComputeGraph.cpp:76
cb::graph::cbNode::asGoTask
static WFGoTask * asGoTask(void *metaTask)
Definition: cbComputeGraph.cpp:70
cb::graph::cbNode::cbNode
cbNode(const nodeType &nt)
Definition: cbComputeGraph.cpp:64
cb::graph::cbGraphSharedMem
The shared memory of compute graph. Include 2 basic components.
Definition: cbComputeGraph.hpp:63
cb::graph::cbComputeGraph::m_nodes
std::vector< cbNode * > m_nodes
Definition: cbComputeGraph.hpp:533
cb::graph::cbVirtualDeviceNode::cbComputeGraph
friend cbComputeGraph
Definition: cbComputeGraph.hpp:254
cb::graph::cbGraphSharedMem::operator=
cbGraphSharedMem operator=(cbGraphSharedMem &rhs)=delete
operator of = is deleted
cb::graph::cbGraphSharedMem::getMemUsed
size_t getMemUsed()
Get the Mem Used object.
Definition: cbComputeGraph.cpp:26
cb::graph::cbGraphSharedMem::m_dataFromDevice
std::vector< cbVirtualSharedTable * > m_dataFromDevice
Definition: cbComputeGraph.hpp:161
cb::graph::nodeType::Output
@ Output
cb::graph::cbComputeGraph::cbRedisCachingNode
friend cbRedisCachingNode
Definition: cbComputeGraph.hpp:345
luaEngine.hpp
A lua engine for execute all c++ embedding in struct. A warper of lua JIT engine.