or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin-operations.mdasync-operations.mdconnection-management.mdindex.mdmessage-consumption.mdmessage-production.md

async-operations.mddocs/

0

# Asynchronous Operations

1

2

Deferrable sensors and triggers for event-driven processing, enabling non-blocking message monitoring in Kafka topics.

3

4

## Capabilities

5

6

### Await Message Sensor

7

8

Deferrable sensor that waits for specific messages in Kafka topics without blocking worker slots. Uses apply functions to test message conditions and supports deferrable execution.

9

10

```python { .api }

11

class AwaitMessageSensor(BaseOperator):

12

"""

13

Sensor that waits for messages in Kafka topics using deferrable execution.

14

15

Attributes:

16

template_fields: tuple = ("topics", "apply_function", "kafka_config_id")

17

ui_color: str = "#e8f4fd"

18

"""

19

20

def __init__(

21

self,

22

topics: Sequence[str],

23

apply_function: str | Callable,

24

kafka_config_id: str = "kafka_default",

25

apply_function_args: Sequence[Any] | None = None,

26

apply_function_kwargs: dict[Any, Any] | None = None,

27

poll_timeout: float = 1.0,

28

poll_interval: float = 5.0,

29

commit_cadence: str = "end_of_batch",

30

max_messages: int | None = None,

31

max_batch_size: int = 1000,

32

**kwargs: Any

33

) -> None:

34

"""

35

Initialize the message sensor.

36

37

Args:

38

topics: List of topics to monitor

39

apply_function: Function to test message conditions (callable or import string)

40

kafka_config_id: Airflow connection ID for Kafka configuration

41

apply_function_args: Arguments to pass to apply function

42

apply_function_kwargs: Keyword arguments to pass to apply function

43

poll_timeout: Timeout for individual polls (seconds)

44

poll_interval: Interval between polls (seconds)

45

commit_cadence: When to commit messages ("end_of_batch", "end_of_operator", "never")

46

max_messages: Maximum number of messages to consume

47

max_batch_size: Maximum batch size for message processing

48

**kwargs: Additional operator arguments

49

"""

50

51

def execute(self, context) -> Any:

52

"""

53

Execute the sensor operation.

54

55

Args:

56

context: Airflow task context

57

58

Returns:

59

Any: Result when condition is met

60

"""

61

62

def execute_complete(self, context, event=None):

63

"""

64

Handle trigger completion.

65

66

Args:

67

context: Airflow task context

68

event: Event data from trigger

69

"""

70

```

71

72

### Await Message Trigger Function Sensor

73

74

Enhanced sensor with custom trigger function for complex event handling. Provides additional event processing capabilities beyond basic condition checking.

75

76

```python { .api }

77

class AwaitMessageTriggerFunctionSensor(BaseOperator):

78

"""

79

Sensor with custom trigger function for advanced message handling.

80

81

Attributes:

82

template_fields: tuple = ("topics", "apply_function", "kafka_config_id")

83

ui_color: str = "#e8f4fd"

84

"""

85

86

def __init__(

87

self,

88

topics: Sequence[str],

89

apply_function: str | Callable,

90

event_triggered_function: Callable,

91

kafka_config_id: str = "kafka_default",

92

apply_function_args: Sequence[Any] | None = None,

93

apply_function_kwargs: dict[Any, Any] | None = None,

94

poll_timeout: float = 1.0,

95

poll_interval: float = 5.0,

96

**kwargs: Any

97

) -> None:

98

"""

99

Initialize the trigger function sensor.

100

101

Args:

102

topics: List of topics to monitor

103

apply_function: Function to test message conditions (callable or import string)

104

event_triggered_function: Function to call when event triggers

105

kafka_config_id: Airflow connection ID for Kafka configuration

106

apply_function_args: Arguments to pass to apply function

107

apply_function_kwargs: Keyword arguments to pass to apply function

108

poll_timeout: Timeout for individual polls (seconds)

109

poll_interval: Interval between polls (seconds)

110

**kwargs: Additional operator arguments

111

"""

112

113

def execute(self, context, event=None) -> Any:

114

"""

115

Execute the sensor operation.

116

117

Args:

118

context: Airflow task context

119

event: Event data

120

121

Returns:

122

Any: Result when condition is met

123

"""

124

```

125

126

### Await Message Trigger

127

128

Low-level trigger for asynchronous message monitoring in Kafka topics. Provides the core deferrable execution mechanism used by sensors.

129

130

```python { .api }

131

class AwaitMessageTrigger(BaseTrigger):

132

"""

133

Trigger for asynchronous message monitoring in Kafka topics.

134

"""

135

136

def __init__(

137

self,

138

topics: Sequence[str],

139

apply_function: str,

140

kafka_config_id: str = "kafka_default",

141

apply_function_args: Sequence[Any] | None = None,

142

apply_function_kwargs: dict[Any, Any] | None = None,

143

poll_timeout: float = 1.0,

144

poll_interval: float = 5.0,

145

**kwargs: Any

146

) -> None:

147

"""

148

Initialize the message trigger.

149

150

Args:

151

topics: List of topics to monitor

152

apply_function: Import string for function to test message conditions

153

kafka_config_id: Airflow connection ID for Kafka configuration

154

apply_function_args: Arguments to pass to apply function

155

apply_function_kwargs: Keyword arguments to pass to apply function

156

poll_timeout: Timeout for individual polls (seconds)

157

poll_interval: Interval between polls (seconds)

158

**kwargs: Additional trigger arguments

159

"""

160

161

def serialize(self) -> tuple[str, dict[str, Any]]:

162

"""

163

Serialize trigger state.

164

165

Returns:

166

tuple: (class_path, serialized_kwargs)

167

"""

168

169

async def run(self):

170

"""

171

Run the trigger asynchronously.

172

173

Yields:

174

TriggerEvent: Event when condition is met

175

"""

176

```

