iotdb_util.py 9.6 KB


  1. import datetime
  2. import json
  3. from iotdb.SessionPool import SessionPool, PoolConfig
  4. from iotdb.utils.Field import Field
  5. from iotdb.utils.SessionDataSet import SessionDataSet
  6. from iotdb.utils.IoTDBConstants import TSDataType
  7. from threading import Lock
  8. # 官方示例 https://github.com/apache/iotdb/blob/rc/2.0.1/iotdb-client/client-py/session_pool_example.py
  9. # pip install apache-iotdb -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com
  10. """
  11. # 初始化客户端,只需要程序启动时初始化一次
  12. IotDbClient('127.0.0.1', username='root', password='root')
  13. # 请求数据
  14. dataset = IotDbClient.query('select * from root.unimat.dev_64')
  15. # 解析数据
  16. parse_dataset_by_time(dataset)
  17. """
  18. class IotDbClient:
  19. _instance = None
  20. _lock = Lock()
  21. session_pool = None
  22. tz = datetime.timezone(datetime.timedelta(hours=8))
  23. def __new__(cls, *args, **kwargs):
  24. with cls._lock:
  25. if not cls._instance:
  26. cls._instance = super().__new__(cls)
  27. return cls._instance
  28. def __init__(self, host, username='root', password='root', port=6667, pool_size=100, timeout_ms=3000,
  29. time_zone="UTC+8", max_retry=0):
  30. """
  31. 初始化客户端
  32. :param host: 服务器地址
  33. :param username: 用户名
  34. :param password: 密码
  35. :param port: 端口 默认 6667
  36. :param pool_size: 线程池大小
  37. :param timeout_ms: 请求超时
  38. :param time_zone: 时区
  39. :param max_retry: 最大重试次数
  40. """
  41. with self._lock:
  42. if not self.__class__.session_pool:
  43. self._pool_config = PoolConfig(host=host, port=str(port), user_name=username,
  44. password=password, fetch_size=1024,
  45. time_zone=time_zone, max_retry=max_retry)
  46. self.__class__.session_pool = SessionPool(self._pool_config, pool_size, timeout_ms)
  47. @classmethod
  48. def query(cls, sql) -> SessionDataSet:
  49. session = IotDbClient.session_pool.get_session()
  50. try:
  51. return session.execute_query_statement(sql)
  52. finally:
  53. IotDbClient.session_pool.put_back(session)
  54. def parse_dataset_to_list(dataset: SessionDataSet, timedela=600) -> dict:
  55. """
  56. 解析iotdb 非last类型的查询结果
  57. { "time":[],"code1":[],"code2":[],...}
  58. :param dataset:
  59. :param timedela: 对查询结果时间进行偏移
  60. :return:
  61. """
  62. column_names = dataset.column_names
  63. result = {}
  64. timed_list = []
  65. while dataset.has_next():
  66. line = dataset.next()
  67. fields = line.get_fields()
  68. timestamp = line.get_timestamp() / 1000
  69. origin_time = datetime.datetime.fromtimestamp(timestamp, IotDbClient.tz)
  70. center_time = origin_time
  71. if timedela is not None:
  72. center_time = origin_time + datetime.timedelta(seconds=timedela)
  73. str_time = center_time.strftime('%Y-%m-%d %H:%M:%S')
  74. timed_list.append(str_time)
  75. for index, name in enumerate(column_names[1:]):
  76. code = name.split('.')[-1]
  77. v = fields[index]
  78. value = convert_data_type(v)
  79. if code in result.keys():
  80. result[code].append(value)
  81. else:
  82. result[code] = [value]
  83. result['dateTime'] = timed_list
  84. return result
  85. def parse_last_data_set(dataset: SessionDataSet):
  86. """
  87. 解析last查询的结果
  88. :param dataset:
  89. :return:
  90. """
  91. result = {}
  92. while dataset.has_next():
  93. line = dataset.next()
  94. # [ column_name, value, ts_data_type]
  95. fields = line.get_fields()
  96. column_name = str(fields[0]).split('.')[-1]
  97. column_value = convert_last_data_type(str(fields[1]), str(fields[2]))
  98. result[column_name] = {"time": line.get_timestamp(), "value": column_value}
  99. return result
  100. def convert_data_type(field: Field):
  101. """
  102. 解析iotdb数据
  103. :param field:
  104. :return:
  105. """
  106. ts_data_type = field.get_data_type()
  107. if ts_data_type is None:
  108. return None
  109. final_value = field.get_string_value()
  110. if final_value is None or final_value == 'None':
  111. return None
  112. if ts_data_type == TSDataType.FLOAT:
  113. return float(final_value)
  114. if ts_data_type == TSDataType.INT32:
  115. return int(final_value)
  116. def convert_last_data_type(value: str, data_type_name: str):
  117. """
  118. 解析last查询的数据类型
  119. :param value:
  120. :param data_type_name:
  121. :return:
  122. """
  123. if 'FLOAT' == data_type_name:
  124. return float(value)
  125. elif 'INT' == data_type_name:
  126. return int(value)
  127. elif 'TEXT' == data_type_name:
  128. return value
  129. else:
  130. return value
  131. def query_terminal_7days(dbpath="root.org_100.monitor_54", start_time="2025-05-13T19:22:53",
  132. end_time="2025-05-20T19:22:53") -> dict:
  133. """
  134. 查询监测点7天历史数据(间隔10分钟,取该点前后10分钟的值)
  135. :param dbpath: 数据库时序路径
  136. :param start_time: 起始时间
  137. :param end_time: 截至时间
  138. :return: { 'tempReal': [], 'humiReal': [], 'status':[], ’time‘:[] }
  139. """
  140. sql = f"""
  141. SELECT LAST_VALUE(temperature) AS 'tempReal' , LAST_VALUE(humidity) AS 'humiReal', LAST_VALUE(status) AS 'status'
  142. FROM {dbpath}
  143. WHERE time >= {start_time}
  144. GROUP BY ([{start_time}, {end_time}), 20m)
  145. """
  146. session_data_set = IotDbClient.query(sql)
  147. return parse_dataset_to_list(session_data_set, 10 * 60)
  148. def query_station_7days(dbpath="root.org_100.dev_159", start_time="2025-05-13T19:22:53",
  149. end_time="2025-05-20T19:22:53") -> dict:
  150. """
  151. 查询冷站7天冷冻总管出水温度
  152. :param dbpath:
  153. :param start_time:
  154. :param end_time:
  155. :return: { 'chilledWaterMainTempOut':[], 'time':[] }
  156. """
  157. sql = f"""
  158. SELECT LAST_VALUE(chilledWaterMainTempOut) AS 'chilledWaterMainTempOut'
  159. FROM {dbpath}
  160. WHERE time >= {start_time}
  161. GROUP BY ([{start_time}, {end_time}), 20m)
  162. """
  163. session_data_set = IotDbClient.query(sql)
  164. return parse_dataset_to_list(session_data_set, 10 * 60)
  165. def query_chiller_7days(dbpath="root.org_100.dev_159", start_time="2025-05-13T19:22:53", end_time="2025-05-20T19:22:53",
  166. capacityRated=1) -> dict:
  167. """
  168. 查询冷机7天历史数据
  169. :param dbpath:
  170. :param start_time:
  171. :param end_time:
  172. :return:
  173. """
  174. sql = f"""
  175. SELECT LAST_VALUE(loadRatio) AS 'loadRatio' ,
  176. LAST_VALUE(evapWaterTempOut) AS 'evapWaterTempOut',
  177. LAST_VALUE(outputActivePower) / 1 AS 'chillerWaterTempOut'
  178. FROM {dbpath}
  179. WHERE time >= {start_time}
  180. GROUP BY ([{start_time}, {end_time}), 20m)
  181. """
  182. session_data_set = IotDbClient.query(sql)
  183. return parse_dataset_to_list(session_data_set, 10 * 60)
  184. def get_terminal_modify_data(org_id, terminal_ids, trigger_time_stamp):
  185. result = {}
  186. if len(str(trigger_time_stamp)) == 13:
  187. trigger_time_stamp = trigger_time_stamp / 1000
  188. # trigger_time 向前移10分钟
  189. trigger_time = datetime.datetime.fromtimestamp(trigger_time_stamp, IotDbClient.tz)
  190. start_time = trigger_time - datetime.timedelta(days=7) - datetime.timedelta(minutes=10)
  191. for terminal_id in terminal_ids:
  192. dbpath = "root.org_" + str(org_id) + ".monitor_" + str(terminal_id)
  193. data = query_terminal_7days(dbpath, start_time.strftime('%Y-%m-%dT%H:%M:00'),
  194. trigger_time.strftime('%Y-%m-%dT%H:%M:00'))
  195. result[str(terminal_id)] = data
  196. return result
  197. def get_chiller_modify_data(org_id, chiller_ids, capacityRates, trigger_time_stamp):
  198. result = {}
  199. if len(str(trigger_time_stamp)) == 13:
  200. trigger_time_stamp = trigger_time_stamp / 1000
  201. # trigger_time 向前移10分钟
  202. trigger_time = datetime.datetime.fromtimestamp(trigger_time_stamp, IotDbClient.tz)
  203. start_time = trigger_time - datetime.timedelta(days=7) - datetime.timedelta(minutes=10)
  204. for index, chiller_id in enumerate(chiller_ids):
  205. dbpath = "root.org_" + str(org_id) + ".dev_" + str(chiller_id)
  206. data = query_chiller_7days(dbpath, start_time.strftime('%Y-%m-%dT%H:%M:00'),
  207. trigger_time.strftime('%Y-%m-%dT%H:%M:00'), capacityRates[index])
  208. result[str(chiller_id)] = data
  209. return result
  210. def get_station_modify_data(org_id, device_id, trigger_time_stamp):
  211. if len(str(trigger_time_stamp)) == 13:
  212. trigger_time_stamp = trigger_time_stamp / 1000
  213. # trigger_time 向前移10分钟
  214. trigger_time = datetime.datetime.fromtimestamp(trigger_time_stamp, IotDbClient.tz)
  215. start_time = trigger_time - datetime.timedelta(days=7) - datetime.timedelta(minutes=10)
  216. dbpath = "root.org_" + str(org_id) + ".dev_" + str(device_id)
  217. data = query_station_7days(dbpath, start_time.strftime('%Y-%m-%dT%H:%M:00'),
  218. trigger_time.strftime('%Y-%m-%dT%H:%M:00'))
  219. return data
  220. if __name__ == '__main__':
  221. IotDbClient('192.168.1.70', username='root', password='root')
  222. org_id = 100
  223. chiller_ids = [159]
  224. terminal_ids = [121, 120]
  225. power_rates = [100, 100]
  226. controller_id = 159
  227. trigger_time_stamp = 1748251717992
  228. terminal_data = get_terminal_modify_data(org_id, terminal_ids, trigger_time_stamp)
  229. chiller_data = get_chiller_modify_data(org_id, chiller_ids, power_rates, trigger_time_stamp)
  230. station_data = get_station_modify_data(org_id, controller_id, trigger_time_stamp)
  231. print(json.dumps(terminal_data))
  232. print(json.dumps(chiller_data))
  233. print(json.dumps(station_data))