123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- import threading, time,json
- from rocketmq.client import PushConsumer, ConsumeStatus, Producer, Message
- # 官方示例
- # https://github.com/apache/rocketmq-client-python/tree/master/samples
- """
- 使用方法 : 仅限linux环境,目前支持基于Debian的发行版
- 1. 安装rocketmq-client-cpp(https://github.com/apache/rocketmq-client-python)
- wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0.amd64.deb
- dpkg -i rocketmq-client-cpp-2.0.0.amd64.deb
- 2. 安装 rocketmq-client-python
- pip install rocketmq-client-python -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com
- 3. 创建rocketmq主题(如果主题不存在)
- rocketmq 控制台创建主题
- ./mqadmin updateTopic -t alg-response -c DefaultCluster -n 192.168.1.70:9876
- ./mqadmin updateTopic -t alg-request -c DefaultCluster -n 192.168.1.70:9876
- 3. 初始化
- RocketClient(name_server_address='http://192.168.1.70:8976')
- 后续在其他文件中直接调用RocketClient类方法,无需再次初始化
- RocketClient.start_consuming 监听消息
- RocketClient.send_sync 发送消息
- """
- ALG_REQ_TOPIC = 'alg-request' # 算法触发主题
- ALG_RESP_TOPIC = 'alg-response' # 算法运算结果主题
- class RocketClient:
- _instance = None
- _lock = threading.Lock() # 类级锁,确保线程安全
- _producer = None # 单例生产者
- _name_server_address = None # NameServer 地址
- def __new__(cls, *args, **kwargs):
- """单例模式实现"""
- with cls._lock:
- if not cls._instance:
- cls._instance = super(RocketClient, cls).__new__(cls)
- return cls._instance
- def __init__(self, name_server_address, producer_group='alg_result_producer'):
- """初始化生产者(仅执行一次)"""
- with self._lock:
- if not self.__class__._producer:
- self.__class__._name_server_address = name_server_address
- self.__class__._producer = Producer(producer_group)
- self.__class__._producer.set_name_server_address(name_server_address)
- self.__class__._producer.start()
- print('start producer')
- @classmethod
- def start_consuming(cls, topic, on_message, consumer_group='alg-req-consumer-group', expression='*'):
- """
- 创建消费端
- :param topic: 监听的消息主题
- :param on_message: 收到消息后的回调方法
- :param consumer_group: 消费组
- :param expression: 消息按tag\key过滤,暂不使用
- :return:
- """
- def consumer_thread():
- consumer = PushConsumer(consumer_group)
- consumer.set_name_server_address(cls._name_server_address)
- consumer.subscribe(topic, on_message, expression=expression)
- consumer.start()
- print('listening message at {}'.format(topic))
- # 消费端是守护进程,需要保证主线程持续运行否则收到消息后客户端会退出
- while True:
- time.sleep(3)
- thread = threading.Thread(target=consumer_thread, daemon=True)
- thread.start()
- @classmethod
- def send_msg_sync(cls, msg: Message):
- """
- 发送消息
- :param cls:
- :param msg:
- :return:
- """
- cls._producer.send_sync(msg)
- @classmethod
- def send_sync(cls, topic: str, body: str, tags=None, keys=None, ):
- """
- 发送消息
- :param cls:
- :param topic:
- :param tags:
- :param keys:
- :param body:
- :return:
- """
- msg = Message(topic)
- if tags:
- msg.set_tags(tags)
- if keys:
- msg.set_keys(keys)
- msg.set_body(body.encode('utf-8'))
- cls._producer.send_sync(msg)
- print(f'[RocketClient] send msg to {topic}')
- if __name__ == '__main__':
- # 初始化
- RocketClient(name_server_address='http://192.168.1.70:9876')
- # 收到消息后的回调方法
- def on_msg(msg):
- try:
- print(f'msgId: {msg.id}, body: {msg.body.decode("utf-8")}')
- # 回复运算结果
- body = {
- 'userName':"test",
- 'coolingStationName':'一期冷站',
- 'deviceName':'1号主机',
- 'coolingCapacity': 1926.9,
- 'COP':6.35,
- 'coolingWaterFlow': 639.2,
- 'time': time.time()
- }
- RocketClient.send_sync(ALG_RESP_TOPIC,body=json.dumps(body))
- return ConsumeStatus.CONSUME_SUCCESS
- except Exception as e:
- print(e)
- return ConsumeStatus.RECONSUME_LATER
- # 监听算法触发消息
- RocketClient.start_consuming(ALG_REQ_TOPIC, on_msg)
- while True:
- cmd = input('输入消息内容 (输入 q 退出): ')
- if cmd == 'q':
- break
- # 发送算法触发消息
- RocketClient.send_sync(ALG_RESP_TOPIC, body=cmd)
|