Producer¶
typedkafka.producer.KafkaProducer
¶
A well-documented Kafka producer with full type hints.
This class wraps confluent-kafka's Producer with: - Comprehensive docstrings on every method - Full type hints for IDE autocomplete - Better error messages - Convenient methods for common operations (send_json, send_string) - Context manager support for automatic cleanup
Basic Usage
producer = KafkaProducer({"bootstrap.servers": "localhost:9092"}) producer.send("my-topic", b"my message", key=b"my-key") producer.flush() # Wait for all messages to be delivered
With Context Manager
with KafkaProducer({"bootstrap.servers": "localhost:9092"}) as producer: ... producer.send("my-topic", b"message") ... # Automatic flush and cleanup on exit
Attributes:
| Name | Type | Description |
|---|---|---|
config |
The configuration dictionary used to initialize the producer |
metrics
property
¶
Current metrics for this producer.
Tracks messages sent, errors, and (if stats enabled) byte throughput.
__enter__()
¶
__exit__(exc_type, exc_val, exc_tb)
¶
Exit context manager and cleanup resources.
Automatically flushes all pending messages before exiting.
__init__(config, on_stats=None, logger=None)
¶
Initialize a Kafka producer with the given configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
dict[str, Any]
|
Configuration dictionary for the producer. Common options: - bootstrap.servers (str): Comma-separated list of broker addresses Example: "localhost:9092" or "broker1:9092,broker2:9092" - client.id (str): An identifier for this client - acks (str|int): Number of acknowledgments the producer requires "0" = no acknowledgment, "1" = leader only, "all" = all replicas - compression.type (str): Compression codec ("none", "gzip", "snappy", "lz4", "zstd") - max.in.flight.requests.per.connection (int): Max unacknowledged requests - linger.ms (int): Time to wait before sending a batch - batch.size (int): Maximum size of a message batch in bytes - statistics.interval.ms (int): Stats reporting interval in milliseconds |
required |
on_stats
|
StatsCallback | None
|
Optional callback receiving parsed KafkaStats each reporting interval.
Requires |
None
|
logger
|
KafkaLogger | None
|
Optional KafkaLogger for structured logging of producer operations. |
None
|
Raises:
| Type | Description |
|---|---|
ProducerError
|
If the producer cannot be initialized with the given config |
Examples:
abort_transaction(timeout=30.0)
¶
Abort the current transaction.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Maximum time to wait for abort in seconds. |
30.0
|
Raises:
| Type | Description |
|---|---|
TransactionError
|
If aborting the transaction fails. |
begin_transaction()
¶
Begin a new transaction.
Raises:
| Type | Description |
|---|---|
TransactionError
|
If beginning the transaction fails. |
close()
¶
Close the producer and release resources.
Calls flush() to ensure all queued messages are delivered before closing. It's recommended to use the producer as a context manager instead of calling this method directly.
Examples:
commit_transaction(timeout=30.0)
¶
Commit the current transaction.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Maximum time to wait for commit in seconds. |
30.0
|
Raises:
| Type | Description |
|---|---|
TransactionError
|
If committing the transaction fails. |
flush(timeout=-1)
¶
Wait for all messages in the queue to be delivered.
Blocks until all messages are sent or the timeout expires.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Maximum time to wait in seconds. Use -1 for infinite wait (default). Example: 5.0 = wait up to 5 seconds |
-1
|
Returns:
| Type | Description |
|---|---|
int
|
Number of messages still in queue/internal Producer state. 0 means all delivered. |
Raises:
| Type | Description |
|---|---|
ProducerError
|
If flush fails |
Examples:
init_transactions(timeout=30.0)
¶
Initialize the producer for transactions.
Must be called before any transactional methods. Requires the
transactional.id configuration to be set.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Maximum time to wait for initialization in seconds. |
30.0
|
Raises:
| Type | Description |
|---|---|
TransactionError
|
If transaction initialization fails. |
Examples:
send(topic, value, key=None, partition=None, on_delivery=None, headers=None)
¶
Send a message to a Kafka topic.
This method is asynchronous - it returns immediately after queuing the message. Use flush() to wait for delivery confirmation.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
The topic name to send the message to |
required |
value
|
bytes
|
The message payload as bytes |
required |
key
|
bytes | None
|
Optional message key as bytes. Messages with the same key go to the same partition. |
None
|
partition
|
int | None
|
Optional partition number. If None, partition is chosen by the partitioner. |
None
|
on_delivery
|
DeliveryCallback | None
|
Optional callback function called when delivery succeeds or fails. Signature: callback(error, message) |
None
|
headers
|
list[tuple[str, bytes]] | None
|
Optional list of (key, value) header tuples to include with the message. |
None
|
Raises:
| Type | Description |
|---|---|
ProducerError
|
If the message cannot be queued (e.g., queue is full) |
Examples:
send_batch(topic, messages, on_delivery=None)
¶
Send a batch of messages to a Kafka topic.
Each message is a tuple of (value, key). This is more efficient than calling send() repeatedly as it defers polling until after all messages are queued.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
The topic name to send the messages to |
required |
messages
|
list[tuple[bytes, bytes | None]]
|
List of (value, key) tuples. Key can be None. |
required |
on_delivery
|
DeliveryCallback | None
|
Optional callback for each message delivery. |
None
|
Raises:
| Type | Description |
|---|---|
ProducerError
|
If any message cannot be queued |
Examples:
send_json(topic, value, key=None, partition=None, on_delivery=None)
¶
Send a JSON-serialized message to a Kafka topic.
Convenience method that automatically serializes Python objects to JSON.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
The topic name to send the message to |
required |
value
|
Any
|
Any JSON-serializable Python object (dict, list, str, int, etc.) |
required |
key
|
str | None
|
Optional string key (will be UTF-8 encoded) |
None
|
partition
|
int | None
|
Optional partition number |
None
|
on_delivery
|
DeliveryCallback | None
|
Optional callback function for delivery confirmation |
None
|
Raises:
| Type | Description |
|---|---|
SerializationError
|
If the value cannot be serialized to JSON |
ProducerError
|
If the message cannot be queued |
Examples:
send_string(topic, value, key=None, partition=None, on_delivery=None)
¶
Send a UTF-8 encoded string message to a Kafka topic.
Convenience method for sending text messages.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
The topic name to send the message to |
required |
value
|
str
|
String message to send |
required |
key
|
str | None
|
Optional string key |
None
|
partition
|
int | None
|
Optional partition number |
None
|
on_delivery
|
DeliveryCallback | None
|
Optional callback function for delivery confirmation |
None
|
Raises:
| Type | Description |
|---|---|
ProducerError
|
If the message cannot be queued |
Examples:
send_typed(topic, value, key=None, partition=None, on_delivery=None, headers=None)
¶
Send a message to a typed topic using its configured serializers.
Provides compile-time type safety: the IDE will verify that value
matches the topic's type parameter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
TypedTopic[T]
|
A TypedTopic that specifies serialization. |
required |
value
|
T
|
The message value (must match the topic's type parameter). |
required |
key
|
Any | None
|
Optional message key. Requires the topic to have a key_serializer. |
None
|
partition
|
int | None
|
Optional partition number. |
None
|
on_delivery
|
DeliveryCallback | None
|
Optional delivery report callback. |
None
|
headers
|
list[tuple[str, bytes]] | None
|
Optional message headers. |
None
|
Raises:
| Type | Description |
|---|---|
SerializationError
|
If serialization fails or key provided without key_serializer. |
ProducerError
|
If the message cannot be queued. |
Examples:
transaction()
¶
Return a context manager for transactional sends.
Automatically begins, commits, or aborts the transaction.
Returns:
| Type | Description |
|---|---|
TransactionContext
|
A context manager that manages the transaction lifecycle. |
Raises:
| Type | Description |
|---|---|
ProducerError
|
If any transaction operation fails. |
Examples:
typedkafka.producer.TransactionContext
¶
Context manager for Kafka transactions.
Begins a transaction on entry and commits on clean exit. Aborts the transaction if an exception occurs.