123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313 |
- import pandas as pd
- import datetime
- from data_initialize_standard.constant import *
- from communication.iotdb_util import *
- # from logs_conf.logger import *
- from logs.logger import *
- from config import communication_config as communicationConfig
- class ExcessTempModify(object):
- def __init__(self, data_excess, dict_code, chiller_outer):
- self.data_excess = data_excess
- self.config = dict_code
- # 由于不同项目末端监测点的采样周期不一致,若无温湿度采样周期则将无法按预期算法修正末端监测点温湿度限制
- # self.modify = modify_data
- self.chiller_outer = chiller_outer
- self.modify = {'stationData': {}, 'terminalData': {}, 'chillerData': {}}
- self.chiller_mode = dict_code['runMode']
- self.sampling_nums = 60/dict_code['samplePeriod']*10 # 根据采样周期进行计算
- def get_modify_data(self):
- IotDbClient(communicationConfig.IOTDB_HOST, port=communicationConfig.IOTDB_PORT,
- username=communicationConfig.IOTDB_USER, password=communicationConfig.IOTDB_PASSWD)
- org_id = self.chiller_outer['orgId']
- controller_id = self.chiller_outer['controllerId']
- chiller_ids = self.chiller_outer['allChillersInfo']['chillerId']
- power_rates = self.chiller_outer['allChillersInfo']['powerRated']
- terminal_ids = list(self.data_excess['ahuMeterId'])
- trigger_time_stamp = pd.to_datetime(self.chiller_outer['triggerTime']).timestamp()
- self.modify['terminalData'] = get_terminal_modify_data(org_id, terminal_ids, trigger_time_stamp)
- self.modify['chillerData'] = get_chiller_modify_data(org_id, chiller_ids, power_rates, trigger_time_stamp)
- self.modify['stationData'] = get_station_modify_data(org_id, controller_id, trigger_time_stamp)
- def data_judge(self):
- self.get_modify_data()
- is_success = 1
- if not self.modify.get('stationData'): # 冷站数据为{}
- is_success = 0
- for terminal in self.modify.get('terminalData', {}).values(): # 监测点数据为{}
- if not terminal:
- is_success = 0
- if self.config["mainTempMode"] == 0:
- for chiller in self.modify.get('chillerData', {}).values(): # 监测点数据为{}
- if not chiller:
- is_success = 0
- if is_success:
- all_chillers = self.chiller_outer["allChillersInfo"]
- # 使用zip组合两个列表并创建字典
- control_mode = {cid: mode
- for cid, mode in zip(all_chillers["chillerId"], all_chillers["waterTempControlMode"])}
- capacity_rated = {cid: mode
- for cid, mode in zip(all_chillers["chillerId"], all_chillers["capacityRated"])}
- for chiller_id, data in self.modify['chillerData'].items():
- if chiller_id in control_mode:
- data["waterTempControlMode"] = control_mode[chiller_id]
- data["capacityRated"] = capacity_rated[chiller_id]
- else:
- is_success = 0
- return is_success
- def transform_dict_to_dataformat(self, station_data, sensor_data, chiller_data):
- """数据格式的转化"""
- # chiller_mode = self.config['runMode']
- time_stamp = station_data['dateTime']
- # 温湿度数据合并
- sensor_data_list = []
- for key in sensor_data.keys():
- sensor_data_list.append(pd.DataFrame(sensor_data[key], index=time_stamp))
- # temp_humi_sensor_id = [int(key) for key in sensor_data.keys()]
- temp_humi_sensor_id = [key for key in sensor_data.keys()]
- sensor_data_df = pd.concat(sensor_data_list, keys=temp_humi_sensor_id, axis=1)
- # sensor_data_df.index = pd.to_datetime([int(i + 28800000) for i in sensor_data_df.index], unit='ms')
- # 通过配置判断总管温度是直接读取还是加权计算-------------------------------------------------------------
- if self.config["mainTempMode"] == 1: # 直接读取总管温度
- water_temp_df = self.main_water_temp(station_data)
- else: # 计算加权水温
- # 通过负载率删选冷机数据,并将所有冷机数据合并
- chiller_data_list = []
- for key in chiller_data.keys():
- val = pd.DataFrame(chiller_data[key], index=time_stamp)
- val.loc[(val['loadRate'] <= 10) | (val['chillerPowerRatio'] <= 10), ['loadRate', 'chillerPowerRatio']] = 0
- val.fillna(0, inplace=True)
- chiller_data_list.append(val)
- # chiller_id_list = [int(key) for key in chiller_data.keys()]
- chiller_id_list = [key for key in chiller_data.keys()]
- chiller_data_df = pd.concat(chiller_data_list, keys=chiller_id_list, axis=1)
- water_temp_df = self.cal_weighted_water_temp(chiller_data_df, chiller_id_list)
- # water_temp_df.index = pd.to_datetime([int(i + 28800000) for i in chiller_data_df.index], unit='ms')
- return sensor_data_df, water_temp_df
- def main_water_temp(self, station_data):
- """直接读取总管温度"""
- weighted_water_temp = pd.DataFrame(index=station_data['dateTime'], columns=['weighted_water_temp', 'init_water_temp'])
- weighted_water_temp['init_water_temp'] = self.config["chillerWaterTempSetInitial"]
- if set(self.config['allChillerControlSelect']) == {0}: # 均为供水控制模式
- if self.chiller_mode == 0:
- weighted_water_temp['weighted_water_temp'] = station_data["chilledWaterMainTempOut"]
- elif self.chiller_mode == 1:
- weighted_water_temp['weighted_water_temp'] = station_data["coolingWaterMainTempOut"]
- else:
- ValueError('chiller mode input error!')
- elif set(self.config['allChillerControlSelect']) == {1}: # 均为回水控制模式
- if self.chiller_mode == 0:
- weighted_water_temp['weighted_water_temp'] = station_data["chilledWaterMainTempIn"]
- elif self.chiller_mode == 1:
- weighted_water_temp['weighted_water_temp'] = station_data["coolingWaterMainTempIn"]
- else:
- ValueError('chiller mode input error!')
- else:
- weighted_water_temp = pd.DataFrame(columns=['weighted_water_temp', 'init_water_temp'])
- return weighted_water_temp
- def cal_weighted_water_temp(self, chiller_data, chiller_id_list):
- """计算加权水温"""
- water_temp_control_mode_index = []
- # water_temp_set_initial_index, water_temp_in_set_initial_index = [], []
- if self.chiller_mode == 0:
- for chiller_id in chiller_id_list:
- water_temp_control_mode_index.append((chiller_id, 'waterTempControlMode'))
- # water_temp_set_initial_index.append((chiller_id, 'chillerWaterTempSetInitial'))
- # water_temp_in_set_initial_index.append((chiller_id, 'chillerWaterTempInSetInitial'))
- elif self.chiller_mode == 1:
- for chiller_id in chiller_id_list:
- water_temp_control_mode_index.append((chiller_id, 'waterTempControlMode'))
- # water_temp_set_initial_index.append((chiller_id, 'heatingWaterTempSetInitial'))
- # water_temp_in_set_initial_index.append((chiller_id, 'heatingWaterTempInSetInitial'))
- #################制热的水温和之前表示不大一样,到底是热水还是冷却水?后期保持一致#########################
- else:
- ValueError('chiller mode input error!')
- # 筛选出相同水温控制模式、相同初始出水温度设定值和相同初始进水温度设定值的数据集,同一时间戳下各冷机的对应参数唯一则代表相同
- chiller_data = chiller_data[chiller_data[water_temp_control_mode_index].nunique(axis=1) == 1]
- # chiller_data = chiller_data[chiller_data[water_temp_set_initial_index].nunique(axis=1) == 1]#这里好像无必要
- # chiller_data = chiller_data[chiller_data[water_temp_in_set_initial_index].nunique(axis=1) == 1]#这里好像无必要
- chiller_data_lwtcm = chiller_data[chiller_data[water_temp_control_mode_index[0]] == 0] # 控制模式为供水
- chiller_data_rwtcm = chiller_data[chiller_data[water_temp_control_mode_index[0]] == 1] # 控制模式为回水
- is_cal = 0 if chiller_data.empty else 1
- if is_cal:
- weighted_water_temp_lwtcm = pd.DataFrame(columns=['weighted_water_temp', 'init_water_temp'])
- weighted_water_temp_rwtcm = pd.DataFrame(columns=['weighted_water_temp', 'init_water_temp'])
- if not chiller_data_lwtcm.empty:
- chiller_data_lwtcm['denominator'], chiller_data_lwtcm['numerator'] = 0.01, 0
- for chiller_id in chiller_id_list:
- chiller_data_lwtcm['denominator'] += chiller_data_lwtcm[(chiller_id, 'chillerPowerRatio')] * \
- chiller_data_lwtcm[(chiller_id, 'capacityRated')].astype("float")
- if self.chiller_mode == 0:
- chiller_data_lwtcm['numerator'] += chiller_data_lwtcm[(chiller_id, 'chillerWaterTempOut')] * \
- chiller_data_lwtcm[(chiller_id, 'chillerPowerRatio')] * \
- chiller_data_lwtcm[(chiller_id, 'capacityRated')].astype("float")
- elif self.chiller_mode == 1:
- chiller_data_lwtcm['numerator'] += chiller_data_lwtcm[(chiller_id, 'heatingWaterTempOut')] * \
- chiller_data_lwtcm[(chiller_id, 'chillerPowerRatio')] * \
- chiller_data_lwtcm[(chiller_id, 'capacityRated')].astype("float")
- else:
- ValueError('chiller mode input error!')
- weighted_water_temp_lwtcm['weighted_water_temp'] = \
- round(chiller_data_lwtcm['numerator'] / chiller_data_lwtcm['denominator'], 2)
- # weighted_water_temp_lwtcm['init_water_temp'] = chiller_data_lwtcm[water_temp_set_initial_index[0]]
- weighted_water_temp_lwtcm['init_water_temp'] = self.config['chillerWaterTempSetInitial']
- if not chiller_data_rwtcm.empty:
- chiller_data_rwtcm['denominator'], chiller_data_rwtcm['numerator'] = 0.01, 0
- for chiller_id in chiller_id_list:
- chiller_data_rwtcm['denominator'] += chiller_data_rwtcm[(chiller_id, 'chillerPowerRatio')] * \
- chiller_data_rwtcm[(chiller_id, 'capacityRated')].astype("float")
- if self.chiller_mode == 0:
- chiller_data_rwtcm['numerator'] += chiller_data_rwtcm[(chiller_id, 'chillerWaterTempIn')] * \
- chiller_data_rwtcm[(chiller_id, 'chillerPowerRatio')] * \
- chiller_data_rwtcm[(chiller_id, 'capacityRated')].astype("float")
- elif self.chiller_mode == 1:
- chiller_data_rwtcm['numerator'] += chiller_data_rwtcm[(chiller_id, 'heatingWaterTempIn')] * \
- chiller_data_rwtcm[(chiller_id, 'chillerPowerRatio')] * \
- chiller_data_rwtcm[(chiller_id, 'capacityRated')].astype("float")
- else:
- ValueError('chiller mode input error!')
- weighted_water_temp_rwtcm['weighted_water_temp'] = \
- round(chiller_data_rwtcm['numerator'] / chiller_data_rwtcm['denominator'], 2)
- # weighted_water_temp_rwtcm['init_water_temp'] = chiller_data_rwtcm[water_temp_in_set_initial_index[0]]
- weighted_water_temp_rwtcm['init_water_temp'] = self.config['chillerWaterTempInSetInitial']
- weighted_water_temp = pd.concat([weighted_water_temp_lwtcm, weighted_water_temp_rwtcm])
- weighted_water_temp = weighted_water_temp.sort_index(axis=0)
- else:
- weighted_water_temp = pd.DataFrame(columns=['weighted_water_temp', 'init_water_temp'])
- return weighted_water_temp
- def modify_air_temp_humi(self, sensor_id_list, sensor_data, water_temp):
- threshold = ModifyTempHumiPara['threshold'] # 修正阈值
- quantile = ModifyTempHumiPara['quantile'] # 修正分位数
- modified_air_temp_dic = {}
- modified_air_humi_dic = {}
- water_temp = water_temp[water_temp['weighted_water_temp'] != 0]
- column_names = sensor_data.columns.tolist() # [(id, 参数名),(id, 参数名), ...] # sensor_data
- for sensor_id in sensor_id_list:
- # su_sensor_id = config[config['returnAirMeterId'] == re_sensor_id].iloc[0]['supplyAirMeterId']
- # if ((re_sensor_id, 'temperature') in column_names) and ((su_sensor_id, 'temperature') in column_names):
- index_df = self.data_excess[self.data_excess['ahuMeterId'] == sensor_id].index.tolist()[0]
- if ((sensor_id, 'tempReal') in column_names) :
- # AHU_start = sensor_data[abs(sensor_data[(sensor_id, 'tempReal')] -
- # sensor_data[(sensor_id, 'tempSupply')]) > 2.0] # 筛选出开启的监测点
- AHU_start = sensor_data[sensor_data[(sensor_id, 'status')] == 1] # 筛选出开启的监测点
- dataset = pd.concat([AHU_start[(sensor_id, 'tempReal')], AHU_start[(sensor_id, 'humiReal')],
- water_temp['weighted_water_temp'], water_temp['init_water_temp']], sort=False,
- keys=['temp', 'humi', 'weighted_water_temp', 'init_water_temp'], axis=1)
- dataset = dataset.dropna()
- if self.chiller_mode == 0:
- upper_temp_limit = self.data_excess.loc[index_df, 'coolingTempUpper']
- upper_humi_limit = self.data_excess.loc[index_df, 'coolingHumiUpper']
- dataset = dataset[dataset['weighted_water_temp'] <= dataset['init_water_temp'] + 0.5]
- # 这里没有考虑其他两种步长的场景,如果要是考虑的话如何考虑呢,如站内冷机的步长不同的情况下
- dataset_temp = dataset[dataset['temp'] >= upper_temp_limit]
- dataset_humi = dataset[dataset['humi'] >= upper_humi_limit]
- if len(dataset_temp) > self.sampling_nums:
- temp_series = dataset_temp.quantile(q=quantile)
- modified_temp_limit = round(temp_series['temp'], 3) + threshold
- else:
- modified_temp_limit = upper_temp_limit
- if len(dataset_humi) > self.sampling_nums:
- humi_series = dataset_humi.quantile(q=quantile)
- modified_humi_limit = min(round(humi_series['humi'], 3) + 10 * threshold, 100)
- else:
- modified_humi_limit = upper_humi_limit
- elif self.chiller_mode == 1:
- # lower_temp_limit = config[config['returnAirMeterId'] == re_sensor_id].iloc[0]['heatingTempDown']
- # lower_temp_limit = sensor_data[(sensor_id, 'heatingTempDown')].iloc[0]
- lower_temp_limit = self.data_excess.loc[index_df, 'heatingTempDown']
- dataset = dataset[dataset['weighted_water_temp'] >= dataset['init_water_temp'] - 0.5]
- # 这里没有考虑其他两种步长的场景,如果要是考虑的话如何考虑呢,如站内冷机的步长不同的情况下
- dataset_temp = dataset[dataset['temp'] <= lower_temp_limit]
- if len(dataset_temp) > self.sampling_nums:
- temp_series = dataset_temp.quantile(q=1-quantile)
- modified_temp_limit = round(temp_series['temp'], 3) - threshold
- else:
- modified_temp_limit = lower_temp_limit
- modified_humi_limit = ''
- else:
- raise ValueError('chiller mode input error!')
- modified_air_temp_dic[sensor_id] = modified_temp_limit
- modified_air_humi_dic[sensor_id] = modified_humi_limit
- return modified_air_temp_dic, modified_air_humi_dic
- def get_modified_air_temp_humi(self):
- logger.critical("============温湿度超标点限值修正模块中关键参数:runMode:%s,threshold:%s,quantile:%s============" %
- (self.config['runMode'], ModifyTempHumiPara['threshold'], ModifyTempHumiPara['quantile']))
- sensor_data_df, chiller_water_temp_df = self.transform_dict_to_dataformat(self.modify['stationData'],
- self.modify['terminalData'],
- self.modify['chillerData'])
- excess_sensor_id = list(self.data_excess['ahuMeterId']) # 超标监测点数据的id列表
- modified_air_temp, modified_air_humi = self.modify_air_temp_humi(excess_sensor_id, sensor_data_df,
- chiller_water_temp_df)
- # re_sensor_id_list = station_data['config']['returnAirMeterId'] # 这里应该是所有监测点的数据
- modified_meter_id = list(modified_air_temp.keys()) # 有修正的监测点Id列表
- for sensor in excess_sensor_id:
- if sensor in modified_meter_id:
- index_df = self.data_excess[self.data_excess['ahuMeterId'] == sensor].index.tolist()[0]
- if self.chiller_mode == 0:
- self.data_excess.loc[index_df, 'coolingTempUpper'] = modified_air_temp[sensor]
- self.data_excess.loc[index_df, 'coolingHumiUpper'] = modified_air_humi[sensor]
- elif self.chiller_mode == 1:
- self.data_excess.loc[index_df, 'heatingTempDown'] = modified_air_temp[sensor]
- else:
- raise ValueError('chiller mode input error!')
- return self.data_excess
- # if __name__ == '__main__':
- # data_temp_humi = pd.DataFrame({
- # "ahuMeterId": ["3000638", "3000694", "3000654"],
- # "terminalName": ["监测点A", "监测点B", "监测点C"],
- # "coolingTempUpper": [25.0, 26.0, 24],
- # "coolingHumiUpper": [60.0, 65.0, 65],
- # "heatingTempDown": [18.0, 17.5, 18],
- # "tempReal": [24.5, 25.3, 25],
- # "humiReal": [50, 50, 50],
- # })
- #
- # station_data_ = {"chilledWaterMainTempOut": [7, 7, 7],
- # 'dateTime': ["2025-4-3 12:00:00", "2025-4-4 12:00:00", "2025-4-5 12:00:00"]}
- # sensor_data_ = {
- # "3000638": {"name": "sensor1", "tempReal": [10, 20, 30], "humiReal": [10, 20, 30], "tempSupply": [1, 2, 3]},
- # "3000694": {"name": "sensor2", "tempReal": [10, 20, 30], "humiReal": [10, 20, 30], "tempSupply": [1, 2, 3]},
- # "3000654": {"name": "sensor3", "tempReal": [10, 20, 30], "humiReal": [10, 20, 30], "tempSupply": [1, 2, 3]}}
- # # 冷机的历史7天数据
- # chiller_data_ = {"301": {"name": 1, "loadRate": [20, 20, 30], "chillerPowerRatio": [50, 50, 50],
- # "chillerWaterTempOut": [7, 7, 7],
- # 'waterTempControlMode': 0, 'capacityRated': 10},
- # "302": {"name": 1, "loadRate": [10, 20, 30], "chillerPowerRatio": [50, 50, 50],
- # "chillerWaterTempOut": [7, 7, 7],
- # 'waterTempControlMode': 0, 'capacityRated': 10}}
- # modify_data = {'terminalData': sensor_data_, 'chillerData': chiller_data_, 'stationData': station_data_}
- # config_info = {"handleService": 1, "controlMode": 1, "isHardwareControl": 1, "calPeriod": 3,
- # "minControlStep": 0.1, "chillerWaterTempSetInitial": 7, "chillerWaterTempSetUpper": 12,
- # "chillerWaterTempSetLower": 7, "energyMode": 1, "controlBasis": 0, "tempMargin": 0.2,
- # "humiMargin": 2, "mainTempMode": 0, "samplePeriod": 60, "upTempControlPeriod": 10,
- # "downTempControlPeriod": 12, 'runMode': 0, 'allChillerControlSelect': [0, 0, 0]}
- # etm = ExcessTempModify(data_temp_humi, config_info, modify_data)
- # data = etm.get_modified_air_temp_humi()
- # print(data)
|