Async¶
typedkafka.aio.AsyncKafkaProducer
¶
Async Kafka producer wrapping confluent-kafka with asyncio support.
Uses a thread pool to run confluent-kafka's synchronous operations without blocking the event loop.
Note
This implementation uses a ThreadPoolExecutor to wrap synchronous confluent-kafka calls. It does not provide true non-blocking async I/O. Each blocking call is offloaded to a thread pool. For high-throughput scenarios, consider tuning the executor's max_workers parameter.
Examples:
>>> async with AsyncKafkaProducer({"bootstrap.servers": "localhost:9092"}) as producer:
... await producer.send("topic", b"message")
... await producer.flush()
Attributes:
| Name | Type | Description |
|---|---|---|
config |
The configuration dictionary used to initialize the producer |
__aenter__()
async
¶
Async context manager entry.
__aexit__(exc_type, exc_val, exc_tb)
async
¶
Async context manager exit.
__init__(config, executor=None)
¶
Initialize an async Kafka producer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
dict[str, Any]
|
Configuration dictionary for the producer. |
required |
executor
|
Optional[ThreadPoolExecutor]
|
Optional ThreadPoolExecutor. If None, a default one is created. |
None
|
Raises:
| Type | Description |
|---|---|
ProducerError
|
If the producer cannot be initialized. |
close()
async
¶
Flush and close the producer.
flush(timeout=-1)
async
¶
Asynchronously wait for all queued messages to be delivered.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Maximum time to wait in seconds. -1 for infinite. |
-1
|
Returns:
| Type | Description |
|---|---|
int
|
Number of messages still in queue. |
send(topic, value, key=None, partition=None)
async
¶
Asynchronously send a message to a Kafka topic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
The topic name to send the message to. |
required |
value
|
bytes
|
The message payload as bytes. |
required |
key
|
Optional[bytes]
|
Optional message key as bytes. |
None
|
partition
|
Optional[int]
|
Optional partition number. |
None
|
Raises:
| Type | Description |
|---|---|
ProducerError
|
If the message cannot be queued. |
send_json(topic, value, key=None, partition=None)
async
¶
Asynchronously send a JSON-serialized message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
The topic name. |
required |
value
|
Any
|
Any JSON-serializable Python object. |
required |
key
|
Optional[str]
|
Optional string key. |
None
|
partition
|
Optional[int]
|
Optional partition number. |
None
|
Raises:
| Type | Description |
|---|---|
SerializationError
|
If JSON serialization fails. |
ProducerError
|
If the message cannot be queued. |
send_string(topic, value, key=None, partition=None)
async
¶
Asynchronously send a UTF-8 encoded string message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
The topic name. |
required |
value
|
str
|
String message to send. |
required |
key
|
Optional[str]
|
Optional string key. |
None
|
partition
|
Optional[int]
|
Optional partition number. |
None
|
Raises:
| Type | Description |
|---|---|
ProducerError
|
If the message cannot be queued. |
typedkafka.aio.AsyncKafkaConsumer
¶
Async Kafka consumer wrapping confluent-kafka with asyncio support.
Uses a thread pool to run confluent-kafka's synchronous poll without
blocking the event loop. Supports async for iteration.
Note
This implementation uses a ThreadPoolExecutor to wrap synchronous confluent-kafka calls. It does not provide true non-blocking async I/O. Each blocking call is offloaded to a thread pool. For high-throughput scenarios, consider tuning the executor's max_workers parameter.
Examples:
>>> async with AsyncKafkaConsumer(config) as consumer:
... consumer.subscribe(["topic"])
... async for msg in consumer:
... print(msg.value_as_string())
Attributes:
| Name | Type | Description |
|---|---|---|
config |
The configuration dictionary used to initialize the consumer |
__aenter__()
async
¶
Async context manager entry.
__aexit__(exc_type, exc_val, exc_tb)
async
¶
Async context manager exit.
__aiter__()
async
¶
Async iterate over messages indefinitely.
Yields:
| Type | Description |
|---|---|
AsyncIterator[KafkaMessage]
|
KafkaMessage objects as they arrive. |
__init__(config, executor=None)
¶
Initialize an async Kafka consumer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
dict[str, Any]
|
Configuration dictionary for the consumer. |
required |
executor
|
Optional[ThreadPoolExecutor]
|
Optional ThreadPoolExecutor. |
None
|
Raises:
| Type | Description |
|---|---|
ConsumerError
|
If the consumer cannot be initialized. |
close()
async
¶
Close the consumer and leave the consumer group.
commit(message=None, asynchronous=True)
async
¶
Asynchronously commit offsets.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
Any
|
Specific message to commit. If None, commits all consumed. |
None
|
asynchronous
|
bool
|
If True, commit asynchronously. |
True
|
poll(timeout=1.0)
async
¶
Asynchronously poll for a single message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Maximum time to wait in seconds. |
1.0
|
Returns:
| Type | Description |
|---|---|
Optional[KafkaMessage]
|
A KafkaMessage, or None if timeout expired. |
Raises:
| Type | Description |
|---|---|
ConsumerError
|
If an error occurs during polling. |
subscribe(topics, **kwargs)
¶
Subscribe to topics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topics
|
list[str]
|
List of topic names. |
required |
**kwargs
|
Any
|
Additional arguments passed to confluent-kafka subscribe. |
{}
|
typedkafka.aio.MessageBatch
¶
A batch of messages for efficient processing.
Attributes:
| Name | Type | Description |
|---|---|---|
messages |
The list of messages in this batch. |
Examples:
>>> async for batch in batch_consume(consumer, batch_size=50):
... for msg in batch:
... process(msg)
topics
property
¶
Set of topics represented in this batch.