import pika
from pika.exchange_type import ExchangeType
class Producer(object):
def __init__(self, queue_name,exchange_name, username, password, host, port, virtual_host):
con_param = {
"host": host,
"port": port,
"virtual_host": virtual_host,
"credentials": pika.credentials.PlainCredentials(
username, password)
}
# 建立连接
self.con = pika.BlockingConnection(pika.ConnectionParameters(**con_param))
# 声明队列
self.channel = self.con.channel()
self.channel.queue_declare(queue=queue_name)
self.channel.exchange_declare(exchange=exchange_name, exchange_type=ExchangeType.fanout)
def send_message(self,queue_name,exchange_name,routing_key, body):
"""fanout 类型的交换机 routing_key 为空字符串,给所有绑定这个交换价的队列发送消息"""
# 绑定交换机
self.channel.queue_bind(queue=queue_name, exchange=exchange_name, routing_key="")
# 发送消息
self.channel.basic_publish(exchange=exchange_name, routing_key=routing_key, body=body)
# 关闭通道
self.channel.close()
# 关闭连接
self.con.close()
if __name__ == '__main__':
p = Producer("test", "logs","tom", "tom@tom", "localhost", 5672, "/afei")
p.send_message("test", "logs","","have a good time")
p1 = Producer("test01", "logs","tom", "tom@tom", "localhost", 5672, "/afei")
p1.send_message("test01","logs","","good luck !")
Subscribe
import pika
class Consumer(object):
def __init__(self, queue_name, username, password, host, port, virtual_host):
con_param = {
"host": host,
"port": port,
"virtual_host": virtual_host,
"credentials": pika.credentials.PlainCredentials(
username, password)
}
# 建立连接
self.con = pika.BlockingConnection(pika.ConnectionParameters(**con_param))
# 创建通道
self.channel = self.con.channel()
self.queue_name = queue_name
def consume_message(self):
def callback(ch, method, properties, body):
print("ch===%r" % ch)
print("method===%r" % method)
print("properties===%r" % properties)
print("[x] Received %r" % body)
# 消费对象
self.channel.basic_consume(queue=self.queue_name, on_message_callback=callback, auto_ack=True)
# 开始消费
self.channel.start_consuming()
self.channel.close()
if __name__ == '__main__':
try:
c = Consumer("test", "tom", "tom@tom", "localhost", 5672, "/afei")
c.consume_message()
except KeyboardInterrupt:
exit(0)