目录
消息队列
产生背景
消息队列介绍
常见的消息队列产品
应用场景
消息队列的消息模型
Kafka的基本介绍
简介
Kafka是一款消息队列中间件产品,来源于领英公司,后期贡献给了Apache,目前是Apache旗下的顶级开源项目,采用语言是Scala
Kafka的架构
Kafka的使用
Kafka的shell命令
Kafka本质上是一个消息队列中间件产品,主要负责消息数据的传递,也就说学习Kafka 也就是学习如何使用Kafka生产数据,以及如何使用Kafka来消费数据
创建Topic
查看Topic
查看具体Topic
模拟生产者Producer
模拟消费者Consumer
修改Topic
查看消费组中有多少个消费者
Kafka的Python API的操作
完成生产者代码
import time
from kafka import KafkaProducer
# 同步发送
def sync_send():
global topic, partition, offset
# 2.1- 同步发送数据/消息
metadata = producer.send("test01", value=f"hello_java_{i}".encode("UTF-8")).get()
# metadata = producer.send("test03",value=f"hello_spark_{i}".encode("UTF-8")).get()
# 2.2- 获取元信息中的内容
topic = metadata.topic
partition = metadata.partition
"""
offset消息偏移量,从0开始编号。也就是一条消息在分区中的序号/索引
在不同分区间,消息偏移量是无序
在同一个分区里面,消息偏移量是有序
"""
offset = metadata.offset
print(f"{topic},{partition},{offset},{metadata}")
if __name__ == '__main__':
# 1- 创建生产者
producer = KafkaProducer(
bootstrap_servers=["node1.itcast.cn:9092","node2.itcast.cn:9092"]
)
# 2- 发送消息
for i in range(10):
# 同步发送
# sync_send()
# 2.3- 异步发送
"""
异步发送,需要等待一下,或者明确关闭Producer生产者
"""
producer.send("test01", value=f"hello_hive_{i}".encode("UTF-8"))
time.sleep(1)
# 3- 释放资源/关闭生产者
# producer.close()
完成消费者代码
from kafka import KafkaConsumer
if __name__ == '__main__':
# 1- 创建消费者
consumer = KafkaConsumer(
"test01",
bootstrap_servers=["node1.itcast.cn:9092", "node2.itcast.cn:9092"]
)
# 2- 消费消息
for msg in consumer:
topic = msg.topic
partition = msg.partition
offset = msg.offset
# key和value消费出来都是bytes数据类型,需要进行解码
key = msg.key
value = msg.value
print(f"{topic},{partition},{offset},{key},{value.decode('UTF-8')},{msg}")