Topics¶
typedkafka.topics.TypedTopic
¶
Bases: Generic[T]
A type-safe topic that binds a topic name to serializer/deserializer pairs.
Provides compile-time type safety when used with KafkaProducer.send_typed()
and KafkaMessage.decode(). IDEs will autocomplete and type-check message
values based on the topic's type parameter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Kafka topic name. |
required |
value_serializer
|
Serializer[T]
|
Serializer for message values. |
required |
value_deserializer
|
Deserializer[T]
|
Deserializer for message values. |
required |
key_serializer
|
Serializer[Any] | None
|
Optional serializer for message keys. |
None
|
key_deserializer
|
Deserializer[Any] | None
|
Optional deserializer for message keys. |
None
|
Examples:
>>> from typedkafka.topics import TypedTopic, json_topic
>>> from typedkafka.serializers import JsonSerializer, JsonDeserializer
>>>
>>> # Using factory function
>>> events = json_topic("user-events")
>>> producer.send_typed(events, {"user_id": 123})
>>>
>>> # Custom topic with explicit serializers
>>> topic = TypedTopic(
... "users",
... value_serializer=JsonSerializer(),
... value_deserializer=JsonDeserializer(),
... )
typedkafka.topics.json_topic(name)
¶
Create a TypedTopic for JSON messages.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Kafka topic name. |
required |
Returns:
| Type | Description |
|---|---|
TypedTopic[Any]
|
A TypedTopic configured with JSON serialization. |
Examples:
typedkafka.topics.string_topic(name, encoding='utf-8')
¶
Create a TypedTopic for string messages.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Kafka topic name. |
required |
encoding
|
str
|
Character encoding (default: utf-8). |
'utf-8'
|
Returns:
| Type | Description |
|---|---|
TypedTopic[str]
|
A TypedTopic configured with string serialization. |
Examples: