0
点赞
收藏
分享

微信扫一扫

使用python 结合confluent-kafka做一个生产者

兽怪海北 2022-03-12 阅读 78
pip install confluent-kafka
import confluent_kafka
from Config import config
from Logger.logger import elogger

loggers = elogger(log_name='listener').get_logger()
class Producter:
    def __init__(self) -> None:
        try:
            self.conf = {'bootstrap.servers': f'{config.Zookeeper["Host"]}:{config.Zookeeper["Port"]}','message.send.max.retries': 10}
            self.kafka_producer = confluent_kafka.Producer(self.conf)
            self.success = False
            self.message = []
        except Exception as e:
            loggers.error(str(e))

    def delivery_callback(self,error, message):
        if error is None:
            self.success = True


    def produce_string_messages(self, topic_name, obj,schema):
        try:
            for i in obj:
                self.message.append(f'{schema}*{i.voucher_id.hex}')
            self.kafka_producer.produce(topic_name, value=','.join(self.message), on_delivery=self.delivery_callback)
            self.kafka_producer.flush()
        except Exception as e:
            loggers.error(str(e))
        finally:
            return self.success


https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html

https://www.tutorialsbuddy.com/confluent-kafka-python-producer-example

举报

相关推荐

0 条评论