Skip to content

Async

typedkafka.aio.AsyncKafkaProducer

Async Kafka producer wrapping confluent-kafka with asyncio support.

Uses a thread pool to run confluent-kafka's synchronous operations without blocking the event loop.

Note

This implementation uses a ThreadPoolExecutor to wrap synchronous confluent-kafka calls. It does not provide true non-blocking async I/O. Each blocking call is offloaded to a thread pool. For high-throughput scenarios, consider tuning the executor's max_workers parameter.

Examples:

>>> async with AsyncKafkaProducer({"bootstrap.servers": "localhost:9092"}) as producer:
...     await producer.send("topic", b"message")
...     await producer.flush()

Attributes:

Name Type Description
config

The configuration dictionary used to initialize the producer

__aenter__() async

Async context manager entry.

__aexit__(exc_type, exc_val, exc_tb) async

Async context manager exit.

__init__(config, executor=None)

Initialize an async Kafka producer.

Parameters:

Name Type Description Default
config dict[str, Any]

Configuration dictionary for the producer.

required
executor Optional[ThreadPoolExecutor]

Optional ThreadPoolExecutor. If None, a default one is created.

None

Raises:

Type Description
ProducerError

If the producer cannot be initialized.

close() async

Flush and close the producer.

flush(timeout=-1) async

Asynchronously wait for all queued messages to be delivered.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait in seconds. -1 for infinite.

-1

Returns:

Type Description
int

Number of messages still in queue.

send(topic, value, key=None, partition=None) async

Asynchronously send a message to a Kafka topic.

Parameters:

Name Type Description Default
topic str

The topic name to send the message to.

required
value bytes

The message payload as bytes.

required
key Optional[bytes]

Optional message key as bytes.

None
partition Optional[int]

Optional partition number.

None

Raises:

Type Description
ProducerError

If the message cannot be queued.

send_json(topic, value, key=None, partition=None) async

Asynchronously send a JSON-serialized message.

Parameters:

Name Type Description Default
topic str

The topic name.

required
value Any

Any JSON-serializable Python object.

required
key Optional[str]

Optional string key.

None
partition Optional[int]

Optional partition number.

None

Raises:

Type Description
SerializationError

If JSON serialization fails.

ProducerError

If the message cannot be queued.

send_string(topic, value, key=None, partition=None) async

Asynchronously send a UTF-8 encoded string message.

Parameters:

Name Type Description Default
topic str

The topic name.

required
value str

String message to send.

required
key Optional[str]

Optional string key.

None
partition Optional[int]

Optional partition number.

None

Raises:

Type Description
ProducerError

If the message cannot be queued.

typedkafka.aio.AsyncKafkaConsumer

Async Kafka consumer wrapping confluent-kafka with asyncio support.

Uses a thread pool to run confluent-kafka's synchronous poll without blocking the event loop. Supports async for iteration.

Note

This implementation uses a ThreadPoolExecutor to wrap synchronous confluent-kafka calls. It does not provide true non-blocking async I/O. Each blocking call is offloaded to a thread pool. For high-throughput scenarios, consider tuning the executor's max_workers parameter.

Examples:

>>> async with AsyncKafkaConsumer(config) as consumer:
...     consumer.subscribe(["topic"])
...     async for msg in consumer:
...         print(msg.value_as_string())

Attributes:

Name Type Description
config

The configuration dictionary used to initialize the consumer

__aenter__() async

Async context manager entry.

__aexit__(exc_type, exc_val, exc_tb) async

Async context manager exit.

__aiter__() async

Async iterate over messages indefinitely.

Yields:

Type Description
AsyncIterator[KafkaMessage]

KafkaMessage objects as they arrive.

__init__(config, executor=None)

Initialize an async Kafka consumer.

Parameters:

Name Type Description Default
config dict[str, Any]

Configuration dictionary for the consumer.

required
executor Optional[ThreadPoolExecutor]

Optional ThreadPoolExecutor.

None

Raises:

Type Description
ConsumerError

If the consumer cannot be initialized.

close() async

Close the consumer and leave the consumer group.

commit(message=None, asynchronous=True) async

Asynchronously commit offsets.

Parameters:

Name Type Description Default
message Any

Specific message to commit. If None, commits all consumed.

None
asynchronous bool

If True, commit asynchronously.

True

poll(timeout=1.0) async

Asynchronously poll for a single message.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait in seconds.

1.0

Returns:

Type Description
Optional[KafkaMessage]

A KafkaMessage, or None if timeout expired.

Raises:

Type Description
ConsumerError

If an error occurs during polling.

subscribe(topics, **kwargs)

Subscribe to topics.

Parameters:

Name Type Description Default
topics list[str]

List of topic names.

required
**kwargs Any

Additional arguments passed to confluent-kafka subscribe.

{}

typedkafka.aio.MessageBatch

A batch of messages for efficient processing.

Attributes:

Name Type Description
messages

The list of messages in this batch.

Examples:

>>> async for batch in batch_consume(consumer, batch_size=50):
...     for msg in batch:
...         process(msg)

topics property

Set of topics represented in this batch.