iotdb_util.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. from iotdb.SessionPool import SessionPool, PoolConfig
  2. from iotdb.utils.Field import Field
  3. from iotdb.utils.SessionDataSet import SessionDataSet
  4. from iotdb.utils.IoTDBConstants import TSDataType
  5. from threading import Lock
  6. # 官方示例 https://github.com/apache/iotdb/blob/rc/2.0.1/iotdb-client/client-py/session_pool_example.py
  7. # pip install apache-iotdb -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com
  8. """
  9. 使用方法
  10. # 初始化客户端,只需要程序启动时初始化一次
  11. IotDbClient('127.0.0.1', username='root', password='root')
  12. # 请求数据
  13. dataset = IotDbClient.query('select * from root.unimat.dev_64')
  14. # 解析数据
  15. parse_dataset_by_time(dataset)
  16. """
  17. class IotDbClient:
  18. _instance = None
  19. _lock = Lock()
  20. session_pool = None
  21. def __new__(cls, *args, **kwargs):
  22. with cls._lock:
  23. if not cls._instance:
  24. cls._instance = super().__new__(cls)
  25. return cls._instance
  26. def __init__(self, host, username='root', password='root', port=6667, pool_size=10, timeout_ms=3000,
  27. time_zone="UTC+8", max_retry=0):
  28. """
  29. 初始化客户端
  30. :param host: 服务器地址
  31. :param username: 用户名
  32. :param password: 密码
  33. :param port: 端口 默认 6667
  34. :param pool_size: 线程池大小
  35. :param timeout_ms: 请求超时
  36. :param time_zone: 时区
  37. :param max_retry: 最大重试次数
  38. """
  39. with self._lock:
  40. if not self.__class__.session_pool:
  41. self._pool_config = PoolConfig(host=host, port=str(port), user_name=username,
  42. password=password, fetch_size=1024,
  43. time_zone=time_zone, max_retry=max_retry)
  44. self.__class__.session_pool = SessionPool(self._pool_config, pool_size, timeout_ms)
  45. print("init iotDbClient")
  46. @classmethod
  47. def query(cls, sql) -> SessionDataSet:
  48. session = IotDbClient.session_pool.get_session()
  49. try:
  50. return session.execute_query_statement(sql)
  51. finally:
  52. IotDbClient.session_pool.put_back(session)
  53. def parse_dataset_by_time(dataset: SessionDataSet) -> list:
  54. """
  55. 解析iotdb 非last类型的查询结果
  56. [ {"time": 时间戳,"code1":value1,"code2":value2,...},{"time": 时间戳2,"code1":value1,"code2":value2,...}]
  57. :param dataset:
  58. :return:
  59. """
  60. column_names = dataset.column_names
  61. result = []
  62. while dataset.has_next():
  63. line = dataset.next()
  64. fields = line.get_fields()
  65. timed_line = {"time": line.get_timestamp()}
  66. for index, name in enumerate(column_names[1:]):
  67. code = name.split('.')[-1]
  68. field = fields[index]
  69. value = convert_data_type(field)
  70. timed_line[code] = value
  71. result.append(timed_line)
  72. return result
  73. def parse_dataset_to_list(dataset: SessionDataSet) -> dict:
  74. """
  75. 解析iotdb 非last类型的查询结果
  76. { "time":[],"code1":[],"code2":[],...}
  77. :param dataset:
  78. :return:
  79. """
  80. column_names = dataset.column_names
  81. result = {}
  82. timed_list = []
  83. while dataset.has_next():
  84. line = dataset.next()
  85. fields = line.get_fields()
  86. timed_list.append(line.get_timestamp())
  87. for index, name in enumerate(column_names[1:]):
  88. code = name.split('.')[-1]
  89. value = convert_data_type(fields[index])
  90. if code in result.keys():
  91. result[code].append(value)
  92. else:
  93. result[code] = [value]
  94. result['time'] = timed_list
  95. return result
  96. def parse_last_data_set(dataset: SessionDataSet):
  97. """
  98. 解析last查询的结果
  99. :param dataset:
  100. :return:
  101. """
  102. result = {}
  103. while dataset.has_next():
  104. line = dataset.next()
  105. # [ column_name, value, ts_data_type]
  106. fields = line.get_fields()
  107. column_name = str(fields[0]).split('.')[-1]
  108. column_value = convert_last_data_type(str(fields[1]), str(fields[2]))
  109. result[column_name] = {"time": line.get_timestamp(), "value": column_value}
  110. return result
  111. def convert_data_type(field: Field):
  112. """
  113. 解析iotdb数据
  114. :param field:
  115. :return:
  116. """
  117. ts_data_type = field.get_data_type()
  118. final_value = None
  119. if ts_data_type == TSDataType.FLOAT:
  120. final_value = field.get_float_value()
  121. elif ts_data_type == TSDataType.INT:
  122. final_value = field.get_int_value()
  123. elif ts_data_type == TSDataType.STRING:
  124. final_value = field.get_string_value()
  125. elif ts_data_type == TSDataType.BOOL:
  126. final_value = field.get_bool_value()
  127. elif ts_data_type == TSDataType.DATETIME:
  128. final_value = field.get_date_value()
  129. else:
  130. pass
  131. if final_value is None:
  132. return -1
  133. if ts_data_type == TSDataType.FLOAT:
  134. return float(final_value)
  135. def convert_last_data_type(value: str, data_type_name: str):
  136. """
  137. 解析last查询的数据类型
  138. :param value:
  139. :param data_type_name:
  140. :return:
  141. """
  142. if 'FLOAT' == data_type_name:
  143. return float(value)
  144. elif 'INT' == data_type_name:
  145. return int(value)
  146. elif 'TEXT' == data_type_name:
  147. return value
  148. else:
  149. return value
  150. def parse_dataset_by_code(dataset: SessionDataSet) -> dict:
  151. """
  152. 转换iotdb查询到的数据格式{"code1":[{time:time1,value:value1}],...}
  153. :param dataset:
  154. :return:
  155. """
  156. column_names = dataset.column_names
  157. result = {}
  158. while dataset.has_next():
  159. line = dataset.next()
  160. fields = line.get_fields()
  161. for index, name in enumerate(column_names[1:]):
  162. code = name.split('.')[-1]
  163. field = fields[index]
  164. value = convert_data_type(field)
  165. timed_value = {"time": line.get_timestamp(), "value": value}
  166. if code in result:
  167. result[code].append(timed_value)
  168. else:
  169. result[code] = [timed_value]
  170. return result
  171. if __name__ == '__main__':
  172. IotDbClient('127.0.0.1', username='root', password='root')
  173. dataSet = IotDbClient.query("select * from root.unimat.dev_64")
  174. print(parse_dataset_to_list(dataSet))