Covalent Bond  0.0.1-alpha
'covalent bond' is a data middle office. As a 2022-2023 Fall SWE final project.
pipeline.cpp
Go to the documentation of this file.
1 #include <wfrest/json.hpp>
2 #include "pipeline.hpp"
3 #include "api.hpp"
4 
5 using namespace wfrest;
6 using Json = nlohmann::json;
7 
8 namespace cb {
9 namespace pipeline {
10 
11 graphContainer::graphContainer(int32_t n) : m_loopTime(n) {}
12 
14  for (auto item : m_graphs) { delete item; }
15 }
16 
18 
20  for (auto& item : m_graphs) {
21  if (item->getId() == idx) { return item; }
22  }
23  return nullptr;
24 }
25 
27  if (!m_isTerminated) {
28  WFGraphTask* graph = WFTaskFactory::create_graph_task([=](WFGraphTask* task) {
29  fmt::print(fg(fmt::color::steel_blue) | fmt::emphasis::italic,
30  "Graph task complete. Wakeup main process\n");
31  fmt::print(fg(fmt::color::steel_blue) | fmt::emphasis::italic,
32  "**************************************************\n");
33  });
34  WFTimerTask* timer = WFTaskFactory::create_timer_task(m_loopTime, 0, [=](WFTimerTask* task) {
35  fmt::print("Loops Graphs by {} sec.\n", m_loopTime);
36  if (!m_isTerminated) {
37  this->execMain();
38  } else {
39  fmt::print(fg(fmt::color::steel_blue) | fmt::emphasis::italic,
40  "Terminated. End loop the graph task. Waiting the graph in queue done.\n");
41  }
42  });
43  auto timerNode = &graph->create_graph_node(timer);
44  for (auto& item : m_graphs) {
45  (graph->create_graph_node(item->generateGraphTask()))-- > (*timerNode);
46  }
47  fmt::print(fg(fmt::color::steel_blue) | fmt::emphasis::italic,
48  "**************************************************\n");
49  graph->start();
50  } else {
51  fmt::print(fg(fmt::color::steel_blue) | fmt::emphasis::italic,
52  "Terminated. End loop the graph task. Waiting the graph in queue done.\n");
53  }
54 }
55 
56 void graphContainer::setTerminated(bool enable) { m_isTerminated = enable; }
57 
58 size_t graphContainer::nums() { return m_graphs.size(); }
59 
60 app::app(const appCfg& cfg)
61  : m_graphs(cfg.graphExecSec), m_web(cfg.webPort, cfg.webRoot), m_rHttp(cfg.rHttpPort) {
62  // connect Redis
64  this->m_VDM.addRedisDevice(new trivial::cbRedisDevice(redisCache, cfg.redisPort, cfg.redisHost,
65  "", cfg.redisPassword, cfg.redisDBNum,
66  cfg.redisSSL));
68 }
69 
81  m_rHttp().POST("/add_graph", [this](const HttpReq* req, HttpResp* resp) {
82  if (req->content_type() != APPLICATION_JSON) {
83  resp->set_status(HttpStatusBadRequest);
84  resp->set_header_pair("Content-Type", "application/json");
85  resp->append_output_body("{\"res\": \"False\"}");
86  return;
87  }
88  Json& kv = req->json();
89  auto t =
90  cb::apiCPP::addGraph(kv["id"].get<int>(), kv["script"].get<std::string>(), &this->m_VDM);
91  if (!t) {
92  resp->set_status(HttpStatusBadRequest);
93  resp->set_header_pair("Content-Type", "application/json");
94  resp->append_output_body("{\"res\": \"False\"}");
95  return;
96  }
97  this->m_graphs.addGraph(t);
98  resp->set_header_pair("Content-Type", "application/json");
99  resp->append_output_body("{\"res\": \"True\"}");
100  });
101 
102  m_rHttp().POST("/add_device", [this](const HttpReq* req, HttpResp* resp) {
103  if (req->content_type() != APPLICATION_JSON) {
104  resp->set_status(HttpStatusBadRequest);
105  resp->set_header_pair("Content-Type", "application/json");
106  resp->append_output_body("{\"res\": \"False\"}");
107  return;
108  }
109  Json& kv = req->json();
110 
111  // get the device type.
112  std::string deviceType = kv["deviceType"].get<std::string>();
113 
114  if (deviceType == "MySQL") {
115  std::string host = kv["host"].get<std::string>();
116  std::string port = kv["port"].get<std::string>();
117  std::string usrName = kv["usrName"].get<std::string>();
118  std::string password = kv["password"].get<std::string>();
119  std::string databaseName = kv["databaseName"].get<std::string>();
121  this->m_VDM.addMySqlDevice(
123  usrName, password, databaseName));
124  resp->set_header_pair("Content-Type", "application/json");
125  resp->append_output_body(fmt::format("{}\"res\": \"{}\" {}", "{", curIdx, "}"));
126  } else if (deviceType == "Redis") {
127  std::string host = kv["host"].get<std::string>();
128  std::string port = kv["port"].get<std::string>();
129  std::string usrName = kv["usrName"].get<std::string>();
130  std::string password = kv["password"].get<std::string>();
131  // TODO
132  }
133  });
134 
135  m_rHttp().GET("/table_at_graph", [=](const HttpReq* req, HttpResp* resp) {
136  if (!req->has_query("idx")) {
137  resp->set_status(500);
138  return;
139  }
140  int32_t _graphId = atoi(req->query("idx").c_str());
141  if (m_graphs.getGraph(_graphId) == nullptr) { return; }
142  auto outs = m_graphs.getGraph(_graphId)
143  ->getOutput();
144  Json kv;
145  kv["row"] = outs->m_shape[0];
146  kv["col"] = outs->m_shape[1];
147  if (outs->m_info.size() == 0) {
148  kv["tableName"] = "DefaultName";
149  } else {
150  kv["tableName"] = outs->m_info[0].getTable();
151  }
152  std::vector<std::string> _tmpColName;
153  for (auto& item : outs->m_info) { _tmpColName.push_back(item.getName()); }
154  kv["colName"] = _tmpColName;
155  resp->Json(kv);
156  });
157 
158  m_rHttp().GET("/num", [=](const HttpReq* req, HttpResp* resp) {
159  std::string tp = req->query("type");
160  if (tp == "task" || tp == "api") {
161  Json kv;
162  kv["res"] = m_graphs.nums();
163  resp->Json(kv);
164  } else if (tp == "device") {
165  Json kv;
167  resp->Json(kv);
168  }
169  });
170 
171  m_rHttp().GET("/trans_graph", [=](const HttpReq* req, HttpResp* resp) {
172  if (!req->has_query("idx")) {
173  resp->set_status(500);
174  return;
175  }
176  int32_t _graphId = atoi(req->query("idx").c_str());
177  if (m_graphs.getGraph(_graphId) == nullptr) { return; }
178  auto outs = m_graphs.getGraph(_graphId);
179  auto nodes = outs->getNodes();
180 
181  /*
182  resp->set_header_pair("Content-Type", "text/html");
183  resp->append_output_body("hhhhhhh");
184  resp->headers["Content-Type"] = "text/html";
185  return;
186  */
187  resp->headers["Content-Type"] = "text/html";
188  trans::outbase(resp);
189  std::map<cb::graph::cbNode*, cb::trans::opMapStruct> mapOpNode;
190  std::map<cb::graph::cbNode*, cb::trans::leafMapStruct> mapLeafNode;
191  std::map<int, cb::graph::cbNode*> toLeaf, toOpNode;
192  int opCodeNow = 0;
193  int leafCodeNow = 0;
194  for (auto item : nodes) {
195  switch (item->nodeT) {
196  case cb::graph::nodeType::Leaf: {
197  mapLeafNode[item].nodeCode = ++leafCodeNow;
198  mapOpNode[item->nextNode].inputNodeCode.push_back(leafCodeNow);
199  mapOpNode[item->nextNode].inputNum++;
200  toLeaf[leafCodeNow] = item;
201  break;
202  }
204  opCodeNow++;
205  mapOpNode[item].nodeCode = opCodeNow;
206  toOpNode[opCodeNow] = item;
207  break;
208  }
209  case cb::graph::nodeType::Output: break;
210  }
211  }
212  // set the node of position(Y)
213  int firstPosy = 0;
214  for (int i = 1; i < opCodeNow + 1; i++) {
215  firstPosy = firstPosy + 100;
216  auto opnode = toOpNode[i];
217  mapOpNode[opnode].posy = firstPosy;
218  for (auto j : mapOpNode[opnode].inputNodeCode) {
219  auto leafNode = toLeaf[j];
220  mapLeafNode[leafNode].posy = firstPosy;
221  firstPosy = firstPosy + 200;
222  }
223  mapOpNode[opnode].posy = (firstPosy + mapOpNode[opnode].posy) / 2;
224  }
225 
226  int finNodePosy = (mapOpNode[toOpNode[1]].posy + mapOpNode[toOpNode[opCodeNow]].posy) / 2;
227  trans::createFinNode(finNodePosy, opCodeNow, resp);
228  for (int i = 1; i < opCodeNow + 1; i++) {
229  auto opnode = toOpNode[i];
230  trans::createOpNode(resp, mapOpNode[opnode]);
231  for (auto j : mapOpNode[opnode].inputNodeCode) {
232  auto leafNode = toLeaf[j];
233  auto device_n = (graph::cbVirtualDeviceNode*)leafNode;
234  auto deviceNode = device_n->getDevice();
235  trans::createLeafNode(resp, mapLeafNode[leafNode], deviceNode);
236  trans::Node_leaf_connect(mapLeafNode[leafNode].nodeCode, i, mapOpNode[opnode].inputNumNow,
237  resp);
238  mapOpNode[opnode].inputNumNow++;
239  }
240  trans::Node_op_connect(i, i - 1, resp);
241  }
242  cb::trans::outbaseo(resp);
243  });
244 }
245 
246 void app::execMain() {
247  // init all
248  initRHttp();
249 
250  // exec all
251  m_graphs.execMain();
252  m_rHttp.execMain();
253  m_web.execMain();
254 }
255 
256 void app::stopMain() {
257  m_rHttp.stopMain();
258  m_web.stopMain();
259  m_graphs.setTerminated(true);
260 }
261 
262 } // namespace pipeline
263 } // namespace cb
cb::pipeline::appCfg::redisPort
const char * redisPort
Definition: pipeline.hpp:92
cb::graph::cbComputeGraph
Definition: cbComputeGraph.hpp:342
cb::pipeline::app::m_graphs
graphContainer m_graphs
Definition: pipeline.hpp:120
cb::trans::createOpNode
void createOpNode(HttpResp *resp, cb::trans::opMapStruct &opMapStruct)
Definition: api.cpp:76
cb::trans::outbaseo
void outbaseo(HttpResp *resp)
Definition: api.cpp:139
cb::graph::cbComputeGraph::getOutput
cbOutputTableStruct * getOutput()
Get the Output object.
Definition: cbComputeGraph.cpp:585
pipeline.hpp
The main pipeline, manage all resources.
cb::pipeline::app::app
app(const appCfg &cfg)
Definition: pipeline.cpp:60
cb::trans::createLeafNode
void createLeafNode(HttpResp *resp, cb::trans::leafMapStruct &leafMapStruct, trivial::cbMySqlDevice *leafNode)
Definition: api.cpp:38
cb::trans::createFinNode
void createFinNode(int posy, int opCodeNow, HttpResp *resp)
Definition: api.cpp:104
cb::pipeline::graphContainer::setTerminated
void setTerminated(bool enable)
Set the Terminated object.
Definition: pipeline.cpp:56
cb::trans::Node_op_connect
void Node_op_connect(int nodeleftnum, int nowinputsnum, HttpResp *resp)
Definition: api.cpp:132
cb::pipeline::graphContainer::addGraph
void addGraph(graph::cbComputeGraph *g)
Definition: pipeline.cpp:17
cb::graph::cbComputeGraph::getNodes
std::vector< cbNode * > getNodes()
Get the Nodes object.
Definition: cbComputeGraph.cpp:587
api.hpp
The C plus plus API section.
cb::pipeline::graphContainer::m_graphs
std::vector< graph::cbComputeGraph * > m_graphs
Definition: pipeline.hpp:81
cb::pipeline::graphContainer::m_loopTime
int32_t m_loopTime
Definition: pipeline.hpp:82
cb::graph::cbVirtualDeviceNode
This node include virtual device infomation and perform the final operation of how to get the data fr...
Definition: cbComputeGraph.hpp:253
cb::pipeline::appCfg
Definition: pipeline.hpp:85
cb::pipeline::graphContainer::m_isTerminated
bool m_isTerminated
Definition: pipeline.hpp:80
cb::pipeline::app::initRHttp
void initRHttp()
Definition: pipeline.cpp:70
cb::pipeline::appCfg::redisDBNum
int32_t redisDBNum
Definition: pipeline.hpp:93
cb
_WIN32
Definition: api.cpp:4
cb::pipeline::graphContainer::nums
size_t nums()
Definition: pipeline.cpp:58
trivial::cbVirtualDeviceManager::addRedisDevice
void addRedisDevice(cbRedisDevice *vd)
Definition: cbVirtualDevice.cpp:118
cb::pipeline::graphContainer::execMain
void execMain()
Definition: pipeline.cpp:26
cb::pipeline::appCfg::redisHost
const char * redisHost
Definition: pipeline.hpp:90
cb::pipeline::appCfg::redisPassword
const char * redisPassword
Definition: pipeline.hpp:91
cb::trans::Node_leaf_connect
void Node_leaf_connect(int nodeleftnum, int noderightnum, int nowinputsnum, HttpResp *resp)
Definition: api.cpp:126
trivial::cbMySqlDevice
Definition: cbVirtualDevice.hpp:95
cb::trans::outbase
void outbase(HttpResp *resp)
Definition: api.cpp:20
cb::graph::nodeType::Operator
@ Operator
Json
nlohmann::json Json
Definition: pipeline.cpp:6
trivial::cbRedisDevice
Definition: cbVirtualDevice.hpp:178
trivial::cbVirtualDeviceManager::addMySqlDevice
void addMySqlDevice(cbMySqlDevice *vd)
Definition: cbVirtualDevice.cpp:114
cb::pipeline::graphContainer::getGraph
graph::cbComputeGraph * getGraph(int32_t idx)
Get the Graph object.
Definition: pipeline.cpp:19
cb::pipeline::appCfg::redisSSL
bool redisSSL
Definition: pipeline.hpp:94
cb::pipeline::graphContainer::~graphContainer
~graphContainer()
Definition: pipeline.cpp:13
trivial::cbVirtualDeviceManager::m_numsRedis
static int32_t m_numsRedis
Definition: cbVirtualDevice.hpp:291
cbOutputTableStruct::m_shape
cbShape< 2 > m_shape
Definition: cbTable.hpp:786
cb::pipeline::app::m_VDM
trivial::cbVirtualDeviceManager m_VDM
Definition: pipeline.hpp:123
cb::graph::nodeType::Output
@ Output
cb::apiCPP::addGraph
cb::graph::cbComputeGraph * addGraph(int32_t idx, const std::string &cmd, trivial::cbVirtualDeviceManager *vdm)
Definition: api.cpp:7
cb::pipeline::app::m_rHttp
utils::cbRestfulHttpServer m_rHttp
Definition: pipeline.hpp:122
trivial::cbVirtualDeviceManager::m_numsMySql
static int32_t m_numsMySql
Definition: cbVirtualDevice.hpp:290