Consumer¶
typedkafka.consumer.KafkaConsumer
¶
A well-documented Kafka consumer with full type hints.
This class wraps confluent-kafka's Consumer with: - Comprehensive docstrings on every method - Full type hints for IDE autocomplete - Better error messages - Convenient message deserialization methods - Context manager support for automatic cleanup - Iterator protocol for easy message consumption
Basic Usage
consumer = KafkaConsumer({ ... "bootstrap.servers": "localhost:9092", ... "group.id": "my-group", ... "auto.offset.reset": "earliest" ... }) consumer.subscribe(["my-topic"]) for msg in consumer: ... print(f"Received: {msg.value_as_string()}")
With Context Manager
with KafkaConsumer(config) as consumer: ... consumer.subscribe(["topic"]) ... for msg in consumer: ... process(msg)
Attributes:
| Name | Type | Description |
|---|---|---|
config |
The configuration dictionary used to initialize the consumer |
metrics
property
¶
Current metrics for this consumer.
Tracks messages received, errors, and (if stats enabled) byte throughput.
__enter__()
¶
Enter context manager.
__exit__(exc_type, exc_val, exc_tb)
¶
Exit context manager and cleanup resources.
__init__(config, on_stats=None, value_deserializer=None, logger=None)
¶
Initialize a Kafka consumer with the given configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
dict[str, Any]
|
Configuration dictionary for the consumer. Common options: - bootstrap.servers (str): Comma-separated list of broker addresses - group.id (str): Consumer group ID (required for subscribe()) - client.id (str): An identifier for this client - auto.offset.reset (str): What to do when there's no initial offset "earliest" = start from beginning, "latest" = start from end - enable.auto.commit (bool): Automatically commit offsets (default: True) - auto.commit.interval.ms (int): Frequency of offset commits in milliseconds - max.poll.interval.ms (int): Max time between polls before being kicked from group - session.timeout.ms (int): Timeout for detecting consumer failures - statistics.interval.ms (int): Stats reporting interval in milliseconds |
required |
on_stats
|
StatsCallback | None
|
Optional callback receiving parsed KafkaStats each reporting interval.
Requires |
None
|
logger
|
KafkaLogger | None
|
Optional KafkaLogger for structured logging of consumer operations. |
None
|
value_deserializer
|
Callable[[bytes], Any] | None
|
Optional function to automatically deserialize message
values. When set, use |
None
|
Raises:
| Type | Description |
|---|---|
ConsumerError
|
If the consumer cannot be initialized |
Examples:
__iter__()
¶
Iterate over messages indefinitely.
Uses the configured poll_timeout (default 1.0s). Configure via the poll_timeout property.
Yields:
| Type | Description |
|---|---|
KafkaMessage
|
KafkaMessage objects as they arrive |
Examples:
assign(partitions)
¶
Manually assign partitions to this consumer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
partitions
|
list[Any]
|
List of TopicPartition objects to assign. |
required |
Raises:
| Type | Description |
|---|---|
ConsumerError
|
If the assignment fails |
Examples:
assignment()
¶
Get the current partition assignment.
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of TopicPartition objects currently assigned to this consumer. |
Raises:
| Type | Description |
|---|---|
ConsumerError
|
If retrieving the assignment fails |
Examples:
close()
¶
Close the consumer and leave the consumer group.
It's recommended to use the consumer as a context manager instead of calling this method directly.
Examples:
commit(message=None, asynchronous=True)
¶
Commit offsets to Kafka.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
KafkaMessage | None
|
Specific message to commit. If None, commits all consumed messages. |
None
|
asynchronous
|
bool
|
If True, commit asynchronously (default). If False, wait for confirmation. |
True
|
Raises:
| Type | Description |
|---|---|
ConsumerError
|
If commit fails |
Examples:
poll(timeout=1.0)
¶
Poll for a single message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Maximum time to wait for a message in seconds (default: 1.0) |
1.0
|
Returns:
| Type | Description |
|---|---|
KafkaMessage | None
|
KafkaMessage if a message was received, None if timeout expired |
Raises:
| Type | Description |
|---|---|
ConsumerError
|
If an error occurs during polling |
Examples:
poll_batch(max_messages=100, timeout=1.0)
¶
Poll for a batch of messages.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_messages
|
int
|
Maximum number of messages to return (default: 100) |
100
|
timeout
|
float
|
Maximum time to wait in seconds (default: 1.0) |
1.0
|
Returns:
| Type | Description |
|---|---|
list[KafkaMessage]
|
List of KafkaMessage objects (may be empty if timeout expires) |
Raises:
| Type | Description |
|---|---|
ConsumerError
|
If an error occurs during polling |
Examples:
position(partitions)
¶
Get the current position (offset) for the given partitions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
partitions
|
list[Any]
|
List of TopicPartition objects to query. |
required |
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of TopicPartition objects with offset set to the current position. |
Raises:
| Type | Description |
|---|---|
ConsumerError
|
If retrieving the position fails |
Examples:
seek(partition)
¶
Seek to a specific offset on a partition.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
partition
|
Any
|
A TopicPartition object with topic, partition, and offset set. |
required |
Raises:
| Type | Description |
|---|---|
ConsumerError
|
If the seek fails |
Examples:
subscribe(topics, on_assign=None, on_revoke=None, on_lost=None)
¶
Subscribe to one or more topics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topics
|
list[str]
|
List of topic names to subscribe to |
required |
on_assign
|
Callable[[Any, Any], None] | None
|
Callback invoked when partitions are assigned. Signature: callback(consumer, partitions) |
None
|
on_revoke
|
Callable[[Any, Any], None] | None
|
Callback invoked when partitions are revoked. Signature: callback(consumer, partitions) |
None
|
on_lost
|
Callable[[Any, Any], None] | None
|
Callback invoked when partitions are lost (unclean). Signature: callback(consumer, partitions) |
None
|
Raises:
| Type | Description |
|---|---|
ConsumerError
|
If subscription fails |
Examples:
typedkafka.consumer.KafkaMessage
¶
A Kafka message with convenient access methods.
Wraps confluent-kafka's Message with better documentation and helper methods.
Attributes:
| Name | Type | Description |
|---|---|---|
topic |
The topic this message came from |
|
partition |
The partition number |
|
offset |
The message offset |
|
key |
The message key as bytes (None if no key) |
|
value |
The message value as bytes |
|
timestamp |
Message timestamp (type, value) tuple |
|
headers |
Message headers as list of (key, value) tuples |
__init__(message)
¶
Initialize from a confluent-kafka Message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
Any
|
A confluent_kafka.Message object |
required |
__repr__()
¶
Return string representation of the message.
decode(topic)
¶
Decode the message value using a typed topic's deserializer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
TypedTopic[T]
|
A TypedTopic that specifies deserialization. |
required |
Returns:
| Type | Description |
|---|---|
T
|
Deserialized value with the topic's type. |
Raises:
| Type | Description |
|---|---|
SerializationError
|
If deserialization fails. |
Examples:
key_as_string(encoding='utf-8')
¶
Decode the message key as a UTF-8 string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
encoding
|
str
|
Character encoding to use (default: utf-8) |
'utf-8'
|
Returns:
| Type | Description |
|---|---|
str | None
|
Decoded string key, or None if no key |
Raises:
| Type | Description |
|---|---|
SerializationError
|
If decoding fails |
Examples:
value_as(deserializer)
¶
Decode the message value using a custom deserializer function.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
deserializer
|
Callable[[bytes], T]
|
A callable that takes bytes and returns the desired type. |
required |
Returns:
| Type | Description |
|---|---|
T
|
Deserialized value |
Raises:
| Type | Description |
|---|---|
SerializationError
|
If deserialization fails |
Examples:
value_as_json()
¶
Deserialize the message value as JSON.
Returns:
| Type | Description |
|---|---|
Any
|
Parsed JSON object (dict, list, str, int, etc.) |
Raises:
| Type | Description |
|---|---|
SerializationError
|
If JSON parsing fails |
Examples:
value_as_string(encoding='utf-8')
¶
Decode the message value as a UTF-8 string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
encoding
|
str
|
Character encoding to use (default: utf-8) |
'utf-8'
|
Returns:
| Type | Description |
|---|---|
str
|
Decoded string value |
Raises:
| Type | Description |
|---|---|
SerializationError
|
If decoding fails |
Examples: