Skip to content

Configuration

typedkafka.config.ProducerConfig

Bases: _SecurityConfigMixin

Type-safe builder for Kafka producer configuration.

Provides a fluent API with full type hints and validation for common producer configuration options.

Examples:

>>> config = (ProducerConfig()
...     .bootstrap_servers("localhost:9092")
...     .compression("gzip")
...     .acks("all")
...     .build())
>>>
>>> from typedkafka import KafkaProducer
>>> producer = KafkaProducer(config)
>>> # With multiple brokers
>>> config = (ProducerConfig()
...     .bootstrap_servers("broker1:9092,broker2:9092,broker3:9092")
...     .client_id("my-application")
...     .build())

__init__()

Initialize an empty producer configuration.

acks(acks)

Set the number of acknowledgments required.

Parameters:

Name Type Description Default
acks Union[Literal['0', '1', 'all'], int]

Acknowledgment level: - "0" or 0: No acknowledgment (fire and forget) - "1" or 1: Leader acknowledgment only - "all" or -1: All in-sync replicas must acknowledge

required

Returns:

Type Description
ProducerConfig

Self for method chaining

Examples:

>>> config = ProducerConfig().acks("all")  # Maximum durability
>>> config = ProducerConfig().acks("1")     # Leader only
>>> config = ProducerConfig().acks("0")     # No acknowledgment

batch_size(bytes_size)

Set maximum batch size in bytes.

Parameters:

Name Type Description Default
bytes_size int

Maximum batch size (default: 16384)

required

Returns:

Type Description
ProducerConfig

Self for method chaining

Examples:

>>> config = ProducerConfig().batch_size(32768)  # 32KB batches

bootstrap_servers(servers)

Set the Kafka broker addresses.

Parameters:

Name Type Description Default
servers str

Comma-separated list of broker addresses. Example: "localhost:9092" or "broker1:9092,broker2:9092"

required

Returns:

Type Description
ProducerConfig

Self for method chaining

Examples:

>>> config = ProducerConfig().bootstrap_servers("localhost:9092")
>>> config = ProducerConfig().bootstrap_servers("b1:9092,b2:9092,b3:9092")

build(validate=False)

Build and return the configuration dictionary.

Parameters:

Name Type Description Default
validate bool

If True, verify required fields and configuration consistency.

False

Returns:

Type Description
dict[str, Any]

Configuration dict ready for KafkaProducer

Raises:

Type Description
ConfigurationError

If validate is True and configuration is invalid.

Examples:

>>> config = (ProducerConfig()
...     .bootstrap_servers("localhost:9092")
...     .acks("all")
...     .build())
>>> from typedkafka import KafkaProducer
>>> producer = KafkaProducer(config)

client_id(client_id)

Set the client ID for this producer.

Parameters:

Name Type Description Default
client_id str

Client identifier string

required

Returns:

Type Description
ProducerConfig

Self for method chaining

compression(compression_type)

Set the compression codec.

Parameters:

Name Type Description Default
compression_type Literal['none', 'gzip', 'snappy', 'lz4', 'zstd']

Compression algorithm to use

required

Returns:

Type Description
ProducerConfig

Self for method chaining

Examples:

>>> config = ProducerConfig().compression("gzip")
>>> config = ProducerConfig().compression("zstd")  # Best compression

enable_idempotence(enabled=True)

Enable idempotent producer (requires acks=all).

When enabled, the producer will ensure that messages are delivered exactly once and in order per partition.

Parameters:

Name Type Description Default
enabled bool

True to enable idempotence (default: True)

True

Returns:

Type Description
ProducerConfig

Self for method chaining

exactly_once(bootstrap_servers, transactional_id) classmethod

Preset for exactly-once semantics.

Configures idempotent, transactional producer with acks=all.

Parameters:

Name Type Description Default
bootstrap_servers str

Broker addresses

required
transactional_id str

