Skip to content

Serializers

typedkafka.serializers.Serializer

Bases: ABC, Generic[T]

Abstract base class for message serializers.

Implement this interface to create custom serializers for use with KafkaProducer.

Examples:

>>> class MySerializer(Serializer[dict]):
...     def serialize(self, topic, value):
...         return json.dumps(value).encode()

serialize(topic, value) abstractmethod

Serialize a value to bytes.

Parameters:

Name Type Description Default
topic str

The topic the message will be sent to.

required
value T

The value to serialize.

required

Returns:

Type Description
bytes

Serialized bytes.

typedkafka.serializers.Deserializer

Bases: ABC, Generic[T]

Abstract base class for message deserializers.

Implement this interface to create custom deserializers for use with KafkaConsumer.

Examples:

>>> class MyDeserializer(Deserializer[dict]):
...     def deserialize(self, topic, data):
...         return json.loads(data.decode())

deserialize(topic, data) abstractmethod

Deserialize bytes to a value.

Parameters:

Name Type Description Default
topic str

The topic the message came from.

required
data bytes

Raw bytes to deserialize.

required

Returns:

Type Description
T

Deserialized value.

typedkafka.serializers.JsonSerializer

Bases: Serializer[Any]

JSON serializer that encodes Python objects to UTF-8 JSON bytes.

Examples:

>>> ser = JsonSerializer()
>>> ser.serialize("topic", {"user_id": 123})
b'{"user_id": 123}'

serialize(topic, value)

Serialize a value to JSON bytes.

typedkafka.serializers.JsonDeserializer

Bases: Deserializer[Any]

JSON deserializer that decodes UTF-8 JSON bytes to Python objects.

Examples:

>>> deser = JsonDeserializer()
>>> deser.deserialize("topic", b'{"user_id": 123}')
{'user_id': 123}

deserialize(topic, data)

Deserialize JSON bytes to a Python object.

typedkafka.serializers.StringSerializer

Bases: Serializer[str]

String serializer that encodes strings to bytes.

Parameters:

Name Type Description Default
encoding str

Character encoding to use (default: utf-8).

'utf-8'

Examples:

>>> ser = StringSerializer()
>>> ser.serialize("topic", "hello")
b'hello'

serialize(topic, value)

Serialize a string to bytes.

typedkafka.serializers.StringDeserializer

Bases: Deserializer[str]

String deserializer that decodes bytes to strings.

Parameters:

Name Type Description Default
encoding str

Character encoding to use (default: utf-8).

'utf-8'

Examples:

>>> deser = StringDeserializer()
>>> deser.deserialize("topic", b"hello")
'hello'

deserialize(topic, data)

Deserialize bytes to a string.

typedkafka.serializers.AvroSerializer

Bases: Serializer[Any]

Avro serializer with Confluent Schema Registry support.

Requires the confluent-kafka[avro] or fastavro package and a running Schema Registry.

Parameters:

Name Type Description Default
schema_registry_url str

URL of the Confluent Schema Registry.

required
schema_str str

Avro schema as a JSON string.

required
schema_registry_config Optional[dict[str, Any]]

Optional additional config for the schema registry client.

None

Raises:

Type Description
ImportError

If required schema registry dependencies are not installed.

Examples:

>>> schema = '{"type": "record", "name": "User", "fields": [{"name": "id", "type": "int"}]}'
>>> ser = AvroSerializer("http://localhost:8081", schema)
>>> ser.serialize("users", {"id": 123})

serialize(topic, value)

Serialize a value using Avro with Schema Registry.

typedkafka.serializers.AvroDeserializer

Bases: Deserializer[Any]

Avro deserializer with Confluent Schema Registry support.

Requires the confluent-kafka[avro] package and a running Schema Registry.

Parameters:

Name Type Description Default
schema_registry_url str

URL of the Confluent Schema Registry.

required
schema_str Optional[str]

Optional Avro schema as a JSON string. If not provided, the schema is fetched from the registry.

None
schema_registry_config Optional[dict[str, Any]]

Optional additional config for the schema registry client.

None

Raises:

Type Description
ImportError

If required schema registry dependencies are not installed.

Examples:

>>> deser = AvroDeserializer("http://localhost:8081")
>>> data = deser.deserialize("users", raw_bytes)

deserialize(topic, data)

Deserialize Avro bytes using Schema Registry.