or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin-operations.mdasync-operations.mdconnection-management.mdindex.mdmessage-consumption.mdmessage-production.md

index.mddocs/

0

# Apache Airflow Kafka Provider

1

2

Apache Kafka integration provider for Apache Airflow, enabling data engineers to build workflows that interact with Kafka message queues and streaming data platforms. This provider offers a comprehensive set of operators for producing and consuming messages, hooks for low-level Kafka client operations, sensors for monitoring message availability, and triggers for asynchronous message processing.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow-providers-apache-kafka

7

- **Package Type**: pip

8

- **Language**: Python

9

- **Installation**: `pip install apache-airflow-providers-apache-kafka`

10

- **Minimum Airflow Version**: 2.10.0+

11

- **Dependencies**: confluent-kafka>=2.6.0, asgiref>=2.3.0

12

13

## Core Imports

14

15

```python

16

# Basic imports for common usage

17

from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator

18

from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator

19

from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageSensor

20

```

21

22

Hook imports for advanced usage:

23

24

```python

25

from airflow.providers.apache.kafka.hooks.base import KafkaBaseHook

26

from airflow.providers.apache.kafka.hooks.consume import KafkaConsumerHook

27

from airflow.providers.apache.kafka.hooks.produce import KafkaProducerHook

28

from airflow.providers.apache.kafka.hooks.client import KafkaAdminClientHook

29

```

30

31

Sensor and trigger imports:

32

33

```python

34

from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageSensor, AwaitMessageTriggerFunctionSensor

35

from airflow.providers.apache.kafka.triggers.await_message import AwaitMessageTrigger

36

```

37

38

## Basic Usage

39

40

```python

41

from datetime import datetime

42

from airflow import DAG

43

from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator

44

from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator

45

46

# Simple producer function

47

def produce_messages(producer):

48

for i in range(10):

49

producer.produce("my-topic", value=f"Message {i}")

50

producer.flush()

51

52

# Simple consumer processing function

53

def process_message(message):

54

print(f"Processing: {message.value().decode('utf-8')}")

55

return True

56

57

dag = DAG(

58

"kafka_example",

59

start_date=datetime(2023, 1, 1),

60

schedule_interval=None,

61

catchup=False

62

)

63

64

# Produce messages to Kafka topic

65

produce_task = ProduceToTopicOperator(

66

task_id="produce_messages",

67

topic="my-topic",

68

producer_function=produce_messages,

69

kafka_config_id="kafka_default",

70

dag=dag

71

)

72

73

# Consume messages from Kafka topic

74

consume_task = ConsumeFromTopicOperator(

75

task_id="consume_messages",

76

topics=["my-topic"],

77

apply_function=process_message,

78

max_messages=10,

79

kafka_config_id="kafka_default",

80

dag=dag

81

)

82

83

produce_task >> consume_task

84

```

85

86

## Architecture

87

88

The provider is organized into five main capability areas:

89

90

- **Connection Management**: Centralized Kafka connection configuration and authentication

91

- **Administrative Operations**: Topic management and cluster administration

92

- **Message Production**: Publishing messages to Kafka topics with flexible delivery options

93

- **Message Consumption**: Consuming and processing messages with configurable commit strategies

94

- **Asynchronous Operations**: Event-driven sensors and triggers for real-time workflows

95

96

## Capabilities

97

98

### Connection Management

99

100

Base connection functionality providing authentication, configuration management, and Google Cloud Managed Kafka integration with automatic token generation.

101

102

```python { .api }

103

class KafkaBaseHook(BaseHook):

104

def __init__(self, kafka_config_id: str = "kafka_default") -> None: ...

105

def get_conn(self) -> Any: ...

106

def test_connection(self) -> tuple[bool, str]: ...

107

```

108

109

[Connection Management](./connection-management.md)

110

111

### Administrative Operations

112

113

Administrative capabilities for managing Kafka clusters including topic creation and deletion with partition and replication configuration.

114

115

```python { .api }

116

class KafkaAdminClientHook(KafkaBaseHook):

117

def create_topic(self, topics: Sequence[Sequence[Any]]) -> None: ...

118

def delete_topic(self, topics: Sequence[str]) -> None: ...

119

```

120

121

[Administrative Operations](./admin-operations.md)

122

123

### Message Production

124

125

Message publishing capabilities with support for synchronous and asynchronous operations, custom delivery callbacks, and templated producer functions.

126

127

