Cookbook¶
Practical recipes and patterns for common Kafka use cases.
Exactly-Once Processing¶
The read-process-write pattern using transactions for exactly-once semantics.
from typedkafka import KafkaConsumer, KafkaProducer
producer = KafkaProducer({
"bootstrap.servers": "localhost:9092",
"transactional.id": "my-processor-1",
})
producer.init_transactions()
consumer = KafkaConsumer({
"bootstrap.servers": "localhost:9092",
"group.id": "processor-group",
"isolation.level": "read_committed",
"enable.auto.commit": False,
})
consumer.subscribe(["input-topic"])
for msg in consumer:
with producer.transaction():
data = msg.value_as_json()
result = transform(data)
producer.send_json("output-topic", result)
# Commit input offset within the same transaction
producer._producer.send_offsets_to_transaction(
consumer._consumer.position(consumer._consumer.assignment()),
consumer._consumer.consumer_group_metadata(),
)
Fan-Out Pattern¶
Send one event to multiple topics atomically using transactions.
from typedkafka import KafkaProducer
producer = KafkaProducer({
"bootstrap.servers": "localhost:9092",
"transactional.id": "fan-out",
})
producer.init_transactions()
event = {"user_id": 123, "action": "purchase", "amount": 99.99}
with producer.transaction():
producer.send_json("analytics-events", event)
producer.send_json("billing-events", {
"user_id": event["user_id"],
"amount": event["amount"],
})
producer.send_json("notifications", {
"user_id": event["user_id"],
"message": f"Purchase of ${event['amount']} confirmed",
})
# All three succeed or all fail together
Graceful Shutdown¶
Handle signals and flush pending messages on shutdown.
import signal
from typedkafka import KafkaProducer, KafkaConsumer
shutdown = False
def on_signal(sig, frame):
global shutdown
shutdown = True
signal.signal(signal.SIGINT, on_signal)
signal.signal(signal.SIGTERM, on_signal)
producer = KafkaProducer({"bootstrap.servers": "localhost:9092"})
consumer = KafkaConsumer({
"bootstrap.servers": "localhost:9092",
"group.id": "my-group",
"enable.auto.commit": False,
})
consumer.subscribe(["events"])
try:
while not shutdown:
msg = consumer.poll(timeout=1.0)
if msg:
process(msg)
producer.send_json("results", {"processed": True})
consumer.commit(msg)
finally:
producer.flush(timeout=10.0)
consumer.close()
Type-Safe Topics¶
Use TypedTopic for compile-time type checking across producer and consumer.
from typedkafka import KafkaProducer, KafkaConsumer
from typedkafka.topics import json_topic, string_topic
# Define typed topics once, use everywhere
events = json_topic("user-events") # TypedTopic[Any]
logs = string_topic("application-logs") # TypedTopic[str]
# Producer — IDE catches type mismatches
producer = KafkaProducer({"bootstrap.servers": "localhost:9092"})
producer.send_typed(events, {"user_id": 123, "action": "click"})
producer.send_typed(logs, "Application started")
# producer.send_typed(logs, {"wrong": "type"}) # IDE error: dict is not str
producer.flush()
# Consumer — decode with type information
consumer = KafkaConsumer({
"bootstrap.servers": "localhost:9092",
"group.id": "my-group",
})
consumer.subscribe(["user-events", "application-logs"])
for msg in consumer:
if msg.topic == "user-events":
data = msg.decode(events) # typed as Any
print(data["user_id"])
elif msg.topic == "application-logs":
text = msg.decode(logs) # typed as str
print(text.upper())
Structured Logging¶
Add structured logging to Kafka operations.
import logging
from typedkafka import KafkaProducer, KafkaConsumer
from typedkafka.logging import KafkaLogger, LogContext
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(name)s - %(message)s",
)
kafka_logger = KafkaLogger(
logging.getLogger("kafka"),
default_context=LogContext(client_id="my-service"),
)
producer = KafkaProducer(
{"bootstrap.servers": "localhost:9092"},
logger=kafka_logger,
)
# Sends, transactions, and errors are logged automatically
producer.send("events", b"message")
# Output: INFO kafka - send topic=events client_id=my-service
Consumer Lag Monitoring¶
Track consumer metrics and statistics.
from typedkafka import KafkaConsumer
from typedkafka.metrics import KafkaStats
def on_stats(stats: KafkaStats):
"""Called every statistics.interval.ms."""
for topic, data in stats.raw.get("topics", {}).items():
for pid, pdata in data.get("partitions", {}).items():
lag = pdata.get("consumer_lag", -1)
if lag > 10000:
print(f"High lag on {topic}[{pid}]: {lag}")
consumer = KafkaConsumer(
{
"bootstrap.servers": "localhost:9092",
"group.id": "my-group",
"statistics.interval.ms": 5000,
},
on_stats=on_stats,
)
consumer.subscribe(["events"])
for msg in consumer:
process(msg)
consumer.commit(msg)
# Check runtime metrics
print(f"Received: {consumer.metrics.messages_received}")
Dead Letter Queue with Retry¶
Combine retry logic with a dead letter queue for robust message processing.
from typedkafka import KafkaProducer, KafkaConsumer
from typedkafka.dlq import DeadLetterQueue, process_with_dlq
from typedkafka.retry import retry, RetryPolicy
dlq_producer = KafkaProducer({"bootstrap.servers": "localhost:9092"})
dlq = DeadLetterQueue(dlq_producer)
consumer = KafkaConsumer({
"bootstrap.servers": "localhost:9092",
"group.id": "my-group",
"enable.auto.commit": False,
})
consumer.subscribe(["orders"])
policy = RetryPolicy(max_attempts=3, initial_delay=1.0, max_delay=10.0)
@retry(policy)
def handle(msg):
data = msg.value_as_json()
save_to_database(data)
for msg in consumer:
success = process_with_dlq(msg, handle, dlq)
consumer.commit(msg)
print(f"Sent {dlq.send_count} messages to DLQ")
Tips¶
- Transactions add overhead — batch multiple operations in a single transaction.
- Use DLQ for poison messages — don't let one bad message block the pipeline.
- Monitor consumer lag — enable
statistics.interval.msin production. - Type your topics —
TypedTopiccatches serialization bugs at development time. - Log selectively —
KafkaLoggeris a no-op whenlogger=None, so you can toggle it per environment.