# ==========算法中涉及的相关变量========== # EnergyMode = {'极限节能': 1, '标准节能': 2, '安全节能': 3} # 控制模式决定温度限制值,先写死,后续作为产品配置参数 # 在进行温湿度计算时,按照以下温度和湿度范围进行筛选,避免异常数据导致计算发生错误 TerminalTempRange = {'lowerLimit': 0, 'upperLimit': 50} TerminalHumiRange = {'lowerLimit': 0, 'upperLimit': 100} # 在进行温湿度限值修正时,用到以下阈值,先写死,后续视情况优化 ModifyTempHumiPara = {'threshold': 0.5, 'quantile': 0.75} # 分别代表阈值和分位数 # 根据监测点情况进行水温计算时,用到以下系数,分别代表温度超标、湿度超标、露点超标、温度不超标、湿度不超标 Coefficient = {'outTemp': 1, 'outHumi': 0.1, 'outDewPoint': 1.25, 'inTemp': 0.5, 'inHumi': 0.05} # 温湿度超标情况判断结果 ExcessDescription = {'0': '未超标', '1': '温度超标', '2': '露点超标', '3': '湿度超标'} ExpectedStructure = { "terminalInfo": ["ahuMeterId", "ahuName", "ahuStartTime", "coolingTempUpper", "coolingHumiUpper", "tempReal", "humiReal"], "controlConfigInfo": ["handleService", "controlMode", "isHardwareControl", "calPeriod", "minControlStep", "chillerWaterTempSetInitial", "chillerWaterTempSetUpper", "chillerWaterTempSetLower", "energyMode", "controlBasis", "tempMargin", "humiMargin", "mainTempMode", "samplePeriod", "upTempControlPeriod", "downTempControlPeriod"], "chillerConfigInfo": ["waterTempControlMode", "isContinuousRun", "coolingShutdownOffset", "coolingRestartOffset", "safetyPreDiffLowerLimit", "safetyLoadRatioLowerLimit", "chillerTempStep"], "chillerSafetyPara": ["circuitNumber", "circuitParameter"], "stationInfo": ["userName", "stationName", "stationId", "runMode", "triggerTime", "chilledWaterMainTempOut", "chilledWaterMainTempIn", "allChillersInfo"], "chillerInfo": ["chillerId", "chillerName", "capacityRated", "refriType", "chillerWaterTempSet", "chillerOffTimeLatest", "offSetTempLatestCooling", "chillerWaterTempOut", "chillerWaterTempIn", "runStatus", "chillerPowerRatio", "loadRate", "upControlTimeLatest", "downControlTimeLatest"], } # def check_data_integrity(dic_input): # # 定义预期的数据结构 # missing_fields = [] # # # 检查顶层字段缺失 # for top_key in ExpectedStructure: # if top_key not in dic_input: # missing_fields.append(top_key) # # # 检查子字段缺失 # # for top_key in ExpectedStructure: # if top_key in dic_input: # current_data = dic_input[top_key] # if isinstance(current_data, dict): # expected_subkeys = ExpectedStructure[top_key] # for sub_key in expected_subkeys: # if sub_key not in current_data: # missing_fields.append(f"{top_key}.{sub_key}") # # # 检查空值的递归函数 # def find_empty_fields(data, parent_path='', empty_fields=None): # if empty_fields is None: # empty_fields = set() # # if isinstance(data, dict): # for key, value in data.items(): # new_path = f"{parent_path}.{key}" if parent_path else key # # 检查当前值是否为空 # if value is None: # empty_fields.add(new_path) # elif isinstance(value, str) and value.strip() == '': # empty_fields.add(new_path) # elif isinstance(value, (list, dict)) and not value: # empty_fields.add(new_path) # # 递归检查子元素 # find_empty_fields(value, new_path, empty_fields) # elif isinstance(data, list): # if not data: # 空列表 # empty_fields.add(parent_path) # else: # for index, item in enumerate(data): # new_path = f"{parent_path}[{index}]" # # 检查当前元素是否为空 # if item is None: # empty_fields.add(new_path) # elif isinstance(item, str) and item.strip() == '': # empty_fields.add(new_path) # elif isinstance(item, (list, dict)) and not item: # empty_fields.add(new_path) # # 递归检查子元素 # find_empty_fields(item, new_path, empty_fields) # else: # # 检查字符串是否为空 # if isinstance(data, str) and data.strip() == '': # empty_fields.add(parent_path) # return empty_fields # # # 查找所有空值路径 # empty_paths = find_empty_fields(dic_input) # # return missing_fields, empty_paths # # dic_input = {} # # # 执行检查 # missing_fields, empty_paths = check_data_integrity(dic_input) # # # 输出结果 # print("输入参数中缺失关键字段,退出水温智控算法计算") # print("输入参数中关键数据存在空值,退出水温智控算法计算") # print("缺失字段检查结果:") # if missing_fields: # print("缺失字段:") # print("\n".join(missing_fields)) # else: # print("无缺失字段") # # print("\n空值字段检查结果:") # if empty_paths: # print("存在空值的字段路径:") # print("\n".join(sorted(empty_paths))) # else: # print("无空值字段")