0
# Job Monitoring
1
2
The AirbyteJobSensor provides Airflow sensor functionality for monitoring the status of Airbyte jobs. It supports both traditional polling and deferrable execution modes, making it suitable for monitoring long-running sync operations.
3
4
```python
5
from airflow.configuration import conf
6
```
7
8
## Capabilities
9
10
### Sensor Initialization
11
12
Creates a sensor to monitor specific Airbyte job completion.
13
14
```python { .api }
15
class AirbyteJobSensor(BaseSensorOperator):
16
def __init__(
17
self,
18
*,
19
airbyte_job_id: int,
20
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
21
airbyte_conn_id: str = "airbyte_default",
22
api_version: str = "v1",
23
**kwargs
24
) -> None:
25
"""
26
Initialize Airbyte job sensor.
27
28
Args:
29
airbyte_job_id: Required. Airbyte job ID to monitor
30
deferrable: Use deferrable execution mode
31
airbyte_conn_id: Airflow connection ID for Airbyte server
32
api_version: Airbyte API version to use
33
**kwargs: Additional BaseSensorOperator arguments (poke_interval, timeout, etc.)
34
"""
35
```
36
37
### Class Attributes
38
39
Template fields and UI configuration.
40
41
```python { .api }
42
template_fields: Sequence[str] = ("airbyte_job_id",)
43
ui_color: str = "#6C51FD"
44
```
45
46
### Monitoring Methods
47
48
Core sensor functionality for job status checking.
49
50
```python { .api }
51
def poke(self, context: Context) -> bool:
52
"""
53
Check job status and determine if sensor condition is satisfied.
54
55
Args:
56
context: Airflow task execution context
57
58
Returns:
59
True if job completed successfully, False if still running
60
61
Raises:
62
AirflowException: If job failed or was cancelled
63
"""
64
65
def execute(self, context: Context) -> Any:
66
"""
67
Execute sensor logic with support for both polling and deferrable modes.
68
69
Args:
70
context: Airflow task execution context
71
72
Returns:
73
None when job completes successfully
74
75
Raises:
76
AirflowException: If job fails, is cancelled, or times out
77
"""
78
79
def execute_complete(self, context: Context, event: Any = None) -> None:
80
"""
81
Callback method for deferrable mode completion.
82
83
Args:
84
context: Airflow task execution context
85
event: Trigger event data
86
87
Raises:
88
AirflowException: If job completed with error status
89
"""
90
```
91
92
## Usage Examples
93
94
### Basic Job Monitoring
95
96
```python
97
from datetime import datetime, timedelta
98
from airflow import DAG
99
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
100
101
dag = DAG(
102
'monitor_example',
103
start_date=datetime(2024, 1, 1),
104
schedule_interval=None
105
)
106
107
# Monitor specific job ID
108
monitor_job = AirbyteJobSensor(
109
task_id='wait_for_sync',
110
airbyte_job_id=12345,
111
airbyte_conn_id='airbyte_default',
112
poke_interval=30, # Check every 30 seconds
113
timeout=3600, # 1 hour timeout
114
dag=dag
115
)
116
```
117
118
### Monitoring Async Jobs
119
120
```python
121
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
122
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
123
124
# Trigger async job
125
trigger_sync = AirbyteTriggerSyncOperator(
126
task_id='trigger_sync',
127
connection_id='connection-uuid-123',
128
asynchronous=True, # Returns job_id
129
dag=dag
130
)
131
132
# Monitor the triggered job
133
monitor_sync = AirbyteJobSensor(
134
task_id='monitor_sync',
135
airbyte_job_id="{{ task_instance.xcom_pull(task_ids='trigger_sync') }}",
136
poke_interval=60, # Check every minute
137
timeout=7200, # 2 hour timeout
138
dag=dag
139
)
140
141
trigger_sync >> monitor_sync
142
```
143
144
### Deferrable Monitoring
145
146
```python
147
# Deferrable sensor - releases worker slot while waiting
148
deferrable_monitor = AirbyteJobSensor(
149
task_id='deferrable_monitor',
150
airbyte_job_id=67890,
151
deferrable=True, # Use async trigger
152
timeout=24*3600, # 24 hour timeout
153
dag=dag
154
)
155
```
156
157
### Dynamic Job ID Monitoring
158
159
```python
160
# Monitor job ID from DAG configuration
161
dynamic_monitor = AirbyteJobSensor(
162
task_id='dynamic_monitor',
163
airbyte_job_id="{{ dag_run.conf['job_id'] }}",
164
poke_interval=45,
165
dag=dag
166
)
167
168
# Monitor job ID from Airflow variable
169
variable_monitor = AirbyteJobSensor(
170
task_id='variable_monitor',
171
airbyte_job_id="{{ var.value.current_job_id }}",
172
timeout=1800,
173
dag=dag
174
)
175
```
176
177
### Multiple Job Monitoring
178
179
```python
180
from airflow.utils.task_group import TaskGroup
181
182
# Monitor multiple jobs in parallel
183
with TaskGroup('monitor_jobs', dag=dag) as job_group:
184
for i, job_id in enumerate([111, 222, 333]):
185
AirbyteJobSensor(
186
task_id=f'monitor_job_{i}',
187
airbyte_job_id=job_id,
188
poke_interval=30,
189
timeout=3600,
190
)
191
```
192
193
### Conditional Monitoring with Branching
194
195
```python
196
from airflow.operators.python import BranchPythonOperator
197
from airflow.operators.dummy import DummyOperator
198
199
def check_job_status(**context):
200
"""Decide whether to monitor or skip based on conditions."""
201
# Custom logic to determine if monitoring is needed
202
if context['dag_run'].conf.get('monitor_job', True):
203
return 'monitor_job'
204
else:
205
return 'skip_monitoring'
206
207
branch_task = BranchPythonOperator(
208
task_id='check_monitoring_needed',
209
python_callable=check_job_status,
210
dag=dag
211
)
212
213
monitor_job = AirbyteJobSensor(
214
task_id='monitor_job',
215
airbyte_job_id="{{ dag_run.conf['job_id'] }}",
216
poke_interval=60,
217
dag=dag
218
)
219
220
skip_task = DummyOperator(
221
task_id='skip_monitoring',
222
dag=dag
223
)
224
225
branch_task >> [monitor_job, skip_task]
226
```
227
228
## Configuration Options
229
230
### Sensor-Specific Configuration
231
232
```python
233
AirbyteJobSensor(
234
# Required parameters
235
airbyte_job_id=12345,
236
237
# Connection configuration
238
airbyte_conn_id='my_airbyte_conn',
239
api_version='v1',
240
241
# Execution mode
242
deferrable=True, # Use async triggers
243
244
# Timing configuration (inherited from BaseSensorOperator)
245
poke_interval=30, # Seconds between status checks
246
timeout=3600, # Maximum wait time
247
exponential_backoff=True, # Increase intervals on failures
248
max_retry_delay=60, # Maximum backoff interval
249
250
# Retry configuration
251
retries=3,
252
retry_delay=timedelta(minutes=5),
253
retry_exponential_backoff=True,
254
)
255
```
256
257
### Deferrable Mode Configuration
258
259
When `deferrable=True`, the sensor automatically configures:
260
261
```python
262
# Default deferrable settings (applied automatically)
263
poke_interval = 5 # Quick initial check
264
timeout = 60*60*24*7 # 7 days default timeout
265
```
266
267
### Template Fields
268
269
The `airbyte_job_id` field supports Jinja templating:
270
271
```python
272
# From XCom (previous task output)
273
airbyte_job_id="{{ task_instance.xcom_pull(task_ids='trigger_task') }}"
274
275
# From DAG run configuration
276
airbyte_job_id="{{ dag_run.conf['job_id'] }}"
277
278
# From Airflow variables
279
airbyte_job_id="{{ var.value.job_to_monitor }}"
280
281
# From task instance context
282
airbyte_job_id="{{ ti.xcom_pull(key='job_id') }}"
283
```
284
285
## Execution Modes
286
287
### Polling Mode (Default)
288
- **deferrable=False**
289
- Continuously occupies worker slot
290
- Suitable for short to medium duration jobs
291
- Uses configurable poke_interval for status checks
292
- Traditional Airflow sensor behavior
293
294
### Deferrable Mode
295
- **deferrable=True**
296
- Releases worker slot while waiting
297
- Uses async triggers for monitoring
298
- Optimal for long-running jobs
299
- Automatically resumes when job completes
300
- Better resource utilization in large deployments
301
302
## Job Status Handling
303
304
The sensor handles all Airbyte job statuses:
305
306
### Success States
307
- **SUCCEEDED**: Job completed successfully, sensor returns True
308
309
### Waiting States
310
- **RUNNING**: Job actively executing, sensor continues waiting
311
- **PENDING**: Job queued for execution, sensor continues waiting
312
- **INCOMPLETE**: Job partially completed, sensor continues waiting
313
314
### Error States
315
- **FAILED**: Job execution failed, raises AirflowException
316
- **CANCELLED**: Job was cancelled, raises AirflowException
317
318
### Unknown States
319
Any unexpected job status raises AirflowException with detailed information.
320
321
## Error Handling
322
323
The sensor provides comprehensive error handling:
324
325
- **Connection errors**: Network issues, authentication failures
326
- **Job not found**: Invalid job_id or job expired
327
- **Timeout errors**: Job exceeds specified timeout duration
328
- **API errors**: Airbyte server errors, rate limiting
329
- **State transition errors**: Unexpected job state changes
330
331
All errors include detailed logging for troubleshooting and monitoring purposes.