Skip to content

Retry

typedkafka.retry.retry(max_attempts=3, backoff_base=0.5, backoff_max=30.0, jitter=True, retryable_exceptions=None)

Decorator that retries a function on failure with exponential backoff.

Parameters:

Name Type Description Default
max_attempts int

Maximum number of attempts (including the first call).

3
backoff_base float

Base delay in seconds for exponential backoff.

0.5
backoff_max float

Maximum delay in seconds between retries.

30.0
jitter bool

If True, add random jitter to backoff delay.

True
retryable_exceptions Optional[Sequence[type[BaseException]]]

Exception types to retry on. Defaults to (KafkaError,) if not specified.

None

Returns:

Type Description
Callable[[F], F]

Decorated function that retries on failure.

Examples:

>>> from typedkafka.retry import retry
>>> from typedkafka.exceptions import ProducerError
>>>
>>> @retry(max_attempts=3, backoff_base=1.0)
... def send_message(producer, topic, value):
...     producer.send(topic, value)
...     producer.flush()
>>> # Retry only specific exceptions
>>> @retry(max_attempts=5, retryable_exceptions=(ProducerError,))
... def produce(producer, data):
...     producer.send_json("events", data)

typedkafka.retry.RetryPolicy

Configurable retry policy for Kafka operations.

Provides a reusable retry configuration that can be applied to multiple operations.

Examples:

>>> policy = RetryPolicy(max_attempts=5, backoff_base=1.0)
>>> result = policy.execute(lambda: producer.send("topic", b"msg"))

__init__(max_attempts=3, backoff_base=0.5, backoff_max=30.0, jitter=True, retryable_exceptions=None)

Initialize a retry policy.

Parameters:

Name Type Description Default
max_attempts int

Maximum number of attempts.

3
backoff_base float

Base delay in seconds for exponential backoff.

0.5
backoff_max float

Maximum delay between retries.

30.0
jitter bool

If True, add random jitter to delays.

True
retryable_exceptions Optional[Sequence[type[BaseException]]]

Exception types to retry on. Defaults to (KafkaError,).

None

execute(func, *args, **kwargs)

Execute a function with retry logic.

Parameters:

Name Type Description Default
func Callable[..., Any]

The function to execute.

required
*args Any

Positional arguments to pass to the function.

()
**kwargs Any

Keyword arguments to pass to the function.

{}

Returns:

Type Description
Any

The return value of the function.

Examples:

>>> policy = RetryPolicy(max_attempts=3)
>>> policy.execute(producer.send, "topic", b"value")