from config.communicationConfig import * import pika class PikaMessage(object): def __init__(self, exchange_name, queue_name, routing_key): """ 初始化参数: 用户名,密码,ip,端口,交换机,交换机类型,队列名称,路由key """ self.username = USERNAME self.password = PASSWORD self.host = HOST self.port = PORT self.virtual_host = VIRTUAL_HOST_GET self.exchange = exchange_name self.exchange_type = EXCHANGE_TYPE self.queue_name = queue_name self.routing_key = routing_key self.credentials = pika.PlainCredentials(USERNAME, PASSWORD) self.cop_delay_exchange = "cop.calc.delay.exchange" self.cop_delay_exchange_type = "x-delayed-message" self.cop_delay_queue = "cop.calc.delay.input" self.cop_delay_key = "cop.delay.input" def connect(self): """ 建立连接 :return: None """ self.connection = pika.BlockingConnection(pika.ConnectionParameters(host=self.host, port=self.port, virtual_host=self.virtual_host, credentials=self.credentials)) self.channel = self.connection.channel() def declare_exchange(self): """ 声明交换机 :return: None """ self.channel.exchange_declare(exchange=self.exchange, exchange_type=self.exchange_type, durable=True) # self.channel.exchange_declare(exchange=self.cop_delay_exchange, # exchange_type=self.cop_delay_exchange_type, # arguments={"x-delayed-type": "direct"}, # durable=True) def declare_queue(self): """ 声明队列 :return: None """ self.channel.queue_declare(queue=self.queue_name, durable=True) self.channel.queue_declare(queue=self.cop_delay_queue, durable=True) def bind_queue(self): """ 将交换机下的队列名与路由key绑定起来 :return: None """ self.channel.queue_bind(exchange=self.exchange, queue=self.queue_name, routing_key=self.routing_key) # self.channel.queue_bind(exchange=self.cop_delay_exchange, # queue=self.cop_delay_queue, # routing_key=self.cop_delay_key) def callback(self, ch, method, properties, body): """ 将rabbitmq监听队列的消息发送到延迟队列 :param ch: :param method: :param properties: :param body: :return: """ # self.publish_msg_to_delay_queue(body) data_from_rabbitmq = body.decode() from main import main main(data_from_rabbitmq) @staticmethod def callback_delay(ch, method, properties, body): data_from_rabbitmq = body.decode() from main import main main(data_from_rabbitmq) def consume(self): """ 消费消息 :return: None """ self.channel.basic_consume(self.queue_name, self.callback, True) # self.channel.basic_consume(self.cop_delay_queue, self.callback_delay, True) self.channel.start_consuming() def connection_close(self): self.connection.close() def publish_msg_to_queue(self, body): """ 发布消息 :return: None """ self.channel.basic_publish(exchange=self.exchange, routing_key=self.routing_key, body=body) def publish_msg_to_delay_queue(self, body): """ 发布消息到延迟队列 :return: None """ self.channel.basic_publish(exchange=self.cop_delay_exchange, routing_key=self.cop_delay_key, properties=pika.BasicProperties(delivery_mode=2, headers={'x-delay': 90000}), body=body) def create_connect(): """ 创建连接,并且返回原始数据值 :param queue_name: 队列的名称 :param routing_key: 与队列绑定的路由key :return: 原始传输数据值 """ # 从这里开始exchange_name, queue_name, routing_key rabbitmq_connection_get = PikaMessage(EXCHANGE_GET, QUEUE_NAME_GET, ROUTING_KEY_GET) # 实例化 rabbitmq_connection_get.connect() # 建立连接 rabbitmq_connection_get.declare_exchange() # 声明交换机 rabbitmq_connection_get.declare_queue() # 声明队列 rabbitmq_connection_get.bind_queue() # 绑定队列 rabbitmq_connection_get.consume() # 消费 rabbitmq_connection_get.connection_close() # 关闭连接 def send_data_to_rabbitmq(body): rabbitmq_connection_get = PikaMessage(EXCHANGE_SEND, QUEUE_NAME_SEND, ROUTING_KEY_SEND) # 实例化 rabbitmq_connection_get.connect() # 建立连接 rabbitmq_connection_get.declare_exchange() # 声明交换机 rabbitmq_connection_get.publish_msg_to_queue(body) rabbitmq_connection_get.connection_close() # 关闭连接