pip install confluent-kafka
import confluent_kafka
from Config import config
from Logger.logger import elogger
loggers = elogger(log_name='listener').get_logger()
class Producter:
def __init__(self) -> None:
try:
self.conf = {'bootstrap.servers': f'{config.Zookeeper["Host"]}:{config.Zookeeper["Port"]}','message.send.max.retries': 10}
self.kafka_producer = confluent_kafka.Producer(self.conf)
self.success = False
self.message = []
except Exception as e:
loggers.error(str(e))
def delivery_callback(self,error, message):
if error is None:
self.success = True
def produce_string_messages(self, topic_name, obj,schema):
try:
for i in obj:
self.message.append(f'{schema}*{i.voucher_id.hex}')
self.kafka_producer.produce(topic_name, value=','.join(self.message), on_delivery=self.delivery_callback)
self.kafka_producer.flush()
except Exception as e:
loggers.error(str(e))
finally:
return self.success
https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html
https://www.tutorialsbuddy.com/confluent-kafka-python-producer-example