Configuration¶
typedkafka.config.ProducerConfig
¶
Bases: _SecurityConfigMixin
Type-safe builder for Kafka producer configuration.
Provides a fluent API with full type hints and validation for common producer configuration options.
Examples:
>>> config = (ProducerConfig()
... .bootstrap_servers("localhost:9092")
... .compression("gzip")
... .acks("all")
... .build())
>>>
>>> from typedkafka import KafkaProducer
>>> producer = KafkaProducer(config)
>>> # With multiple brokers
>>> config = (ProducerConfig()
... .bootstrap_servers("broker1:9092,broker2:9092,broker3:9092")
... .client_id("my-application")
... .build())
__init__()
¶
Initialize an empty producer configuration.
acks(acks)
¶
Set the number of acknowledgments required.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
acks
|
Union[Literal['0', '1', 'all'], int]
|
Acknowledgment level: - "0" or 0: No acknowledgment (fire and forget) - "1" or 1: Leader acknowledgment only - "all" or -1: All in-sync replicas must acknowledge |
required |
Returns:
| Type | Description |
|---|---|
ProducerConfig
|
Self for method chaining |
Examples:
batch_size(bytes_size)
¶
Set maximum batch size in bytes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bytes_size
|
int
|
Maximum batch size (default: 16384) |
required |
Returns:
| Type | Description |
|---|---|
ProducerConfig
|
Self for method chaining |
Examples:
bootstrap_servers(servers)
¶
Set the Kafka broker addresses.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
servers
|
str
|
Comma-separated list of broker addresses. Example: "localhost:9092" or "broker1:9092,broker2:9092" |
required |
Returns:
| Type | Description |
|---|---|
ProducerConfig
|
Self for method chaining |
Examples:
build(validate=False)
¶
Build and return the configuration dictionary.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
validate
|
bool
|
If True, verify required fields and configuration consistency. |
False
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Configuration dict ready for KafkaProducer |
Raises:
| Type | Description |
|---|---|
ConfigurationError
|
If validate is True and configuration is invalid. |
Examples:
client_id(client_id)
¶
Set the client ID for this producer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client_id
|
str
|
Client identifier string |
required |
Returns:
| Type | Description |
|---|---|
ProducerConfig
|
Self for method chaining |
compression(compression_type)
¶
Set the compression codec.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
compression_type
|
Literal['none', 'gzip', 'snappy', 'lz4', 'zstd']
|
Compression algorithm to use |
required |
Returns:
| Type | Description |
|---|---|
ProducerConfig
|
Self for method chaining |
Examples:
enable_idempotence(enabled=True)
¶
Enable idempotent producer (requires acks=all).
When enabled, the producer will ensure that messages are delivered exactly once and in order per partition.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
enabled
|
bool
|
True to enable idempotence (default: True) |
True
|
Returns:
| Type | Description |
|---|---|
ProducerConfig
|
Self for method chaining |
exactly_once(bootstrap_servers, transactional_id)
classmethod
¶
Preset for exactly-once semantics.
Configures idempotent, transactional producer with acks=all.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bootstrap_servers
|
str
|
Broker addresses |
required |
transactional_id
|
str
|
Unique transactional identifier |
required |
Returns:
| Type | Description |
|---|---|
ProducerConfig
|
Pre-configured ProducerConfig |
from_env(prefix='KAFKA_')
classmethod
¶
Load configuration from environment variables.
Reads environment variables with the given prefix and maps them to producer configuration options.
Supported variables
{prefix}BOOTSTRAP_SERVERS{prefix}ACKS{prefix}COMPRESSION{prefix}LINGER_MS{prefix}TRANSACTIONAL_ID
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prefix
|
str
|
Environment variable prefix (default: "KAFKA_") |
'KAFKA_'
|
Returns:
| Type | Description |
|---|---|
ProducerConfig
|
ProducerConfig with values from environment |
high_throughput(bootstrap_servers)
classmethod
¶
Preset for high-throughput scenarios.
Configures lz4 compression, acks=1, batching with 50ms linger and 64KB batch size.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
bootstrap_servers
|
str
|
Broker addresses |
required |
Returns:
| Type | Description |
|---|---|
ProducerConfig
|
Pre-configured ProducerConfig |
linger_ms(milliseconds)
¶
Set time to wait before sending a batch.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
milliseconds
|
int
|
Time to wait for more messages before sending |
required |
Returns:
| Type | Description |
|---|---|
ProducerConfig
|
Self for method chaining |
Examples:
max_in_flight_requests(count)
¶
Set maximum number of unacknowledged requests.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
count
|
int
|
Max in-flight requests per connection (1-5 recommended) |
required |
Returns:
| Type | Description |
|---|---|
ProducerConfig
|
Self for method chaining |
retries(count)
¶
Set number of retries for failed sends.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
count
|
int
|
Number of retries (default: 2147483647 for infinite) |
required |
Returns:
| Type | Description |
|---|---|
ProducerConfig
|
Self for method chaining |
set(key, value)
¶
Set a custom configuration parameter.
Use this for advanced configurations not covered by type-safe methods.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Configuration key |
required |
value
|
Any
|
Configuration value |
required |
Returns:
| Type | Description |
|---|---|
ProducerConfig
|
Self for method chaining |
Examples:
stats_interval_ms(milliseconds)
¶
Enable statistics reporting at the given interval.
When set, confluent-kafka will emit internal statistics at this interval.
Use the on_stats parameter on KafkaProducer to receive parsed stats.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
milliseconds
|
int
|
Stats reporting interval in milliseconds (e.g. 5000 for every 5s). |
required |
Returns:
| Type | Description |
|---|---|
ProducerConfig
|
Self for method chaining |
Raises:
| Type | Description |
|---|---|
ValueError
|
If milliseconds is negative. |
Examples:
transactional_id(txn_id)
¶
Set the transactional ID for exactly-once semantics.
Requires enable_idempotence(True) and acks("all").
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
txn_id
|
str
|
Unique transactional identifier |
required |
Returns:
| Type | Description |
|---|---|
ProducerConfig
|
Self for method chaining |
typedkafka.config.ConsumerConfig
¶
Bases: _SecurityConfigMixin
Type-safe builder for Kafka consumer configuration.
Provides a fluent API with full type hints and validation for common consumer configuration options.
Examples:
>>> config = (ConsumerConfig()
... .bootstrap_servers("localhost:9092")
... .group_id("my-consumer-group")
... .auto_offset_reset("earliest")
... .build())
>>>
>>> from typedkafka import KafkaConsumer
>>> consumer = KafkaConsumer(config)
__init__()
¶
Initialize an empty consumer configuration.
auto_commit_interval_ms(milliseconds)
¶
Set frequency of automatic offset commits.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
milliseconds
|
int
|
Commit interval (default: 5000) |
required |
Returns:
| Type | Description |
|---|---|
ConsumerConfig
|
Self for method chaining |
auto_offset_reset(reset)
¶
Set behavior when no initial offset exists.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
reset
|
Literal['earliest', 'latest', 'none']
|
Offset reset behavior: - "earliest": Start from the beginning - "latest": Start from the end (skip existing messages) - "none": Throw error if no offset exists |
required |
Returns:
| Type | Description |
|---|---|
ConsumerConfig
|
Self for method chaining |
Examples:
bootstrap_servers(servers)
¶
Set the Kafka broker addresses.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
servers
|
str
|
Comma-separated list of broker addresses |
required |
Returns:
| Type | Description |
|---|---|
ConsumerConfig
|
Self for method chaining |
build(validate=False)
¶
Build and return the configuration dictionary.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
validate
|
bool
|
If True, verify that required fields (bootstrap.servers, group.id) are set. |
False
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Configuration dict ready for KafkaConsumer |
Raises:
| Type | Description |
|---|---|
ConfigurationError
|
If validate is True and required fields are missing. |
client_id(client_id)
¶
Set the client ID for this consumer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client_id
|
str
|
Client identifier string |
required |
Returns:
| Type | Description |
|---|---|
ConsumerConfig
|
Self for method chaining |
enable_auto_commit(enabled=True)
¶
Enable or disable automatic offset commits.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
enabled
|
bool
|
True to auto-commit, False for manual commits |
True
|
Returns:
| Type | Description |
|---|---|
ConsumerConfig
|
Self for method chaining |
Examples:
from_env(prefix='KAFKA_')
classmethod
¶
Load configuration from environment variables.
Supported variables
{prefix}BOOTSTRAP_SERVERS{prefix}GROUP_ID{prefix}AUTO_OFFSET_RESET
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prefix
|
str
|
Environment variable prefix (default: "KAFKA_") |
'KAFKA_'
|
Returns:
| Type | Description |
|---|---|
ConsumerConfig
|
ConsumerConfig with values from environment |
group_id(group_id)
¶
Set the consumer group ID (required for subscribe()).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
group_id
|
str
|
Consumer group identifier |
required |
Returns:
| Type | Description |
|---|---|
ConsumerConfig
|
Self for method chaining |
Examples:
max_poll_interval_ms(milliseconds)
¶
Set maximum time between polls.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
milliseconds
|
int
|
Max poll interval (default: 300000) |
required |
Returns:
| Type | Description |
|---|---|
ConsumerConfig
|
Self for method chaining |
max_poll_records(count)
¶
Set maximum records returned in a single poll.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
count
|
int
|
Max records per poll |
required |
Returns:
| Type | Description |
|---|---|
ConsumerConfig
|
Self for method chaining |
session_timeout_ms(milliseconds)
¶
Set consumer session timeout.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
milliseconds
|
int
|
Session timeout (default: 10000) |
required |
Returns:
| Type | Description |
|---|---|
ConsumerConfig
|
Self for method chaining |
set(key, value)
¶
Set a custom configuration parameter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Configuration key |
required |
value
|
Any
|
Configuration value |
required |
Returns:
| Type | Description |
|---|---|
ConsumerConfig
|
Self for method chaining |
stats_interval_ms(milliseconds)
¶
Enable statistics reporting at the given interval.
When set, confluent-kafka will emit internal statistics at this interval.
Use the on_stats parameter on KafkaConsumer to receive parsed stats.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
milliseconds
|
int
|
Stats reporting interval in milliseconds (e.g. 5000 for every 5s). |
required |
Returns:
| Type | Description |
|---|---|
ConsumerConfig
|
Self for method chaining |
Raises:
| Type | Description |
|---|---|
ValueError
|
If milliseconds is negative. |
Examples: