|
@@ -0,0 +1,190 @@
|
|
|
+import threading, time,json
|
|
|
+from logs.logger import *
|
|
|
+from config import communication_config as communicationConfig
|
|
|
+from rocketmq.client import PushConsumer, ConsumeStatus, Producer, Message, ReceivedMessage
|
|
|
+
|
|
|
+# 官方示例
|
|
|
+# https://github.com/apache/rocketmq-client-python/tree/master/samples
|
|
|
+
|
|
|
+
|
|
|
+"""
|
|
|
+使用方法 : 仅限linux环境,目前支持基于Debian的发行版
|
|
|
+1. 安装rocketmq-client-cpp(https://github.com/apache/rocketmq-client-python)
|
|
|
+wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0.amd64.deb
|
|
|
+dpkg -i rocketmq-client-cpp-2.0.0.amd64.deb
|
|
|
+
|
|
|
+2. 安装 rocketmq-client-python
|
|
|
+pip install rocketmq-client-python==2.0.0 -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com
|
|
|
+
|
|
|
+3. 创建rocketmq主题(如果主题不存在)
|
|
|
+rocketmq 控制台创建主题
|
|
|
+./mqadmin updateTopic -t alg-response -c DefaultCluster -n 192.168.1.70:9876
|
|
|
+./mqadmin updateTopic -t alg-request -c DefaultCluster -n 192.168.1.70:9876
|
|
|
+
|
|
|
+3. 初始化
|
|
|
+RocketClient(name_server_address='http://192.168.1.70:8976')
|
|
|
+后续在其他文件中直接调用RocketClient类方法,无需再次初始化
|
|
|
+RocketClient.start_consuming 监听消息
|
|
|
+RocketClient.send_sync 发送消息
|
|
|
+
|
|
|
+
|
|
|
+"""
|
|
|
+ALG_REQ_TOPIC = communicationConfig.ROCKET_CHILLER_TEMP_IN_TOPIC # 算法触发主题
|
|
|
+ALG_RESP_TOPIC = communicationConfig.ROCKET_CHILLER_TEMP_OUT_TOPIC # 算法运算结果主题
|
|
|
+ALG_REQ_TAG = communicationConfig.ROCKET_CHILLER_TEMP_IN_TAG
|
|
|
+ALG_RESP_TAG = communicationConfig.ROCKET_CHILLER_TEMP_OUT_TAG
|
|
|
+
|
|
|
+class RocketClient:
|
|
|
+ _instance = None
|
|
|
+ _lock = threading.Lock() # 类级锁,确保线程安全
|
|
|
+ _producer = None # 单例生产者
|
|
|
+ _name_server_address = None # NameServer 地址
|
|
|
+
|
|
|
+ def __new__(cls, *args, **kwargs):
|
|
|
+ """单例模式实现"""
|
|
|
+ with cls._lock:
|
|
|
+ if not cls._instance:
|
|
|
+ cls._instance = super(RocketClient, cls).__new__(cls)
|
|
|
+ return cls._instance
|
|
|
+
|
|
|
+ def __init__(self, name_server_address, producer_group='alg_cop_result_producer', access_key='', access_secret=''):
|
|
|
+ """初始化生产者(仅执行一次)"""
|
|
|
+ with self._lock:
|
|
|
+ if not self.__class__._producer:
|
|
|
+ self.__class__._name_server_address = name_server_address
|
|
|
+ self.__class__._producer = Producer(producer_group)
|
|
|
+ self.__class__._producer.set_name_server_address(name_server_address)
|
|
|
+ self.__class__._producer.set_session_credentials(access_key,access_secret,'')
|
|
|
+ self.__class__._producer.start()
|
|
|
+ logger.critical("========Rocket server {} ========".format(name_server_address))
|
|
|
+ logger.critical("========Rocket cop producer created ========")
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def start_consuming(cls, topic, on_message, consumer_group='alg-req-consumer-group', expression='*'):
|
|
|
+ """
|
|
|
+ 创建消费端
|
|
|
+ :param topic: 监听的消息主题
|
|
|
+ :param on_message: 收到消息后的回调方法
|
|
|
+ :param consumer_group: 消费组
|
|
|
+ :param expression: 消息按tag:key过滤,暂不使用
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ def consumer_thread():
|
|
|
+ consumer = PushConsumer(consumer_group)
|
|
|
+ consumer.set_name_server_address(cls._name_server_address)
|
|
|
+ consumer.subscribe(topic, on_message, expression=expression)
|
|
|
+ consumer.start()
|
|
|
+ logger.critical('listening message at {}'.format(topic))
|
|
|
+ # 消费端是守护进程,需要保证主线程持续运行否则收到消息后客户端会退出
|
|
|
+ while True:
|
|
|
+ time.sleep(3)
|
|
|
+
|
|
|
+ thread = threading.Thread(target=consumer_thread, daemon=True)
|
|
|
+ thread.start()
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def start_consume(cls, topic, on_message, consumer_group='alg-req-consumer-group', expression='*',
|
|
|
+ access_key='', access_secret=''):
|
|
|
+ consumer = PushConsumer(consumer_group)
|
|
|
+ consumer.set_name_server_address(cls._name_server_address)
|
|
|
+ consumer.set_session_credentials(access_key,access_secret,'')
|
|
|
+ consumer.subscribe(topic, on_message, expression=expression)
|
|
|
+ consumer.start()
|
|
|
+ logger.critical('listening message at {}:{}'.format(topic,expression))
|
|
|
+ # 保证主线程运行
|
|
|
+ while True:
|
|
|
+ time.sleep(3600)
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def send_msg_sync(cls, msg: Message):
|
|
|
+ """
|
|
|
+ 发送消息
|
|
|
+ :param cls:
|
|
|
+ :param msg:
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ cls._producer.send_sync(msg)
|
|
|
+
|
|
|
+ @classmethod
|
|
|
+ def send_sync(cls, topic: str, body: str, tags=None, keys=None, ):
|
|
|
+ """
|
|
|
+ 发送消息
|
|
|
+ :param cls:
|
|
|
+ :param topic:
|
|
|
+ :param tags:
|
|
|
+ :param keys:
|
|
|
+ :param body:
|
|
|
+ :return:
|
|
|
+ """
|
|
|
+ msg = Message(topic)
|
|
|
+ if tags:
|
|
|
+ msg.set_tags(tags)
|
|
|
+ if keys:
|
|
|
+ msg.set_keys(keys)
|
|
|
+ msg.set_body(body.encode('utf-8'))
|
|
|
+ cls._producer.send_sync(msg)
|
|
|
+ logger.critical(f'[RocketClient] send msg to {topic}')
|
|
|
+
|
|
|
+def on_rocket_msg(msg:ReceivedMessage):
|
|
|
+ from main import main
|
|
|
+ try:
|
|
|
+ main(msg.body.decode())
|
|
|
+ return ConsumeStatus.CONSUME_SUCCESS
|
|
|
+ except Exception as e:
|
|
|
+ return ConsumeStatus.RECONSUME_LATER
|
|
|
+
|
|
|
+
|
|
|
+def start_rocket():
|
|
|
+ RocketClient(name_server_address=communicationConfig.ROCKET_SERVER_ADDR,
|
|
|
+ access_key=communicationConfig.ROCKET_ACCESS_KEY,
|
|
|
+ access_secret=communicationConfig.ROCKET_ACCESS_SECRET)
|
|
|
+ # 监听算法触发消息
|
|
|
+ RocketClient.start_consume(ALG_REQ_TOPIC, on_rocket_msg, expression=ALG_REQ_TAG)
|
|
|
+
|
|
|
+def send_data_to_rocket(json_msg):
|
|
|
+ # 回复算法结果
|
|
|
+ RocketClient.send_sync(ALG_RESP_TOPIC, body=json_msg, tags=ALG_RESP_TAG)
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+if __name__ == '__main__':
|
|
|
+ """
|
|
|
+ 测试rocket通信使用
|
|
|
+ """
|
|
|
+ # 初始化
|
|
|
+ RocketClient(name_server_address='http://192.168.1.70:9876')
|
|
|
+ print("rocket 初始化")
|
|
|
+
|
|
|
+ # 收到消息后的回调方法
|
|
|
+ def on_msg(msg):
|
|
|
+ try:
|
|
|
+ print(f'msgId: {msg.id}, body: {msg.body.decode("utf-8")}')
|
|
|
+
|
|
|
+ # 回复运算结果
|
|
|
+ body = {
|
|
|
+ 'userName':"test",
|
|
|
+ 'coolingStationName':'一期冷站',
|
|
|
+ 'deviceName':'1号主机',
|
|
|
+ 'coolingCapacity': 1926.9,
|
|
|
+ 'COP':6.35,
|
|
|
+ 'coolingWaterFlow': 639.2,
|
|
|
+ 'time': time.time()
|
|
|
+ }
|
|
|
+ RocketClient.send_sync(ALG_RESP_TOPIC,
|
|
|
+ body=json.dumps(body, ensure_ascii=False),
|
|
|
+ tags=ALG_RESP_TAG)
|
|
|
+ return ConsumeStatus.CONSUME_SUCCESS
|
|
|
+ except Exception as e:
|
|
|
+ print(e)
|
|
|
+ return ConsumeStatus.RECONSUME_LATER
|
|
|
+
|
|
|
+ # 监听算法触发消息
|
|
|
+ RocketClient.start_consuming(ALG_REQ_TOPIC, on_msg, expression=ALG_REQ_TAG)
|
|
|
+
|
|
|
+ while True:
|
|
|
+ cmd = input('输入消息内容 (输入 q 退出): ')
|
|
|
+ if cmd == 'q':
|
|
|
+ break
|
|
|
+ # 发送算法触发消息
|
|
|
+ RocketClient.send_sync(ALG_RESP_TOPIC, body=cmd, tags=ALG_RESP_TAG)
|