فهرست منبع

调试调研链路

heshixin 1 ماه پیش
والد
کامیت
691bd0af02
1فایلهای تغییر یافته به همراه19 افزوده شده و 7 حذف شده
  1. 19 7
      rocketmq_util.py

+ 19 - 7
rocketmq_util.py

@@ -1,4 +1,4 @@
-import threading, time
+import threading, time,json
 from rocketmq.client import PushConsumer, ConsumeStatus, Producer, Message
 
 
@@ -17,8 +17,8 @@ pip install rocketmq-client-python -i http://mirrors.aliyun.com/pypi/simple --tr
 
 3. 创建rocketmq主题(如果主题不存在)
 rocketmq 控制台创建主题
-./mqadmin updateTopic -t alg_result -c DefaultCluster -n 192.168.1.70:9876
-./mqadmin updateTopic -t alg_request -c DefaultCluster -n 192.168.1.70:9876
+./mqadmin updateTopic -t alg-response -c DefaultCluster -n 192.168.1.70:9876
+./mqadmin updateTopic -t alg-request -c DefaultCluster -n 192.168.1.70:9876
 
 3. 初始化
 RocketClient(name_server_address='http://192.168.1.70:8976')
@@ -28,8 +28,8 @@ RocketClient.send_sync  发送消息
 
 
 """
-ALG_REQ_TOPIC = 'alg_request'  # 算法触发主题
-ALG_RESP_TOPIC = 'alg_result'  # 算法运算结果主题
+ALG_REQ_TOPIC = 'alg-request'  # 算法触发主题
+ALG_RESP_TOPIC = 'alg-response'  # 算法运算结果主题
 
 
 class RocketClient:
@@ -56,7 +56,7 @@ class RocketClient:
                 print('start producer')
 
     @classmethod
-    def start_consuming(cls, topic, on_message, consumer_group='alg_consumer_group', expression='*'):
+    def start_consuming(cls, topic, on_message, consumer_group='alg-req-consumer-group', expression='*'):
         """
         创建消费端
         :param topic: 监听的消息主题
@@ -119,6 +119,18 @@ if __name__ == '__main__':
     def on_msg(msg):
         try:
             print(f'msgId: {msg.id}, body: {msg.body.decode("utf-8")}')
+
+            # 回复运算结果
+            body = {
+                'userName':"test",
+                'coolingStationName':'一期冷站',
+                'deviceName':'1号主机',
+                'coolingCapacity': 1926.9,
+                'COP':6.35,
+                'coolingWaterFlow': 639.2,
+                'time': time.time()
+            }
+            RocketClient.send_sync(ALG_RESP_TOPIC,body=json.dumps(body))
             return ConsumeStatus.CONSUME_SUCCESS
         except Exception as e:
             print(e)
@@ -132,4 +144,4 @@ if __name__ == '__main__':
         if cmd == 'q':
             break
         # 发送算法触发消息
-        RocketClient.send_sync(ALG_REQ_TOPIC, body=cmd)
+        RocketClient.send_sync(ALG_RESP_TOPIC, body=cmd)