or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin-operations.mdasync-operations.mdconnection-management.mdindex.mdmessage-consumption.mdmessage-production.md

admin-operations.mddocs/

0

# Administrative Operations

1

2

Administrative capabilities for managing Kafka clusters including topic creation and deletion with partition and replication configuration.

3

4

## Capabilities

5

6

### Kafka Admin Client Hook

7

8

Provides administrative operations for Kafka clusters using the Kafka AdminClient API. Extends the base hook with cluster management capabilities.

9

10

```python { .api }

11

class KafkaAdminClientHook(KafkaBaseHook):

12

"""

13

A hook for administrative Kafka operations.

14

15

Inherits from KafkaBaseHook and provides methods for topic management

16

and other administrative tasks.

17

"""

18

19

def __init__(self, kafka_config_id: str = "kafka_default") -> None:

20

"""

21

Initialize the Kafka admin client hook.

22

23

Args:

24

kafka_config_id: The connection object to use

25

"""

26

27

def create_topic(self, topics: Sequence[Sequence[Any]]) -> None:

28

"""

29

Create Kafka topics.

30

31

Args:

32

topics: Sequence of topic configurations, where each topic is a sequence

33

containing [topic_name, partition_count, replication_factor]

34

35

Raises:

36

KafkaException: If topic creation fails

37

"""

38

39

def delete_topic(self, topics: Sequence[str]) -> None:

40

"""

41

Delete Kafka topics.

42

43

Args:

44

topics: Sequence of topic names to delete

45

46

Raises:

47

KafkaException: If topic deletion fails

48

"""

49

50

def _get_client(self, config) -> AdminClient:

51

"""

52

Get a Kafka AdminClient with the given configuration.

53

54

Args:

55

config: Kafka client configuration dictionary

56

57

Returns:

58

AdminClient: Configured confluent-kafka AdminClient instance

59

"""

60

```

61

62

### Usage Examples

63

64

#### Creating Topics

65

66

```python

67

from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook

68

69

admin_hook = KafkaAdminClientHook(kafka_config_id="kafka_default")

70

71

# Create a single topic with default settings

72

topics_to_create = [

73

["my-topic", 3, 1] # topic_name, partitions, replication_factor

74

]

75

76

admin_hook.create_topic(topics_to_create)

77

```

78

79

#### Creating Topics with Configuration

80

81

```python

82

from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook

83

84

admin_hook = KafkaAdminClientHook()

85

86

# Create topics with standard configuration

87

topics_config = [

88

["high-throughput-topic", 12, 3], # topic_name, partitions, replication_factor

89

["log-compaction-topic", 6, 2]

90

]

91

92

admin_hook.create_topic(topics_config)

93

```

94

95

#### Deleting Topics

96

97

```python

98

from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook

99

100

admin_hook = KafkaAdminClientHook()

101

102

# Delete single topic

103

admin_hook.delete_topic(["old-topic"])

104

105

# Delete multiple topics

106

topics_to_delete = [

107

"temp-topic-1",

108

"temp-topic-2",

109

"test-topic"

110

]

111

admin_hook.delete_topic(topics_to_delete)

112

```

113

114

### Administrative DAG Example

115

116

```python

117

from datetime import datetime

118

from airflow import DAG

119

from airflow.operators.python import PythonOperator

120

from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook

121

122

def create_kafka_topics():

123

"""Create required Kafka topics for the data pipeline."""

124

admin_hook = KafkaAdminClientHook(kafka_config_id="kafka_prod")

125

126

topics = [

127

["user-events", 12, 3], # topic, partitions, replication_factor

128

["processed-events", 6, 2],

129

["error-events", 3, 2]

130

]

131

132

admin_hook.create_topic(topics)

133

return "Topics created successfully"

134

135

def cleanup_old_topics():

136

"""Remove temporary or test topics."""

137

admin_hook = KafkaAdminClientHook(kafka_config_id="kafka_prod")

138

139

topics_to_remove = [

140

"test-topic-2023",

141

"temp-processing-topic",

142

"old-user-data"

143

]

144

145

admin_hook.delete_topic(topics_to_remove)

146

return "Old topics cleaned up"

147

148

dag = DAG(

149

"kafka_admin_operations",

150

start_date=datetime(2023, 1, 1),

151

schedule_interval=None,

152

catchup=False,

153

description="Kafka administrative operations"

154

)

155

156

create_topics_task = PythonOperator(

157

task_id="create_topics",

158

python_callable=create_kafka_topics,

159

dag=dag

160

)

161

162

cleanup_topics_task = PythonOperator(

163

task_id="cleanup_topics",

164

python_callable=cleanup_old_topics,

165

dag=dag

166

)

167

168

create_topics_task >> cleanup_topics_task

169

```

170

171

### Topic Configuration Options

172

173

Common topic configuration parameters that can be specified in the config dictionary:

174

175

#### Retention Settings

176

177

```python

178

{

179

"retention.ms": "604800000", # 7 days in milliseconds

180

"retention.bytes": "1073741824", # 1GB per partition

181

"cleanup.policy": "delete" # or "compact" for log compaction

182

}

183

```

184

185

#### Performance Settings

186

187

```python

188

{

189

"segment.ms": "86400000", # 1 day segment roll time

190

"max.message.bytes": "1000000", # 1MB max message size

191

"compression.type": "snappy", # or "gzip", "lz4", "zstd"

192

"min.insync.replicas": "2" # Minimum in-sync replicas

193

}

194

```

195

196

#### Log Compaction Settings

197

198

```python

199

{

200

"cleanup.policy": "compact",

201

"min.cleanable.dirty.ratio": "0.5",

202

"delete.retention.ms": "86400000", # Tombstone retention

203

"min.compaction.lag.ms": "0"

204

}

205

```

206

207

### Error Handling

208

209

Administrative operations may fail due to various reasons. Handle exceptions appropriately:

210

211

```python

212

from confluent_kafka import KafkaException

213

from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook

214

215

def safe_topic_creation():

216

admin_hook = KafkaAdminClientHook()

217

218

try:

219

topics = [["new-topic", 3, 1, {}]]

220

admin_hook.create_topic(topics)

221

print("Topic created successfully")

222

except KafkaException as e:

223

if "already exists" in str(e):

224

print("Topic already exists, continuing...")

225

else:

226

print(f"Failed to create topic: {e}")

227

raise

228

except Exception as e:

229

print(f"Unexpected error: {e}")

230

raise

231

```

232

233

### Best Practices

234

235

1. **Partition Planning**: Choose partition count based on expected throughput and consumer parallelism

236

2. **Replication Factor**: Use at least 2 for important topics, 3 for critical data

237

3. **Retention Policy**: Set appropriate retention based on data lifecycle requirements

238

4. **Error Handling**: Always handle topic creation/deletion failures gracefully

239

5. **Monitoring**: Log administrative operations for audit and troubleshooting purposes