Skip to content

Topics

typedkafka.topics.TypedTopic

Bases: Generic[T]

A type-safe topic that binds a topic name to serializer/deserializer pairs.

Provides compile-time type safety when used with KafkaProducer.send_typed() and KafkaMessage.decode(). IDEs will autocomplete and type-check message values based on the topic's type parameter.

Parameters:

Name Type Description Default
name str

Kafka topic name.

required
value_serializer Serializer[T]

Serializer for message values.

required
value_deserializer Deserializer[T]

Deserializer for message values.

required
key_serializer Serializer[Any] | None

Optional serializer for message keys.

None
key_deserializer Deserializer[Any] | None

Optional deserializer for message keys.

None

Examples:

>>> from typedkafka.topics import TypedTopic, json_topic
>>> from typedkafka.serializers import JsonSerializer, JsonDeserializer
>>>
>>> # Using factory function
>>> events = json_topic("user-events")
>>> producer.send_typed(events, {"user_id": 123})
>>>
>>> # Custom topic with explicit serializers
>>> topic = TypedTopic(
...     "users",
...     value_serializer=JsonSerializer(),
...     value_deserializer=JsonDeserializer(),
... )

typedkafka.topics.json_topic(name)

Create a TypedTopic for JSON messages.

Parameters:

Name Type Description Default
name str

Kafka topic name.

required

Returns:

Type Description
TypedTopic[Any]

A TypedTopic configured with JSON serialization.

Examples:

>>> events = json_topic("events")
>>> producer.send_typed(events, {"user_id": 123})

typedkafka.topics.string_topic(name, encoding='utf-8')

Create a TypedTopic for string messages.

Parameters:

Name Type Description Default
name str

Kafka topic name.

required
encoding str

Character encoding (default: utf-8).

'utf-8'

Returns:

Type Description
TypedTopic[str]

A TypedTopic configured with string serialization.

Examples:

>>> logs = string_topic("logs")
>>> producer.send_typed(logs, "Application started")