Skip to main content

Integration

Programming Interfaces

Python Producer Example

from kafka import KafkaProducer

import json
# Initialize Producer
producer = KafkaProducer(
bootstrap_servers=['localhost:9092'],
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
# Send Message
producer.send('user-events', {
'user_id': 1234,
'action': 'login',
'timestamp': '2024-02-07T12:00:00Z'
})

Python Consumer Example

from kafka import KafkaConsumer
import json
# Initialize Consumer
consumer = KafkaConsumer(
'user-events',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
group_id='user-event-processors',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
# Consume Messages
for message in consumer:
print(f"Received: {message.value}")