rabbitmq_link.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. from config.communication_config import *
  2. import pika
  3. class PikaMessage(object):
  4. def __init__(self, exchange_name, queue_name, routing_key):
  5. """
  6. 初始化参数:
  7. 用户名,密码,ip,端口,交换机,交换机类型,队列名称,路由key
  8. """
  9. self.username = USERNAME
  10. self.password = PASSWORD
  11. self.host = HOST
  12. self.port = PORT
  13. self.virtual_host = VIRTUAL_HOST_GET
  14. self.exchange = exchange_name
  15. self.exchange_type = EXCHANGE_TYPE
  16. self.queue_name = queue_name
  17. self.routing_key = routing_key
  18. self.credentials = pika.PlainCredentials(USERNAME, PASSWORD)
  19. self.chillertemp_delay_exchange = "chillertemp.calc.delay.exchange"
  20. self.chillertemp_delay_exchange_type = "x-delayed-message"
  21. self.chillertemp_delay_queue = "chillertemp.calc.delay.input"
  22. self.chillertemp_delay_key = "chillertemp.delay.input"
  23. def connect(self):
  24. """
  25. 建立连接
  26. :return: None
  27. """
  28. self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host,
  29. port=self.port,
  30. virtual_host=self.virtual_host,
  31. credentials=self.credentials))
  32. self.channel = self.connection.channel()
  33. def declare_exchange(self):
  34. """
  35. 声明交换机
  36. :return: None
  37. """
  38. self.channel.exchange_declare(exchange=self.exchange,
  39. exchange_type=self.exchange_type,
  40. durable=True)
  41. # self.channel.exchange_declare(exchange=self.chillertemp_delay_exchange,
  42. # exchange_type=self.chillertemp_delay_exchange_type,
  43. # arguments={"x-delayed-type": "direct"},
  44. # durable=True)
  45. def declare_queue(self):
  46. """
  47. 声明队列
  48. :return: None
  49. """
  50. self.channel.queue_declare(queue=self.queue_name, durable=True)
  51. self.channel.queue_declare(queue=self.chillertemp_delay_queue, durable=True)
  52. def bind_queue(self):
  53. """
  54. 将交换机下的队列名与路由key绑定起来
  55. :return: None
  56. """
  57. self.channel.queue_bind(exchange=self.exchange,
  58. queue=self.queue_name,
  59. routing_key=self.routing_key)
  60. # self.channel.queue_bind(exchange=self.chillertemp_delay_exchange,
  61. # queue=self.chillertemp_delay_queue,
  62. # routing_key=self.chillertemp_delay_key)
  63. def callback(self, ch, method, properties, body):
  64. """
  65. 将rabbitmq监听队列的消息发送到延迟队列
  66. :param ch:
  67. :param method:
  68. :param properties:
  69. :param body:
  70. :return:
  71. """
  72. # self.publish_msg_to_delay_queue(body)
  73. data_from_rabbitmq = body.decode()
  74. from chiller_temp_main import main
  75. main(data_from_rabbitmq)
  76. @staticmethod
  77. def callback_delay(ch, method, properties, body):
  78. data_from_rabbitmq = body.decode()
  79. from chiller_temp_main import main
  80. main(data_from_rabbitmq)
  81. def consume(self):
  82. """
  83. 消费消息
  84. :return: None
  85. """
  86. self.channel.basic_consume(self.queue_name, self.callback, True)
  87. # self.channel.basic_consume(self.chillertemp_delay_queue, self.callback_delay, True)
  88. self.channel.start_consuming()
  89. def connection_close(self):
  90. self.connection.close()
  91. def publish_msg_to_queue(self, body):
  92. """
  93. 发布消息
  94. :return: None
  95. """
  96. self.channel.basic_publish(exchange=self.exchange,
  97. routing_key=self.routing_key,
  98. body=body)
  99. def publish_msg_to_delay_queue(self, body):
  100. """
  101. 发布消息到延迟队列
  102. :return: None
  103. """
  104. self.channel.basic_publish(exchange=self.chillertemp_delay_exchange,
  105. routing_key=self.chillertemp_delay_key,
  106. properties=pika.BasicProperties(delivery_mode=2, headers={'x-delay': 90000}),
  107. body=body)
  108. def create_connect():
  109. """
  110. 创建连接,并且返回原始数据值
  111. :param queue_name: 队列的名称
  112. :param routing_key: 与队列绑定的路由key
  113. :return: 原始传输数据值
  114. """
  115. # 从这里开始exchange_name, queue_name, routing_key
  116. rabbitmq_connection_get = PikaMessage(EXCHANGE_GET, QUEUE_NAME_GET, ROUTING_KEY_GET) # 实例化
  117. rabbitmq_connection_get.connect() # 建立连接
  118. rabbitmq_connection_get.declare_exchange() # 声明交换机
  119. rabbitmq_connection_get.declare_queue() # 声明队列
  120. rabbitmq_connection_get.bind_queue() # 绑定队列
  121. rabbitmq_connection_get.consume() # 消费
  122. rabbitmq_connection_get.connection_close() # 关闭连接
  123. def send_data_to_rabbitmq(body):
  124. rabbitmq_connection_get = PikaMessage(EXCHANGE_SEND, QUEUE_NAME_SEND, ROUTING_KEY_SEND) # 实例化
  125. rabbitmq_connection_get.connect() # 建立连接
  126. rabbitmq_connection_get.declare_exchange() # 声明交换机
  127. rabbitmq_connection_get.publish_msg_to_queue(body)
  128. rabbitmq_connection_get.connection_close() # 关闭连接