0
点赞
收藏
分享

微信扫一扫

kafka 生产,消费的的几个小问题

瑾谋 2022-08-02 阅读 35
  1. 生产的代码比较简单:

import json
from kafka import KafkaProducer
from decimal import Decimal
import decimal

class DecimalEncoder(json.JSONEncoder):
def default(self, o):
if isinstance(o, decimal.Decimal):
return float(o)
super(DecimalEncoder, self).default(o)

def producer():
host_port = 'localhost:9095'
topic = '1111'
producer = KafkaProducer(bootstrap_servers=[host_port], value_serializer=lambda v: json.dumps(v).encode('utf-8'))
msg_dict = {
"sleep_time": 10,
"db_config": {
"database": "test_1",
"host": "xxxx",
"user": "root",
"password": "root"
},
"table": "msg",
"msg": "Hello World"
}
msg_dict = {'price_cost': Decimal('6729.0716'), 'location_id': 193, 'product_id': 842892}

msg = json.dumps(msg_dict,cls=DecimalEncoder)
producer.send(topic, msg, partition=0)
print('send success')
producer.close()


if __name__ == '__main__':
producer()

  1. 消费的代码涉及到的东西比较多,个人觉得群组功能挺不错的:

import json
from pykafka import KafkaClient
def customer2():

hosts = 'localhost:9095,localhost:9092'
client = KafkaClient(hosts=hosts)
topic = client.topics['1111']
consumer = topic.get_simple_consumer(consumer_group=b'123456', auto_commit_interval_ms=1,
auto_commit_enable=True)
for msg in consumer:
a = json.loads(msg.value,encoding='utf-8')
print(a)
if __name__ == '__main__':
customer2()

问答:

  1. kafka,在没有消费端消费消息的情况下,生产消息,启动消费端获取不到数据,在启动消费端后推送消息又可以获取到数据?
    消费端使用​​​pykafka​​​ 而不是使用​​kafka​​​,使用​​get_simple_consumer​​方法来获取数据,基本上每次指定一个新的group都会获取前边所有历史数据
  2. 消费端指定多个host?
    host使用逗号隔开.

懂得,原来世界如此简单!



举报

相关推荐

0 条评论