Serializers¶
typedkafka.serializers.Serializer
¶
Bases: ABC, Generic[T]
Abstract base class for message serializers.
Implement this interface to create custom serializers for use with KafkaProducer.
Examples:
>>> class MySerializer(Serializer[dict]):
... def serialize(self, topic, value):
... return json.dumps(value).encode()
serialize(topic, value)
abstractmethod
¶
Serialize a value to bytes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
The topic the message will be sent to. |
required |
value
|
T
|
The value to serialize. |
required |
Returns:
| Type | Description |
|---|---|
bytes
|
Serialized bytes. |
typedkafka.serializers.Deserializer
¶
Bases: ABC, Generic[T]
Abstract base class for message deserializers.
Implement this interface to create custom deserializers for use with KafkaConsumer.
Examples:
>>> class MyDeserializer(Deserializer[dict]):
... def deserialize(self, topic, data):
... return json.loads(data.decode())
deserialize(topic, data)
abstractmethod
¶
Deserialize bytes to a value.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
The topic the message came from. |
required |
data
|
bytes
|
Raw bytes to deserialize. |
required |
Returns:
| Type | Description |
|---|---|
T
|
Deserialized value. |
typedkafka.serializers.JsonSerializer
¶
Bases: Serializer[Any]
JSON serializer that encodes Python objects to UTF-8 JSON bytes.
Examples:
serialize(topic, value)
¶
Serialize a value to JSON bytes.
typedkafka.serializers.JsonDeserializer
¶
Bases: Deserializer[Any]
JSON deserializer that decodes UTF-8 JSON bytes to Python objects.
Examples:
deserialize(topic, data)
¶
Deserialize JSON bytes to a Python object.
typedkafka.serializers.StringSerializer
¶
Bases: Serializer[str]
String serializer that encodes strings to bytes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
encoding
|
str
|
Character encoding to use (default: utf-8). |
'utf-8'
|
Examples:
serialize(topic, value)
¶
Serialize a string to bytes.
typedkafka.serializers.StringDeserializer
¶
Bases: Deserializer[str]
String deserializer that decodes bytes to strings.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
encoding
|
str
|
Character encoding to use (default: utf-8). |
'utf-8'
|
Examples:
deserialize(topic, data)
¶
Deserialize bytes to a string.
typedkafka.serializers.AvroSerializer
¶
Bases: Serializer[Any]
Avro serializer with Confluent Schema Registry support.
Requires the confluent-kafka[avro] or fastavro package
and a running Schema Registry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schema_registry_url
|
str
|
URL of the Confluent Schema Registry. |
required |
schema_str
|
str
|
Avro schema as a JSON string. |
required |
schema_registry_config
|
Optional[dict[str, Any]]
|
Optional additional config for the schema registry client. |
None
|
Raises:
| Type | Description |
|---|---|
ImportError
|
If required schema registry dependencies are not installed. |
Examples:
>>> schema = '{"type": "record", "name": "User", "fields": [{"name": "id", "type": "int"}]}'
>>> ser = AvroSerializer("http://localhost:8081", schema)
>>> ser.serialize("users", {"id": 123})
serialize(topic, value)
¶
Serialize a value using Avro with Schema Registry.
typedkafka.serializers.AvroDeserializer
¶
Bases: Deserializer[Any]
Avro deserializer with Confluent Schema Registry support.
Requires the confluent-kafka[avro] package and a running Schema Registry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schema_registry_url
|
str
|
URL of the Confluent Schema Registry. |
required |
schema_str
|
Optional[str]
|
Optional Avro schema as a JSON string. If not provided, the schema is fetched from the registry. |
None
|
schema_registry_config
|
Optional[dict[str, Any]]
|
Optional additional config for the schema registry client. |
None
|
Raises:
| Type | Description |
|---|---|
ImportError
|
If required schema registry dependencies are not installed. |
Examples:
>>> deser = AvroDeserializer("http://localhost:8081")
>>> data = deser.deserialize("users", raw_bytes)
deserialize(topic, data)
¶
Deserialize Avro bytes using Schema Registry.