Unique transactional identifier

required

Returns:

Type Description
ProducerConfig

Pre-configured ProducerConfig

from_env(prefix='KAFKA_') classmethod

Load configuration from environment variables.

Reads environment variables with the given prefix and maps them to producer configuration options.

Supported variables
  • {prefix}BOOTSTRAP_SERVERS
  • {prefix}ACKS
  • {prefix}COMPRESSION
  • {prefix}LINGER_MS
  • {prefix}TRANSACTIONAL_ID

Parameters:

Name Type Description Default
prefix str

Environment variable prefix (default: "KAFKA_")

'KAFKA_'

Returns:

Type Description
ProducerConfig

ProducerConfig with values from environment

high_throughput(bootstrap_servers) classmethod

Preset for high-throughput scenarios.

Configures lz4 compression, acks=1, batching with 50ms linger and 64KB batch size.

Parameters:

Name Type Description Default
bootstrap_servers str

Broker addresses

required

Returns:

Type Description
ProducerConfig

Pre-configured ProducerConfig

linger_ms(milliseconds)

Set time to wait before sending a batch.

Parameters:

Name Type Description Default
milliseconds int

Time to wait for more messages before sending

required

Returns:

Type Description
ProducerConfig

Self for method chaining

Examples:

>>> # Wait up to 10ms to batch messages
>>> config = ProducerConfig().linger_ms(10)

max_in_flight_requests(count)

Set maximum number of unacknowledged requests.

Parameters:

Name Type Description Default
count int

Max in-flight requests per connection (1-5 recommended)

required

Returns:

Type Description
ProducerConfig

Self for method chaining

retries(count)

Set number of retries for failed sends.

Parameters:

Name Type Description Default
count int

Number of retries (default: 2147483647 for infinite)

required

Returns:

Type Description
ProducerConfig

Self for method chaining

set(key, value)

Set a custom configuration parameter.

Use this for advanced configurations not covered by type-safe methods.

Parameters:

Name Type Description Default
key str

Configuration key

required
value Any

Configuration value

required

Returns:

Type Description
ProducerConfig

Self for method chaining

Examples:

>>> config = ProducerConfig().set("queue.buffering.max.messages", 100000)

stats_interval_ms(milliseconds)

Enable statistics reporting at the given interval.

When set, confluent-kafka will emit internal statistics at this interval. Use the on_stats parameter on KafkaProducer to receive parsed stats.

Parameters:

Name Type Description Default
milliseconds int

Stats reporting interval in milliseconds (e.g. 5000 for every 5s).

required

Returns:

Type Description
ProducerConfig

Self for method chaining

Raises:

Type Description
ValueError

If milliseconds is negative.

Examples:

>>> config = ProducerConfig().stats_interval_ms(5000).build()

transactional_id(txn_id)

Set the transactional ID for exactly-once semantics.

Requires enable_idempotence(True) and acks("all").

Parameters:

Name Type Description Default
txn_id str

Unique transactional identifier

required

Returns:

Type Description
ProducerConfig

Self for method chaining

typedkafka.config.ConsumerConfig

Bases: _SecurityConfigMixin

Type-safe builder for Kafka consumer configuration.

Provides a fluent API with full type hints and validation for common consumer configuration options.

Examples:

>>> config = (ConsumerConfig()
...     .bootstrap_servers("localhost:9092")
...     .group_id("my-consumer-group")
...     .auto_offset_reset("earliest")
...     .build())
>>>
>>> from typedkafka import KafkaConsumer
>>> consumer = KafkaConsumer(config)

__init__()

Initialize an empty consumer configuration.

auto_commit_interval_ms(milliseconds)

Set frequency of automatic offset commits.

Parameters:

Name Type Description Default
milliseconds int

Commit interval (default: 5000)

required

Returns:

Type Description
ConsumerConfig

Self for method chaining

auto_offset_reset(reset)

Set behavior when no initial offset exists.

Parameters:

Name Type Description Default
reset Literal['earliest', 'latest', 'none']

Offset reset behavior: - "earliest": Start from the beginning - "latest": Start from the end (skip existing messages) - "none": Throw error if no offset exists

required

Returns:

Type Description
ConsumerConfig

Self for method chaining

Examples:

>>> # Process all messages from the beginning
>>> config = ConsumerConfig().auto_offset_reset("earliest")
>>>
>>> # Only process new messages
>>> config = ConsumerConfig().auto_offset_reset("latest")

bootstrap_servers(servers)

Set the Kafka broker addresses.

Parameters:

Name Type Description Default
servers str

Comma-separated list of broker addresses

required

Returns:

Type Description
ConsumerConfig

Self for method chaining

build(validate=False)

Build and return the configuration dictionary.

Parameters:

Name Type Description Default
validate bool

If True, verify that required fields (bootstrap.servers, group.id) are set.

False

Returns:

Type Description
dict[str, Any]

Configuration dict ready for KafkaConsumer

Raises:

Type Description
ConfigurationError

If validate is True and required fields are missing.

client_id(client_id)

Set the client ID for this consumer.

Parameters:

Name Type Description Default
client_id str

Client identifier string

required

Returns:

Type Description
ConsumerConfig

Self for method chaining

enable_auto_commit(enabled=True)

Enable or disable automatic offset commits.

Parameters:

Name Type Description Default
enabled bool

True to auto-commit, False for manual commits

True

Returns:

Type Description
ConsumerConfig

Self for method chaining

Examples:

>>> # Manual offset management
>>> config = ConsumerConfig().enable_auto_commit(False)

from_env(prefix='KAFKA_') classmethod

Load configuration from environment variables.

Supported variables
  • {prefix}BOOTSTRAP_SERVERS
  • {prefix}GROUP_ID
  • {prefix}AUTO_OFFSET_RESET

Parameters:

Name Type Description Default
prefix str

Environment variable prefix (default: "KAFKA_")

'KAFKA_'

Returns:

Type Description
ConsumerConfig

ConsumerConfig with values from environment

group_id(group_id)

Set the consumer group ID (required for subscribe()).

Parameters:

Name Type Description Default
group_id str

Consumer group identifier

required

Returns:

Type Description
ConsumerConfig

Self for method chaining

Examples:

>>> config = ConsumerConfig().group_id("my-application-consumers")

max_poll_interval_ms(milliseconds)

Set maximum time between polls.

Parameters:

Name Type Description Default
milliseconds int

Max poll interval (default: 300000)

required

Returns:

Type Description
ConsumerConfig

Self for method chaining

max_poll_records(count)

Set maximum records returned in a single poll.

Parameters:

Name Type Description Default
count int

Max records per poll

required

Returns:

Type Description
ConsumerConfig

Self for method chaining

session_timeout_ms(milliseconds)

Set consumer session timeout.

Parameters:

Name Type Description Default
milliseconds int

Session timeout (default: 10000)

required

Returns:

Type Description
ConsumerConfig

Self for method chaining

set(key, value)

Set a custom configuration parameter.

Parameters:

Name Type Description Default
key str

Configuration key

required
value Any

Configuration value

required

Returns:

Type Description
ConsumerConfig

Self for method chaining

stats_interval_ms(milliseconds)

Enable statistics reporting at the given interval.

When set, confluent-kafka will emit internal statistics at this interval. Use the on_stats parameter on KafkaConsumer to receive parsed stats.

Parameters:

Name Type Description Default
milliseconds int

Stats reporting interval in milliseconds (e.g. 5000 for every 5s).

required

Returns:

Type Description
ConsumerConfig

Self for method chaining

Raises:

Type Description
ValueError

If milliseconds is negative.

Examples:

>>> config = ConsumerConfig().stats_interval_ms(5000).build()