rocketmq_util.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. import threading, time
  2. from rocketmq.client import PushConsumer, ConsumeStatus, Producer, Message
  3. # 官方示例
  4. # https://github.com/apache/rocketmq-client-python/tree/master/samples
  5. """
  6. 使用方法 : 仅限linux环境,目前支持基于Debian的发行版
  7. 1. 安装rocketmq-client-cpp(https://github.com/apache/rocketmq-client-python)
  8. wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0.amd64.deb
  9. dpkg -i rocketmq-client-cpp-2.0.0.amd64.deb
  10. 2. 安装 rocketmq-client-python
  11. pip install rocketmq-client-python -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com
  12. 3. 创建rocketmq主题(如果主题不存在)
  13. rocketmq 控制台创建主题
  14. ./mqadmin updateTopic -t alg_result -c DefaultCluster -n 192.168.1.70:9876
  15. ./mqadmin updateTopic -t alg_request -c DefaultCluster -n 192.168.1.70:9876
  16. 3. 初始化
  17. RocketClient(name_server_address='http://192.168.1.70:8976')
  18. 后续在其他文件中直接调用RocketClient类方法,无需再次初始化
  19. RocketClient.start_consuming 监听消息
  20. RocketClient.send_sync 发送消息
  21. """
  22. ALG_REQ_TOPIC = 'alg_request' # 算法触发主题
  23. ALG_RESP_TOPIC = 'alg_result' # 算法运算结果主题
  24. class RocketClient:
  25. _instance = None
  26. _lock = threading.Lock() # 类级锁,确保线程安全
  27. _producer = None # 单例生产者
  28. _name_server_address = None # NameServer 地址
  29. def __new__(cls, *args, **kwargs):
  30. """单例模式实现"""
  31. with cls._lock:
  32. if not cls._instance:
  33. cls._instance = super(RocketClient, cls).__new__(cls)
  34. return cls._instance
  35. def __init__(self, name_server_address, producer_group='alg_result_producer'):
  36. """初始化生产者(仅执行一次)"""
  37. with self._lock:
  38. if not self.__class__._producer:
  39. self.__class__._name_server_address = name_server_address
  40. self.__class__._producer = Producer(producer_group)
  41. self.__class__._producer.set_name_server_address(name_server_address)
  42. self.__class__._producer.start()
  43. print('start producer')
  44. @classmethod
  45. def start_consuming(cls, topic, on_message, consumer_group='alg_consumer_group', expression='*'):
  46. """
  47. 创建消费端
  48. :param topic: 监听的消息主题
  49. :param on_message: 收到消息后的回调方法
  50. :param consumer_group: 消费组
  51. :param expression: 消息按tag\key过滤,暂不使用
  52. :return:
  53. """
  54. def consumer_thread():
  55. consumer = PushConsumer(consumer_group)
  56. consumer.set_name_server_address(cls._name_server_address)
  57. consumer.subscribe(topic, on_message, expression=expression)
  58. consumer.start()
  59. print('listening message at {}'.format(topic))
  60. # 消费端是守护进程,需要保证主线程持续运行否则收到消息后客户端会退出
  61. while True:
  62. time.sleep(3)
  63. thread = threading.Thread(target=consumer_thread, daemon=True)
  64. thread.start()
  65. @classmethod
  66. def send_msg_sync(cls, msg: Message):
  67. """
  68. 发送消息
  69. :param cls:
  70. :param msg:
  71. :return:
  72. """
  73. cls._producer.send_sync(msg)
  74. @classmethod
  75. def send_sync(cls, topic: str, body: str, tags=None, keys=None, ):
  76. """
  77. 发送消息
  78. :param cls:
  79. :param topic:
  80. :param tags:
  81. :param keys:
  82. :param body:
  83. :return:
  84. """
  85. msg = Message(topic)
  86. if tags:
  87. msg.set_tags(tags)
  88. if keys:
  89. msg.set_keys(keys)
  90. msg.set_body(body.encode('utf-8'))
  91. cls._producer.send_sync(msg)
  92. print(f'[RocketClient] send msg to {topic}')
  93. if __name__ == '__main__':
  94. # 初始化
  95. RocketClient(name_server_address='http://192.168.1.70:9876')
  96. # 收到消息后的回调方法
  97. def on_msg(msg):
  98. try:
  99. print(f'msgId: {msg.id}, body: {msg.body.decode("utf-8")}')
  100. return ConsumeStatus.CONSUME_SUCCESS
  101. except Exception as e:
  102. print(e)
  103. return ConsumeStatus.RECONSUME_LATER
  104. # 监听算法触发消息
  105. RocketClient.start_consuming(ALG_REQ_TOPIC, on_msg)
  106. while True:
  107. cmd = input('输入消息内容 (输入 q 退出): ')
  108. if cmd == 'q':
  109. break
  110. # 发送算法触发消息
  111. RocketClient.send_sync(ALG_REQ_TOPIC, body=cmd)