0
# Async Triggers
1
2
The AirbyteSyncTrigger provides asynchronous monitoring capabilities for Airbyte jobs in deferrable execution mode. It enables efficient resource utilization by releasing worker slots while monitoring long-running sync operations.
3
4
## Capabilities
5
6
### Trigger Initialization
7
8
Creates an async trigger for monitoring Airbyte job completion.
9
10
```python { .api }
11
class AirbyteSyncTrigger(BaseTrigger):
12
def __init__(
13
self,
14
job_id: int,
15
conn_id: str,
16
end_time: float,
17
poll_interval: float,
18
) -> None:
19
"""
20
Initialize Airbyte sync trigger.
21
22
Args:
23
job_id: Airbyte job ID to monitor
24
conn_id: Airflow connection ID for Airbyte server
25
end_time: Unix timestamp when monitoring should timeout
26
poll_interval: Seconds between status checks
27
"""
28
```
29
30
### Serialization
31
32
Methods for trigger persistence and restoration.
33
34
```python { .api }
35
def serialize(self) -> tuple[str, dict[str, Any]]:
36
"""
37
Serialize trigger state for persistence.
38
39
Returns:
40
Tuple of (class_path, serialized_arguments)
41
"""
42
```
43
44
### Async Monitoring
45
46
Core asynchronous monitoring functionality.
47
48
```python { .api }
49
async def run(self) -> AsyncIterator[TriggerEvent]:
50
"""
51
Execute async monitoring loop until job completion or timeout.
52
53
Yields:
54
TriggerEvent with job completion status and details
55
56
Events:
57
- {"status": "success", "message": "...", "job_id": int}
58
- {"status": "error", "message": "...", "job_id": int}
59
- {"status": "cancelled", "message": "...", "job_id": int}
60
"""
61
62
async def is_still_running(self, hook: AirbyteHook) -> bool:
63
"""
64
Check if job is still in a running state.
65
66
Args:
67
hook: AirbyteHook instance for API communication
68
69
Returns:
70
True if job is RUNNING, PENDING, or INCOMPLETE
71
"""
72
```
73
74
## Usage Examples
75
76
### Direct Trigger Usage
77
78
```python
79
import asyncio
80
from airflow.providers.airbyte.triggers.airbyte import AirbyteSyncTrigger
81
82
async def monitor_job():
83
"""Example of direct trigger usage."""
84
trigger = AirbyteSyncTrigger(
85
job_id=12345,
86
conn_id='airbyte_default',
87
end_time=time.time() + 3600, # 1 hour from now
88
poll_interval=60 # Check every minute
89
)
90
91
async for event in trigger.run():
92
print(f"Job event: {event}")
93
break # Exit after first event
94
95
# Run the monitoring
96
asyncio.run(monitor_job())
97
```
98
99
### Integration with Deferrable Operator
100
101
```python
102
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
103
104
# Operator automatically uses trigger when deferrable=True
105
deferrable_sync = AirbyteTriggerSyncOperator(
106
task_id='deferrable_sync',
107
connection_id='connection-uuid-123',
108
deferrable=True, # Automatically creates and uses AirbyteSyncTrigger
109
timeout=7200, # 2 hours - converted to end_time
110
dag=dag
111
)
112
```
113
114
### Integration with Deferrable Sensor
115
116
```python
117
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
118
119
# Sensor automatically uses trigger when deferrable=True
120
deferrable_sensor = AirbyteJobSensor(
121
task_id='deferrable_monitor',
122
airbyte_job_id=67890,
123
deferrable=True, # Automatically creates and uses AirbyteSyncTrigger
124
timeout=3600, # 1 hour - converted to end_time
125
dag=dag
126
)
127
```
128
129
### Custom Trigger Implementation
130
131
```python
132
import time
133
from datetime import timedelta
134
from airflow.providers.airbyte.triggers.airbyte import AirbyteSyncTrigger
135
from airflow.providers.airbyte.version_compat import BaseOperator
136
137
class CustomAirbyteOperator(BaseOperator):
138
"""Custom operator with trigger usage."""
139
140
def execute(self, context):
141
"""Execute with custom trigger configuration."""
142
# Submit job logic here
143
job_id = self.submit_job()
144
145
if self.deferrable:
146
# Custom trigger configuration
147
self.defer(
148
timeout=self.execution_timeout,
149
trigger=AirbyteSyncTrigger(
150
job_id=job_id,
151
conn_id=self.airbyte_conn_id,
152
end_time=time.time() + 7200, # Custom 2-hour timeout
153
poll_interval=30, # Custom 30-second interval
154
),
155
method_name="execute_complete",
156
)
157
158
def execute_complete(self, context, event=None):
159
"""Handle trigger completion."""
160
if event["status"] == "success":
161
self.log.info(f"Job {event['job_id']} completed successfully")
162
else:
163
raise AirflowException(f"Job failed: {event['message']}")
164
```
165
166
## Event Types
167
168
The trigger yields different event types based on job outcomes:
169
170
### Success Event
171
172
```python
173
{
174
"status": "success",
175
"message": "Job run 12345 has completed successfully.",
176
"job_id": 12345
177
}
178
```
179
180
### Error Event
181
182
```python
183
{
184
"status": "error",
185
"message": "Job run 12345 has failed.",
186
"job_id": 12345
187
}
188
```
189
190
### Cancellation Event
191
192
```python
193
{
194
"status": "cancelled",
195
"message": "Job run 12345 has been cancelled.",
196
"job_id": 12345
197
}
198
```
199
200
### Timeout Event
201
202
```python
203
{
204
"status": "error",
205
"message": "Job run 12345 has not reached a terminal status after 3600 seconds.",
206
"job_id": 12345
207
}
208
```
209
210
### Exception Event
211
212
```python
213
{
214
"status": "error",
215
"message": "Connection timeout: Unable to reach Airbyte server",
216
"job_id": 12345
217
}
218
```
219
220
## Trigger Lifecycle
221
222
### Initialization Phase
223
1. Trigger receives job_id, connection info, and timing parameters
224
2. Trigger serializes state for persistence
225
3. Airflow schedules trigger for async execution
226
227
### Monitoring Phase
228
1. Trigger creates AirbyteHook for API communication
229
2. Enters polling loop with specified interval
230
3. Checks job status on each iteration
231
4. Continues until terminal state or timeout
232
233
### Completion Phase
234
1. Trigger yields appropriate TriggerEvent
235
2. Associated task receives event via execute_complete()
236
3. Task completes or raises exception based on event status
237
238
## Configuration
239
240
### Timing Parameters
241
242
```python
243
AirbyteSyncTrigger(
244
job_id=12345,
245
conn_id='airbyte_default',
246
247
# Timeout configuration
248
end_time=time.time() + 7200, # 2 hours from now
249
250
# Polling configuration
251
poll_interval=60, # Check every 60 seconds
252
)
253
```
254
255
### Connection Configuration
256
257
The trigger uses the same connection configuration as other Airbyte components:
258
259
```python
260
# Connection parameters extracted from Airflow connection
261
{
262
"host": "https://api.airbyte.com",
263
"client_id": "oauth_client_id",
264
"client_secret": "oauth_client_secret",
265
"token_url": "v1/applications/token",
266
"proxies": {...} # Optional proxy settings
267
}
268
```
269
270
## Error Handling
271
272
The trigger handles various error scenarios:
273
274
### Network Errors
275
- Connection timeouts to Airbyte server
276
- DNS resolution failures
277
- Network connectivity issues
278
279
### Authentication Errors
280
- Invalid client credentials
281
- Expired tokens
282
- Authorization failures
283
284
### API Errors
285
- Invalid job IDs
286
- Server internal errors
287
- Rate limiting responses
288
289
### Job State Errors
290
- Unexpected job state transitions
291
- Job not found scenarios
292
- Malformed API responses
293
294
All errors are captured and yielded as error events with descriptive messages for debugging.
295
296
## Best Practices
297
298
### Timeout Configuration
299
- Set reasonable timeouts based on expected job duration
300
- Consider data volume and complexity when setting timeouts
301
- Use longer timeouts for initial syncs, shorter for incremental
302
303
### Polling Intervals
304
- Balance between responsiveness and API load
305
- Use longer intervals (60+ seconds) for long-running jobs
306
- Use shorter intervals (10-30 seconds) for quick jobs
307
308
### Resource Management
309
- Prefer deferrable triggers over polling for long jobs
310
- Monitor trigger resource usage in large deployments
311
- Consider connection pooling for high-frequency monitoring
312
313
### Error Recovery
314
- Implement appropriate retry logic in calling operators/sensors
315
- Log sufficient detail for troubleshooting failed jobs
316
- Set up alerting for persistent trigger failures