Skip to content

Producer

typedkafka.producer.KafkaProducer

A well-documented Kafka producer with full type hints.

This class wraps confluent-kafka's Producer with: - Comprehensive docstrings on every method - Full type hints for IDE autocomplete - Better error messages - Convenient methods for common operations (send_json, send_string) - Context manager support for automatic cleanup

Basic Usage

producer = KafkaProducer({"bootstrap.servers": "localhost:9092"}) producer.send("my-topic", b"my message", key=b"my-key") producer.flush() # Wait for all messages to be delivered

With Context Manager

with KafkaProducer({"bootstrap.servers": "localhost:9092"}) as producer: ... producer.send("my-topic", b"message") ... # Automatic flush and cleanup on exit

JSON Messages

producer.send_json("events", {"user_id": 123, "action": "click"})

Attributes:

Name Type Description
config

The configuration dictionary used to initialize the producer

metrics property

Current metrics for this producer.

Tracks messages sent, errors, and (if stats enabled) byte throughput.

__enter__()

Enter context manager.

Returns:

Type Description
KafkaProducer

self

Examples:

>>> with KafkaProducer({"bootstrap.servers": "localhost:9092"}) as producer:
...     producer.send("topic", b"message")

__exit__(exc_type, exc_val, exc_tb)

Exit context manager and cleanup resources.

Automatically flushes all pending messages before exiting.

__init__(config, on_stats=None, logger=None)

Initialize a Kafka producer with the given configuration.

Parameters:

Name Type Description Default
config dict[str, Any]

Configuration dictionary for the producer. Common options: - bootstrap.servers (str): Comma-separated list of broker addresses Example: "localhost:9092" or "broker1:9092,broker2:9092" - client.id (str): An identifier for this client - acks (str|int): Number of acknowledgments the producer requires "0" = no acknowledgment, "1" = leader only, "all" = all replicas - compression.type (str): Compression codec ("none", "gzip", "snappy", "lz4", "zstd") - max.in.flight.requests.per.connection (int): Max unacknowledged requests - linger.ms (int): Time to wait before sending a batch - batch.size (int): Maximum size of a message batch in bytes - statistics.interval.ms (int): Stats reporting interval in milliseconds

required
on_stats StatsCallback | None

Optional callback receiving parsed KafkaStats each reporting interval. Requires statistics.interval.ms to be set in config.

None
logger KafkaLogger | None

Optional KafkaLogger for structured logging of producer operations.

None

Raises:

Type Description
ProducerError

If the producer cannot be initialized with the given config

Examples:

>>> # Basic producer
>>> producer = KafkaProducer({"bootstrap.servers": "localhost:9092"})
>>> # Producer with metrics
>>> producer = KafkaProducer({
...     "bootstrap.servers": "localhost:9092",
...     "statistics.interval.ms": 5000,
... })
>>> print(producer.metrics.messages_sent)

abort_transaction(timeout=30.0)

Abort the current transaction.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait for abort in seconds.

30.0

Raises:

Type Description
TransactionError

If aborting the transaction fails.

begin_transaction()

Begin a new transaction.

Raises:

Type Description
TransactionError

If beginning the transaction fails.

close()

Close the producer and release resources.

Calls flush() to ensure all queued messages are delivered before closing. It's recommended to use the producer as a context manager instead of calling this method directly.

Examples:

>>> producer = KafkaProducer({"bootstrap.servers": "localhost:9092"})
>>> try:
...     producer.send("topic", b"message")
... finally:
...     producer.close()  # Ensure cleanup
>>> # Better: use context manager
>>> with KafkaProducer({"bootstrap.servers": "localhost:9092"}) as producer:
...     producer.send("topic", b"message")

commit_transaction(timeout=30.0)

Commit the current transaction.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait for commit in seconds.

30.0

Raises:

Type Description
TransactionError

If committing the transaction fails.

flush(timeout=-1)

Wait for all messages in the queue to be delivered.

Blocks until all messages are sent or the timeout expires.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait in seconds. Use -1 for infinite wait (default). Example: 5.0 = wait up to 5 seconds

-1

Returns:

Type Description
int

Number of messages still in queue/internal Producer state. 0 means all delivered.

Raises:

Type Description
ProducerError

If flush fails

Examples:

>>> # Wait for all messages to be delivered
>>> producer.send("topic", b"message 1")
>>> producer.send("topic", b"message 2")
>>> remaining = producer.flush()
>>> if remaining == 0:
...     print("All messages delivered successfully")
>>> # Wait up to 5 seconds
>>> remaining = producer.flush(timeout=5.0)
>>> if remaining > 0:
...     print(f"Warning: {remaining} messages not delivered after 5 seconds")

init_transactions(timeout=30.0)

Initialize the producer for transactions.

Must be called before any transactional methods. Requires the transactional.id configuration to be set.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait for initialization in seconds.

30.0

Raises:

Type Description
TransactionError

If transaction initialization fails.

Examples:

>>> producer = KafkaProducer({
...     "bootstrap.servers": "localhost:9092",
...     "transactional.id": "my-txn-id",
... })
>>> producer.init_transactions()

