Explorar el Código

7天数据查询

heshixin hace 1 semana
padre
commit
90dab3fd53
Se han modificado 1 ficheros con 272 adiciones y 198 borrados
  1. 272 198
      iotdb_util.py

+ 272 - 198
iotdb_util.py

@@ -1,198 +1,272 @@
-from iotdb.SessionPool import SessionPool, PoolConfig
-from iotdb.utils.Field import Field
-from iotdb.utils.SessionDataSet import SessionDataSet
-from iotdb.utils.IoTDBConstants import TSDataType
-from threading import Lock
-
-# 官方示例 https://github.com/apache/iotdb/blob/rc/2.0.1/iotdb-client/client-py/session_pool_example.py
-# pip install apache-iotdb -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com
-
-"""
-使用方法
-# 初始化客户端,只需要程序启动时初始化一次
-IotDbClient('127.0.0.1', username='root', password='root')
-# 请求数据
-dataset = IotDbClient.query('select * from root.unimat.dev_64')
-# 解析数据
-parse_dataset_by_time(dataset)
-
-"""
-
-
-class IotDbClient:
-    _instance = None
-    _lock = Lock()
-
-    session_pool = None
-
-    def __new__(cls, *args, **kwargs):
-        with cls._lock:
-            if not cls._instance:
-                cls._instance = super().__new__(cls)
-        return cls._instance
-
-    def __init__(self, host, username='root', password='root', port=6667, pool_size=10, timeout_ms=3000,
-                 time_zone="UTC+8", max_retry=0):
-        """
-        初始化客户端
-        :param host: 服务器地址
-        :param username: 用户名
-        :param password: 密码
-        :param port: 端口 默认 6667
-        :param pool_size: 线程池大小
-        :param timeout_ms: 请求超时
-        :param time_zone: 时区
-        :param max_retry: 最大重试次数
-        """
-        with self._lock:
-            if not self.__class__.session_pool:
-                self._pool_config = PoolConfig(host=host, port=str(port), user_name=username,
-                                               password=password, fetch_size=1024,
-                                               time_zone=time_zone, max_retry=max_retry)
-                self.__class__.session_pool = SessionPool(self._pool_config, pool_size, timeout_ms)
-                print("init iotDbClient")
-
-    @classmethod
-    def query(cls, sql) -> SessionDataSet:
-        session = IotDbClient.session_pool.get_session()
-        try:
-            return session.execute_query_statement(sql)
-        finally:
-            IotDbClient.session_pool.put_back(session)
-
-
-def parse_dataset_by_time(dataset: SessionDataSet) -> list:
-    """
-    解析iotdb 非last类型的查询结果
-    [ {"time": 时间戳,"code1":value1,"code2":value2,...},{"time": 时间戳2,"code1":value1,"code2":value2,...}]
-    :param dataset:
-    :return:
-    """
-    column_names = dataset.column_names
-    result = []
-    while dataset.has_next():
-        line = dataset.next()
-        fields = line.get_fields()
-        timed_line = {"time": line.get_timestamp()}
-        for index, name in enumerate(column_names[1:]):
-            code = name.split('.')[-1]
-            field = fields[index]
-            value = convert_data_type(field)
-            timed_line[code] = value
-        result.append(timed_line)
-    return result
-
-
-def parse_dataset_to_list(dataset: SessionDataSet) -> dict:
-    """
-    解析iotdb 非last类型的查询结果
-    { "time":[],"code1":[],"code2":[],...}
-    :param dataset:
-    :return:
-    """
-    column_names = dataset.column_names
-    result = {}
-    timed_list = []
-    while dataset.has_next():
-        line = dataset.next()
-        fields = line.get_fields()
-        timed_list.append(line.get_timestamp())
-        for index, name in enumerate(column_names[1:]):
-            code = name.split('.')[-1]
-            value = convert_data_type(fields[index])
-            if code in result.keys():
-                result[code].append(value)
-            else:
-                result[code] = [value]
-        result['time'] = timed_list
-    return result
-
-
-def parse_last_data_set(dataset: SessionDataSet):
-    """
-    解析last查询的结果
-    :param dataset:
-    :return:
-    """
-    result = {}
-    while dataset.has_next():
-        line = dataset.next()
-        # [ column_name, value, ts_data_type]
-        fields = line.get_fields()
-        column_name = str(fields[0]).split('.')[-1]
-        column_value = convert_last_data_type(str(fields[1]), str(fields[2]))
-        result[column_name] = {"time": line.get_timestamp(), "value": column_value}
-
-    return result
-
-
-def convert_data_type(field: Field):
-    """
-    解析iotdb数据
-    :param field:
-    :return:
-    """
-    ts_data_type = field.get_data_type()
-    final_value = None
-    if ts_data_type == TSDataType.FLOAT:
-        final_value = field.get_float_value()
-    elif ts_data_type == TSDataType.INT:
-        final_value = field.get_int_value()
-    elif ts_data_type == TSDataType.STRING:
-        final_value = field.get_string_value()
-    elif ts_data_type == TSDataType.BOOL:
-        final_value = field.get_bool_value()
-    elif ts_data_type == TSDataType.DATETIME:
-        final_value = field.get_date_value()
-    else:
-        pass
-    if final_value is None:
-        return -1
-    if ts_data_type == TSDataType.FLOAT:
-        return float(final_value)
-
-
-def convert_last_data_type(value: str, data_type_name: str):
-    """
-    解析last查询的数据类型
-    :param value:
-    :param data_type_name:
-    :return:
-    """
-    if 'FLOAT' == data_type_name:
-        return float(value)
-    elif 'INT' == data_type_name:
-        return int(value)
-    elif 'TEXT' == data_type_name:
-        return value
-    else:
-        return value
-
-
-def parse_dataset_by_code(dataset: SessionDataSet) -> dict:
-    """
-    转换iotdb查询到的数据格式{"code1":[{time:time1,value:value1}],...}
-    :param dataset:
-    :return:
-    """
-    column_names = dataset.column_names
-    result = {}
-    while dataset.has_next():
-        line = dataset.next()
-        fields = line.get_fields()
-        for index, name in enumerate(column_names[1:]):
-            code = name.split('.')[-1]
-            field = fields[index]
-            value = convert_data_type(field)
-            timed_value = {"time": line.get_timestamp(), "value": value}
-            if code in result:
-                result[code].append(timed_value)
-            else:
-                result[code] = [timed_value]
-    return result
-
-
-if __name__ == '__main__':
-    IotDbClient('127.0.0.1', username='root', password='root')
-    dataSet = IotDbClient.query("select * from root.unimat.dev_64")
-    print(parse_dataset_to_list(dataSet))
+import datetime
+import json
+
+from iotdb.SessionPool import SessionPool, PoolConfig
+from iotdb.utils.Field import Field
+from iotdb.utils.SessionDataSet import SessionDataSet
+from iotdb.utils.IoTDBConstants import TSDataType
+from threading import Lock
+
+# 官方示例 https://github.com/apache/iotdb/blob/rc/2.0.1/iotdb-client/client-py/session_pool_example.py
+# pip install apache-iotdb -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com
+
+"""
+
+# 初始化客户端,只需要程序启动时初始化一次
+IotDbClient('127.0.0.1', username='root', password='root')
+# 请求数据
+dataset = IotDbClient.query('select * from root.unimat.dev_64')
+# 解析数据
+parse_dataset_by_time(dataset)
+
+"""
+
+
+class IotDbClient:
+    _instance = None
+    _lock = Lock()
+    session_pool = None
+    tz = datetime.timezone(datetime.timedelta(hours=8))
+
+    def __new__(cls, *args, **kwargs):
+        with cls._lock:
+            if not cls._instance:
+                cls._instance = super().__new__(cls)
+        return cls._instance
+
+    def __init__(self, host, username='root', password='root', port=6667, pool_size=100, timeout_ms=3000,
+                 time_zone="UTC+8", max_retry=0):
+        """
+        初始化客户端
+        :param host: 服务器地址
+        :param username: 用户名
+        :param password: 密码
+        :param port: 端口 默认 6667
+        :param pool_size: 线程池大小
+        :param timeout_ms: 请求超时
+        :param time_zone: 时区
+        :param max_retry: 最大重试次数
+        """
+        with self._lock:
+            if not self.__class__.session_pool:
+                self._pool_config = PoolConfig(host=host, port=str(port), user_name=username,
+                                               password=password, fetch_size=1024,
+                                               time_zone=time_zone, max_retry=max_retry)
+                self.__class__.session_pool = SessionPool(self._pool_config, pool_size, timeout_ms)
+
+    @classmethod
+    def query(cls, sql) -> SessionDataSet:
+        session = IotDbClient.session_pool.get_session()
+        try:
+            return session.execute_query_statement(sql)
+        finally:
+            IotDbClient.session_pool.put_back(session)
+
+
+def parse_dataset_to_list(dataset: SessionDataSet, timedela=600) -> dict:
+    """
+    解析iotdb 非last类型的查询结果
+    { "time":[],"code1":[],"code2":[],...}
+    :param dataset:
+    :param timedela: 对查询结果时间进行偏移
+    :return:
+    """
+    column_names = dataset.column_names
+    result = {}
+    timed_list = []
+    while dataset.has_next():
+        line = dataset.next()
+        fields = line.get_fields()
+        timestamp = line.get_timestamp() / 1000
+        origin_time = datetime.datetime.fromtimestamp(timestamp, IotDbClient.tz)
+        center_time = origin_time
+        if timedela is not None:
+            center_time = origin_time + datetime.timedelta(seconds=timedela)
+        str_time = center_time.strftime('%Y-%m-%d %H:%M:%S')
+        timed_list.append(str_time)
+        for index, name in enumerate(column_names[1:]):
+            code = name.split('.')[-1]
+            v = fields[index]
+            value = convert_data_type(v)
+            if code in result.keys():
+                result[code].append(value)
+            else:
+                result[code] = [value]
+        result['dateTime'] = timed_list
+    return result
+
+
+def parse_last_data_set(dataset: SessionDataSet):
+    """
+    解析last查询的结果
+    :param dataset:
+    :return:
+    """
+    result = {}
+    while dataset.has_next():
+        line = dataset.next()
+        # [ column_name, value, ts_data_type]
+        fields = line.get_fields()
+        column_name = str(fields[0]).split('.')[-1]
+        column_value = convert_last_data_type(str(fields[1]), str(fields[2]))
+        result[column_name] = {"time": line.get_timestamp(), "value": column_value}
+
+    return result
+
+
+def convert_data_type(field: Field):
+    """
+    解析iotdb数据
+    :param field:
+    :return:
+    """
+    ts_data_type = field.get_data_type()
+    if ts_data_type is None:
+        return None
+    final_value = field.get_string_value()
+    if final_value is None or final_value == 'None':
+        return None
+    if ts_data_type == TSDataType.FLOAT:
+        return float(final_value)
+    if ts_data_type == TSDataType.INT32:
+        return int(final_value)
+
+
+def convert_last_data_type(value: str, data_type_name: str):
+    """
+    解析last查询的数据类型
+    :param value:
+    :param data_type_name:
+    :return:
+    """
+    if 'FLOAT' == data_type_name:
+        return float(value)
+    elif 'INT' == data_type_name:
+        return int(value)
+    elif 'TEXT' == data_type_name:
+        return value
+    else:
+        return value
+
+
+def query_terminal_7days(dbpath="root.org_100.monitor_54", start_time="2025-05-13T19:22:53",
+                         end_time="2025-05-20T19:22:53") -> dict:
+    """
+    查询监测点7天历史数据(间隔10分钟,取该点前后10分钟的值)
+    :param dbpath: 数据库时序路径
+    :param start_time: 起始时间
+    :param end_time: 截至时间
+    :return: { 'tempReal': [], 'humiReal': [], 'status':[], ’time‘:[] }
+    """
+    sql = f"""
+           SELECT LAST_VALUE(temperature) AS 'tempReal' , LAST_VALUE(humidity) AS 'humiReal', LAST_VALUE(status) AS 'status' 
+           FROM {dbpath}
+           WHERE time >= {start_time} 
+           GROUP BY ([{start_time}, {end_time}), 20m)
+        """
+    session_data_set = IotDbClient.query(sql)
+    return parse_dataset_to_list(session_data_set, 10 * 60)
+
+
+def query_station_7days(dbpath="root.org_100.dev_159", start_time="2025-05-13T19:22:53",
+                        end_time="2025-05-20T19:22:53") -> dict:
+    """
+    查询冷站7天冷冻总管出水温度
+    :param dbpath:
+    :param start_time:
+    :param end_time:
+    :return: { 'chilledWaterMainTempOut':[], 'time':[] }
+    """
+    sql = f"""
+           SELECT LAST_VALUE(chilledWaterMainTempOut) AS 'chilledWaterMainTempOut' 
+           FROM {dbpath}
+           WHERE time >= {start_time} 
+           GROUP BY ([{start_time}, {end_time}), 20m) 
+        """
+    session_data_set = IotDbClient.query(sql)
+    return parse_dataset_to_list(session_data_set, 10 * 60)
+
+
+def query_chiller_7days(dbpath="root.org_100.dev_159", start_time="2025-05-13T19:22:53", end_time="2025-05-20T19:22:53",
+                        capacityRated=1) -> dict:
+    """
+    查询冷机7天历史数据
+    :param dbpath:
+    :param start_time:
+    :param end_time:
+    :return:
+    """
+    sql = f"""
+           SELECT LAST_VALUE(loadRatio) AS 'loadRatio' , 
+                  LAST_VALUE(evapWaterTempOut) AS 'evapWaterTempOut', 
+                  LAST_VALUE(outputActivePower) / 1 AS 'chillerWaterTempOut'
+           FROM {dbpath}
+           WHERE time >= {start_time} 
+           GROUP BY ([{start_time}, {end_time}), 20m)
+        """
+    session_data_set = IotDbClient.query(sql)
+    return parse_dataset_to_list(session_data_set, 10 * 60)
+
+
+def get_terminal_modify_data(org_id, terminal_ids, trigger_time_stamp):
+    result = {}
+    if len(str(trigger_time_stamp)) == 13:
+        trigger_time_stamp = trigger_time_stamp / 1000
+    # trigger_time 向前移10分钟
+    trigger_time = datetime.datetime.fromtimestamp(trigger_time_stamp, IotDbClient.tz)
+    start_time = trigger_time - datetime.timedelta(days=7) - datetime.timedelta(minutes=10)
+
+    for terminal_id in terminal_ids:
+        dbpath = "root.org_" + str(org_id) + ".monitor_" + str(terminal_id)
+
+        data = query_terminal_7days(dbpath, start_time.strftime('%Y-%m-%dT%H:%M:00'),
+                                    trigger_time.strftime('%Y-%m-%dT%H:%M:00'))
+        result[str(terminal_id)] = data
+    return result
+
+
+def get_chiller_modify_data(org_id, chiller_ids, capacityRates, trigger_time_stamp):
+    result = {}
+    if len(str(trigger_time_stamp)) == 13:
+        trigger_time_stamp = trigger_time_stamp / 1000
+    # trigger_time 向前移10分钟
+    trigger_time = datetime.datetime.fromtimestamp(trigger_time_stamp, IotDbClient.tz)
+    start_time = trigger_time - datetime.timedelta(days=7) - datetime.timedelta(minutes=10)
+    for index, chiller_id in enumerate(chiller_ids):
+        dbpath = "root.org_" + str(org_id) + ".dev_" + str(chiller_id)
+        data = query_chiller_7days(dbpath, start_time.strftime('%Y-%m-%dT%H:%M:00'),
+                                   trigger_time.strftime('%Y-%m-%dT%H:%M:00'), capacityRates[index])
+        result[str(chiller_id)] = data
+
+    return result
+
+
+def get_station_modify_data(org_id, device_id, trigger_time_stamp):
+    if len(str(trigger_time_stamp)) == 13:
+        trigger_time_stamp = trigger_time_stamp / 1000
+    # trigger_time 向前移10分钟
+    trigger_time = datetime.datetime.fromtimestamp(trigger_time_stamp, IotDbClient.tz)
+    start_time = trigger_time - datetime.timedelta(days=7) - datetime.timedelta(minutes=10)
+
+    dbpath = "root.org_" + str(org_id) + ".dev_" + str(device_id)
+    data = query_station_7days(dbpath, start_time.strftime('%Y-%m-%dT%H:%M:00'),
+                               trigger_time.strftime('%Y-%m-%dT%H:%M:00'))
+    return data
+
+
+if __name__ == '__main__':
+    IotDbClient('192.168.1.70', username='root', password='root')
+
+    org_id = 100
+    chiller_ids = [159]
+    terminal_ids = [121, 120]
+    capacity_rates = [100, 100]
+    main_id = 159
+
+    trigger_time_stamp = 1748251717992
+    terminal_data = get_terminal_modify_data(org_id, terminal_ids, trigger_time_stamp)
+    chiller_data = get_chiller_modify_data(org_id, chiller_ids, capacity_rates, trigger_time_stamp)
+    station_data = get_station_modify_data(org_id, main_id, trigger_time_stamp)
+    print(json.dumps(terminal_data))
+    print(json.dumps(chiller_data))
+    print(json.dumps(station_data))