Skip to content

Admin

typedkafka.admin.KafkaAdmin

A well-documented Kafka admin client with full type hints.

Provides methods for managing topics, configurations, and cluster operations with comprehensive documentation and better error messages.

Examples:

>>> admin = KafkaAdmin({"bootstrap.servers": "localhost:9092"})
>>>
>>> # Create a topic
>>> admin.create_topic("events", num_partitions=3, replication_factor=2)
>>>
>>> # List all topics
>>> topics = admin.list_topics()
>>> print(topics)
>>>
>>> # Delete a topic
>>> admin.delete_topic("old-topic")

Attributes:

Name Type Description
config

The configuration dictionary used to initialize the admin client

__init__(config)

Initialize a Kafka admin client.

Parameters:

Name Type Description Default
config dict[str, Any]

Configuration dictionary. Required option: - bootstrap.servers (str): Comma-separated broker addresses

required

Raises:

Type Description
AdminError

If the admin client cannot be initialized

Examples:

>>> admin = KafkaAdmin({"bootstrap.servers": "localhost:9092"})
>>>
>>> # With multiple brokers
>>> admin = KafkaAdmin({
...     "bootstrap.servers": "broker1:9092,broker2:9092,broker3:9092"
... })

create_topic(topic, num_partitions=1, replication_factor=1, config=None, timeout=30.0)

Create a new Kafka topic.

Parameters:

Name Type Description Default
topic str

Topic name to create

required
num_partitions int

Number of partitions (default: 1)

1
replication_factor int

Replication factor (default: 1, recommended: 2-3)

1
config Optional[dict[str, str]]

Optional topic configuration dict

None
timeout float

Operation timeout in seconds (default: 30.0)

30.0

Raises:

Type Description
AdminError

If topic creation fails

Examples:

>>> admin = KafkaAdmin({"bootstrap.servers": "localhost:9092"})
>>>
>>> # Simple topic creation
>>> admin.create_topic("my-topic")
>>>
>>> # Topic with multiple partitions and replication
>>> admin.create_topic("events", num_partitions=10, replication_factor=3)
>>>
>>> # Topic with custom configuration
>>> admin.create_topic(
...     "logs",
...     num_partitions=5,
...     config={"retention.ms": "604800000"}  # 7 days
... )

delete_topic(topic, timeout=30.0)

Delete a Kafka topic.

Parameters:

Name Type Description Default
topic str

Topic name to delete

required
timeout float

Operation timeout in seconds (default: 30.0)

30.0

Raises:

Type Description
AdminError

If topic deletion fails

Examples:

>>> admin = KafkaAdmin({"bootstrap.servers": "localhost:9092"})
>>> admin.delete_topic("old-topic")

describe_topic(topic, timeout=10.0)

Get detailed information about a topic.

Parameters:

Name Type Description Default
topic str

Topic name

required
timeout float

Request timeout in seconds (default: 10.0)

10.0

Returns:

Type Description
dict[str, Any]

Dict containing topic metadata (partitions, replication, etc.)

Raises:

Type Description
AdminError

If describing the topic fails

Examples:

>>> admin = KafkaAdmin({"bootstrap.servers": "localhost:9092"})
>>> info = admin.describe_topic("my-topic")
>>> print(f"Partitions: {len(info['partitions'])}")

list_topics(timeout=10.0)

List all topics in the Kafka cluster.

Parameters:

Name Type Description Default
timeout float

Request timeout in seconds (default: 10.0)

10.0

Returns:

Type Description
list[str]

List of topic names

Raises:

Type Description
AdminError

If listing topics fails

Examples:

>>> admin = KafkaAdmin({"bootstrap.servers": "localhost:9092"})
>>> topics = admin.list_topics()
>>> for topic in topics:
...     print(f"Topic: {topic}")

topic_exists(topic, timeout=10.0)

Check if a topic exists.

Parameters:

Name Type Description Default
topic str

Topic name to check

required
timeout float

Request timeout in seconds (default: 10.0)

10.0

Returns:

Type Description
bool

True if topic exists, False otherwise

Raises:

Type Description
AdminError

If the check fails

Examples:

>>> admin = KafkaAdmin({"bootstrap.servers": "localhost:9092"})
>>> if admin.topic_exists("my-topic"):
...     print("Topic exists!")
... else:
...     admin.create_topic("my-topic")

typedkafka.admin.TopicConfig

Configuration for creating a new Kafka topic.

Examples:

>>> config = (TopicConfig("my-topic")
...     .partitions(3)
...     .replication_factor(2)
...     .config("retention.ms", "86400000"))  # 1 day retention

__init__(name)

Initialize topic configuration.

Parameters:

Name Type Description Default
name str

Topic name

required

config(key, value)

Set a topic configuration parameter.

Parameters:

Name Type Description Default
key str

Configuration key (e.g., "retention.ms", "compression.type")

required
value str

Configuration value

required

Returns:

Type Description
TopicConfig

Self for method chaining

Examples:

>>> config = (TopicConfig("logs")
...     .config("retention.ms", "604800000")  # 7 days
...     .config("compression.type", "gzip"))

partitions(count)

Set number of partitions.

Parameters:

Name Type Description Default
count int

Number of partitions (must be >= 1)

required

Returns:

Type Description
TopicConfig

Self for method chaining

Examples:

>>> config = TopicConfig("my-topic").partitions(10)

replication_factor(factor)

Set replication factor.

Parameters:

Name Type Description Default
factor int

Replication factor (typically 2 or 3)

required

Returns:

Type Description
TopicConfig

Self for method chaining

Examples:

>>> config = TopicConfig("my-topic").replication_factor(3)

typedkafka.admin.AdminError

Bases: KafkaError

Raised when an admin operation fails.