Protobuf¶
Protobuf serialization support for Kafka messages.
Note
Requires protobuf package. Install with: pip install typedkafka[protobuf]
typedkafka.protobuf.ProtobufSerializer
¶
Bases: Serializer[Any]
Serializer for Protocol Buffer messages.
Examples:
>>> from myapp.proto import UserEvent_pb2
>>> serializer = ProtobufSerializer()
>>> event = UserEvent_pb2.UserEvent(user_id=123, action="click")
>>> data = serializer.serialize("topic", event)
serialize(topic, value)
¶
Serialize a protobuf message to bytes.
typedkafka.protobuf.ProtobufDeserializer
¶
Bases: Deserializer[Any]
Deserializer for Protocol Buffer messages.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_type
|
type[Any]
|
The protobuf message class to deserialize into. |
required |
Examples:
>>> from myapp.proto import UserEvent_pb2
>>> deserializer = ProtobufDeserializer(UserEvent_pb2.UserEvent)
>>> event = deserializer.deserialize("topic", raw_bytes)
deserialize(topic, data)
¶
Deserialize bytes to a protobuf message.
typedkafka.protobuf.protobuf_serializer_for(message_type)
¶
Get a simple serializer function for a specific protobuf message type.
Useful with KafkaConsumer(value_deserializer=...) patterns.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_type
|
type[T]
|
The protobuf message class. |
required |
Returns:
| Type | Description |
|---|---|
Callable[[T], bytes]
|
A function that serializes instances to bytes. |
typedkafka.protobuf.protobuf_deserializer_for(message_type)
¶
Get a simple deserializer function for a specific protobuf message type.
Useful with KafkaConsumer(value_deserializer=...) patterns.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
message_type
|
type[T]
|
The protobuf message class. |
required |
Returns:
| Type | Description |
|---|---|
Callable[[bytes], T]
|
A function that deserializes bytes to the message type. |
typedkafka.protobuf.SchemaRegistryProtobufSerializer
¶
Protobuf serializer with Confluent Schema Registry integration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
schema_registry_url
|
str
|
URL of the Confluent Schema Registry. |
required |
schema_registry_config
|
dict[str, Any] | None
|
Optional additional config for the schema registry client. |
None
|
Examples:
>>> serializer = SchemaRegistryProtobufSerializer(
... schema_registry_url="http://localhost:8081",
... )
serialize(topic, message, message_type=None)
¶
Serialize a protobuf message with schema registry.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
Target topic name. |
required |
message
|
Any
|
The protobuf message instance. |
required |
message_type
|
type[Any] | None
|
Optional message class (defaults to type(message)). |
None
|
Returns:
| Type | Description |
|---|---|
bytes
|
Serialized bytes with schema registry framing. |