Covalent Bond  0.0.1-alpha
'covalent bond' is a data middle office. As a 2022-2023 Fall SWE final project.
cbVirtualDevice.hpp
Go to the documentation of this file.
1 
12 #ifndef __SERVER_CB_VIRTUAL_DEVICE_HPP_
13 #define __SERVER_CB_VIRTUAL_DEVICE_HPP_
14 
15 #ifdef _WIN32
16 #if _MSC_VER > 1000
17 #pragma once
18 #endif
19 #endif
20 
21 #if defined(__unix__) && defined(__clang__)
22 #pragma once
23 #endif
24 
25 #include "../../pch.hpp"
26 
27 #include <workflow/MySQLResult.h>
28 #include <workflow/WFMySQLConnection.h>
29 #include <workflow/WFRedisServer.h>
30 #include <workflow/WFTaskFactory.h>
31 
32 using mysql_callback = std::function<void(WFMySQLTask*)>;
33 using redis_callback = std::function<void(WFRedisTask*)>;
34 
35 namespace trivial {
40 enum class virtualDeviceType {
41  Redis = 0,
42  MySql = 1,
43  Kafka = 2,
44  HttpDefine = 3,
45 };
46 
48  cbVirtualDevice() = default;
49  cbVirtualDevice(const std::string& _port, const std::string& _host, const std::string& _userName,
50  const std::string& _passWord, const std::string& _charSet,
51  const std::string _dataBaseName, const virtualDeviceType& v)
52  : deviceType(v),
53  port(_port),
54  host(_host),
55  usrName(_userName),
56  passWord(_passWord),
57  charSet(_charSet),
58  dataBaseName(_dataBaseName) {}
59 
61  std::string port;
62  std::string host;
63  std::string usrName;
64  std::string passWord;
65  std::string charSet;
66  std::string dataBaseName;
67 
68  std::string Url;
69 
80  void reloadConnection(const std::string& _port, const std::string& _host,
81  const std::string& _userName, const std::string& _passWord,
82  const std::string& _charSet, const std::string& _dataBaseName);
83 
88  virtual void updateUrl() = 0;
89 };
90 
95 struct cbMySqlDevice final : public cbVirtualDevice {
97  m_conn->deinit();
98  delete m_conn;
99  }
100  cbMySqlDevice(int32_t _idx) : m_idx(_idx) { m_conn = new WFMySQLConnection(_idx); }
101  cbMySqlDevice(int32_t _idx, const std::string& _port, const std::string& _host,
102  const std::string& _userName, const std::string& _passWord,
103  const std::string& _dataBaseName, const std::string& _charSet = "")
104  : cbVirtualDevice(_port, _host, _userName, _passWord, _charSet, _dataBaseName,
106  m_idx(_idx) {
107  m_conn = new WFMySQLConnection(_idx);
108  // In my implementation. The mysql client should give the url like this:
109  // mysql://username:password@host:port/dbname?character_set=charset&character_set_results=charset
110  // And I assume the charset of result and original database are same.
111  std::stringstream ss;
112  ss << "mysql://" << _userName << ":" << _passWord << "@" << _host << ":" << _port << "/"
113  << _dataBaseName;
114  if (!_charSet.empty()) {
115  ss << "?character_set=" << _charSet << "&character_set_results=" << _charSet;
116  }
117  Url = ss.str();
118  // Init the connection;
119  m_conn->init(Url);
120  }
121 
126  void updateUrl() override final;
127 
135  WFMySQLTask* query(const std::string& q, const mysql_callback& callback_func = nullptr);
136 
144  std::vector<WFMySQLTask*> sequentialQuery(const std::vector<std::string>& q,
145  const std::vector<mysql_callback>& callback_func);
152  static void execMain(WFMySQLTask* t);
153 
160  static void execMain(const std::vector<WFMySQLTask*>& t);
161 
167  int32_t getIdx();
168 
169  private:
170  int32_t m_idx = 0;
171  WFMySQLConnection* m_conn = nullptr;
172 };
173 
178 struct cbRedisDevice final : public cbVirtualDevice {
180  cbRedisDevice(int32_t idx) : m_idx(idx) {}
181  cbRedisDevice(int32_t idx, const std::string& _port, const std::string& _host,
182  const std::string& _userName, const std::string& _passWord, int32_t dbnum,
183  bool isSsl = false)
184  : cbVirtualDevice(_port, _host, _userName, _passWord, "", "", virtualDeviceType::Redis),
185  m_isSSL(isSsl),
186  m_dbnum(dbnum),
187  m_idx(idx) {
188  // if no ssl: redis://:password@host:port/dbnum?query#fragment
189  // if ssl: rediss://:password@host:port/dbnum?query#fragment
190  updateUrl();
191  }
192 
193  void updateUrl() override final;
194 
204  WFRedisTask* set(const std::vector<std::string>& params,
205  const redis_callback& callback_func = nullptr, void* usrData = nullptr,
206  int32_t retryTimes = 3);
207 
217  WFRedisTask* get(const std::vector<std::string>& params,
218  const redis_callback& callback_func = nullptr, void* usrData = nullptr,
219  int32_t retryTimes = 3);
220 
230  WFRedisTask* exists(const std::vector<std::string>& params,
231  const redis_callback& callback_func = nullptr, void* usrData = nullptr,
232  int32_t retryTimes = 3);
233 
240  static void execMain(WFRedisTask* task);
241 
242  private:
243  bool m_isSSL = false;
244  int32_t m_dbnum = 0;
245  int32_t m_idx = 0;
246 };
247 
252 struct cbKafkaDevice final : public cbVirtualDevice {
253  void updateUrl() override final;
254 };
255 
261  public:
262  cbVirtualDeviceManager() = default;
264  cbVirtualDeviceManager operator=(const cbVirtualDeviceManager& t) = delete;
265 
267  for (auto& item : m_mySqlPool) {
268  if (item.second.second) { delete item.second.first; }
269  }
270  for (auto& item : m_redisPool) {
271  if (item.second.second) { delete item.second.first; }
272  }
273  for (auto& item : m_kafkaPool) {
274  if (item.second.second) { delete item.second.first; }
275  }
276  }
277 
278  void addMySqlDevice(cbMySqlDevice* vd);
279  void addRedisDevice(cbRedisDevice* vd);
280  void addKafkaDevice(cbKafkaDevice* vd);
281 
282  void removeMySqlDevice(int32_t idx);
283  void removeRedisDevice(int32_t idx);
284  void removeKafkaDevice(int32_t idx);
285 
286  cbMySqlDevice* getMySqlDevice(int32_t idx);
287  cbRedisDevice* getRedisDevice(int32_t idx);
288  cbKafkaDevice* getKafkaDevice(int32_t idx);
289 
290  static int32_t m_numsMySql;
291  static int32_t m_numsRedis;
292  static int32_t m_numsKafka;
293 
294  private:
295  std::map<int32_t, std::pair<cbMySqlDevice*, bool>> m_mySqlPool;
296  std::map<int32_t, std::pair<cbRedisDevice*, bool>> m_redisPool;
297  std::map<int32_t, std::pair<cbKafkaDevice*, bool>> m_kafkaPool;
298 };
299 
300 }; // namespace trivial
301 
302 #endif //! __SERVER_CB_VIRTUAL_DEVICE_HPP_
trivial
Definition: cbVirtualDevice.cpp:4
mysql_callback
std::function< void(WFMySQLTask *)> mysql_callback
_WIN32
Definition: cbVirtualDevice.hpp:32
trivial::cbVirtualDeviceManager::~cbVirtualDeviceManager
~cbVirtualDeviceManager()
Definition: cbVirtualDevice.hpp:266
trivial::virtualDeviceType::MySql
@ MySql
trivial::cbVirtualDevice::Url
std::string Url
Definition: cbVirtualDevice.hpp:68
trivial::cbVirtualDevice::cbVirtualDevice
cbVirtualDevice()=default
trivial::cbMySqlDevice::m_idx
int32_t m_idx
Definition: cbVirtualDevice.hpp:170
trivial::cbMySqlDevice::cbMySqlDevice
cbMySqlDevice(int32_t _idx, const std::string &_port, const std::string &_host, const std::string &_userName, const std::string &_passWord, const std::string &_dataBaseName, const std::string &_charSet="")
Definition: cbVirtualDevice.hpp:101
trivial::cbVirtualDeviceManager::m_kafkaPool
std::map< int32_t, std::pair< cbKafkaDevice *, bool > > m_kafkaPool
! index, {pointer, visibility}
Definition: cbVirtualDevice.hpp:297
trivial::cbVirtualDevice::updateUrl
virtual void updateUrl()=0
trivial::cbVirtualDevice
Definition: cbVirtualDevice.hpp:47
trivial::cbVirtualDeviceManager::m_numsKafka
static int32_t m_numsKafka
Definition: cbVirtualDevice.hpp:292
trivial::cbVirtualDevice::deviceType
virtualDeviceType deviceType
Definition: cbVirtualDevice.hpp:60
redis_callback
std::function< void(WFRedisTask *)> redis_callback
Definition: cbVirtualDevice.hpp:33
trivial::virtualDeviceType::HttpDefine
@ HttpDefine
trivial::cbVirtualDeviceManager::m_mySqlPool
std::map< int32_t, std::pair< cbMySqlDevice *, bool > > m_mySqlPool
Definition: cbVirtualDevice.hpp:295
trivial::cbMySqlDevice::m_conn
WFMySQLConnection * m_conn
Definition: cbVirtualDevice.hpp:171
trivial::virtualDeviceType::Redis
@ Redis
trivial::cbVirtualDevice::port
std::string port
Definition: cbVirtualDevice.hpp:61
trivial::virtualDeviceType::Kafka
@ Kafka
trivial::cbVirtualDevice::dataBaseName
std::string dataBaseName
Definition: cbVirtualDevice.hpp:66
trivial::cbVirtualDevice::cbVirtualDevice
cbVirtualDevice(const std::string &_port, const std::string &_host, const std::string &_userName, const std::string &_passWord, const std::string &_charSet, const std::string _dataBaseName, const virtualDeviceType &v)
Definition: cbVirtualDevice.hpp:49
trivial::cbMySqlDevice::updateUrl
void updateUrl() override final
Definition: cbVirtualDevice.cpp:17
trivial::cbRedisDevice::cbRedisDevice
cbRedisDevice(int32_t idx, const std::string &_port, const std::string &_host, const std::string &_userName, const std::string &_passWord, int32_t dbnum, bool isSsl=false)
Definition: cbVirtualDevice.hpp:181
trivial::cbMySqlDevice::execMain
static void execMain(WFMySQLTask *t)
User should not use this function. All task should be added to graph node in order to execute all tas...
Definition: cbVirtualDevice.cpp:51
trivial::cbRedisDevice::~cbRedisDevice
~cbRedisDevice()
Definition: cbVirtualDevice.hpp:179
trivial::virtualDeviceType
virtualDeviceType
Definition: cbVirtualDevice.hpp:40
trivial::cbKafkaDevice
Definition: cbVirtualDevice.hpp:252
trivial::cbVirtualDeviceManager
Definition: cbVirtualDevice.hpp:260
trivial::cbMySqlDevice
Definition: cbVirtualDevice.hpp:95
trivial::cbMySqlDevice::cbMySqlDevice
cbMySqlDevice(int32_t _idx)
Definition: cbVirtualDevice.hpp:100
trivial::cbMySqlDevice::~cbMySqlDevice
~cbMySqlDevice()
Definition: cbVirtualDevice.hpp:96
trivial::cbRedisDevice::cbRedisDevice
cbRedisDevice(int32_t idx)
Definition: cbVirtualDevice.hpp:180
trivial::cbRedisDevice
Definition: cbVirtualDevice.hpp:178
trivial::cbVirtualDevice::reloadConnection
void reloadConnection(const std::string &_port, const std::string &_host, const std::string &_userName, const std::string &_passWord, const std::string &_charSet, const std::string &_dataBaseName)
Definition: cbVirtualDevice.cpp:5
trivial::cbVirtualDevice::host
std::string host
Definition: cbVirtualDevice.hpp:62
trivial::cbMySqlDevice::sequentialQuery
std::vector< WFMySQLTask * > sequentialQuery(const std::vector< std::string > &q, const std::vector< mysql_callback > &callback_func)
Definition: cbVirtualDevice.cpp:35
trivial::cbVirtualDevice::passWord
std::string passWord
Definition: cbVirtualDevice.hpp:64
trivial::cbVirtualDevice::charSet
std::string charSet
Definition: cbVirtualDevice.hpp:65
trivial::cbVirtualDevice::usrName
std::string usrName
Definition: cbVirtualDevice.hpp:63
trivial::cbMySqlDevice::getIdx
int32_t getIdx()
Get the Idx object.
Definition: cbVirtualDevice.cpp:29
trivial::cbVirtualDeviceManager::m_numsRedis
static int32_t m_numsRedis
Definition: cbVirtualDevice.hpp:291
trivial::cbVirtualDeviceManager::m_redisPool
std::map< int32_t, std::pair< cbRedisDevice *, bool > > m_redisPool
! index, {pointer, visibility}
Definition: cbVirtualDevice.hpp:296
trivial::cbMySqlDevice::query
WFMySQLTask * query(const std::string &q, const mysql_callback &callback_func=nullptr)
Definition: cbVirtualDevice.cpp:31
trivial::cbVirtualDeviceManager::m_numsMySql
static int32_t m_numsMySql
Definition: cbVirtualDevice.hpp:290