Dead Letter Queue¶
typedkafka.dlq.DeadLetterQueue
¶
Routes failed messages to a dead letter topic.
Wraps an existing producer (KafkaProducer or MockProducer) and sends failed messages to a DLQ topic with error metadata in headers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
producer
|
Any
|
A KafkaProducer or MockProducer instance used to send DLQ messages. |
required |
topic_fn
|
Optional[Callable[[str], str]]
|
Optional callable that maps original topic to DLQ topic name. Default: appends ".dlq" to the original topic. |
None
|
default_topic
|
Optional[str]
|
Optional fixed DLQ topic name. If set, all messages go here regardless of original topic. Mutually exclusive with topic_fn. |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
If both topic_fn and default_topic are provided. |
Examples:
>>> from typedkafka import KafkaProducer
>>> producer = KafkaProducer({"bootstrap.servers": "localhost:9092"})
>>> dlq = DeadLetterQueue(producer)
>>>
>>> # Route a failed message
>>> dlq.send(message, error=exc)
send_count
property
¶
Number of messages sent to the DLQ.
send(message, error=None, extra_headers=None)
¶
Send a failed message to the dead letter topic.
Preserves the original message value and key, and adds error metadata as Kafka headers.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
Any
|
A KafkaMessage or MockMessage that failed processing. |
required |
error
|
Optional[Exception]
|
The exception that caused the failure (optional). |
None
|
extra_headers
|
Optional[list[tuple[str, bytes]]]
|
Additional headers to include. |
None
|
Raises:
| Type | Description |
|---|---|
ProducerError
|
If sending to the DLQ topic fails. |
Headers added automatically
- dlq.original.topic: Original topic name
- dlq.original.partition: Original partition number
- dlq.original.offset: Original message offset
- dlq.timestamp: Unix timestamp when DLQ send occurred
- dlq.error.message: Error string (if error provided)
- dlq.error.type: Error class name (if error provided)
- dlq.error.traceback: Full traceback (if error provided)
typedkafka.dlq.process_with_dlq(message, handler, dlq)
¶
Process a message, routing to DLQ on failure.
Calls the handler with the message. If the handler raises an exception, the message is sent to the dead letter queue with the error details.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
Any
|
A KafkaMessage or MockMessage to process. |
required |
handler
|
Callable[[Any], None]
|
Callable that processes the message. Should raise on failure. |
required |
dlq
|
DeadLetterQueue
|
DeadLetterQueue instance to route failures to. |
required |
Returns:
| Type | Description |
|---|---|
bool
|
True if processing succeeded, False if message was sent to DLQ. |
Examples: