Skip to content

Testing

typedkafka.testing.MockMessage

A mock Kafka message for testing.

Attributes:

Name Type Description
topic

The topic this message was sent to

value

The message value

key

The message key (optional)

partition

The partition number

offset

The message offset

headers

Message headers

__init__(topic, value, key=None, partition=0, offset=0, headers=None)

Initialize a mock message.

Parameters:

Name Type Description Default
topic str

Topic name

required
value bytes

Message value as bytes

required
key Optional[bytes]

Optional message key

None
partition int

Partition number (default: 0)

0
offset int

Message offset (default: 0)

0
headers Optional[list[tuple[str, bytes]]]

Optional list of (key, value) header tuples

None

__repr__()

Return string representation of the message.

key_as_string(encoding='utf-8')

Decode the message key as a string.

Parameters:

Name Type Description Default
encoding str

Character encoding to use (default: utf-8)

'utf-8'

Returns:

Type Description
Optional[str]

Decoded string key, or None if no key

value_as(deserializer)

Decode the message value using a custom deserializer function.

Parameters:

Name Type Description Default
deserializer

A callable that takes bytes and returns the desired type.

required

Returns:

Type Description

Deserialized value

value_as_json()

Deserialize the message value as JSON.

Returns:

Type Description
Any

Parsed JSON object

Raises:

Type Description
SerializationError

If JSON parsing fails

value_as_string(encoding='utf-8')

Decode the message value as a string.

Parameters:

Name Type Description Default
encoding str

Character encoding to use (default: utf-8)

'utf-8'

Returns:

Type Description
str

Decoded string value

Raises:

Type Description
SerializationError

If decoding fails

typedkafka.testing.MockProducer

A mock Kafka producer for testing.

Records all messages sent to topics without actually sending to Kafka. Perfect for unit tests to verify your code sends the right messages.

Examples:

>>> producer = MockProducer()
>>> producer.send("my-topic", b"test message", key=b"test-key")
>>>
>>> # Verify the message was sent
>>> assert len(producer.messages["my-topic"]) == 1
>>> msg = producer.messages["my-topic"][0]
>>> assert msg.value == b"test message"
>>> assert msg.key == b"test-key"

Attributes:

Name Type Description
messages dict[str, list[MockMessage]]

Dict mapping topic names to lists of MockMessage objects

call_count

Number of times send() was called

flushed

Whether flush() has been called

metrics property

Current metrics for this producer.

__enter__()

Context manager entry.

__exit__(exc_type, exc_val, exc_tb)

Context manager exit.

__init__(config=None, fail_on_topics=None)

Initialize a mock producer.

Parameters:

Name Type Description Default
config Optional[dict[str, Any]]

Optional config dict (ignored, but accepted for compatibility)

None
fail_on_topics Optional[set[str]]

Optional set of topic names that will cause send() to raise ProducerError. Useful for testing error handling paths.

None

abort_transaction(timeout=30.0)

Abort the mock transaction, discarding buffered messages.

begin_transaction()

Begin a mock transaction.

close()

Mark producer as closed.

commit_transaction(timeout=30.0)

Commit the mock transaction, flushing buffered messages.

flush(timeout=-1)

Mark producer as flushed (no-op in mock).

Parameters:

Name Type Description Default
timeout float

Ignored in mock

-1

Returns:

Type Description
int

0 (always successful in mock)

Examples:

>>> producer = MockProducer()
>>> producer.send("topic", b"msg")
>>> remaining = producer.flush()
>>> assert remaining == 0
>>> assert producer.flushed is True

get_json_messages(topic)

Get all messages for a topic deserialized as JSON.

Parameters:

Name Type Description Default
topic str

Topic name

required

Returns:

Type Description
list[Any]

List of deserialized JSON values

init_transactions(timeout=30.0)

Initialize transactions (no-op in mock).

message_count(topic)

Get count of messages sent to a topic.

Parameters:

Name Type Description Default
topic str

Topic name

required

Returns:

Type Description
int

Number of messages sent to the topic

reset()

Clear all recorded messages and reset state.

Useful for reusing the same mock across multiple test cases.

Examples:

>>> producer = MockProducer()
>>> producer.send("topic", b"msg1")
>>> producer.reset()
>>> assert len(producer.messages) == 0
>>> assert producer.call_count == 0

send(topic, value, key=None, partition=None, on_delivery=None, headers=None)

Record a message send (doesn't actually send to Kafka).

Parameters:

Name Type Description Default
topic str

Topic to send to

required
value bytes

Message value

required
key Optional[bytes]

Optional message key

None
partition Optional[int]

Optional partition (default: 0)

None
on_delivery Optional[DeliveryCallback]

Optional callback (will be called immediately with success)

None

Examples:

>>> producer = MockProducer()
>>> producer.send("events", b"data", key=b"key-1")
>>> assert len(producer.messages["events"]) == 1

send_batch(topic, messages, on_delivery=None)

Record a batch of message sends.

Parameters:

Name Type Description Default
topic str

Topic to send to

required
messages list[tuple[bytes, Optional[bytes]]]

List of (value, key) tuples

required
on_delivery Optional[DeliveryCallback]

Optional delivery callback

None

send_json(topic, value, key=None, partition=None, on_delivery=None)

Record a JSON message send.

Parameters:

Name Type Description Default
topic str

Topic to send to

required
value Any

JSON-serializable value

required
key Optional[str]

Optional string key

None
partition Optional[int]

Optional partition

None
on_delivery Optional[DeliveryCallback]

Optional delivery callback

None

Examples:

>>> producer = MockProducer()
>>> producer.send_json("events", {"user_id": 123})
>>> import json
>>> data = json.loads(producer.messages["events"][0].value)
>>> assert data["user_id"] == 123

send_string(topic, value, key=None, partition=None, on_delivery=None)

Record a string message send.

Parameters:

Name Type Description Default
topic str

Topic to send to

required
value str

String value

required
key Optional[str]

Optional string key

None
partition Optional[int]

Optional partition

None
on_delivery Optional[DeliveryCallback]

Optional delivery callback

None

transaction()

Return a mock transaction context manager.

typedkafka.testing.MockConsumer

A mock Kafka consumer for testing.

Allows you to inject predefined messages for testing code that consumes from Kafka.

Examples:

>>> consumer = MockConsumer()
>>> consumer.add_message("my-topic", b"test message", key=b"test-key")
>>> consumer.subscribe(["my-topic"])
>>>
>>> msg = consumer.poll()
>>> assert msg.value == b"test message"
>>> assert msg.key == b"test-key"

Attributes:

Name Type Description
messages list[MockMessage]

Queue of MockMessage objects to be consumed

subscribed_topics list[str]

List of subscribed topics

committed_offsets dict[tuple[str, int], int]

Dict of committed offsets by topic/partition

poll_timeout float

Timeout used by iter (matches KafkaConsumer)

metrics property

Current metrics for this consumer.

__enter__()

Context manager entry.

__exit__(exc_type, exc_val, exc_tb)

Context manager exit.

__init__(config=None)

Initialize a mock consumer.

Parameters:

Name Type Description Default
config Optional[dict[str, Any]]

Optional config dict (ignored, but accepted for compatibility)

None

__iter__()

Iterate over queued messages.

Yields all queued messages then stops. In tests, use poll() in a loop or add all messages before iterating.

Yields:

Type Description

MockMessage objects until queue is exhausted

add_json_message(topic, value, key=None, partition=0)

Add a JSON message to be consumed.

Parameters:

Name Type Description Default
topic str

Topic name

required
value Any

JSON-serializable value

required
key Optional[str]

Optional string key

None
partition int

Partition number

0

Examples:

>>> consumer = MockConsumer()
>>> consumer.add_json_message("events", {"user_id": 123, "action": "click"})

add_message(topic, value, key=None, partition=0, offset=None, headers=None)

Add a message to be consumed.

Call this in your tests to inject messages that your code will consume.

Parameters:

Name Type Description Default
topic str

Topic name

required
value bytes

Message value

required
key Optional[bytes]

Optional message key

None
partition int

Partition number (default: 0)

0
offset Optional[int]

Message offset (auto-generated if None)

None
headers Optional[list[tuple[str, bytes]]]

Optional message headers

None

Examples:

>>> consumer = MockConsumer()
>>> consumer.add_message("events", b'{"user_id": 123}')
>>> consumer.add_message("events", b'{"user_id": 456}')
>>> assert len(consumer.messages) == 2

add_string_message(topic, value, key=None, partition=0)

Add a string message to be consumed.

Parameters:

Name Type Description Default
topic str

Topic name

required
value str

String value

required
key Optional[str]

Optional string key

None
partition int

Partition number

0

assign(partitions)

Manually assign partitions.

assignment()

Get the current partition assignment.

close()

Mark consumer as closed.

commit(message=None, asynchronous=True)

Record a commit (doesn't actually commit to Kafka).

Parameters:

Name Type Description Default
message Optional[MockMessage]

Message to commit offset for

None
asynchronous bool

Ignored in mock

True

Examples:

>>> consumer = MockConsumer()
>>> consumer.add_message("topic", b"msg", partition=0, offset=42)
>>> msg = consumer.poll()
>>> consumer.commit(msg)
>>> assert consumer.committed_offsets[("topic", 0)] == 42

poll(timeout=1.0)

Poll for the next message.

Returns messages in the order they were added with add_message().

Parameters:

Name Type Description Default
timeout float

Ignored in mock

1.0

Returns:

Type Description
Optional[MockMessage]

Next MockMessage or None if no more messages

Examples:

>>> consumer = MockConsumer()
>>> consumer.add_message("topic", b"msg1")
>>> consumer.add_message("topic", b"msg2")
>>>
>>> msg1 = consumer.poll()
>>> assert msg1.value == b"msg1"
>>> msg2 = consumer.poll()
>>> assert msg2.value == b"msg2"
>>> msg3 = consumer.poll()
>>> assert msg3 is None

poll_batch(max_messages=100, timeout=1.0)

Poll for a batch of messages.

Parameters:

Name Type Description Default
max_messages int

Maximum number of messages to return

100
timeout float

Ignored in mock

1.0

Returns:

Type Description
list[MockMessage]

List of MockMessage objects

position(partitions)

Get current position for partitions (returns input unchanged in mock).

reset()

Clear all messages and reset state.

Examples:

>>> consumer = MockConsumer()
>>> consumer.add_message("topic", b"msg")
>>> consumer.reset()
>>> assert len(consumer.messages) == 0

seek(partition)

Seek to a specific offset (recorded but not enforced in mock).

subscribe(topics, on_assign=None, on_revoke=None, on_lost=None)

Subscribe to topics (recorded but not enforced in mock).

Parameters:

Name Type Description Default
topics list[str]

List of topic names

required
on_assign Optional[DeliveryCallback]

Optional rebalance callback (stored but not called in mock)

None
on_revoke Optional[DeliveryCallback]

Optional rebalance callback (stored but not called in mock)

None
on_lost Optional[DeliveryCallback]

Optional rebalance callback (stored but not called in mock)

None

Examples:

>>> consumer = MockConsumer()
>>> consumer.subscribe(["topic1", "topic2"])
>>> assert "topic1" in consumer.subscribed_topics

typedkafka.testing.MockDeadLetterQueue

In-memory mock of DeadLetterQueue for testing.

Records messages sent to the DLQ without requiring a real producer.

Attributes:

Name Type Description
messages list[tuple[str, MockMessage]]

List of (topic, MockMessage) tuples sent to the DLQ.

Examples:

>>> dlq = MockDeadLetterQueue()
>>> dlq.send(msg, error=ValueError("bad"))
>>> assert dlq.send_count == 1

send_count property

Number of messages sent to the DLQ.

reset()

Clear all recorded DLQ messages.

send(message, error=None, extra_headers=None)

Record a message sent to the DLQ.