Admin¶
typedkafka.admin.KafkaAdmin
¶
A well-documented Kafka admin client with full type hints.
Provides methods for managing topics, configurations, and cluster operations with comprehensive documentation and better error messages.
Examples:
>>> admin = KafkaAdmin({"bootstrap.servers": "localhost:9092"})
>>>
>>> # Create a topic
>>> admin.create_topic("events", num_partitions=3, replication_factor=2)
>>>
>>> # List all topics
>>> topics = admin.list_topics()
>>> print(topics)
>>>
>>> # Delete a topic
>>> admin.delete_topic("old-topic")
Attributes:
| Name | Type | Description |
|---|---|---|
config |
The configuration dictionary used to initialize the admin client |
__init__(config)
¶
Initialize a Kafka admin client.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
dict[str, Any]
|
Configuration dictionary. Required option: - bootstrap.servers (str): Comma-separated broker addresses |
required |
Raises:
| Type | Description |
|---|---|
AdminError
|
If the admin client cannot be initialized |
Examples:
create_topic(topic, num_partitions=1, replication_factor=1, config=None, timeout=30.0)
¶
Create a new Kafka topic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
Topic name to create |
required |
num_partitions
|
int
|
Number of partitions (default: 1) |
1
|
replication_factor
|
int
|
Replication factor (default: 1, recommended: 2-3) |
1
|
config
|
Optional[dict[str, str]]
|
Optional topic configuration dict |
None
|
timeout
|
float
|
Operation timeout in seconds (default: 30.0) |
30.0
|
Raises:
| Type | Description |
|---|---|
AdminError
|
If topic creation fails |
Examples:
>>> admin = KafkaAdmin({"bootstrap.servers": "localhost:9092"})
>>>
>>> # Simple topic creation
>>> admin.create_topic("my-topic")
>>>
>>> # Topic with multiple partitions and replication
>>> admin.create_topic("events", num_partitions=10, replication_factor=3)
>>>
>>> # Topic with custom configuration
>>> admin.create_topic(
... "logs",
... num_partitions=5,
... config={"retention.ms": "604800000"} # 7 days
... )
delete_topic(topic, timeout=30.0)
¶
Delete a Kafka topic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
Topic name to delete |
required |
timeout
|
float
|
Operation timeout in seconds (default: 30.0) |
30.0
|
Raises:
| Type | Description |
|---|---|
AdminError
|
If topic deletion fails |
Examples:
describe_topic(topic, timeout=10.0)
¶
Get detailed information about a topic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
Topic name |
required |
timeout
|
float
|
Request timeout in seconds (default: 10.0) |
10.0
|
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Dict containing topic metadata (partitions, replication, etc.) |
Raises:
| Type | Description |
|---|---|
AdminError
|
If describing the topic fails |
Examples:
list_topics(timeout=10.0)
¶
List all topics in the Kafka cluster.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Request timeout in seconds (default: 10.0) |
10.0
|
Returns:
| Type | Description |
|---|---|
list[str]
|
List of topic names |
Raises:
| Type | Description |
|---|---|
AdminError
|
If listing topics fails |
Examples:
topic_exists(topic, timeout=10.0)
¶
Check if a topic exists.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
Topic name to check |
required |
timeout
|
float
|
Request timeout in seconds (default: 10.0) |
10.0
|
Returns:
| Type | Description |
|---|---|
bool
|
True if topic exists, False otherwise |
Raises:
| Type | Description |
|---|---|
AdminError
|
If the check fails |
Examples:
typedkafka.admin.TopicConfig
¶
Configuration for creating a new Kafka topic.
Examples:
>>> config = (TopicConfig("my-topic")
... .partitions(3)
... .replication_factor(2)
... .config("retention.ms", "86400000")) # 1 day retention
__init__(name)
¶
Initialize topic configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Topic name |
required |
config(key, value)
¶
Set a topic configuration parameter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
key
|
str
|
Configuration key (e.g., "retention.ms", "compression.type") |
required |
value
|
str
|
Configuration value |
required |
Returns:
| Type | Description |
|---|---|
TopicConfig
|
Self for method chaining |
Examples:
partitions(count)
¶
Set number of partitions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
count
|
int
|
Number of partitions (must be >= 1) |
required |
Returns:
| Type | Description |
|---|---|
TopicConfig
|
Self for method chaining |
Examples:
replication_factor(factor)
¶
Set replication factor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
factor
|
int
|
Replication factor (typically 2 or 3) |
required |
Returns:
| Type | Description |
|---|---|
TopicConfig
|
Self for method chaining |
Examples: