rocketmq_link.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. import threading, time,json
  2. from logs.logger import *
  3. from config import communicationConfig
  4. from rocketmq.client import PushConsumer, ConsumeStatus, Producer, Message, ReceivedMessage
  5. # 官方示例
  6. # https://github.com/apache/rocketmq-client-python/tree/master/samples
  7. """
  8. 使用方法 : 仅限linux环境,目前支持基于Debian的发行版
  9. 1. 安装rocketmq-client-cpp(https://github.com/apache/rocketmq-client-python)
  10. wget https://github.com/apache/rocketmq-client-cpp/releases/download/2.0.0/rocketmq-client-cpp-2.0.0.amd64.deb
  11. dpkg -i rocketmq-client-cpp-2.0.0.amd64.deb
  12. 2. 安装 rocketmq-client-python
  13. pip install rocketmq-client-python -i http://mirrors.aliyun.com/pypi/simple --trusted-host mirrors.aliyun.com
  14. 3. 创建rocketmq主题(如果主题不存在)
  15. rocketmq 控制台创建主题
  16. ./mqadmin updateTopic -t alg-response -c DefaultCluster -n 192.168.1.70:9876
  17. ./mqadmin updateTopic -t alg-request -c DefaultCluster -n 192.168.1.70:9876
  18. 3. 初始化
  19. RocketClient(name_server_address='http://192.168.1.70:8976')
  20. 后续在其他文件中直接调用RocketClient类方法,无需再次初始化
  21. RocketClient.start_consuming 监听消息
  22. RocketClient.send_sync 发送消息
  23. """
  24. ALG_REQ_TOPIC = communicationConfig.ROCKET_COP_IN_TOPIC # 算法触发主题
  25. ALG_RESP_TOPIC = communicationConfig.ROCKET_COP_OUT_TOPIC # 算法运算结果主题
  26. ALG_REQ_TAG = communicationConfig.ROCKET_COP_IN_TAG
  27. ALG_RESP_TAG = communicationConfig.ROCKET_COP_OUT_TAG
  28. class RocketClient:
  29. _instance = None
  30. _lock = threading.Lock() # 类级锁,确保线程安全
  31. _producer = None # 单例生产者
  32. _name_server_address = None # NameServer 地址
  33. def __new__(cls, *args, **kwargs):
  34. """单例模式实现"""
  35. with cls._lock:
  36. if not cls._instance:
  37. cls._instance = super(RocketClient, cls).__new__(cls)
  38. return cls._instance
  39. def __init__(self, name_server_address, producer_group='alg_cop_result_producer', access_key='', access_secret=''):
  40. """初始化生产者(仅执行一次)"""
  41. with self._lock:
  42. if not self.__class__._producer:
  43. self.__class__._name_server_address = name_server_address
  44. self.__class__._producer = Producer(producer_group)
  45. self.__class__._producer.set_name_server_address(name_server_address)
  46. self.__class__._producer.set_session_credentials(access_key,access_secret,'')
  47. self.__class__._producer.start()
  48. logger.critical("========Rocket server {} ========".format(name_server_address))
  49. logger.critical("========Rocket cop producer created ========")
  50. @classmethod
  51. def start_consuming(cls, topic, on_message, consumer_group='alg-req-consumer-group', expression='*'):
  52. """
  53. 创建消费端
  54. :param topic: 监听的消息主题
  55. :param on_message: 收到消息后的回调方法
  56. :param consumer_group: 消费组
  57. :param expression: 消息按tag:key过滤,暂不使用
  58. :return:
  59. """
  60. def consumer_thread():
  61. consumer = PushConsumer(consumer_group)
  62. consumer.set_name_server_address(cls._name_server_address)
  63. consumer.subscribe(topic, on_message, expression=expression)
  64. consumer.start()
  65. logger.critical('listening message at {}'.format(topic))
  66. # 消费端是守护进程,需要保证主线程持续运行否则收到消息后客户端会退出
  67. while True:
  68. time.sleep(3)
  69. thread = threading.Thread(target=consumer_thread, daemon=True)
  70. thread.start()
  71. @classmethod
  72. def start_consume(cls, topic, on_message, consumer_group='alg-req-consumer-group', expression='*',
  73. access_key='', access_secret=''):
  74. consumer = PushConsumer(consumer_group)
  75. consumer.set_name_server_address(cls._name_server_address)
  76. consumer.set_session_credentials(access_key,access_secret,'')
  77. consumer.subscribe(topic, on_message, expression=expression)
  78. consumer.start()
  79. logger.critical('listening message at {}:{}'.format(topic,expression))
  80. # 保证主线程运行
  81. while True:
  82. time.sleep(3600)
  83. @classmethod
  84. def send_msg_sync(cls, msg: Message):
  85. """
  86. 发送消息
  87. :param cls:
  88. :param msg:
  89. :return:
  90. """
  91. cls._producer.send_sync(msg)
  92. @classmethod
  93. def send_sync(cls, topic: str, body: str, tags=None, keys=None, ):
  94. """
  95. 发送消息
  96. :param cls:
  97. :param topic:
  98. :param tags:
  99. :param keys:
  100. :param body:
  101. :return:
  102. """
  103. msg = Message(topic)
  104. if tags:
  105. msg.set_tags(tags)
  106. if keys:
  107. msg.set_keys(keys)
  108. msg.set_body(body.encode('utf-8'))
  109. cls._producer.send_sync(msg)
  110. logger.critical(f'[RocketClient] send msg to {topic}')
  111. def on_rocket_msg(cop_msg:ReceivedMessage):
  112. from main import main
  113. try:
  114. main(cop_msg.body.decode())
  115. return ConsumeStatus.CONSUME_SUCCESS
  116. except Exception as e:
  117. return ConsumeStatus.RECONSUME_LATER
  118. def start_rocket():
  119. RocketClient(name_server_address=communicationConfig.ROCKET_SERVER_ADDR,
  120. access_key=communicationConfig.ROCKET_ACCESS_KEY,
  121. access_secret=communicationConfig.ROCKET_ACCESS_SECRET)
  122. # 监听算法触发消息
  123. RocketClient.start_consume(ALG_REQ_TOPIC, on_rocket_msg, expression=ALG_REQ_TAG)
  124. def send_data_to_rocket(json_msg):
  125. # 回复算法结果
  126. RocketClient.send_sync(ALG_RESP_TOPIC, body=json_msg, tags=ALG_RESP_TAG)
  127. if __name__ == '__main__':
  128. """
  129. 测试rocket通信使用
  130. """
  131. # 初始化
  132. RocketClient(name_server_address='http://192.168.1.70:9876')
  133. print("rocket 初始化")
  134. # 收到消息后的回调方法
  135. def on_msg(msg):
  136. try:
  137. print(f'msgId: {msg.id}, body: {msg.body.decode("utf-8")}')
  138. # 回复运算结果
  139. body = {
  140. 'userName':"test",
  141. 'coolingStationName':'一期冷站',
  142. 'deviceName':'1号主机',
  143. 'coolingCapacity': 1926.9,
  144. 'COP':6.35,
  145. 'coolingWaterFlow': 639.2,
  146. 'time': time.time()
  147. }
  148. RocketClient.send_sync(ALG_RESP_TOPIC,
  149. body=json.dumps(body, ensure_ascii=False),
  150. tags=ALG_RESP_TAG)
  151. return ConsumeStatus.CONSUME_SUCCESS
  152. except Exception as e:
  153. print(e)
  154. return ConsumeStatus.RECONSUME_LATER
  155. # 监听算法触发消息
  156. RocketClient.start_consuming(ALG_REQ_TOPIC, on_msg, expression=ALG_REQ_TAG)
  157. while True:
  158. cmd = input('输入消息内容 (输入 q 退出): ')
  159. if cmd == 'q':
  160. break
  161. # 发送算法触发消息
  162. RocketClient.send_sync(ALG_RESP_TOPIC, body=cmd, tags=ALG_RESP_TAG)