kafka creating message producer and client consumer
To create a message producer that publishes message to a kafka, we can do the follow:-
import os
from kafka import KafkaProducer
def main():
kafka_host = os.getenv('KAFKA_HOST')
# Connect to Kafka broker
producer = KafkaProducer(bootstrap_servers=f'{kafka_host}:9092')
# Send a message to a topic
producer.send('my-topic', b'Hello, Kafka!')
# Optional: wait for all messages to be sent and close the producer
producer.flush()
producer.close()
if __name__ == "__main__":
main()
And if you trying to create a message producer, you can use the simple code here:
import os
from kafka import KafkaConsumer
kafka_host = os.getenv('KAFKA_HOST')
# Connect to Kafka and subscribe to a topic
consumer = KafkaConsumer(
'my-topic',
bootstrap_servers=f'{kafka_host}:9092',
auto_offset_reset='earliest', # or 'latest'
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: x.decode('utf-8')
)
print("Listening for messages...")
for message in consumer:
print(f"[{message.partition}] {message.offset} - {message.value}")
What if you're trying to reset the offset back to 1 or even later? This is useful when you're trying to replay a transactions.
import os
from kafka import KafkaConsumer, TopicPartition
kafka_host = os.getenv('KAFKA_HOST')
bootstrap_servers = [f'{kafka_host}:9092']
topic = 'my-topic'
mygroup = 'my-group'
partition = 0
desired_offset = 1
# Create consumer without subscribing with enable auto commit false
consumer = KafkaConsumer(bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest',
enable_auto_commit=False,
group_id=mygroup)
# Assign specific partition
tp = TopicPartition(topic, partition)
consumer.assign([tp])
# Seek to desired offset
consumer.seek(tp, desired_offset)
# Start consuming from offset 1
for message in consumer:
print(f"Offset: {message.offset}, Key: {message.key}, Value: {message.value}")
Comments