Skip to content

Metrics

typedkafka.metrics.KafkaStats dataclass

Parsed Kafka statistics from confluent-kafka's stats_cb callback.

This is a subset of the most useful fields from the full statistics JSON. Access the complete data via the raw attribute.

Attributes:

Name Type Description
name str

Client name/id

client_type str

Client type ("producer" or "consumer")

ts int

Timestamp in microseconds since epoch

time_seconds int

Wall clock time in seconds since epoch

replyq int

Number of operations waiting in queue

msg_cnt int

Current number of messages in producer queues

msg_size int

Current total size of messages in producer queues (bytes)

tx int

Total number of requests sent to brokers

rx int

Total number of responses received from brokers

txbytes int

Total number of bytes transmitted to brokers

rxbytes int

Total number of bytes received from brokers

raw dict[str, Any]

The full raw statistics dict for advanced use

Examples:

>>> stats = KafkaStats.from_json(json_string)
>>> print(f"Bytes sent: {stats.txbytes}")
>>> print(f"Messages queued: {stats.msg_cnt}")

from_json(json_str) classmethod

Parse a KafkaStats from the JSON string provided by stats_cb.

Parameters:

Name Type Description Default
json_str str

JSON string from confluent-kafka's statistics callback.

required

Returns:

Type Description
KafkaStats

Parsed KafkaStats instance.

typedkafka.metrics.KafkaMetrics dataclass

Simple counters tracked by the producer or consumer.

Updated automatically during normal operations (send, poll). When statistics reporting is enabled via statistics.interval.ms, byte counters and last_stats are also populated.

Attributes:

Name Type Description
messages_sent int

Total messages successfully queued (producer only)

messages_received int

Total messages received (consumer only)

errors int

Total error count

bytes_sent int

Total bytes transmitted (from stats callback)

bytes_received int

Total bytes received (from stats callback)

last_stats Optional[KafkaStats]

Most recent KafkaStats snapshot (None if stats not enabled)

Examples:

>>> producer = KafkaProducer(config)
>>> producer.send("topic", b"hello")
>>> print(producer.metrics.messages_sent)  # 1

reset()

Reset all counters to zero.

typedkafka.metrics.make_stats_cb(metrics, user_callback=None)

Create a stats_cb function for confluent-kafka that updates metrics.

Parameters:

Name Type Description Default
metrics KafkaMetrics

The KafkaMetrics instance to update with byte counters.

required
user_callback Optional[StatsCallback]

Optional user callback that receives parsed KafkaStats.

None

Returns:

Type Description
Callable[[str], None]

A callback suitable for confluent-kafka's stats_cb config option.

Examples:

>>> metrics = KafkaMetrics()
>>> cb = make_stats_cb(metrics, on_stats=my_handler)
>>> config["stats_cb"] = cb