from iotdb.SessionPool import SessionPool, PoolConfig from iotdb.utils.Field import Field from iotdb.utils.SessionDataSet import SessionDataSet from iotdb.utils.IoTDBConstants import TSDataType from threading import Lock # 官方示例 https://github.com/apache/iotdb/blob/rc/2.0.1/iotdb-client/client-py/session_pool_example.py # pip install apache-iotdb -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com """ 使用方法 # 初始化客户端,只需要程序启动时初始化一次 IotDbClient('127.0.0.1', username='root', password='root') # 请求数据 dataset = IotDbClient.query('select * from root.unimat.dev_64') # 解析数据 parse_dataset_by_time(dataset) """ class IotDbClient: _instance = None _lock = Lock() session_pool = None def __new__(cls, *args, **kwargs): with cls._lock: if not cls._instance: cls._instance = super().__new__(cls) return cls._instance def __init__(self, host, username='root', password='root', port=6667, pool_size=10, timeout_ms=3000, time_zone="UTC+8", max_retry=0): """ 初始化客户端 :param host: 服务器地址 :param username: 用户名 :param password: 密码 :param port: 端口 默认 6667 :param pool_size: 线程池大小 :param timeout_ms: 请求超时 :param time_zone: 时区 :param max_retry: 最大重试次数 """ with self._lock: if not self.__class__.session_pool: self._pool_config = PoolConfig(host=host, port=str(port), user_name=username, password=password, fetch_size=1024, time_zone=time_zone, max_retry=max_retry) self.__class__.session_pool = SessionPool(self._pool_config, pool_size, timeout_ms) print("init iotDbClient") @classmethod def query(cls, sql) -> SessionDataSet: session = IotDbClient.session_pool.get_session() try: return session.execute_query_statement(sql) finally: IotDbClient.session_pool.put_back(session) def parse_dataset_by_time(dataset: SessionDataSet) -> list: """ 解析iotdb 非last类型的查询结果 [ {"time": 时间戳,"code1":value1,"code2":value2,...},{"time": 时间戳2,"code1":value1,"code2":value2,...}] :param dataset: :return: """ column_names = dataset.column_names result = [] while dataset.has_next(): line = dataset.next() fields = line.get_fields() timed_line = {"time": line.get_timestamp()} for index, name in enumerate(column_names[1:]): code = name.split('.')[-1] field = fields[index] value = convert_data_type(field) timed_line[code] = value result.append(timed_line) return result def parse_dataset_to_list(dataset: SessionDataSet) -> dict: """ 解析iotdb 非last类型的查询结果 { "time":[],"code1":[],"code2":[],...} :param dataset: :return: """ column_names = dataset.column_names result = {} timed_list = [] while dataset.has_next(): line = dataset.next() fields = line.get_fields() timed_list.append(line.get_timestamp()) for index, name in enumerate(column_names[1:]): code = name.split('.')[-1] value = convert_data_type(fields[index]) if code in result.keys(): result[code].append(value) else: result[code] = [value] result['time'] = timed_list return result def parse_last_data_set(dataset: SessionDataSet): """ 解析last查询的结果 :param dataset: :return: """ result = {} while dataset.has_next(): line = dataset.next() # [ column_name, value, ts_data_type] fields = line.get_fields() column_name = str(fields[0]).split('.')[-1] column_value = convert_last_data_type(str(fields[1]), str(fields[2])) result[column_name] = {"time": line.get_timestamp(), "value": column_value} return result def convert_data_type(field: Field): """ 解析iotdb数据 :param field: :return: """ ts_data_type = field.get_data_type() final_value = None if ts_data_type == TSDataType.FLOAT: final_value = field.get_float_value() elif ts_data_type == TSDataType.INT: final_value = field.get_int_value() elif ts_data_type == TSDataType.STRING: final_value = field.get_string_value() elif ts_data_type == TSDataType.BOOL: final_value = field.get_bool_value() elif ts_data_type == TSDataType.DATETIME: final_value = field.get_date_value() else: pass if final_value is None: return -1 if ts_data_type == TSDataType.FLOAT: return float(final_value) def convert_last_data_type(value: str, data_type_name: str): """ 解析last查询的数据类型 :param value: :param data_type_name: :return: """ if 'FLOAT' == data_type_name: return float(value) elif 'INT' == data_type_name: return int(value) elif 'TEXT' == data_type_name: return value else: return value def parse_dataset_by_code(dataset: SessionDataSet) -> dict: """ 转换iotdb查询到的数据格式{"code1":[{time:time1,value:value1}],...} :param dataset: :return: """ column_names = dataset.column_names result = {} while dataset.has_next(): line = dataset.next() fields = line.get_fields() for index, name in enumerate(column_names[1:]): code = name.split('.')[-1] field = fields[index] value = convert_data_type(field) timed_value = {"time": line.get_timestamp(), "value": value} if code in result: result[code].append(timed_value) else: result[code] = [timed_value] return result if __name__ == '__main__': IotDbClient('127.0.0.1', username='root', password='root') dataSet = IotDbClient.query("select * from root.unimat.dev_64") print(parse_dataset_to_list(dataSet))