heshixin před 1 týdnem
rodič
revize
0310022778
6 změnil soubory, kde provedl 232 přidání a 3 odebrání
  1. 9 1
      README.MD
  2. 190 0
      communication/rocketmq_link.py
  3. 8 1
      config/communicationConfig.py
  4. 20 1
      config/config.py
  5. 3 0
      main.py
  6. 2 0
      startup.sh

+ 9 - 1
README.MD

@@ -18,7 +18,15 @@
 1. 检查服务器pyhon环境需要高于3.10(python3、python3-venv)
 2. mkdir /opt/alg && python3 -m venv alg_env
 3. source alg/env/bin/activate
-
+4. 安装第三方库
+5. 启动服务
+    ```shell
+    bash starup.sh
+    ```
+6. 日志文件
+   ```shell
+   /var/log/cop.log
+   ```
 
 # 阿里云第三方库下载加速
 ```shell

+ 190 - 0
communication/rocketmq_link.py

@@ -0,0 +1,190 @@
+import threading, time,json
+from logs.logger import *
+from config import communicationConfig
+from rocketmq.client import PushConsumer, ConsumeStatus, Producer, Message, ReceivedMessage
+
+# 官方示例
+# https://github.com/apache/rocketmq-client-python/tree/master/samples
+
+
+"""
+使用方法 : 仅限linux环境,目前支持基于Debian的发行版
+1. 安装rocketmq-client-cpp(https://github.com/apache/rocketmq-client-python)
+wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0.amd64.deb
+dpkg -i rocketmq-client-cpp-2.0.0.amd64.deb
+
+2. 安装 rocketmq-client-python
+pip install rocketmq-client-python -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com
+
+3. 创建rocketmq主题(如果主题不存在)
+rocketmq 控制台创建主题
+./mqadmin updateTopic -t alg-response -c DefaultCluster -n 192.168.1.70:9876
+./mqadmin updateTopic -t alg-request -c DefaultCluster -n 192.168.1.70:9876
+
+3. 初始化
+RocketClient(name_server_address='http://192.168.1.70:8976')
+后续在其他文件中直接调用RocketClient类方法,无需再次初始化
+RocketClient.start_consuming 监听消息
+RocketClient.send_sync  发送消息
+
+
+"""
+ALG_REQ_TOPIC = communicationConfig.ROCKET_COP_IN_TOPIC  # 算法触发主题
+ALG_RESP_TOPIC = communicationConfig.ROCKET_COP_OUT_TOPIC  # 算法运算结果主题
+ALG_REQ_TAG = communicationConfig.ROCKET_COP_IN_TAG
+ALG_RESP_TAG = communicationConfig.ROCKET_COP_OUT_TAG
+
+class RocketClient:
+    _instance = None
+    _lock = threading.Lock()  # 类级锁,确保线程安全
+    _producer = None  # 单例生产者
+    _name_server_address = None  # NameServer 地址
+
+    def __new__(cls, *args, **kwargs):
+        """单例模式实现"""
+        with cls._lock:
+            if not cls._instance:
+                cls._instance = super(RocketClient, cls).__new__(cls)
+        return cls._instance
+
+    def __init__(self, name_server_address, producer_group='alg_cop_result_producer', access_key='', access_secret=''):
+        """初始化生产者(仅执行一次)"""
+        with self._lock:
+            if not self.__class__._producer:
+                self.__class__._name_server_address = name_server_address
+                self.__class__._producer = Producer(producer_group)
+                self.__class__._producer.set_name_server_address(name_server_address)
+                self.__class__._producer.set_session_credentials(access_key,access_secret,'')
+                self.__class__._producer.start()
+                logger.critical("========Rocket server {} ========".format(name_server_address))
+                logger.critical("========Rocket cop producer created ========")
+
+    @classmethod
+    def start_consuming(cls, topic, on_message, consumer_group='alg-req-consumer-group', expression='*'):
+        """
+        创建消费端
+        :param topic: 监听的消息主题
+        :param on_message: 收到消息后的回调方法
+        :param consumer_group: 消费组
+        :param expression: 消息按tag:key过滤,暂不使用
+        :return:
+        """
+        def consumer_thread():
+            consumer = PushConsumer(consumer_group)
+            consumer.set_name_server_address(cls._name_server_address)
+            consumer.subscribe(topic, on_message, expression=expression)
+            consumer.start()
+            logger.critical('listening message at {}'.format(topic))
+            # 消费端是守护进程,需要保证主线程持续运行否则收到消息后客户端会退出
+            while True:
+                time.sleep(3)
+
+        thread = threading.Thread(target=consumer_thread, daemon=True)
+        thread.start()
+
+    @classmethod
+    def start_consume(cls, topic, on_message, consumer_group='alg-req-consumer-group', expression='*',
+                      access_key='', access_secret=''):
+        consumer = PushConsumer(consumer_group)
+        consumer.set_name_server_address(cls._name_server_address)
+        consumer.set_session_credentials(access_key,access_secret,'')
+        consumer.subscribe(topic, on_message, expression=expression)
+        consumer.start()
+        logger.critical('listening message at {}:{}'.format(topic,expression))
+        # 保证主线程运行
+        while True:
+            time.sleep(3600)
+
+    @classmethod
+    def send_msg_sync(cls, msg: Message):
+        """
+        发送消息
+        :param cls:
+        :param msg:
+        :return:
+        """
+        cls._producer.send_sync(msg)
+
+    @classmethod
+    def send_sync(cls, topic: str, body: str, tags=None, keys=None, ):
+        """
+        发送消息
+        :param cls:
+        :param topic:
+        :param tags:
+        :param keys:
+        :param body:
+        :return:
+        """
+        msg = Message(topic)
+        if tags:
+            msg.set_tags(tags)
+        if keys:
+            msg.set_keys(keys)
+        msg.set_body(body.encode('utf-8'))
+        cls._producer.send_sync(msg)
+        logger.critical(f'[RocketClient] send msg to  {topic}')
+
+def on_rocket_msg(cop_msg:ReceivedMessage):
+    from main import main
+    try:
+        main(cop_msg.body.decode())
+        return ConsumeStatus.CONSUME_SUCCESS
+    except Exception as e:
+        return ConsumeStatus.RECONSUME_LATER
+
+
+def start_rocket():
+    RocketClient(name_server_address=communicationConfig.ROCKET_SERVER_ADDR,
+                 access_key=communicationConfig.ROCKET_ACCESS_KEY,
+                 access_secret=communicationConfig.ROCKET_ACCESS_SECRET)
+    # 监听算法触发消息
+    RocketClient.start_consume(ALG_REQ_TOPIC, on_rocket_msg, expression=ALG_REQ_TAG)
+
+def send_data_to_rocket(json_msg):
+    # 回复算法结果
+    RocketClient.send_sync(ALG_RESP_TOPIC, body=json_msg, tags=ALG_RESP_TAG)
+
+
+
+
+if __name__ == '__main__':
+    """
+    测试rocket通信使用
+    """
+    # 初始化
+    RocketClient(name_server_address='http://192.168.1.70:9876')
+    print("rocket 初始化")
+
+    # 收到消息后的回调方法
+    def on_msg(msg):
+        try:
+            print(f'msgId: {msg.id}, body: {msg.body.decode("utf-8")}')
+
+            # 回复运算结果
+            body = {
+                'userName':"test",
+                'coolingStationName':'一期冷站',
+                'deviceName':'1号主机',
+                'coolingCapacity': 1926.9,
+                'COP':6.35,
+                'coolingWaterFlow': 639.2,
+                'time': time.time()
+            }
+            RocketClient.send_sync(ALG_RESP_TOPIC,
+                                   body=json.dumps(body, ensure_ascii=False),
+                                   tags=ALG_RESP_TAG)
+            return ConsumeStatus.CONSUME_SUCCESS
+        except Exception as e:
+            print(e)
+            return ConsumeStatus.RECONSUME_LATER
+
+    # 监听算法触发消息
+    RocketClient.start_consuming(ALG_REQ_TOPIC, on_msg, expression=ALG_REQ_TAG)
+
+    while True:
+        cmd = input('输入消息内容 (输入 q 退出): ')
+        if cmd == 'q':
+            break
+        # 发送算法触发消息
+        RocketClient.send_sync(ALG_RESP_TOPIC, body=cmd, tags=ALG_RESP_TAG)

+ 8 - 1
config/communicationConfig.py

@@ -1,4 +1,3 @@
-
 #!/usr/bin/env python3
 # -*- coding: utf-8 -*-
 # 与rabbitmq通讯相关的参数设置
@@ -23,3 +22,11 @@ VIRTUAL_HOST_SEND = configs["rabbitmq_config"]["virtual_host"]  # 获取虚拟
 
 emEmbaic = mysql_configs["em_embasic"]
 emBsem = mysql_configs["em_bsem"]
+
+ROCKET_SERVER_ADDR = configs["rocketmq_config"]["host"]
+ROCKET_ACCESS_KEY = configs["rocketmq_config"]["access_key"]
+ROCKET_ACCESS_SECRET = configs["rocketmq_config"]["access_secret"]
+ROCKET_COP_IN_TOPIC = configs["rocketmq_config"]["topic_cop_in"]
+ROCKET_COP_OUT_TOPIC = configs["rocketmq_config"]["topic_cop_out"]
+ROCKET_COP_IN_TAG = configs["rocketmq_config"]["tag_cop_in"]
+ROCKET_COP_OUT_TAG = configs["rocketmq_config"]["tag_cop_out"]

+ 20 - 1
config/config.py

@@ -14,7 +14,26 @@ configs = {
                         "routing_key_get": os.getenv("ROUTING_KEY_GET", "cop.input"),
                         "exchange_send": os.getenv("EXCHANGE_SEND", "sem.server.exchange"),
                         "queue_name_send": os.getenv("QUEUE_NAME_SEND", "sem.server.queue.cop.output"),
-                        "routing_key_send": os.getenv("ROUTING_KEY_SEND", "cop.output")}}
+                        "routing_key_send": os.getenv("ROUTING_KEY_SEND", "cop.output")},
+
+    "rocketmq_config": {
+
+            "access_key": os.getenv("ROCKET_USER","test"),
+            "access_secret": os.getenv("ROCKET_PASSWD", "test"),
+            #"host": os.getenv("ROCKET_HOST_ADDR","http://192.168.1.70:9876"),
+            "host": os.getenv("ROCKET_HOST_ADDR","http://172.17.102.51:9876"),
+            # cop 触发主题
+            "topic_cop_in": os.getenv("TOPIC_COP_IN","alg-request"),
+            "tag_cop_in": os.getenv("TAG_COP_IN","alg-cop-req"),
+            # cop 响应主题
+            "topic_cop_out": os.getenv("TOPIC_COP_OUT","alg-response"),
+            "tag_cop_out": os.getenv("TAG_COP_OUT","alg-cop-resp"),
+
+    }
+
+
+    }
+
 
 
 # configs = {

+ 3 - 0
main.py

@@ -2,6 +2,7 @@ import json
 from logs.logger import *
 from calculation.calculation_process import CalculationProcess
 from communication.rabbitmq_link import create_connect, send_data_to_rabbitmq
+# from communication.rocketmq_link import start_rocket, send_data_to_rocket
 
 
 def main(dict_origin):
@@ -17,9 +18,11 @@ def main(dict_origin):
     dict_results = json.dumps(dict_results, ensure_ascii=False)
     logger.critical("Output: {}".format(dict_results))
     send_data_to_rabbitmq(dict_results)
+    # send_data_to_rocket(dict_results)
 
 
 if __name__ == '__main__':
     logger.critical("========Create connection for rabbitmq========")
     create_connect()
+    # start_rocket()
 

+ 2 - 0
startup.sh

@@ -0,0 +1,2 @@
+source /opt/alg/alg_env/bin/activate && \
+nohup python3 main.py  >> /var/log/cop.log  2>&1 &