Skip to content

Dead Letter Queue

typedkafka.dlq.DeadLetterQueue

Routes failed messages to a dead letter topic.

Wraps an existing producer (KafkaProducer or MockProducer) and sends failed messages to a DLQ topic with error metadata in headers.

Parameters:

Name Type Description Default
producer Any

A KafkaProducer or MockProducer instance used to send DLQ messages.

required
topic_fn Optional[Callable[[str], str]]

Optional callable that maps original topic to DLQ topic name. Default: appends ".dlq" to the original topic.

None
default_topic Optional[str]

Optional fixed DLQ topic name. If set, all messages go here regardless of original topic. Mutually exclusive with topic_fn.

None

Raises:

Type Description
ValueError

If both topic_fn and default_topic are provided.

Examples:

>>> from typedkafka import KafkaProducer
>>> producer = KafkaProducer({"bootstrap.servers": "localhost:9092"})
>>> dlq = DeadLetterQueue(producer)
>>>
>>> # Route a failed message
>>> dlq.send(message, error=exc)
>>> # Custom topic naming
>>> dlq = DeadLetterQueue(producer, topic_fn=lambda t: f"errors.{t}")
>>> # Fixed DLQ topic
>>> dlq = DeadLetterQueue(producer, default_topic="all-errors")

send_count property

Number of messages sent to the DLQ.

send(message, error=None, extra_headers=None)

Send a failed message to the dead letter topic.

Preserves the original message value and key, and adds error metadata as Kafka headers.

Parameters:

Name Type Description Default
message Any

A KafkaMessage or MockMessage that failed processing.

required
error Optional[Exception]

The exception that caused the failure (optional).

None
extra_headers Optional[list[tuple[str, bytes]]]

Additional headers to include.

None

Raises:

Type Description
ProducerError

If sending to the DLQ topic fails.

Headers added automatically
  • dlq.original.topic: Original topic name
  • dlq.original.partition: Original partition number
  • dlq.original.offset: Original message offset
  • dlq.timestamp: Unix timestamp when DLQ send occurred
  • dlq.error.message: Error string (if error provided)
  • dlq.error.type: Error class name (if error provided)
  • dlq.error.traceback: Full traceback (if error provided)

typedkafka.dlq.process_with_dlq(message, handler, dlq)

Process a message, routing to DLQ on failure.

Calls the handler with the message. If the handler raises an exception, the message is sent to the dead letter queue with the error details.

Parameters:

Name Type Description Default
message Any

A KafkaMessage or MockMessage to process.

required
handler Callable[[Any], None]

Callable that processes the message. Should raise on failure.

required
dlq DeadLetterQueue

DeadLetterQueue instance to route failures to.

required

Returns:

Type Description
bool

True if processing succeeded, False if message was sent to DLQ.

Examples:

>>> dlq = DeadLetterQueue(producer)
>>> for msg in consumer:
...     success = process_with_dlq(msg, my_handler, dlq)
...     if success:
...         consumer.commit(msg)