123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277 |
- import datetime
- import json
- from iotdb.SessionPool import SessionPool, PoolConfig
- from iotdb.utils.Field import Field
- from iotdb.utils.SessionDataSet import SessionDataSet
- from iotdb.utils.IoTDBConstants import TSDataType
- from threading import Lock
- # 官方示例 https://github.com/apache/iotdb/blob/rc/2.0.1/iotdb-client/client-py/session_pool_example.py
- # pip install apache-iotdb==1.3.3 -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com
- """
- # 初始化客户端,只需要程序启动时初始化一次
- IotDbClient('127.0.0.1', username='root', password='root')
- # 请求数据
- dataset = IotDbClient.query('select * from root.unimat.dev_64')
- # 解析数据
- parse_dataset_by_time(dataset)
- """
- class IotDbClient:
- _instance = None
- _lock = Lock()
- session_pool = None
- tz = datetime.timezone(datetime.timedelta(hours=8))
- def __new__(cls, *args, **kwargs):
- with cls._lock:
- if not cls._instance:
- cls._instance = super().__new__(cls)
- return cls._instance
- def __init__(self, host, username='root', password='root', port=6667, pool_size=100, timeout_ms=3000,
- time_zone="UTC+8", max_retry=0):
- """
- 初始化客户端
- :param host: 服务器地址
- :param username: 用户名
- :param password: 密码
- :param port: 端口 默认 6667
- :param pool_size: 线程池大小
- :param timeout_ms: 请求超时
- :param time_zone: 时区
- :param max_retry: 最大重试次数
- """
- with self._lock:
- if not self.__class__.session_pool:
- self._pool_config = PoolConfig(host=host, port=str(port), user_name=username,
- password=password, fetch_size=1024,
- time_zone=time_zone, max_retry=max_retry)
- self.__class__.session_pool = SessionPool(self._pool_config, pool_size, timeout_ms)
- @classmethod
- def query(cls, sql) -> SessionDataSet:
- session = IotDbClient.session_pool.get_session()
- try:
- return session.execute_query_statement(sql)
- finally:
- IotDbClient.session_pool.put_back(session)
- def parse_dataset_to_list(dataset: SessionDataSet, timedela=600) -> dict:
- """
- 解析iotdb 非last类型的查询结果
- { "time":[],"code1":[],"code2":[],...}
- :param dataset:
- :param timedela: 对查询结果时间进行偏移
- :return:
- """
- column_names = dataset.column_names
- result = {}
- timed_list = []
- while dataset.has_next():
- line = dataset.next()
- fields = line.get_fields()
- timestamp = line.get_timestamp() / 1000
- origin_time = datetime.datetime.fromtimestamp(timestamp, IotDbClient.tz)
- center_time = origin_time
- if timedela is not None:
- center_time = origin_time + datetime.timedelta(seconds=timedela)
- str_time = center_time.strftime('%Y-%m-%d %H:%M:%S')
- timed_list.append(str_time)
- for index, name in enumerate(column_names[1:]):
- code = name.split('.')[-1]
- v = fields[index]
- value = convert_data_type(v)
- if code in result.keys():
- result[code].append(value)
- else:
- result[code] = [value]
- result['dateTime'] = timed_list
- return result
- def parse_last_data_set(dataset: SessionDataSet):
- """
- 解析last查询的结果
- :param dataset:
- :return:
- """
- result = {}
- while dataset.has_next():
- line = dataset.next()
- # [ column_name, value, ts_data_type]
- fields = line.get_fields()
- column_name = str(fields[0]).split('.')[-1]
- column_value = convert_last_data_type(str(fields[1]), str(fields[2]))
- result[column_name] = {"time": line.get_timestamp(), "value": column_value}
- return result
- def convert_data_type(field: Field):
- """
- 解析iotdb数据
- :param field:
- :return:
- """
- ts_data_type = field.get_data_type()
- if ts_data_type is None:
- return None
- final_value = field.get_string_value()
- if final_value is None or final_value == 'None':
- return None
- if ts_data_type == TSDataType.FLOAT:
- return float(final_value)
- if ts_data_type == TSDataType.INT32:
- return int(final_value)
- def convert_last_data_type(value: str, data_type_name: str):
- """
- 解析last查询的数据类型
- :param value:
- :param data_type_name:
- :return:
- """
- if 'FLOAT' == data_type_name:
- return float(value)
- elif 'INT' == data_type_name:
- return int(value)
- elif 'TEXT' == data_type_name:
- return value
- else:
- return value
- def query_terminal_7days(dbpath="root.org_100.monitor_54", start_time="2025-05-13T19:22:53",
- end_time="2025-05-20T19:22:53") -> dict:
- """
- 查询监测点7天历史数据(间隔10分钟,取该点前后10分钟的值)
- :param dbpath: 数据库时序路径
- :param start_time: 起始时间
- :param end_time: 截至时间
- :return: { 'tempReal': [], 'humiReal': [], 'status':[], ’time‘:[] }
- """
- sql = f"""
- SELECT LAST_VALUE(temperature) AS 'tempReal' , LAST_VALUE(humidity) AS 'humiReal', LAST_VALUE(status) AS 'status'
- FROM {dbpath}
- WHERE time >= {start_time}
- GROUP BY ([{start_time}, {end_time}), 20m)
- """
- session_data_set = IotDbClient.query(sql)
- return parse_dataset_to_list(session_data_set, 10 * 60)
- def query_station_7days(dbpath="root.org_100.dev_159", start_time="2025-05-13T19:22:53",
- end_time="2025-05-20T19:22:53") -> dict:
- """
- 查询冷站7天冷冻总管出水温度
- :param dbpath:
- :param start_time:
- :param end_time:
- :return: { 'chilledWaterMainTempOut':[], 'time':[] }
- """
- sql = f"""
- SELECT LAST_VALUE(chilledWaterMainTempOut) AS 'chilledWaterMainTempOut'
- FROM {dbpath}
- WHERE time >= {start_time}
- GROUP BY ([{start_time}, {end_time}), 20m)
- """
- session_data_set = IotDbClient.query(sql)
- return parse_dataset_to_list(session_data_set, 10 * 60)
- def query_chiller_7days(dbpath="root.org_100.dev_159", start_time="2025-05-13T19:22:53", end_time="2025-05-20T19:22:53",
- capacityRated=1) -> dict:
- """
- 查询冷机7天历史数据
- :param dbpath:
- :param start_time:
- :param end_time:
- :return:
- """
- sql = f"""
- SELECT LAST_VALUE(loadRatio) AS 'loadRatio' ,
- LAST_VALUE(evapWaterTempOut) AS 'evapWaterTempOut',
- LAST_VALUE(outputActivePower) / 1 AS 'chillerWaterTempOut'
- FROM {dbpath}
- WHERE time >= {start_time}
- GROUP BY ([{start_time}, {end_time}), 20m)
- """
- session_data_set = IotDbClient.query(sql)
- return parse_dataset_to_list(session_data_set, 10 * 60)
- def get_terminal_modify_data(org_id, terminal_ids, trigger_time_stamp):
- result = {}
- if len(str(trigger_time_stamp)) == 13:
- trigger_time_stamp = trigger_time_stamp / 1000
- # trigger_time 向前移10分钟
- trigger_time = datetime.datetime.fromtimestamp(trigger_time_stamp, IotDbClient.tz)
- start_time = trigger_time - datetime.timedelta(days=7) - datetime.timedelta(minutes=10)
- for terminal_id in terminal_ids:
- dbpath = "root.org_" + str(org_id) + ".monitor_" + str(terminal_id)
- data = query_terminal_7days(dbpath, start_time.strftime('%Y-%m-%dT%H:%M:00'),
- trigger_time.strftime('%Y-%m-%dT%H:%M:00'))
- result[str(terminal_id)] = data
- return result
- def get_chiller_modify_data(org_id, chiller_ids, power_rates, trigger_time_stamp):
- result = {}
- if len(str(trigger_time_stamp)) == 13:
- trigger_time_stamp = trigger_time_stamp / 1000
- # trigger_time 向前移10分钟
- trigger_time = datetime.datetime.fromtimestamp(trigger_time_stamp, IotDbClient.tz)
- start_time = trigger_time - datetime.timedelta(days=7) - datetime.timedelta(minutes=10)
- for index, chiller_id in enumerate(chiller_ids):
- dbpath = "root.org_" + str(org_id) + ".dev_" + str(chiller_id)
- data = query_chiller_7days(dbpath, start_time.strftime('%Y-%m-%dT%H:%M:00'),
- trigger_time.strftime('%Y-%m-%dT%H:%M:00'), power_rates[index])
- result[str(chiller_id)] = data
- return result
- def get_station_modify_data(org_id, device_id, trigger_time_stamp):
- if len(str(trigger_time_stamp)) == 13:
- trigger_time_stamp = trigger_time_stamp / 1000
- # trigger_time 向前移10分钟
- trigger_time = datetime.datetime.fromtimestamp(trigger_time_stamp, IotDbClient.tz)
- start_time = trigger_time - datetime.timedelta(days=7) - datetime.timedelta(minutes=10)
- dbpath = "root.org_" + str(org_id) + ".dev_" + str(device_id)
- data = query_station_7days(dbpath, start_time.strftime('%Y-%m-%dT%H:%M:00'),
- trigger_time.strftime('%Y-%m-%dT%H:%M:00'))
- return data
- if __name__ == '__main__':
- IotDbClient('192.168.1.70', username='root', password='root')
- # 组织/租户ID
- org_id = 100
- # 冷机ID
- chiller_ids = [159]
- # 超标监测点ID
- terminal_ids = [121, 120]
- # 冷机额定功率
- power_rates = [100, 100]
- # 群控柜ID
- controller_id = 159
- trigger_time_stamp = 1748251717992
- terminal_data = get_terminal_modify_data(org_id, terminal_ids, trigger_time_stamp)
- chiller_data = get_chiller_modify_data(org_id, chiller_ids, power_rates, trigger_time_stamp)
- station_data = get_station_modify_data(org_id, controller_id, trigger_time_stamp)
- print(json.dumps(terminal_data))
- print(json.dumps(chiller_data))
- print(json.dumps(station_data))
|