Sfoglia il codice sorgente

上传文件至 ''

rocketmq 操作工具
heshxin 1 mese fa
parent
commit
eaedbd3a48
1 ha cambiato i file con 135 aggiunte e 0 eliminazioni
  1. 135 0
      rocketmq_util.py

+ 135 - 0
rocketmq_util.py

@@ -0,0 +1,135 @@
+import threading, time
+from rocketmq.client import PushConsumer, ConsumeStatus, Producer, Message
+
+
+# 官方示例
+# https://github.com/apache/rocketmq-client-python/tree/master/samples
+
+
+"""
+使用方法 : 仅限linux环境,目前支持基于Debian的发行版
+1. 安装rocketmq-client-cpp(https://github.com/apache/rocketmq-client-python)
+wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0.amd64.deb
+dpkg -i rocketmq-client-cpp-2.0.0.amd64.deb
+
+2. 安装 rocketmq-client-python
+pip install rocketmq-client-python -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com
+
+3. 创建rocketmq主题(如果主题不存在)
+rocketmq 控制台创建主题
+./mqadmin updateTopic -t alg_result -c DefaultCluster -n 192.168.1.70:9876
+./mqadmin updateTopic -t alg_request -c DefaultCluster -n 192.168.1.70:9876
+
+3. 初始化
+RocketClient(name_server_address='http://192.168.1.70:8976')
+后续在其他文件中直接调用RocketClient类方法,无需再次初始化
+RocketClient.start_consuming 监听消息
+RocketClient.send_sync  发送消息
+
+
+"""
+ALG_REQ_TOPIC = 'alg_request'  # 算法触发主题
+ALG_RESP_TOPIC = 'alg_result'  # 算法运算结果主题
+
+
+class RocketClient:
+    _instance = None
+    _lock = threading.Lock()  # 类级锁,确保线程安全
+    _producer = None  # 单例生产者
+    _name_server_address = None  # NameServer 地址
+
+    def __new__(cls, *args, **kwargs):
+        """单例模式实现"""
+        with cls._lock:
+            if not cls._instance:
+                cls._instance = super(RocketClient, cls).__new__(cls)
+        return cls._instance
+
+    def __init__(self, name_server_address, producer_group='alg_result_producer'):
+        """初始化生产者(仅执行一次)"""
+        with self._lock:
+            if not self.__class__._producer:
+                self.__class__._name_server_address = name_server_address
+                self.__class__._producer = Producer(producer_group)
+                self.__class__._producer.set_name_server_address(name_server_address)
+                self.__class__._producer.start()
+                print('start producer')
+
+    @classmethod
+    def start_consuming(cls, topic, on_message, consumer_group='alg_consumer_group', expression='*'):
+        """
+        创建消费端
+        :param topic: 监听的消息主题
+        :param on_message: 收到消息后的回调方法
+        :param consumer_group: 消费组
+        :param expression: 消息按tag\key过滤,暂不使用
+        :return:
+        """
+
+        def consumer_thread():
+            consumer = PushConsumer(consumer_group)
+            consumer.set_name_server_address(cls._name_server_address)
+            consumer.subscribe(topic, on_message, expression=expression)
+            consumer.start()
+            print('listening message at {}'.format(topic))
+            # 消费端是守护进程,需要保证主线程持续运行否则收到消息后客户端会退出
+            while True:
+                time.sleep(3)
+
+        thread = threading.Thread(target=consumer_thread, daemon=True)
+        thread.start()
+
+    @classmethod
+    def send_msg_sync(cls, msg: Message):
+        """
+        发送消息
+        :param cls:
+        :param msg:
+        :return:
+        """
+        cls._producer.send_sync(msg)
+
+    @classmethod
+    def send_sync(cls, topic: str, body: str, tags=None, keys=None, ):
+        """
+        发送消息
+        :param cls:
+        :param topic:
+        :param tags:
+        :param keys:
+        :param body:
+        :return:
+        """
+        msg = Message(topic)
+        if tags:
+            msg.set_tags(tags)
+        if keys:
+            msg.set_keys(keys)
+        msg.set_body(body.encode('utf-8'))
+        cls._producer.send_sync(msg)
+        print(f'[RocketClient] send msg to  {topic}')
+
+
+if __name__ == '__main__':
+    # 初始化
+    RocketClient(name_server_address='http://192.168.1.70:9876')
+
+
+    # 收到消息后的回调方法
+    def on_msg(msg):
+        try:
+            print(f'msgId: {msg.id}, body: {msg.body.decode("utf-8")}')
+            return ConsumeStatus.CONSUME_SUCCESS
+        except Exception as e:
+            print(e)
+            return ConsumeStatus.RECONSUME_LATER
+
+    # 监听算法触发消息
+    RocketClient.start_consuming(ALG_REQ_TOPIC, on_msg)
+
+    while True:
+        cmd = input('输入消息内容 (输入 q 退出): ')
+        if cmd == 'q':
+            break
+        # 发送算法触发消息
+        RocketClient.send_sync(ALG_REQ_TOPIC, body=cmd)