0
点赞
收藏
分享

微信扫一扫

Kafka与MySQL的组合使用

陬者 2022-04-01 阅读 325

这是做的数据采集实验,我们在做的时候,要先把zookeeper和kafka服务启动,可以参考我上一篇.https://blog.csdn.net/hhjdshz/article/details/123881242?spm=1001.2014.3001.5501

 学生表student

sno

sname

ssex

sage

95001

John

M

23

95002

Tom

M

23

  1. 读取student表的数据内容,将其转为JSON格式,发送给Kafka;

 

在导入kafka包之前,我们要安装Python 第三方库 kafka-Python包

​
python -m pip install  kafka-python --force-reinstall -i Simple Index pip

​

完整代码

#从导入kafka 生产者的包
from kafka import KafkaProducer
import json
import pymysql.cursors
# 连接kafka
producer=KafkaProducer(bootstrap_servers='localhost:9092', value_serializer=lambda v: json.dumps(v).encode('utf-8'))
# 连接自己的mysql数据库
connect=pymysql.Connect(host='localhost', port=3306, user='root',  passwd='123456',  db='valentines', charset='utf8')
# 获取游标对象
cursor = connect.cursor()
# 查询数据
sql = "select * from student"
#执行SQL语句
result = cursor.execute(sql)
#fetchall查询所有结果
data = cursor.fetchall()
for msg in data:
    asd = {}
    asd['sno']=msg[0]
    asd['sname']=msg[1]
    asd['ssex']=msg[2]
    asd['sage']=msg[3]
    producer.send('json_topic', data)
    #提交事务
    connect.commit()
    print(asd)
connect.close()

2.再从Kafka中获取到JSON格式数据,打印出来;

完整代码:

from kafka import KafkaConsumer
import json
import pymysql.cursors

consumer = KafkaConsumer('json_topic', bootstrap_servers=['localhost:9092'], group_id=None, auto_offset_reset='earliest')

for msg in consumer:
    msgs = str(msg.value, encoding="utf-8")
    python_data = json.loads(msgs)
    print(python_data)
举报

相关推荐

0 条评论