or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

async-triggers.mdhook-api.mdindex.mdjob-monitoring.mdsync-operations.md

async-triggers.mddocs/

0

# Async Triggers

1

2

The AirbyteSyncTrigger provides asynchronous monitoring capabilities for Airbyte jobs in deferrable execution mode. It enables efficient resource utilization by releasing worker slots while monitoring long-running sync operations.

3

4

## Capabilities

5

6

### Trigger Initialization

7

8

Creates an async trigger for monitoring Airbyte job completion.

9

10

```python { .api }

11

class AirbyteSyncTrigger(BaseTrigger):

12

def __init__(

13

self,

14

job_id: int,

15

conn_id: str,

16

end_time: float,

17

poll_interval: float,

18

) -> None:

19

"""

20

Initialize Airbyte sync trigger.

21

22

Args:

23

job_id: Airbyte job ID to monitor

24

conn_id: Airflow connection ID for Airbyte server

25

end_time: Unix timestamp when monitoring should timeout

26

poll_interval: Seconds between status checks

27

"""

28

```

29

30

### Serialization

31

32

Methods for trigger persistence and restoration.

33

34

```python { .api }

35

def serialize(self) -> tuple[str, dict[str, Any]]:

36

"""

37

Serialize trigger state for persistence.

38

39

Returns:

40

Tuple of (class_path, serialized_arguments)

41

"""

42

```

43

44

### Async Monitoring

45

46

Core asynchronous monitoring functionality.

47

48

```python { .api }

49

async def run(self) -> AsyncIterator[TriggerEvent]:

50

"""

51

Execute async monitoring loop until job completion or timeout.

52

53

Yields:

54

TriggerEvent with job completion status and details

55

56

Events:

57

- {"status": "success", "message": "...", "job_id": int}

58

- {"status": "error", "message": "...", "job_id": int}

59

- {"status": "cancelled", "message": "...", "job_id": int}

60

"""

61

62

async def is_still_running(self, hook: AirbyteHook) -> bool:

63

"""

64

Check if job is still in a running state.

65

66

Args:

67

hook: AirbyteHook instance for API communication

68

69

Returns:

70

True if job is RUNNING, PENDING, or INCOMPLETE

71

"""

72

```

73

74

## Usage Examples

75

76

### Direct Trigger Usage

77

78

```python

79

import asyncio

80

from airflow.providers.airbyte.triggers.airbyte import AirbyteSyncTrigger

81

82

async def monitor_job():

83

"""Example of direct trigger usage."""

84

trigger = AirbyteSyncTrigger(

85

job_id=12345,

86

conn_id='airbyte_default',

87

end_time=time.time() + 3600, # 1 hour from now

88

poll_interval=60 # Check every minute

89

)

90

91

async for event in trigger.run():

92

print(f"Job event: {event}")

93

break # Exit after first event

94

95

# Run the monitoring

96

asyncio.run(monitor_job())

97

```

98

99

### Integration with Deferrable Operator

100

101

```python

102

from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator

103

104

# Operator automatically uses trigger when deferrable=True

105

deferrable_sync = AirbyteTriggerSyncOperator(

106

task_id='deferrable_sync',

107

connection_id='connection-uuid-123',

108

deferrable=True, # Automatically creates and uses AirbyteSyncTrigger

109

timeout=7200, # 2 hours - converted to end_time

110

dag=dag

111

)

112

```

113

114

### Integration with Deferrable Sensor

115

116

```python

117

from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor

118

119

# Sensor automatically uses trigger when deferrable=True

120

deferrable_sensor = AirbyteJobSensor(

121

task_id='deferrable_monitor',

122

airbyte_job_id=67890,

123

deferrable=True, # Automatically creates and uses AirbyteSyncTrigger

124

timeout=3600, # 1 hour - converted to end_time

125

dag=dag

126

)

127

```

128

129

### Custom Trigger Implementation

130

131

```python

132

import time

133

from datetime import timedelta

134

from airflow.providers.airbyte.triggers.airbyte import AirbyteSyncTrigger

135

from airflow.providers.airbyte.version_compat import BaseOperator

136

137

class CustomAirbyteOperator(BaseOperator):

138

"""Custom operator with trigger usage."""

139

140

def execute(self, context):

141

"""Execute with custom trigger configuration."""

142

# Submit job logic here

143

job_id = self.submit_job()

144

145

if self.deferrable:

146

# Custom trigger configuration

147

self.defer(

148

timeout=self.execution_timeout,

149

trigger=AirbyteSyncTrigger(

150

job_id=job_id,

151

conn_id=self.airbyte_conn_id,

152

end_time=time.time() + 7200, # Custom 2-hour timeout

153

poll_interval=30, # Custom 30-second interval

154

),

155

method_name="execute_complete",

156

)

157

158

def execute_complete(self, context, event=None):

159

"""Handle trigger completion."""

160

if event["status"] == "success":

161

self.log.info(f"Job {event['job_id']} completed successfully")

162

else:

163

raise AirflowException(f"Job failed: {event['message']}")

164

```

