Testing¶
typedkafka.testing.MockMessage
¶
A mock Kafka message for testing.
Attributes:
| Name | Type | Description |
|---|---|---|
topic |
The topic this message was sent to |
|
value |
The message value |
|
key |
The message key (optional) |
|
partition |
The partition number |
|
offset |
The message offset |
|
headers |
Message headers |
__init__(topic, value, key=None, partition=0, offset=0, headers=None)
¶
Initialize a mock message.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
Topic name |
required |
value
|
bytes
|
Message value as bytes |
required |
key
|
Optional[bytes]
|
Optional message key |
None
|
partition
|
int
|
Partition number (default: 0) |
0
|
offset
|
int
|
Message offset (default: 0) |
0
|
headers
|
Optional[list[tuple[str, bytes]]]
|
Optional list of (key, value) header tuples |
None
|
__repr__()
¶
Return string representation of the message.
key_as_string(encoding='utf-8')
¶
Decode the message key as a string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
encoding
|
str
|
Character encoding to use (default: utf-8) |
'utf-8'
|
Returns:
| Type | Description |
|---|---|
Optional[str]
|
Decoded string key, or None if no key |
value_as(deserializer)
¶
Decode the message value using a custom deserializer function.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
deserializer
|
A callable that takes bytes and returns the desired type. |
required |
Returns:
| Type | Description |
|---|---|
|
Deserialized value |
value_as_json()
¶
Deserialize the message value as JSON.
Returns:
| Type | Description |
|---|---|
Any
|
Parsed JSON object |
Raises:
| Type | Description |
|---|---|
SerializationError
|
If JSON parsing fails |
value_as_string(encoding='utf-8')
¶
Decode the message value as a string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
encoding
|
str
|
Character encoding to use (default: utf-8) |
'utf-8'
|
Returns:
| Type | Description |
|---|---|
str
|
Decoded string value |
Raises:
| Type | Description |
|---|---|
SerializationError
|
If decoding fails |
typedkafka.testing.MockProducer
¶
A mock Kafka producer for testing.
Records all messages sent to topics without actually sending to Kafka. Perfect for unit tests to verify your code sends the right messages.
Examples:
>>> producer = MockProducer()
>>> producer.send("my-topic", b"test message", key=b"test-key")
>>>
>>> # Verify the message was sent
>>> assert len(producer.messages["my-topic"]) == 1
>>> msg = producer.messages["my-topic"][0]
>>> assert msg.value == b"test message"
>>> assert msg.key == b"test-key"
Attributes:
| Name | Type | Description |
|---|---|---|
messages |
dict[str, list[MockMessage]]
|
Dict mapping topic names to lists of MockMessage objects |
call_count |
Number of times send() was called |
|
flushed |
Whether flush() has been called |
metrics
property
¶
Current metrics for this producer.
__enter__()
¶
Context manager entry.
__exit__(exc_type, exc_val, exc_tb)
¶
Context manager exit.
__init__(config=None, fail_on_topics=None)
¶
Initialize a mock producer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
Optional[dict[str, Any]]
|
Optional config dict (ignored, but accepted for compatibility) |
None
|
fail_on_topics
|
Optional[set[str]]
|
Optional set of topic names that will cause send() to raise ProducerError. Useful for testing error handling paths. |
None
|
abort_transaction(timeout=30.0)
¶
Abort the mock transaction, discarding buffered messages.
begin_transaction()
¶
Begin a mock transaction.
close()
¶
Mark producer as closed.
commit_transaction(timeout=30.0)
¶
Commit the mock transaction, flushing buffered messages.
flush(timeout=-1)
¶
Mark producer as flushed (no-op in mock).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Ignored in mock |
-1
|
Returns:
| Type | Description |
|---|---|
int
|
0 (always successful in mock) |
Examples:
get_json_messages(topic)
¶
Get all messages for a topic deserialized as JSON.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
Topic name |
required |
Returns:
| Type | Description |
|---|---|
list[Any]
|
List of deserialized JSON values |
init_transactions(timeout=30.0)
¶
Initialize transactions (no-op in mock).
message_count(topic)
¶
Get count of messages sent to a topic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
Topic name |
required |
Returns:
| Type | Description |
|---|---|
int
|
Number of messages sent to the topic |
reset()
¶
send(topic, value, key=None, partition=None, on_delivery=None, headers=None)
¶
Record a message send (doesn't actually send to Kafka).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
Topic to send to |
required |
value
|
bytes
|
Message value |
required |
key
|
Optional[bytes]
|
Optional message key |
None
|
partition
|
Optional[int]
|
Optional partition (default: 0) |
None
|
on_delivery
|
Optional[DeliveryCallback]
|
Optional callback (will be called immediately with success) |
None
|
Examples:
send_batch(topic, messages, on_delivery=None)
¶
Record a batch of message sends.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
Topic to send to |
required |
messages
|
list[tuple[bytes, Optional[bytes]]]
|
List of (value, key) tuples |
required |
on_delivery
|
Optional[DeliveryCallback]
|
Optional delivery callback |
None
|
send_json(topic, value, key=None, partition=None, on_delivery=None)
¶
Record a JSON message send.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
Topic to send to |
required |
value
|
Any
|
JSON-serializable value |
required |
key
|
Optional[str]
|
Optional string key |
None
|
partition
|
Optional[int]
|
Optional partition |
None
|
on_delivery
|
Optional[DeliveryCallback]
|
Optional delivery callback |
None
|
Examples:
send_string(topic, value, key=None, partition=None, on_delivery=None)
¶
Record a string message send.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
Topic to send to |
required |
value
|
str
|
String value |
required |
key
|
Optional[str]
|
Optional string key |
None
|
partition
|
Optional[int]
|
Optional partition |
None
|
on_delivery
|
Optional[DeliveryCallback]
|
Optional delivery callback |
None
|
transaction()
¶
Return a mock transaction context manager.
typedkafka.testing.MockConsumer
¶
A mock Kafka consumer for testing.
Allows you to inject predefined messages for testing code that consumes from Kafka.
Examples:
>>> consumer = MockConsumer()
>>> consumer.add_message("my-topic", b"test message", key=b"test-key")
>>> consumer.subscribe(["my-topic"])
>>>
>>> msg = consumer.poll()
>>> assert msg.value == b"test message"
>>> assert msg.key == b"test-key"
Attributes:
| Name | Type | Description |
|---|---|---|
messages |
list[MockMessage]
|
Queue of MockMessage objects to be consumed |
subscribed_topics |
list[str]
|
List of subscribed topics |
committed_offsets |
dict[tuple[str, int], int]
|
Dict of committed offsets by topic/partition |
poll_timeout |
float
|
Timeout used by iter (matches KafkaConsumer) |
metrics
property
¶
Current metrics for this consumer.
__enter__()
¶
Context manager entry.
__exit__(exc_type, exc_val, exc_tb)
¶
Context manager exit.
__init__(config=None)
¶
Initialize a mock consumer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
Optional[dict[str, Any]]
|
Optional config dict (ignored, but accepted for compatibility) |
None
|
__iter__()
¶
Iterate over queued messages.
Yields all queued messages then stops. In tests, use poll() in a loop or add all messages before iterating.
Yields:
| Type | Description |
|---|---|
|
MockMessage objects until queue is exhausted |
add_json_message(topic, value, key=None, partition=0)
¶
Add a JSON message to be consumed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
Topic name |
required |
value
|
Any
|
JSON-serializable value |
required |
key
|
Optional[str]
|
Optional string key |
None
|
partition
|
int
|
Partition number |
0
|
Examples:
add_message(topic, value, key=None, partition=0, offset=None, headers=None)
¶
Add a message to be consumed.
Call this in your tests to inject messages that your code will consume.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
Topic name |
required |
value
|
bytes
|
Message value |
required |
key
|
Optional[bytes]
|
Optional message key |
None
|
partition
|
int
|
Partition number (default: 0) |
0
|
offset
|
Optional[int]
|
Message offset (auto-generated if None) |
None
|
headers
|
Optional[list[tuple[str, bytes]]]
|
Optional message headers |
None
|
Examples:
add_string_message(topic, value, key=None, partition=0)
¶
Add a string message to be consumed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
Topic name |
required |
value
|
str
|
String value |
required |
key
|
Optional[str]
|
Optional string key |
None
|
partition
|
int
|
Partition number |
0
|
assign(partitions)
¶
Manually assign partitions.
assignment()
¶
Get the current partition assignment.
close()
¶
Mark consumer as closed.
commit(message=None, asynchronous=True)
¶
Record a commit (doesn't actually commit to Kafka).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message
|
Optional[MockMessage]
|
Message to commit offset for |
None
|
asynchronous
|
bool
|
Ignored in mock |
True
|
Examples:
poll(timeout=1.0)
¶
Poll for the next message.
Returns messages in the order they were added with add_message().
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Ignored in mock |
1.0
|
Returns:
| Type | Description |
|---|---|
Optional[MockMessage]
|
Next MockMessage or None if no more messages |
Examples:
poll_batch(max_messages=100, timeout=1.0)
¶
Poll for a batch of messages.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
max_messages
|
int
|
Maximum number of messages to return |
100
|
timeout
|
float
|
Ignored in mock |
1.0
|
Returns:
| Type | Description |
|---|---|
list[MockMessage]
|
List of MockMessage objects |
position(partitions)
¶
Get current position for partitions (returns input unchanged in mock).
reset()
¶
seek(partition)
¶
Seek to a specific offset (recorded but not enforced in mock).
subscribe(topics, on_assign=None, on_revoke=None, on_lost=None)
¶
Subscribe to topics (recorded but not enforced in mock).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topics
|
list[str]
|
List of topic names |
required |
on_assign
|
Optional[DeliveryCallback]
|
Optional rebalance callback (stored but not called in mock) |
None
|
on_revoke
|
Optional[DeliveryCallback]
|
Optional rebalance callback (stored but not called in mock) |
None
|
on_lost
|
Optional[DeliveryCallback]
|
Optional rebalance callback (stored but not called in mock) |
None
|
Examples:
typedkafka.testing.MockDeadLetterQueue
¶
In-memory mock of DeadLetterQueue for testing.
Records messages sent to the DLQ without requiring a real producer.
Attributes:
| Name | Type | Description |
|---|---|---|
messages |
list[tuple[str, MockMessage]]
|
List of (topic, MockMessage) tuples sent to the DLQ. |
Examples: