Covalent Bond  0.0.1-alpha
'covalent bond' is a data middle office. As a 2022-2023 Fall SWE final project.
cbVirtualDevice.cpp
Go to the documentation of this file.
1 #include "cbVirtualDevice.hpp"
2 #include <workflow/WFOperator.h>
3 
4 namespace trivial {
5 void cbVirtualDevice::reloadConnection(const std::string& _port, const std::string& _host,
6  const std::string& _userName, const std::string& _passWord,
7  const std::string& _charSet,
8  const std::string& _dataBaseName) {
9  port = _port;
10  host = _host;
11  usrName = _userName;
12  passWord = _passWord;
13  charSet = _charSet;
14  dataBaseName = _dataBaseName;
15 }
16 
18  std::stringstream ss;
19  ss << "mysql://" << usrName << ":" << passWord << "@" << host << ":" << port << "/"
20  << dataBaseName;
21  if (!charSet.empty()) {
22  ss << "?character_set=" << charSet << "&character_set_results=" << charSet;
23  }
24  Url = ss.str();
25  m_conn->deinit();
26  m_conn->init(Url);
27 }
28 
29 int32_t cbMySqlDevice::getIdx() { return m_idx; }
30 
31 WFMySQLTask* cbMySqlDevice::query(const std::string& q, const mysql_callback& callback_func) {
32  return m_conn->create_query_task(q, callback_func);
33 }
34 
35 std::vector<WFMySQLTask*> cbMySqlDevice::sequentialQuery(
36  const std::vector<std::string>& q, const std::vector<mysql_callback>& callback_func) {
37  if (q.size() != callback_func.size()) {
38  fmt::print(fg(fmt::color::crimson) | fmt::emphasis::bold,
39  "[ Error ] sequential task's vector size is not same. Get query size={}, but "
40  "callback_func size={}",
41  q.size(), callback_func.size());
42  }
43  size_t len_size = q.size();
44  std::vector<WFMySQLTask*> ans;
45  for (size_t i = 0; i < len_size; ++i) {
46  ans.push_back(m_conn->create_query_task(q[i], callback_func[i]));
47  }
48  return ans;
49 }
50 
51 void cbMySqlDevice::execMain(WFMySQLTask* t) { t->start(); }
52 
53 void cbMySqlDevice::execMain(const std::vector<WFMySQLTask*>& t) {
54  auto& generalTask = (*t[0]) > t[1];
55  size_t len_size = t.size();
56  for (size_t i = 2; i < len_size; ++i) { generalTask.push_back(t[i]); }
57  generalTask.start();
58 }
59 
61  // if no ssl: redis://:password@host:port/dbnum?query#fragment
62  // if ssl: rediss://:password@host:port/dbnum?query#fragment
63  std::stringstream ss;
64  if (m_isSSL) {
65  ss << "rediss://:";
66  } else {
67  ss << "redis://:";
68  }
69  ss << passWord << "@" << host << ":" << port << "/" << std::to_string(m_dbnum);
70  // << "?query#fragment";
71  Url = ss.str();
72 }
73 
74 WFRedisTask* cbRedisDevice::set(const std::vector<std::string>& params,
75  const redis_callback& callback_func, void* usrData,
76  int32_t retryTimes) {
77  WFRedisTask* task = WFTaskFactory::create_redis_task(Url, retryTimes, callback_func);
78  protocol::RedisRequest* req = task->get_req();
79  req->set_request("SET", params);
80  task->user_data = usrData;
81  return task;
82 }
83 
84 WFRedisTask* cbRedisDevice::get(const std::vector<std::string>& params,
85  const redis_callback& callback_func, void* usrData,
86  int32_t retryTimes) {
87  WFRedisTask* task = WFTaskFactory::create_redis_task(Url, retryTimes, callback_func);
88  protocol::RedisRequest* req = task->get_req();
89  req->set_request("GET", params);
90  task->user_data = usrData;
91  return task;
92 }
93 
94 WFRedisTask* cbRedisDevice::exists(const std::vector<std::string>& params,
95  const redis_callback& callback_func, void* usrData,
96  int32_t retryTimes) {
97  WFRedisTask* task = WFTaskFactory::create_redis_task(Url, retryTimes, callback_func);
98  protocol::RedisRequest* req = task->get_req();
99  req->set_request("EXISTS", params);
100  task->user_data = usrData;
101  return task;
102 }
103 
104 void cbRedisDevice::execMain(WFRedisTask* task) { task->start(); }
105 
107  // TODO
108 }
109 
113 
115  m_mySqlPool[m_numsMySql++] = {vd, true};
116 }
117 
119  m_redisPool[m_numsRedis++] = {vd, true};
120 }
121 
123  m_kafkaPool[m_numsKafka++] = {vd, true};
124 }
125 
127  delete m_mySqlPool[idx].first;
128  m_mySqlPool[idx].second = false;
129 }
130 
132  delete m_redisPool[idx].first;
133  m_redisPool[idx].second = false;
134 }
135 
137  delete m_kafkaPool[idx].first;
138  m_kafkaPool[idx].second = false;
139 }
140 
142  if (m_mySqlPool[idx].second) { return m_mySqlPool[idx].first; }
143  return nullptr;
144 }
145 
147  if (m_redisPool[idx].second) { return m_redisPool[idx].first; }
148  return nullptr;
149 }
150 
152  if (m_kafkaPool[idx].second) { return m_kafkaPool[idx].first; }
153  return nullptr;
154 }
155 
156 } // namespace trivial
trivial
Definition: cbVirtualDevice.cpp:4
mysql_callback
std::function< void(WFMySQLTask *)> mysql_callback
_WIN32
Definition: cbVirtualDevice.hpp:32
trivial::cbVirtualDevice::Url
std::string Url
Definition: cbVirtualDevice.hpp:68
trivial::cbMySqlDevice::m_idx
int32_t m_idx
Definition: cbVirtualDevice.hpp:170
trivial::cbVirtualDeviceManager::m_kafkaPool
std::map< int32_t, std::pair< cbKafkaDevice *, bool > > m_kafkaPool
! index, {pointer, visibility}
Definition: cbVirtualDevice.hpp:297
trivial::cbRedisDevice::updateUrl
void updateUrl() override final
Definition: cbVirtualDevice.cpp:60
trivial::cbVirtualDeviceManager::m_numsKafka
static int32_t m_numsKafka
Definition: cbVirtualDevice.hpp:292
trivial::cbRedisDevice::execMain
static void execMain(WFRedisTask *task)
User should not use this function. All task should be added to graph node in order to execute all tas...
Definition: cbVirtualDevice.cpp:104
trivial::cbRedisDevice::get
WFRedisTask * get(const std::vector< std::string > &params, const redis_callback &callback_func=nullptr, void *usrData=nullptr, int32_t retryTimes=3)
Definition: cbVirtualDevice.cpp:84
redis_callback
std::function< void(WFRedisTask *)> redis_callback
Definition: cbVirtualDevice.hpp:33
trivial::cbVirtualDeviceManager::getKafkaDevice
cbKafkaDevice * getKafkaDevice(int32_t idx)
Definition: cbVirtualDevice.cpp:151
trivial::cbVirtualDeviceManager::getRedisDevice
cbRedisDevice * getRedisDevice(int32_t idx)
Definition: cbVirtualDevice.cpp:146
trivial::cbKafkaDevice::updateUrl
void updateUrl() override final
Definition: cbVirtualDevice.cpp:106
trivial::cbVirtualDeviceManager::m_mySqlPool
std::map< int32_t, std::pair< cbMySqlDevice *, bool > > m_mySqlPool
Definition: cbVirtualDevice.hpp:295
trivial::cbMySqlDevice::m_conn
WFMySQLConnection * m_conn
Definition: cbVirtualDevice.hpp:171
trivial::cbVirtualDevice::port
std::string port
Definition: cbVirtualDevice.hpp:61
trivial::cbRedisDevice::exists
WFRedisTask * exists(const std::vector< std::string > &params, const redis_callback &callback_func=nullptr, void *usrData=nullptr, int32_t retryTimes=3)
Definition: cbVirtualDevice.cpp:94
trivial::cbVirtualDevice::dataBaseName
std::string dataBaseName
Definition: cbVirtualDevice.hpp:66
trivial::cbVirtualDeviceManager::addRedisDevice
void addRedisDevice(cbRedisDevice *vd)
Definition: cbVirtualDevice.cpp:118
trivial::cbVirtualDeviceManager::removeMySqlDevice
void removeMySqlDevice(int32_t idx)
Definition: cbVirtualDevice.cpp:126
trivial::cbMySqlDevice::updateUrl
void updateUrl() override final
Definition: cbVirtualDevice.cpp:17
trivial::cbRedisDevice::m_dbnum
int32_t m_dbnum
Definition: cbVirtualDevice.hpp:244
trivial::cbRedisDevice::m_isSSL
bool m_isSSL
Definition: cbVirtualDevice.hpp:243
trivial::cbMySqlDevice::execMain
static void execMain(WFMySQLTask *t)
User should not use this function. All task should be added to graph node in order to execute all tas...
Definition: cbVirtualDevice.cpp:51
trivial::cbVirtualDeviceManager::getMySqlDevice
cbMySqlDevice * getMySqlDevice(int32_t idx)
Definition: cbVirtualDevice.cpp:141
trivial::cbKafkaDevice
Definition: cbVirtualDevice.hpp:252
cbVirtualDevice.hpp
abstract virtual device. Provide MySql/Redis/Kafka warper. All virtual device will handle the connect...
trivial::cbMySqlDevice
Definition: cbVirtualDevice.hpp:95
trivial::cbVirtualDeviceManager::removeRedisDevice
void removeRedisDevice(int32_t idx)
Definition: cbVirtualDevice.cpp:131
trivial::cbVirtualDeviceManager::addKafkaDevice
void addKafkaDevice(cbKafkaDevice *vd)
Definition: cbVirtualDevice.cpp:122
trivial::cbVirtualDeviceManager::removeKafkaDevice
void removeKafkaDevice(int32_t idx)
Definition: cbVirtualDevice.cpp:136
trivial::cbRedisDevice
Definition: cbVirtualDevice.hpp:178
trivial::cbVirtualDevice::reloadConnection
void reloadConnection(const std::string &_port, const std::string &_host, const std::string &_userName, const std::string &_passWord, const std::string &_charSet, const std::string &_dataBaseName)
Definition: cbVirtualDevice.cpp:5
trivial::cbVirtualDevice::host
std::string host
Definition: cbVirtualDevice.hpp:62
trivial::cbMySqlDevice::sequentialQuery
std::vector< WFMySQLTask * > sequentialQuery(const std::vector< std::string > &q, const std::vector< mysql_callback > &callback_func)
Definition: cbVirtualDevice.cpp:35
trivial::cbRedisDevice::set
WFRedisTask * set(const std::vector< std::string > &params, const redis_callback &callback_func=nullptr, void *usrData=nullptr, int32_t retryTimes=3)
Definition: cbVirtualDevice.cpp:74
trivial::cbVirtualDeviceManager::addMySqlDevice
void addMySqlDevice(cbMySqlDevice *vd)
Definition: cbVirtualDevice.cpp:114
trivial::cbVirtualDevice::passWord
std::string passWord
Definition: cbVirtualDevice.hpp:64
trivial::cbVirtualDevice::charSet
std::string charSet
Definition: cbVirtualDevice.hpp:65
trivial::cbVirtualDevice::usrName
std::string usrName
Definition: cbVirtualDevice.hpp:63
trivial::cbMySqlDevice::getIdx
int32_t getIdx()
Get the Idx object.
Definition: cbVirtualDevice.cpp:29
trivial::cbVirtualDeviceManager::m_numsRedis
static int32_t m_numsRedis
Definition: cbVirtualDevice.hpp:291
trivial::cbVirtualDeviceManager::m_redisPool
std::map< int32_t, std::pair< cbRedisDevice *, bool > > m_redisPool
! index, {pointer, visibility}
Definition: cbVirtualDevice.hpp:296
trivial::cbMySqlDevice::query
WFMySQLTask * query(const std::string &q, const mysql_callback &callback_func=nullptr)
Definition: cbVirtualDevice.cpp:31
trivial::cbVirtualDeviceManager::m_numsMySql
static int32_t m_numsMySql
Definition: cbVirtualDevice.hpp:290