iotdb_util.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. import datetime
  2. import json
  3. from iotdb.SessionPool import SessionPool, PoolConfig
  4. from iotdb.utils.Field import Field
  5. from iotdb.utils.SessionDataSet import SessionDataSet
  6. from iotdb.utils.IoTDBConstants import TSDataType
  7. from threading import Lock
  8. # 官方示例 https://github.com/apache/iotdb/blob/rc/2.0.1/iotdb-client/client-py/session_pool_example.py
  9. # pip install apache-iotdb==1.3.3 -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com
  10. """
  11. # 初始化客户端,只需要程序启动时初始化一次
  12. IotDbClient('127.0.0.1', username='root', password='root')
  13. # 请求数据
  14. dataset = IotDbClient.query('select * from root.unimat.dev_64')
  15. # 解析数据
  16. parse_dataset_to_list(dataset)
  17. """
  18. class IotDbClient:
  19. _instance = None
  20. _lock = Lock()
  21. session_pool = None
  22. tz = datetime.timezone(datetime.timedelta(hours=8))
  23. def __new__(cls, *args, **kwargs):
  24. with cls._lock:
  25. if not cls._instance:
  26. cls._instance = super().__new__(cls)
  27. return cls._instance
  28. def __init__(self, host, username='root', password='root', port=6667, pool_size=100, timeout_ms=3000,
  29. time_zone="UTC+8", max_retry=0):
  30. """
  31. 初始化客户端
  32. :param host: 服务器地址
  33. :param username: 用户名
  34. :param password: 密码
  35. :param port: 端口 默认 6667
  36. :param pool_size: 线程池大小
  37. :param timeout_ms: 请求超时
  38. :param time_zone: 时区
  39. :param max_retry: 最大重试次数
  40. """
  41. with self._lock:
  42. if not self.__class__.session_pool:
  43. self._pool_config = PoolConfig(host=host, port=str(port), user_name=username,
  44. password=password, fetch_size=1024,
  45. time_zone=time_zone, max_retry=max_retry)
  46. self.__class__.session_pool = SessionPool(self._pool_config, pool_size, timeout_ms)
  47. @classmethod
  48. def query(cls, sql) -> SessionDataSet | None:
  49. session = None
  50. try:
  51. session = IotDbClient.session_pool.get_session()
  52. return session.execute_query_statement(sql)
  53. except Exception as e:
  54. return None
  55. finally:
  56. if session is not None:
  57. IotDbClient.session_pool.put_back(session)
  58. def parse_dataset_to_list(dataset: SessionDataSet, timedela=600) -> dict:
  59. """
  60. 解析iotdb 非last类型的查询结果
  61. { "time":[],"code1":[],"code2":[],...}
  62. :param dataset:
  63. :param timedela: 对查询结果时间进行偏移
  64. :return:
  65. """
  66. if dataset is None:
  67. return {}
  68. column_names = dataset.column_names
  69. result = {}
  70. timed_list = []
  71. while dataset.has_next():
  72. line = dataset.next()
  73. fields = line.get_fields()
  74. timestamp = line.get_timestamp() / 1000
  75. origin_time = datetime.datetime.fromtimestamp(timestamp, IotDbClient.tz)
  76. center_time = origin_time
  77. if timedela is not None:
  78. center_time = origin_time + datetime.timedelta(seconds=timedela)
  79. str_time = center_time.strftime('%Y-%m-%d %H:%M:%S')
  80. timed_list.append(str_time)
  81. for index, name in enumerate(column_names[1:]):
  82. code = name.split('.')[-1]
  83. v = fields[index]
  84. value = convert_data_type(v)
  85. if code in result.keys():
  86. result[code].append(value)
  87. else:
  88. result[code] = [value]
  89. result['dateTime'] = timed_list
  90. return result
  91. def parse_last_data_set(dataset: SessionDataSet):
  92. """
  93. 解析last查询的结果
  94. :param dataset:
  95. :return:
  96. """
  97. if dataset is None:
  98. return {}
  99. result = {}
  100. while dataset.has_next():
  101. line = dataset.next()
  102. # [ column_name, value, ts_data_type]
  103. fields = line.get_fields()
  104. column_name = str(fields[0]).split('.')[-1]
  105. column_value = convert_last_data_type(str(fields[1]), str(fields[2]))
  106. result[column_name] = {"time": line.get_timestamp(), "value": column_value}
  107. return result
  108. def convert_data_type(field: Field):
  109. """
  110. 解析iotdb数据
  111. :param field:
  112. :return:
  113. """
  114. ts_data_type = field.get_data_type()
  115. if ts_data_type is None:
  116. return None
  117. final_value = field.get_string_value()
  118. if final_value is None or final_value == 'None':
  119. return None
  120. if ts_data_type == TSDataType.FLOAT:
  121. return float(final_value)
  122. if ts_data_type == TSDataType.INT32:
  123. return int(final_value)
  124. def convert_last_data_type(value: str, data_type_name: str):
  125. """
  126. 解析last查询的数据类型
  127. :param value:
  128. :param data_type_name:
  129. :return:
  130. """
  131. if 'FLOAT' == data_type_name:
  132. return float(value)
  133. elif 'INT' == data_type_name:
  134. return int(value)
  135. elif 'TEXT' == data_type_name:
  136. return value
  137. else:
  138. return value
  139. def query_terminal_7days(dbpath="root.org_100.monitor_54", start_time="2025-05-13T19:22:53",
  140. end_time="2025-05-20T19:22:53") -> dict:
  141. """
  142. 查询监测点7天历史数据(间隔10分钟,取该点前后10分钟的值)
  143. :param dbpath: 数据库时序路径
  144. :param start_time: 起始时间
  145. :param end_time: 截至时间
  146. :return: { 'tempReal': [], 'humiReal': [], 'status':[], ’time‘:[] }
  147. """
  148. sql = f"""
  149. SELECT LAST_VALUE(temperature) AS 'tempReal' , LAST_VALUE(humidity) AS 'humiReal', LAST_VALUE(status) AS 'status'
  150. FROM {dbpath}
  151. WHERE time >= {start_time}
  152. GROUP BY ([{start_time}, {end_time}), 20m)
  153. """
  154. session_data_set = IotDbClient.query(sql)
  155. return parse_dataset_to_list(session_data_set, 10 * 60)
  156. def query_station_7days(dbpath="root.org_100.dev_159", start_time="2025-05-13T19:22:53",
  157. end_time="2025-05-20T19:22:53") -> dict:
  158. """
  159. 查询冷站7天冷冻总管出水温度
  160. :param dbpath:
  161. :param start_time:
  162. :param end_time:
  163. :return: { 'chilledWaterMainTempOut':[], 'time':[] }
  164. """
  165. sql = f"""
  166. SELECT LAST_VALUE(chilledWaterMainTempOut) AS 'chilledWaterMainTempOut'
  167. FROM {dbpath}
  168. WHERE time >= {start_time}
  169. GROUP BY ([{start_time}, {end_time}), 20m)
  170. """
  171. session_data_set = IotDbClient.query(sql)
  172. return parse_dataset_to_list(session_data_set, 10 * 60)
  173. def query_chiller_7days(dbpath="root.org_100.dev_159", start_time="2025-05-13T19:22:53", end_time="2025-05-20T19:22:53",
  174. capacityRated=1) -> dict:
  175. """
  176. 查询冷机7天历史数据
  177. :param dbpath:
  178. :param start_time:
  179. :param end_time:
  180. :return:
  181. """
  182. sql = f"""
  183. SELECT LAST_VALUE(loadRatio) AS 'loadRatio' ,
  184. LAST_VALUE(evapWaterTempOut) AS 'evapWaterTempOut',
  185. LAST_VALUE(outputActivePower) / 1 AS 'chillerWaterTempOut'
  186. FROM {dbpath}
  187. WHERE time >= {start_time}
  188. GROUP BY ([{start_time}, {end_time}), 20m)
  189. """
  190. session_data_set = IotDbClient.query(sql)
  191. return parse_dataset_to_list(session_data_set, 10 * 60)
  192. def get_terminal_modify_data(org_id, terminal_ids, trigger_time_stamp):
  193. result = {}
  194. if len(str(trigger_time_stamp)) == 13:
  195. trigger_time_stamp = trigger_time_stamp / 1000
  196. # trigger_time 向前移10分钟
  197. trigger_time = datetime.datetime.fromtimestamp(trigger_time_stamp, IotDbClient.tz)
  198. start_time = trigger_time - datetime.timedelta(days=7) - datetime.timedelta(minutes=10)
  199. for terminal_id in terminal_ids:
  200. dbpath = "root.org_" + str(org_id) + ".monitor_" + str(terminal_id)
  201. data = query_terminal_7days(dbpath, start_time.strftime('%Y-%m-%dT%H:%M:00'),
  202. trigger_time.strftime('%Y-%m-%dT%H:%M:00'))
  203. result[str(terminal_id)] = data
  204. return result
  205. def get_chiller_modify_data(org_id, chiller_ids, power_rates, trigger_time_stamp):
  206. result = {}
  207. if len(str(trigger_time_stamp)) == 13:
  208. trigger_time_stamp = trigger_time_stamp / 1000
  209. # trigger_time 向前移10分钟
  210. trigger_time = datetime.datetime.fromtimestamp(trigger_time_stamp, IotDbClient.tz)
  211. start_time = trigger_time - datetime.timedelta(days=7) - datetime.timedelta(minutes=10)
  212. for index, chiller_id in enumerate(chiller_ids):
  213. dbpath = "root.org_" + str(org_id) + ".dev_" + str(chiller_id)
  214. data = query_chiller_7days(dbpath, start_time.strftime('%Y-%m-%dT%H:%M:00'),
  215. trigger_time.strftime('%Y-%m-%dT%H:%M:00'), power_rates[index])
  216. result[str(chiller_id)] = data
  217. return result
  218. def get_station_modify_data(org_id, device_id, trigger_time_stamp):
  219. if len(str(trigger_time_stamp)) == 13:
  220. trigger_time_stamp = trigger_time_stamp / 1000
  221. # trigger_time 向前移10分钟
  222. trigger_time = datetime.datetime.fromtimestamp(trigger_time_stamp, IotDbClient.tz)
  223. start_time = trigger_time - datetime.timedelta(days=7) - datetime.timedelta(minutes=10)
  224. dbpath = "root.org_" + str(org_id) + ".dev_" + str(device_id)
  225. data = query_station_7days(dbpath, start_time.strftime('%Y-%m-%dT%H:%M:00'),
  226. trigger_time.strftime('%Y-%m-%dT%H:%M:00'))
  227. return data
  228. if __name__ == '__main__':
  229. IotDbClient('192.168.1.70', username='root', password='root')
  230. # 组织/租户ID
  231. org_id = 100
  232. # 冷机ID
  233. chiller_ids = [159]
  234. # 超标监测点ID
  235. terminal_ids = [121, 120]
  236. # 冷机额定功率
  237. power_rates = [100, 100]
  238. # 群控柜ID
  239. controller_id = 159
  240. trigger_time_stamp = 1748251717992
  241. terminal_data = get_terminal_modify_data(org_id, terminal_ids, trigger_time_stamp)
  242. chiller_data = get_chiller_modify_data(org_id, chiller_ids, power_rates, trigger_time_stamp)
  243. station_data = get_station_modify_data(org_id, controller_id, trigger_time_stamp)
  244. print(json.dumps(terminal_data))
  245. print(json.dumps(chiller_data))
  246. print(json.dumps(station_data))