177

178

### Usage Examples

179

180

#### Basic Message Waiting

181

182

```python

183

from airflow import DAG

184

from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageSensor

185

from datetime import datetime

186

import json

187

188

def check_order_completion(message):

189

"""Check if message indicates order completion."""

190

try:

191

data = json.loads(message.value().decode('utf-8'))

192

return data.get("status") == "completed" and data.get("order_id") == "12345"

193

except Exception:

194

return False

195

196

dag = DAG(

197

"kafka_sensor_example",

198

start_date=datetime(2023, 1, 1),

199

schedule_interval=None,

200

catchup=False

201

)

202

203

wait_for_order = AwaitMessageSensor(

204

task_id="wait_for_order_completion",

205

topics=["order-events"],

206

apply_function=check_order_completion,

207

poll_timeout=1.0,

208

poll_interval=5.0,

209

timeout=300, # 5 minutes timeout

210

kafka_config_id="kafka_default",

211

dag=dag

212

)

213

```

214

215

#### Using Apply Function as Import String

216

217

```python

218

# Define function in separate module (e.g. my_functions.py)

219

def detect_critical_event(message):

220

"""Detect critical events in message stream."""

221

try:

222

data = json.loads(message.value().decode('utf-8'))

223

return data.get("priority") == "critical"

224

except Exception:

225

return False

226

227

# Use import string in sensor

228

critical_sensor = AwaitMessageSensor(

229

task_id="detect_critical",

230

topics=["system-events"],

231

apply_function="my_functions.detect_critical_event", # Import string

232

poll_timeout=1.0,

233

poll_interval=2.0,

234

kafka_config_id="kafka_default"

235

)

236

```

237

238

#### Commit Cadence Configuration

239

240

```python

241

# Different commit strategies

242

sensor_end_of_batch = AwaitMessageSensor(

243

task_id="sensor_batch_commit",

244

topics=["events"],

245

apply_function="my_functions.process_event",

246

commit_cadence="end_of_batch", # Commit after each batch

247

max_batch_size=100,

248

kafka_config_id="kafka_default"

249

)

250

251

sensor_end_of_operator = AwaitMessageSensor(

252

task_id="sensor_operator_commit",

253

topics=["events"],

254

apply_function="my_functions.process_event",

255

commit_cadence="end_of_operator", # Commit at end of execution

256

kafka_config_id="kafka_default"

257

)

258

259

sensor_never_commit = AwaitMessageSensor(

260

task_id="sensor_no_commit",

261

topics=["events"],

262

apply_function="my_functions.process_event",

263

commit_cadence="never", # Never commit (external commit control)

264

kafka_config_id="kafka_default"

265

)

266

```

267

268

#### Custom Trigger Function Sensor

269

270

```python

271

from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageTriggerFunctionSensor

272

273

def event_condition(message):

274

"""Check if message meets condition."""

275

data = json.loads(message.value().decode('utf-8'))

276

return data.get("event_type") == "alert"

277

278

def handle_alert(event_data):

279

"""Handle alert when triggered."""

280

print(f"Alert triggered: {event_data}")

281

# Custom alert handling logic

282

return "Alert processed"

283

284

alert_sensor = AwaitMessageTriggerFunctionSensor(

285

task_id="handle_alerts",

286

topics=["alerts"],

287

apply_function=event_condition,

288

event_triggered_function=handle_alert,

289

poll_timeout=0.5,

290

poll_interval=1.0,

291

kafka_config_id="kafka_default"

292

)

293

```

294

295

#### Using Trigger Directly

296

297

```python

298

from airflow.providers.apache.kafka.triggers.await_message import AwaitMessageTrigger

299

from airflow.sensors.base import BaseSensorOperator

300

301

class CustomKafkaSensor(BaseSensorOperator):

302

def __init__(self, topics, apply_function, **kwargs):

303

super().__init__(**kwargs)

304

self.topics = topics

305

self.apply_function = apply_function

306

307

def poke(self, context):

308

# Create and use trigger directly

309

trigger = AwaitMessageTrigger(

310

topics=self.topics,

311

apply_function=self.apply_function,

312

kafka_config_id="kafka_default",

313

poll_timeout=1.0,

314

poll_interval=2.0

315

)

316

317

# Use trigger in custom logic

318

return self._check_trigger_condition(trigger, context)

319

```

320

321

### Error Handling

322

323

The provider includes error handling mechanisms:

324

325

```python { .api }

326

def error_callback(err):

327

"""

328

Error callback function for Kafka consumer errors.

329

330

Args:

331

err: Kafka error object

332

"""

333

```

334

335

Custom exception for authentication failures:

336

337

```python { .api }

338

class KafkaAuthenticationError(Exception):

339

"""Custom exception for Kafka authentication failures."""

340

```

341

342

### Configuration Constants

343

344

```python { .api }

345

VALID_COMMIT_CADENCE = ["end_of_batch", "end_of_operator", "never"]

346

```

347

348

### Best Practices

349

350

1. **Deferrable Execution**: Use sensors for long-running waits to free up worker slots

351

2. **Apply Function Design**: Keep apply functions lightweight and handle exceptions gracefully

352

3. **Commit Strategy**: Choose appropriate commit cadence based on your use case

353

4. **Error Handling**: Implement proper error handling in apply functions

354

5. **Resource Management**: Set reasonable batch sizes and timeouts

355

6. **Import Strings**: Use import strings for apply functions to enable proper serialization

356

7. **Connection Management**: Use appropriate Kafka connection configurations