send(topic, value, key=None, partition=None, on_delivery=None, headers=None)

Send a message to a Kafka topic.

This method is asynchronous - it returns immediately after queuing the message. Use flush() to wait for delivery confirmation.

Parameters:

Name Type Description Default
topic str

The topic name to send the message to

required
value bytes

The message payload as bytes

required
key bytes | None

Optional message key as bytes. Messages with the same key go to the same partition.

None
partition int | None

Optional partition number. If None, partition is chosen by the partitioner.

None
on_delivery DeliveryCallback | None

Optional callback function called when delivery succeeds or fails. Signature: callback(error, message)

None
headers list[tuple[str, bytes]] | None

Optional list of (key, value) header tuples to include with the message.

None

Raises:

Type Description
ProducerError

If the message cannot be queued (e.g., queue is full)

Examples:

>>> # Send a simple message
>>> producer.send("my-topic", b"Hello, Kafka!")
>>> # Send with a key for partitioning
>>> producer.send("user-events", b"event data", key=b"user-123")
>>> # Send with headers
>>> producer.send("topic", b"data", headers=[("trace-id", b"abc123")])

send_batch(topic, messages, on_delivery=None)

Send a batch of messages to a Kafka topic.

Each message is a tuple of (value, key). This is more efficient than calling send() repeatedly as it defers polling until after all messages are queued.

Parameters:

Name Type Description Default
topic str

The topic name to send the messages to

required
messages list[tuple[bytes, bytes | None]]

List of (value, key) tuples. Key can be None.

required
on_delivery DeliveryCallback | None

Optional callback for each message delivery.

None

Raises:

Type Description
ProducerError

If any message cannot be queued

Examples:

>>> producer.send_batch("events", [
...     (b"event1", b"key1"),
...     (b"event2", b"key2"),
...     (b"event3", None),
... ])
>>> producer.flush()

send_json(topic, value, key=None, partition=None, on_delivery=None)

Send a JSON-serialized message to a Kafka topic.

Convenience method that automatically serializes Python objects to JSON.

Parameters:

Name Type Description Default
topic str

The topic name to send the message to

required
value Any

Any JSON-serializable Python object (dict, list, str, int, etc.)

required
key str | None

Optional string key (will be UTF-8 encoded)

None
partition int | None

Optional partition number

None
on_delivery DeliveryCallback | None

Optional callback function for delivery confirmation

None

Raises:

Type Description
SerializationError

If the value cannot be serialized to JSON

ProducerError

If the message cannot be queued

Examples:

>>> # Send a dict as JSON
>>> producer.send_json("events", {"user_id": 123, "action": "click"})
>>> # Send with a string key
>>> producer.send_json("user-data", {"name": "Alice"}, key="user-123")
>>> # Send a list
>>> producer.send_json("numbers", [1, 2, 3, 4, 5])

send_string(topic, value, key=None, partition=None, on_delivery=None)

Send a UTF-8 encoded string message to a Kafka topic.

Convenience method for sending text messages.

Parameters:

Name Type Description Default
topic str

The topic name to send the message to

required
value str

String message to send

required
key str | None

Optional string key

None
partition int | None

Optional partition number

None
on_delivery DeliveryCallback | None

Optional callback function for delivery confirmation

None

Raises:

Type Description
ProducerError

If the message cannot be queued

Examples:

>>> producer.send_string("logs", "Application started successfully")
>>> producer.send_string("user-messages", "Hello!", key="user-123")

send_typed(topic, value, key=None, partition=None, on_delivery=None, headers=None)

Send a message to a typed topic using its configured serializers.

Provides compile-time type safety: the IDE will verify that value matches the topic's type parameter.

Parameters:

Name Type Description Default
topic TypedTopic[T]

A TypedTopic that specifies serialization.

required
value T

The message value (must match the topic's type parameter).

required
key Any | None

Optional message key. Requires the topic to have a key_serializer.

None
partition int | None

Optional partition number.

None
on_delivery DeliveryCallback | None

Optional delivery report callback.

None
headers list[tuple[str, bytes]] | None

Optional message headers.

None

Raises:

Type Description
SerializationError

If serialization fails or key provided without key_serializer.

ProducerError

If the message cannot be queued.

Examples:

>>> from typedkafka.topics import json_topic, string_topic
>>> events = json_topic("events")
>>> producer.send_typed(events, {"user_id": 123, "action": "click"})
>>>
>>> logs = string_topic("logs")
>>> producer.send_typed(logs, "Application started")

transaction()

Return a context manager for transactional sends.

Automatically begins, commits, or aborts the transaction.

Returns:

Type Description
TransactionContext

A context manager that manages the transaction lifecycle.

Raises:

Type Description
ProducerError

If any transaction operation fails.

Examples:

>>> producer = KafkaProducer({
...     "bootstrap.servers": "localhost:9092",
...     "transactional.id": "my-txn-id",
... })
>>> producer.init_transactions()
>>> with producer.transaction():
...     producer.send("topic", b"msg1")
...     producer.send("topic", b"msg2")

typedkafka.producer.TransactionContext

Context manager for Kafka transactions.

Begins a transaction on entry and commits on clean exit. Aborts the transaction if an exception occurs.