kafka creating message producer and client consumer

 

To create a message producer that publishes message to a kafka, we can do the follow:- 


import os
from kafka import KafkaProducer

def main():
    kafka_host = os.getenv('KAFKA_HOST')
    # Connect to Kafka broker
    producer = KafkaProducer(bootstrap_servers=f'{kafka_host}:9092')

    # Send a message to a topic
    producer.send('my-topic', b'Hello, Kafka!')

    # Optional: wait for all messages to be sent and close the producer
    producer.flush()
    producer.close()

if __name__ == "__main__":
    main()


And if you trying to create a message producer, you can use the simple code here:


import os
from kafka import KafkaConsumer

kafka_host = os.getenv('KAFKA_HOST')

# Connect to Kafka and subscribe to a topic
consumer = KafkaConsumer(
    'my-topic',
    bootstrap_servers=f'{kafka_host}:9092',
    auto_offset_reset='earliest',  # or 'latest'
    enable_auto_commit=True,
    group_id='my-group',
    value_deserializer=lambda x: x.decode('utf-8')
)

print("Listening for messages...")

for message in consumer:
    print(f"[{message.partition}] {message.offset} - {message.value}")


What if you're trying to reset the offset back to 1 or even later? This is useful when you're trying to replay a transactions. 

import os
from kafka import KafkaConsumer, TopicPartition

kafka_host = os.getenv('KAFKA_HOST')
bootstrap_servers = [f'{kafka_host}:9092']

topic = 'my-topic'
mygroup = 'my-group'
partition = 0
desired_offset = 1

# Create consumer without subscribing with enable auto commit false
consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers,
                         auto_offset_reset='earliest',
                         enable_auto_commit=False,  
                         group_id=mygroup)

# Assign specific partition
tp = TopicPartition(topic, partition)
consumer.assign([tp])

# Seek to desired offset
consumer.seek(tp, desired_offset)

# Start consuming from offset 1
for message in consumer:
    print(f"Offset: {message.offset}, Key: {message.key}, Value: {message.value}")




Comments

Popular posts from this blog

gemini cli getting file not defined error

NodeJS: Error: spawn EINVAL in window for node version 20.20 and 18.20

vllm : Failed to infer device type