165

166

## Event Types

167

168

The trigger yields different event types based on job outcomes:

169

170

### Success Event

171

172

```python

173

{

174

"status": "success",

175

"message": "Job run 12345 has completed successfully.",

176

"job_id": 12345

177

}

178

```

179

180

### Error Event

181

182

```python

183

{

184

"status": "error",

185

"message": "Job run 12345 has failed.",

186

"job_id": 12345

187

}

188

```

189

190

### Cancellation Event

191

192

```python

193

{

194

"status": "cancelled",

195

"message": "Job run 12345 has been cancelled.",

196

"job_id": 12345

197

}

198

```

199

200

### Timeout Event

201

202

```python

203

{

204

"status": "error",

205

"message": "Job run 12345 has not reached a terminal status after 3600 seconds.",

206

"job_id": 12345

207

}

208

```

209

210

### Exception Event

211

212

```python

213

{

214

"status": "error",

215

"message": "Connection timeout: Unable to reach Airbyte server",

216

"job_id": 12345

217

}

218

```

219

220

## Trigger Lifecycle

221

222

### Initialization Phase

223

1. Trigger receives job_id, connection info, and timing parameters

224

2. Trigger serializes state for persistence

225

3. Airflow schedules trigger for async execution

226

227

### Monitoring Phase

228

1. Trigger creates AirbyteHook for API communication

229

2. Enters polling loop with specified interval

230

3. Checks job status on each iteration

231

4. Continues until terminal state or timeout

232

233

### Completion Phase

234

1. Trigger yields appropriate TriggerEvent

235

2. Associated task receives event via execute_complete()

236

3. Task completes or raises exception based on event status

237

238

## Configuration

239

240

### Timing Parameters

241

242

```python

243

AirbyteSyncTrigger(

244

job_id=12345,

245

conn_id='airbyte_default',

246

247

# Timeout configuration

248

end_time=time.time() + 7200, # 2 hours from now

249

250

# Polling configuration

251

poll_interval=60, # Check every 60 seconds

252

)

253

```

254

255

### Connection Configuration

256

257

The trigger uses the same connection configuration as other Airbyte components:

258

259

```python

260

# Connection parameters extracted from Airflow connection

261

{

262

"host": "https://api.airbyte.com",

263

"client_id": "oauth_client_id",

264

"client_secret": "oauth_client_secret",

265

"token_url": "v1/applications/token",

266

"proxies": {...} # Optional proxy settings

267

}

268

```

269

270

## Error Handling

271

272

The trigger handles various error scenarios:

273

274

### Network Errors

275

- Connection timeouts to Airbyte server

276

- DNS resolution failures

277

- Network connectivity issues

278

279

### Authentication Errors

280

- Invalid client credentials

281

- Expired tokens

282

- Authorization failures

283

284

### API Errors

285

- Invalid job IDs

286

- Server internal errors

287

- Rate limiting responses

288

289

### Job State Errors

290

- Unexpected job state transitions

291

- Job not found scenarios

292

- Malformed API responses

293

294

All errors are captured and yielded as error events with descriptive messages for debugging.

295

296

## Best Practices

297

298

### Timeout Configuration

299

- Set reasonable timeouts based on expected job duration

300

- Consider data volume and complexity when setting timeouts

301

- Use longer timeouts for initial syncs, shorter for incremental

302

303

### Polling Intervals

304

- Balance between responsiveness and API load

305

- Use longer intervals (60+ seconds) for long-running jobs

306

- Use shorter intervals (10-30 seconds) for quick jobs

307

308

### Resource Management

309

- Prefer deferrable triggers over polling for long jobs

310

- Monitor trigger resource usage in large deployments

311

- Consider connection pooling for high-frequency monitoring

312

313

### Error Recovery

314

- Implement appropriate retry logic in calling operators/sensors

315

- Log sufficient detail for troubleshooting failed jobs

316

- Set up alerting for persistent trigger failures