```python { .api }

128

class ProduceToTopicOperator(BaseOperator):

129

def __init__(self, topic: str, producer_function: str | Callable, kafka_config_id: str = "kafka_default", **kwargs) -> None: ...

130

131

class KafkaProducerHook(KafkaBaseHook):

132

def get_producer(self) -> Producer: ...

133

```

134

135

[Message Production](./message-production.md)

136

137

### Message Consumption

138

139

Message consumption capabilities with batch and individual processing, configurable commit strategies, and message filtering functions.

140

141

```python { .api }

142

class ConsumeFromTopicOperator(BaseOperator):

143

def __init__(self, topics: str | Sequence[str], kafka_config_id: str = "kafka_default", **kwargs) -> None: ...

144

145

class KafkaConsumerHook(KafkaBaseHook):

146

def get_consumer(self) -> Consumer: ...

147

```

148

149

[Message Consumption](./message-consumption.md)

150

151

### Asynchronous Operations

152

153

Deferrable sensors and triggers for event-driven processing, enabling non-blocking message monitoring in Kafka topics.

154

155

```python { .api }

156

class AwaitMessageSensor(BaseOperator):

157

def __init__(self, topics: Sequence[str], apply_function: str | Callable, kafka_config_id: str = "kafka_default", **kwargs) -> None: ...

158

159

class AwaitMessageTriggerFunctionSensor(BaseOperator):

160

def __init__(self, topics: Sequence[str], apply_function: str | Callable, event_triggered_function: Callable, **kwargs) -> None: ...

161

162

class AwaitMessageTrigger(BaseTrigger):

163

def __init__(self, topics: Sequence[str], apply_function: str, kafka_config_id: str = "kafka_default", **kwargs) -> None: ...

164

```

165

166

[Asynchronous Operations](./async-operations.md)

167

168

## Configuration

169

170

The provider uses Airflow connections for configuration. Set up a connection with:

171

172

- **Connection Type**: kafka

173

- **Connection ID**: kafka_default (or custom)

174

- **Extra**: JSON configuration with `bootstrap.servers` and other Kafka client settings

175

176

Example connection configuration:

177

178

```json

179

{

180

"bootstrap.servers": "localhost:9092",

181

"security.protocol": "SASL_SSL",

182

"sasl.mechanism": "PLAIN",

183

"sasl.username": "your-username",

184

"sasl.password": "your-password"

185

}

186

```

187

188

## Error Handling

189

190

The provider includes specialized exception handling:

191

192

```python { .api }

193

class KafkaAuthenticationError(Exception):

194

"""Custom exception for Kafka authentication failures."""

195

```

196

197

## Version Compatibility

198

199

The provider includes version compatibility helpers for different Airflow versions:

200

201

```python { .api }

202

def get_base_airflow_version_tuple() -> tuple[int, int, int]:

203

"""

204

Get the base Airflow version as a tuple.

205

206

Returns:

207

tuple: (major, minor, patch) version numbers

208

"""

209

210

# Version flags for conditional imports and behavior

211

AIRFLOW_V_3_0_PLUS: bool # True if Airflow 3.0+

212

AIRFLOW_V_3_1_PLUS: bool # True if Airflow 3.1+

213

214

# Version-compatible base class imports

215

BaseHook # Imports from airflow.sdk (3.1+) or airflow.hooks.base

216

BaseOperator # Imports from airflow.sdk (3.0+) or airflow.models

217

```

218

219

### Version-Specific Import Patterns

220

221

```python

222

# Recommended: Use version-compatible imports

223

from airflow.providers.apache.kafka.version_compat import BaseHook, BaseOperator

224

225

# These imports work across Airflow versions:

226

# - Airflow 3.1+: Imports from airflow.sdk.base_hook

227

# - Airflow 3.0+: Imports from airflow.models.baseoperator

228

# - Airflow 2.10+: Imports from airflow.hooks.base and airflow.models

229

```

230

231

### Version Compatibility Utilities

232

233

The provider includes utilities for handling different Airflow versions:

234

235

```python { .api }

236

def get_base_airflow_version_tuple() -> tuple[int, int, int]:

237

"""

238

Get the base Airflow version as a tuple.

239

240

Returns:

241

tuple: (major, minor, patch) version numbers

242

"""

243

244

# Version detection constants

245

AIRFLOW_V_3_0_PLUS: bool # True if Airflow 3.0 or higher

246

AIRFLOW_V_3_1_PLUS: bool # True if Airflow 3.1 or higher

247

```

248

249

These utilities enable the provider to work across multiple Airflow versions by automatically importing the correct base classes and handling version-specific differences.