123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- from config.communication_config import *
- import pika
- class PikaMessage(object):
- def __init__(self, exchange_name, queue_name, routing_key):
- """
- 初始化参数:
- 用户名,密码,ip,端口,交换机,交换机类型,队列名称,路由key
- """
- self.username = USERNAME
- self.password = PASSWORD
- self.host = HOST
- self.port = PORT
- self.virtual_host = VIRTUAL_HOST_GET
- self.exchange = exchange_name
- self.exchange_type = EXCHANGE_TYPE
- self.queue_name = queue_name
- self.routing_key = routing_key
- self.credentials = pika.PlainCredentials(USERNAME, PASSWORD)
- self.chillertemp_delay_exchange = "chillertemp.calc.delay.exchange"
- self.chillertemp_delay_exchange_type = "x-delayed-message"
- self.chillertemp_delay_queue = "chillertemp.calc.delay.input"
- self.chillertemp_delay_key = "chillertemp.delay.input"
- def connect(self):
- """
- 建立连接
- :return: None
- """
- self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host,
- port=self.port,
- virtual_host=self.virtual_host,
- credentials=self.credentials))
- self.channel = self.connection.channel()
- def declare_exchange(self):
- """
- 声明交换机
- :return: None
- """
- self.channel.exchange_declare(exchange=self.exchange,
- exchange_type=self.exchange_type,
- durable=True)
- # self.channel.exchange_declare(exchange=self.chillertemp_delay_exchange,
- # exchange_type=self.chillertemp_delay_exchange_type,
- # arguments={"x-delayed-type": "direct"},
- # durable=True)
- def declare_queue(self):
- """
- 声明队列
- :return: None
- """
- self.channel.queue_declare(queue=self.queue_name, durable=True)
- self.channel.queue_declare(queue=self.chillertemp_delay_queue, durable=True)
- def bind_queue(self):
- """
- 将交换机下的队列名与路由key绑定起来
- :return: None
- """
- self.channel.queue_bind(exchange=self.exchange,
- queue=self.queue_name,
- routing_key=self.routing_key)
- # self.channel.queue_bind(exchange=self.chillertemp_delay_exchange,
- # queue=self.chillertemp_delay_queue,
- # routing_key=self.chillertemp_delay_key)
- def callback(self, ch, method, properties, body):
- """
- 将rabbitmq监听队列的消息发送到延迟队列
- :param ch:
- :param method:
- :param properties:
- :param body:
- :return:
- """
- # self.publish_msg_to_delay_queue(body)
- data_from_rabbitmq = body.decode()
- from chiller_temp_main import main
- main(data_from_rabbitmq)
- @staticmethod
- def callback_delay(ch, method, properties, body):
- data_from_rabbitmq = body.decode()
- from chiller_temp_main import main
- main(data_from_rabbitmq)
- def consume(self):
- """
- 消费消息
- :return: None
- """
- self.channel.basic_consume(self.queue_name, self.callback, True)
- # self.channel.basic_consume(self.chillertemp_delay_queue, self.callback_delay, True)
- self.channel.start_consuming()
- def connection_close(self):
- self.connection.close()
- def publish_msg_to_queue(self, body):
- """
- 发布消息
- :return: None
- """
- self.channel.basic_publish(exchange=self.exchange,
- routing_key=self.routing_key,
- body=body)
- def publish_msg_to_delay_queue(self, body):
- """
- 发布消息到延迟队列
- :return: None
- """
- self.channel.basic_publish(exchange=self.chillertemp_delay_exchange,
- routing_key=self.chillertemp_delay_key,
- properties=pika.BasicProperties(delivery_mode=2, headers={'x-delay': 90000}),
- body=body)
- def create_connect():
- """
- 创建连接,并且返回原始数据值
- :param queue_name: 队列的名称
- :param routing_key: 与队列绑定的路由key
- :return: 原始传输数据值
- """
- # 从这里开始exchange_name, queue_name, routing_key
- rabbitmq_connection_get = PikaMessage(EXCHANGE_GET, QUEUE_NAME_GET, ROUTING_KEY_GET) # 实例化
- rabbitmq_connection_get.connect() # 建立连接
- rabbitmq_connection_get.declare_exchange() # 声明交换机
- rabbitmq_connection_get.declare_queue() # 声明队列
- rabbitmq_connection_get.bind_queue() # 绑定队列
- rabbitmq_connection_get.consume() # 消费
- rabbitmq_connection_get.connection_close() # 关闭连接
- def send_data_to_rabbitmq(body):
- rabbitmq_connection_get = PikaMessage(EXCHANGE_SEND, QUEUE_NAME_SEND, ROUTING_KEY_SEND) # 实例化
- rabbitmq_connection_get.connect() # 建立连接
- rabbitmq_connection_get.declare_exchange() # 声明交换机
- rabbitmq_connection_get.publish_msg_to_queue(body)
- rabbitmq_connection_get.connection_close() # 关闭连接
|