Skip to content

Protobuf

Protobuf serialization support for Kafka messages.

Note

Requires protobuf package. Install with: pip install typedkafka[protobuf]

typedkafka.protobuf.ProtobufSerializer

Bases: Serializer[Any]

Serializer for Protocol Buffer messages.

Examples:

>>> from myapp.proto import UserEvent_pb2
>>> serializer = ProtobufSerializer()
>>> event = UserEvent_pb2.UserEvent(user_id=123, action="click")
>>> data = serializer.serialize("topic", event)

serialize(topic, value)

Serialize a protobuf message to bytes.

typedkafka.protobuf.ProtobufDeserializer

Bases: Deserializer[Any]

Deserializer for Protocol Buffer messages.

Parameters:

Name Type Description Default
message_type type[Any]

The protobuf message class to deserialize into.

required

Examples:

>>> from myapp.proto import UserEvent_pb2
>>> deserializer = ProtobufDeserializer(UserEvent_pb2.UserEvent)
>>> event = deserializer.deserialize("topic", raw_bytes)

deserialize(topic, data)

Deserialize bytes to a protobuf message.

typedkafka.protobuf.protobuf_serializer_for(message_type)

Get a simple serializer function for a specific protobuf message type.

Useful with KafkaConsumer(value_deserializer=...) patterns.

Parameters:

Name Type Description Default
message_type type[T]

The protobuf message class.

required

Returns:

Type Description
Callable[[T], bytes]

A function that serializes instances to bytes.

typedkafka.protobuf.protobuf_deserializer_for(message_type)

Get a simple deserializer function for a specific protobuf message type.

Useful with KafkaConsumer(value_deserializer=...) patterns.

Parameters:

Name Type Description Default
message_type type[T]

The protobuf message class.

required

Returns:

Type Description
Callable[[bytes], T]

A function that deserializes bytes to the message type.

typedkafka.protobuf.SchemaRegistryProtobufSerializer

Protobuf serializer with Confluent Schema Registry integration.

Parameters:

Name Type Description Default
schema_registry_url str

URL of the Confluent Schema Registry.

required
schema_registry_config dict[str, Any] | None

Optional additional config for the schema registry client.

None

Examples:

>>> serializer = SchemaRegistryProtobufSerializer(
...     schema_registry_url="http://localhost:8081",
... )

serialize(topic, message, message_type=None)

Serialize a protobuf message with schema registry.

Parameters:

Name Type Description Default
topic str

Target topic name.

required
message Any

The protobuf message instance.

required
message_type type[Any] | None

Optional message class (defaults to type(message)).

None

Returns:

Type Description
bytes

Serialized bytes with schema registry framing.