Skip to content

Consumer

typedkafka.consumer.KafkaConsumer

A well-documented Kafka consumer with full type hints.

This class wraps confluent-kafka's Consumer with: - Comprehensive docstrings on every method - Full type hints for IDE autocomplete - Better error messages - Convenient message deserialization methods - Context manager support for automatic cleanup - Iterator protocol for easy message consumption

Basic Usage

consumer = KafkaConsumer({ ... "bootstrap.servers": "localhost:9092", ... "group.id": "my-group", ... "auto.offset.reset": "earliest" ... }) consumer.subscribe(["my-topic"]) for msg in consumer: ... print(f"Received: {msg.value_as_string()}")

With Context Manager

with KafkaConsumer(config) as consumer: ... consumer.subscribe(["topic"]) ... for msg in consumer: ... process(msg)

Attributes:

Name Type Description
config

The configuration dictionary used to initialize the consumer

metrics property

Current metrics for this consumer.

Tracks messages received, errors, and (if stats enabled) byte throughput.

__enter__()

Enter context manager.

__exit__(exc_type, exc_val, exc_tb)

Exit context manager and cleanup resources.

__init__(config, on_stats=None, value_deserializer=None, logger=None)

Initialize a Kafka consumer with the given configuration.

Parameters:

Name Type Description Default
config dict[str, Any]

Configuration dictionary for the consumer. Common options: - bootstrap.servers (str): Comma-separated list of broker addresses - group.id (str): Consumer group ID (required for subscribe()) - client.id (str): An identifier for this client - auto.offset.reset (str): What to do when there's no initial offset "earliest" = start from beginning, "latest" = start from end - enable.auto.commit (bool): Automatically commit offsets (default: True) - auto.commit.interval.ms (int): Frequency of offset commits in milliseconds - max.poll.interval.ms (int): Max time between polls before being kicked from group - session.timeout.ms (int): Timeout for detecting consumer failures - statistics.interval.ms (int): Stats reporting interval in milliseconds

required
on_stats StatsCallback | None

Optional callback receiving parsed KafkaStats each reporting interval. Requires statistics.interval.ms to be set in config.

None
logger KafkaLogger | None

Optional KafkaLogger for structured logging of consumer operations.

None
value_deserializer Callable[[bytes], Any] | None

Optional function to automatically deserialize message values. When set, use msg.value_as(deserializer) or the configured deserializer will be available for typed consumption patterns.

None

Raises:

Type Description
ConsumerError

If the consumer cannot be initialized

Examples:

>>> # Basic consumer
>>> consumer = KafkaConsumer({
...     "bootstrap.servers": "localhost:9092",
...     "group.id": "my-consumer-group",
...     "auto.offset.reset": "earliest"
... })
>>> # Consumer with metrics
>>> consumer = KafkaConsumer({
...     "bootstrap.servers": "localhost:9092",
...     "group.id": "my-group",
...     "statistics.interval.ms": 5000,
... })
>>> print(consumer.metrics.messages_received)

__iter__()

Iterate over messages indefinitely.

Uses the configured poll_timeout (default 1.0s). Configure via the poll_timeout property.

Yields:

Type Description
KafkaMessage

KafkaMessage objects as they arrive

Examples:

>>> for msg in consumer:
...     print(f"Received: {msg.value_as_string()}")
...     consumer.commit(msg)
>>> # With custom poll timeout
>>> consumer.poll_timeout = 5.0
>>> for msg in consumer:
...     process(msg)

assign(partitions)

Manually assign partitions to this consumer.

Parameters:

Name Type Description Default
partitions list[Any]

List of TopicPartition objects to assign.

required

Raises:

Type Description
ConsumerError

If the assignment fails

Examples:

>>> from confluent_kafka import TopicPartition
>>> consumer.assign([TopicPartition("my-topic", 0), TopicPartition("my-topic", 1)])

assignment()

Get the current partition assignment.

Returns:

Type Description
list[Any]

List of TopicPartition objects currently assigned to this consumer.

Raises:

Type Description
ConsumerError

If retrieving the assignment fails

Examples:

>>> partitions = consumer.assignment()
>>> for tp in partitions:
...     print(f"{tp.topic} [{tp.partition}]")

close()

Close the consumer and leave the consumer group.

It's recommended to use the consumer as a context manager instead of calling this method directly.

Examples:

>>> consumer = KafkaConsumer(config)
>>> try:
...     consumer.subscribe(["topic"])
...     for msg in consumer:
...         process(msg)
... finally:
...     consumer.close()
>>> # Better: use context manager
>>> with KafkaConsumer(config) as consumer:
...     consumer.subscribe(["topic"])
...     for msg in consumer:
...         process(msg)

commit(message=None, asynchronous=True)

Commit offsets to Kafka.

Parameters:

Name Type Description Default
message KafkaMessage | None

Specific message to commit. If None, commits all consumed messages.

None
asynchronous bool

If True, commit asynchronously (default). If False, wait for confirmation.

True

Raises:

Type Description
ConsumerError

If commit fails

Examples:

>>> # Commit after processing each message
>>> msg = consumer.poll()
>>> if msg:
...     process(msg)
...     consumer.commit(msg)
>>> # Commit all consumed messages
>>> consumer.commit()
>>> # Synchronous commit (wait for confirmation)
>>> consumer.commit(msg, asynchronous=False)

poll(timeout=1.0)

Poll for a single message.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait for a message in seconds (default: 1.0)

1.0

Returns:

Type Description
KafkaMessage | None

KafkaMessage if a message was received, None if timeout expired

Raises:

Type Description
ConsumerError

If an error occurs during polling

Examples:

>>> # Poll with default 1 second timeout
>>> msg = consumer.poll()
>>> if msg:
...     print(f"Received: {msg.value_as_string()}")
>>> # Poll with longer timeout
>>> msg = consumer.poll(timeout=5.0)
>>> # Poll in a loop
>>> while True:
...     msg = consumer.poll(timeout=1.0)
...     if msg:
...         process(msg)
...         consumer.commit(msg)

poll_batch(max_messages=100, timeout=1.0)

Poll for a batch of messages.

Parameters:

Name Type Description Default
max_messages int

Maximum number of messages to return (default: 100)

100
timeout float

Maximum time to wait in seconds (default: 1.0)

1.0

Returns:

Type Description
list[KafkaMessage]

List of KafkaMessage objects (may be empty if timeout expires)

Raises:

Type Description
ConsumerError

If an error occurs during polling

Examples:

>>> messages = consumer.poll_batch(max_messages=50, timeout=2.0)
>>> for msg in messages:
...     process(msg)

position(partitions)

Get the current position (offset) for the given partitions.

Parameters:

Name Type Description Default
partitions list[Any]

List of TopicPartition objects to query.

required

Returns:

Type Description
list[Any]

List of TopicPartition objects with offset set to the current position.

Raises:

Type Description
ConsumerError

If retrieving the position fails

Examples:

>>> from confluent_kafka import TopicPartition
>>> positions = consumer.position([TopicPartition("my-topic", 0)])
>>> for tp in positions:
...     print(f"Offset: {tp.offset}")

seek(partition)

Seek to a specific offset on a partition.

Parameters:

Name Type Description Default
partition Any

A TopicPartition object with topic, partition, and offset set.

required

Raises:

Type Description
ConsumerError

If the seek fails

Examples:

>>> from confluent_kafka import TopicPartition
>>> consumer.seek(TopicPartition("my-topic", 0, 100))

subscribe(topics, on_assign=None, on_revoke=None, on_lost=None)

Subscribe to one or more topics.

Parameters:

Name Type Description Default
topics list[str]

List of topic names to subscribe to

required
on_assign Callable[[Any, Any], None] | None

Callback invoked when partitions are assigned. Signature: callback(consumer, partitions)

None
on_revoke Callable[[Any, Any], None] | None

Callback invoked when partitions are revoked. Signature: callback(consumer, partitions)

None
on_lost Callable[[Any, Any], None] | None

Callback invoked when partitions are lost (unclean). Signature: callback(consumer, partitions)

None

Raises:

Type Description
ConsumerError

If subscription fails

Examples:

>>> # Subscribe to a single topic
>>> consumer.subscribe(["my-topic"])
>>> # Subscribe to multiple topics
>>> consumer.subscribe(["orders", "payments", "shipments"])
>>> # Subscribe with rebalance callbacks
>>> def on_assign(consumer, partitions):
...     print(f"Assigned: {partitions}")
>>> def on_revoke(consumer, partitions):
...     print(f"Revoked: {partitions}")
>>> consumer.subscribe(["my-topic"], on_assign=on_assign, on_revoke=on_revoke)

typedkafka.consumer.KafkaMessage

A Kafka message with convenient access methods.

Wraps confluent-kafka's Message with better documentation and helper methods.

Attributes:

Name Type Description
topic

The topic this message came from

partition

The partition number

offset

The message offset

key

The message key as bytes (None if no key)

value

The message value as bytes

timestamp

Message timestamp (type, value) tuple

headers

Message headers as list of (key, value) tuples

__init__(message)

Initialize from a confluent-kafka Message.

Parameters:

Name Type Description Default
message Any

A confluent_kafka.Message object

required

__repr__()

Return string representation of the message.

decode(topic)

Decode the message value using a typed topic's deserializer.

Parameters:

Name Type Description Default
topic TypedTopic[T]

A TypedTopic that specifies deserialization.

required

Returns:

Type Description
T

Deserialized value with the topic's type.

Raises:

Type Description
SerializationError

If deserialization fails.

Examples:

>>> from typedkafka.topics import json_topic
>>> events = json_topic("events")
>>> msg = consumer.poll()
>>> data = msg.decode(events)  # type: Any

key_as_string(encoding='utf-8')

Decode the message key as a UTF-8 string.

Parameters:

Name Type Description Default
encoding str

Character encoding to use (default: utf-8)

'utf-8'

Returns:

Type Description
str | None

Decoded string key, or None if no key

Raises:

Type Description
SerializationError

If decoding fails

Examples:

>>> msg = consumer.poll()
>>> if msg.key_as_string():
...     print(f"Key: {msg.key_as_string()}")

value_as(deserializer)

Decode the message value using a custom deserializer function.

Parameters:

Name Type Description Default
deserializer Callable[[bytes], T]

A callable that takes bytes and returns the desired type.

required

Returns:

Type Description
T

Deserialized value

Raises:

Type Description
SerializationError

If deserialization fails

Examples:

>>> from dataclasses import dataclass
>>> @dataclass
... class UserEvent:
...     user_id: int
...     @classmethod
...     def from_bytes(cls, data: bytes) -> "UserEvent":
...         d = json.loads(data)
...         return cls(user_id=d["user_id"])
>>> event = msg.value_as(UserEvent.from_bytes)

value_as_json()

Deserialize the message value as JSON.

Returns:

Type Description
Any

Parsed JSON object (dict, list, str, int, etc.)

Raises:

Type Description
SerializationError

If JSON parsing fails

Examples:

>>> msg = consumer.poll()
>>> data = msg.value_as_json()
>>> print(f"User ID: {data['user_id']}")

value_as_string(encoding='utf-8')

Decode the message value as a UTF-8 string.

Parameters:

Name Type Description Default
encoding str

Character encoding to use (default: utf-8)

'utf-8'

Returns:

Type Description
str

Decoded string value

Raises:

Type Description
SerializationError

If decoding fails

Examples:

>>> msg = consumer.poll()
>>> text = msg.value_as_string()
>>> print(f"Received: {text}")