使仅订阅消息的子集成为可能。例如,我们将能够仅将关键错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
通过路由来匹配对应的消息
一、消息发布端
#!/usr/bin/env python
import pika
import sys
import json
import datetime
def get_message():
# 产生消息入口处
for i in range(100): # 生成100条消息
for str_t in ['info', 'warning', 'error']: # 生成三种类型的消息
message = json.dumps({'id': "%s-90000%s" % (str_t, i), "amount": 100 * i, "name": "%s" % str_t,
"createtime": str(datetime.datetime.now())})
producer(message, str_t)
def producer(message, severity):
# 登陆并创建信道
connection = pika.BlockingConnection(
pika.ConnectionParameters(virtual_host='/melon_demo', host='82.156.19.94', port=5672,
credentials=pika.PlainCredentials('guest', 'guest')))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct', durable=True)
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)
print(" [x] Sent %r:%r" % (severity, message))
connection.close()
if __name__ == "__main__":
get_message() # 程序执行入口
二、接收所有的消息all
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(virtual_host='/melon_demo', host='82.156.19.94', port=5672,
credentials=pika.PlainCredentials('guest', 'guest')))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct',durable=True)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
for severity in ['info','warning','error']:
channel.queue_bind( exchange='direct_logs', queue=queue_name, routing_key=severity)
print(' [*] Waiting for all logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
三、接收所有的消息info
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(virtual_host='/melon_demo', host='82.156.19.94', port=5672,
credentials=pika.PlainCredentials('guest', 'guest')))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct',durable=True)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# for severity in ['info','warning','error']:
# channel.queue_bind( exchange='direct_logs', queue=queue_name, routing_key=severity)
channel.queue_bind( exchange='direct_logs', queue=queue_name, routing_key='info')
print(' [*] Waiting for info. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
四、接收所有的消息error
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(virtual_host='/melon_demo', host='82.156.19.94', port=5672,
credentials=pika.PlainCredentials('guest', 'guest')))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct',durable=True)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
channel.queue_bind( exchange='direct_logs', queue=queue_name, routing_key='error')
print(' [*] Waiting for error. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
五、接收消息warning
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(virtual_host='/melon_demo', host='82.156.19.94', port=5672,
credentials=pika.PlainCredentials('guest', 'guest')))
channel = connection.channel()
channel.exchange_declare(exchange='direct_logs', exchange_type='direct',durable=True)
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
# for severity in ['info','warning','error']:
channel.queue_bind( exchange='direct_logs', queue=queue_name, routing_key='warning')
print(' [*] Waiting for warning. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
心有猛虎,细嗅蔷薇