import datetime import json from iotdb.SessionPool import SessionPool, PoolConfig from iotdb.utils.Field import Field from iotdb.utils.SessionDataSet import SessionDataSet from iotdb.utils.IoTDBConstants import TSDataType from threading import Lock # 官方示例 https://github.com/apache/iotdb/blob/rc/2.0.1/iotdb-client/client-py/session_pool_example.py # pip install apache-iotdb==1.3.3 -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com """ # 初始化客户端,只需要程序启动时初始化一次 IotDbClient('127.0.0.1', username='root', password='root') # 请求数据 dataset = IotDbClient.query('select * from root.unimat.dev_64') # 解析数据 parse_dataset_to_list(dataset) """ class IotDbClient: _instance = None _lock = Lock() session_pool = None tz = datetime.timezone(datetime.timedelta(hours=8)) def __new__(cls, *args, **kwargs): with cls._lock: if not cls._instance: cls._instance = super().__new__(cls) return cls._instance def __init__(self, host, username='root', password='root', port=6667, pool_size=100, timeout_ms=3000, time_zone="UTC+8", max_retry=0): """ 初始化客户端 :param host: 服务器地址 :param username: 用户名 :param password: 密码 :param port: 端口 默认 6667 :param pool_size: 线程池大小 :param timeout_ms: 请求超时 :param time_zone: 时区 :param max_retry: 最大重试次数 """ with self._lock: if not self.__class__.session_pool: self._pool_config = PoolConfig(host=host, port=str(port), user_name=username, password=password, fetch_size=1024, time_zone=time_zone, max_retry=max_retry) self.__class__.session_pool = SessionPool(self._pool_config, pool_size, timeout_ms) @classmethod def query(cls, sql) -> SessionDataSet | None: session = None try: session = IotDbClient.session_pool.get_session() return session.execute_query_statement(sql) except Exception as e: return None finally: if session is not None: IotDbClient.session_pool.put_back(session) def parse_dataset_to_list(dataset: SessionDataSet, timedela=600) -> dict: """ 解析iotdb 非last类型的查询结果 { "time":[],"code1":[],"code2":[],...} :param dataset: :param timedela: 对查询结果时间进行偏移 :return: """ if dataset is None: return {} column_names = dataset.column_names result = {} timed_list = [] while dataset.has_next(): line = dataset.next() fields = line.get_fields() timestamp = line.get_timestamp() / 1000 origin_time = datetime.datetime.fromtimestamp(timestamp, IotDbClient.tz) center_time = origin_time if timedela is not None: center_time = origin_time + datetime.timedelta(seconds=timedela) str_time = center_time.strftime('%Y-%m-%d %H:%M:%S') timed_list.append(str_time) for index, name in enumerate(column_names[1:]): code = name.split('.')[-1] v = fields[index] value = convert_data_type(v) if code in result.keys(): result[code].append(value) else: result[code] = [value] result['dateTime'] = timed_list return result def parse_last_data_set(dataset: SessionDataSet): """ 解析last查询的结果 :param dataset: :return: """ if dataset is None: return {} result = {} while dataset.has_next(): line = dataset.next() # [ column_name, value, ts_data_type] fields = line.get_fields() column_name = str(fields[0]).split('.')[-1] column_value = convert_last_data_type(str(fields[1]), str(fields[2])) result[column_name] = {"time": line.get_timestamp(), "value": column_value} return result def convert_data_type(field: Field): """ 解析iotdb数据 :param field: :return: """ ts_data_type = field.get_data_type() if ts_data_type is None: return None final_value = field.get_string_value() if final_value is None or final_value == 'None': return None if ts_data_type == TSDataType.FLOAT: return float(final_value) if ts_data_type == TSDataType.INT32: return int(final_value) def convert_last_data_type(value: str, data_type_name: str): """ 解析last查询的数据类型 :param value: :param data_type_name: :return: """ if 'FLOAT' == data_type_name: return float(value) elif 'INT' == data_type_name: return int(value) elif 'TEXT' == data_type_name: return value else: return value def query_terminal_7days(dbpath="root.org_100.monitor_54", start_time="2025-05-13T19:22:53", end_time="2025-05-20T19:22:53") -> dict: """ 查询监测点7天历史数据(间隔10分钟,取该点前后10分钟的值) :param dbpath: 数据库时序路径 :param start_time: 起始时间 :param end_time: 截至时间 :return: { 'tempReal': [], 'humiReal': [], 'status':[], ’time‘:[] } """ sql = f""" SELECT LAST_VALUE(temperature) AS 'tempReal' , LAST_VALUE(humidity) AS 'humiReal', LAST_VALUE(status) AS 'status' FROM {dbpath} WHERE time >= {start_time} GROUP BY ([{start_time}, {end_time}), 20m) """ session_data_set = IotDbClient.query(sql) return parse_dataset_to_list(session_data_set, 10 * 60) def query_station_7days(dbpath="root.org_100.dev_159", start_time="2025-05-13T19:22:53", end_time="2025-05-20T19:22:53") -> dict: """ 查询冷站7天冷冻总管出水温度 :param dbpath: :param start_time: :param end_time: :return: { 'chilledWaterMainTempOut':[], 'time':[] } """ sql = f""" SELECT LAST_VALUE(chilledWaterMainTempOut) AS 'chilledWaterMainTempOut' FROM {dbpath} WHERE time >= {start_time} GROUP BY ([{start_time}, {end_time}), 20m) """ session_data_set = IotDbClient.query(sql) return parse_dataset_to_list(session_data_set, 10 * 60) def query_chiller_7days(dbpath="root.org_100.dev_159", start_time="2025-05-13T19:22:53", end_time="2025-05-20T19:22:53", capacityRated=1) -> dict: """ 查询冷机7天历史数据 :param dbpath: :param start_time: :param end_time: :return: """ sql = f""" SELECT LAST_VALUE(loadRatio) AS 'loadRate' , LAST_VALUE(evapWaterTempOut) AS 'chillerWaterTempOut', LAST_VALUE(outputActivePower) / 1 AS 'chillerPowerRatio' FROM {dbpath} WHERE time >= {start_time} GROUP BY ([{start_time}, {end_time}), 20m) """ session_data_set = IotDbClient.query(sql) return parse_dataset_to_list(session_data_set, 10 * 60) def get_terminal_modify_data(org_id, terminal_ids, trigger_time_stamp): result = {} if len(str(trigger_time_stamp)) == 13: trigger_time_stamp = trigger_time_stamp / 1000 # trigger_time 向前移10分钟 trigger_time = datetime.datetime.fromtimestamp(trigger_time_stamp, IotDbClient.tz) start_time = trigger_time - datetime.timedelta(days=7) - datetime.timedelta(minutes=10) for terminal_id in terminal_ids: dbpath = "root.org_" + str(org_id) + ".monitor_" + str(terminal_id) data = query_terminal_7days(dbpath, start_time.strftime('%Y-%m-%dT%H:%M:00'), trigger_time.strftime('%Y-%m-%dT%H:%M:00')) result[str(terminal_id)] = data return result def get_chiller_modify_data(org_id, chiller_ids, power_rates, trigger_time_stamp): result = {} if len(str(trigger_time_stamp)) == 13: trigger_time_stamp = trigger_time_stamp / 1000 # trigger_time 向前移10分钟 trigger_time = datetime.datetime.fromtimestamp(trigger_time_stamp, IotDbClient.tz) start_time = trigger_time - datetime.timedelta(days=7) - datetime.timedelta(minutes=10) for index, chiller_id in enumerate(chiller_ids): dbpath = "root.org_" + str(org_id) + ".dev_" + str(chiller_id) data = query_chiller_7days(dbpath, start_time.strftime('%Y-%m-%dT%H:%M:00'), trigger_time.strftime('%Y-%m-%dT%H:%M:00'), power_rates[index]) result[str(chiller_id)] = data return result def get_station_modify_data(org_id, device_id, trigger_time_stamp): if len(str(trigger_time_stamp)) == 13: trigger_time_stamp = trigger_time_stamp / 1000 # trigger_time 向前移10分钟 trigger_time = datetime.datetime.fromtimestamp(trigger_time_stamp, IotDbClient.tz) start_time = trigger_time - datetime.timedelta(days=7) - datetime.timedelta(minutes=10) dbpath = "root.org_" + str(org_id) + ".dev_" + str(device_id) data = query_station_7days(dbpath, start_time.strftime('%Y-%m-%dT%H:%M:00'), trigger_time.strftime('%Y-%m-%dT%H:%M:00')) return data if __name__ == '__main__': IotDbClient('192.168.1.70', username='root', password='root') # 组织/租户ID org_id = 100 # 冷机ID chiller_ids = [170] # 超标监测点ID terminal_ids = [58, 59, 60] # 冷机额定功率 power_rates = [200] # 群控柜ID controller_id = 168 trigger_time_stamp = 1748517100841 terminal_data = get_terminal_modify_data(org_id, terminal_ids, trigger_time_stamp) chiller_data = get_chiller_modify_data(org_id, chiller_ids, power_rates, trigger_time_stamp) station_data = get_station_modify_data(org_id, controller_id, trigger_time_stamp) print(json.dumps(terminal_data)) print(json.dumps(chiller_data)) print(json.dumps(station_data))