import threading, time,json from rocketmq.client import PushConsumer, ConsumeStatus, Producer, Message # 官方示例 # https://github.com/apache/rocketmq-client-python/tree/master/samples """ 使用方法 : 仅限linux环境,目前支持基于Debian的发行版 1. 安装rocketmq-client-cpp(https://github.com/apache/rocketmq-client-python) wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0.amd64.deb dpkg -i rocketmq-client-cpp-2.0.0.amd64.deb 2. 安装 rocketmq-client-python pip install rocketmq-client-python -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com 3. 创建rocketmq主题(如果主题不存在) rocketmq 控制台创建主题 ./mqadmin updateTopic -t alg-response -c DefaultCluster -n 192.168.1.70:9876 ./mqadmin updateTopic -t alg-request -c DefaultCluster -n 192.168.1.70:9876 3. 初始化 RocketClient(name_server_address='http://192.168.1.70:8976') 后续在其他文件中直接调用RocketClient类方法,无需再次初始化 RocketClient.start_consuming 监听消息 RocketClient.send_sync 发送消息 """ ALG_REQ_TOPIC = 'alg-request' # 算法触发主题 ALG_RESP_TOPIC = 'alg-response' # 算法运算结果主题 class RocketClient: _instance = None _lock = threading.Lock() # 类级锁,确保线程安全 _producer = None # 单例生产者 _name_server_address = None # NameServer 地址 def __new__(cls, *args, **kwargs): """单例模式实现""" with cls._lock: if not cls._instance: cls._instance = super(RocketClient, cls).__new__(cls) return cls._instance def __init__(self, name_server_address, producer_group='alg_result_producer'): """初始化生产者(仅执行一次)""" with self._lock: if not self.__class__._producer: self.__class__._name_server_address = name_server_address self.__class__._producer = Producer(producer_group) self.__class__._producer.set_name_server_address(name_server_address) self.__class__._producer.start() print('start producer') @classmethod def start_consuming(cls, topic, on_message, consumer_group='alg-req-consumer-group', expression='*'): """ 创建消费端 :param topic: 监听的消息主题 :param on_message: 收到消息后的回调方法 :param consumer_group: 消费组 :param expression: 消息按tag\key过滤,暂不使用 :return: """ def consumer_thread(): consumer = PushConsumer(consumer_group) consumer.set_name_server_address(cls._name_server_address) consumer.subscribe(topic, on_message, expression=expression) consumer.start() print('listening message at {}'.format(topic)) # 消费端是守护进程,需要保证主线程持续运行否则收到消息后客户端会退出 while True: time.sleep(3) thread = threading.Thread(target=consumer_thread, daemon=True) thread.start() @classmethod def send_msg_sync(cls, msg: Message): """ 发送消息 :param cls: :param msg: :return: """ cls._producer.send_sync(msg) @classmethod def send_sync(cls, topic: str, body: str, tags=None, keys=None, ): """ 发送消息 :param cls: :param topic: :param tags: :param keys: :param body: :return: """ msg = Message(topic) if tags: msg.set_tags(tags) if keys: msg.set_keys(keys) msg.set_body(body.encode('utf-8')) cls._producer.send_sync(msg) print(f'[RocketClient] send msg to {topic}') if __name__ == '__main__': # 初始化 RocketClient(name_server_address='http://192.168.1.70:9876') # 收到消息后的回调方法 def on_msg(msg): try: print(f'msgId: {msg.id}, body: {msg.body.decode("utf-8")}') # 回复运算结果 body = { 'userName':"test", 'coolingStationName':'一期冷站', 'deviceName':'1号主机', 'coolingCapacity': 1926.9, 'COP':6.35, 'coolingWaterFlow': 639.2, 'time': time.time() } RocketClient.send_sync(ALG_RESP_TOPIC,body=json.dumps(body, ensure_ascii=False)) return ConsumeStatus.CONSUME_SUCCESS except Exception as e: print(e) return ConsumeStatus.RECONSUME_LATER # 监听算法触发消息 RocketClient.start_consuming(ALG_REQ_TOPIC, on_msg) while True: cmd = input('输入消息内容 (输入 q 退出): ') if cmd == 'q': break # 发送算法触发消息 RocketClient.send_sync(ALG_RESP_TOPIC, body=cmd)