0
点赞
收藏
分享

微信扫一扫

李沐之经典卷积神经网络

安七月读书 2024-01-12 阅读 10

目录

消息队列

产生背景

消息队列介绍

常见的消息队列产品

应用场景

 消息队列的消息模型

Kafka的基本介绍

简介

Kafka的架构

Kafka的使用

Kafka的shell命令

Kafka的Python API的操作

完成生产者代码

完成消费者代码


消息队列

产生背景

消息队列介绍

常见的消息队列产品

应用场景

 消息队列的消息模型

Kafka的基本介绍

简介

Kafka是一款消息队列中间件产品,来源于领英公司,后期贡献给了Apache,目前是Apache旗下的顶级开源项目,采用语言是Scala

Kafka的架构

Kafka的使用

Kafka的shell命令

      Kafka本质上是一个消息队列中间件产品,主要负责消息数据的传递,也就说学习Kafka 也就是学习如何使用Kafka生产数据,以及如何使用Kafka来消费数据

创建Topic

查看Topic

查看具体Topic

模拟生产者Producer

 模拟消费者Consumer

修改Topic

查看消费组中有多少个消费者

Kafka的Python API的操作

完成生产者代码

import time

from kafka import KafkaProducer

# 同步发送
def sync_send():
    global topic, partition, offset
    # 2.1- 同步发送数据/消息
    metadata = producer.send("test01", value=f"hello_java_{i}".encode("UTF-8")).get()
    # metadata = producer.send("test03",value=f"hello_spark_{i}".encode("UTF-8")).get()
    # 2.2- 获取元信息中的内容
    topic = metadata.topic
    partition = metadata.partition
    """
        offset消息偏移量,从0开始编号。也就是一条消息在分区中的序号/索引
        在不同分区间,消息偏移量是无序
        在同一个分区里面,消息偏移量是有序
    """
    offset = metadata.offset
    print(f"{topic},{partition},{offset},{metadata}")


if __name__ == '__main__':

    # 1- 创建生产者
    producer = KafkaProducer(
        bootstrap_servers=["node1.itcast.cn:9092","node2.itcast.cn:9092"]
    )

    # 2- 发送消息
    for i in range(10):
        # 同步发送
        # sync_send()

        # 2.3- 异步发送
        """
            异步发送,需要等待一下,或者明确关闭Producer生产者
        """
        producer.send("test01", value=f"hello_hive_{i}".encode("UTF-8"))

    time.sleep(1)

    # 3- 释放资源/关闭生产者
    # producer.close()

完成消费者代码

from kafka import KafkaConsumer

if __name__ == '__main__':

    # 1- 创建消费者
    consumer = KafkaConsumer(
        "test01",
        bootstrap_servers=["node1.itcast.cn:9092", "node2.itcast.cn:9092"]
    )

    # 2- 消费消息
    for msg in consumer:
        topic = msg.topic
        partition = msg.partition
        offset = msg.offset
        # key和value消费出来都是bytes数据类型,需要进行解码
        key = msg.key
        value = msg.value

        print(f"{topic},{partition},{offset},{key},{value.decode('UTF-8')},{msg}")

 

举报

相关推荐

0 条评论