0
# Sync Operations
1
2
The AirbyteTriggerSyncOperator provides Airflow task functionality for triggering Airbyte data synchronization jobs. It supports both synchronous and asynchronous execution modes, with optional deferrable execution for long-running jobs.
3
4
```python
5
from airflow.configuration import conf
6
```
7
8
## Capabilities
9
10
### Operator Initialization
11
12
Creates a sync operator task with comprehensive configuration options.
13
14
```python { .api }
15
class AirbyteTriggerSyncOperator(BaseOperator):
16
def __init__(
17
self,
18
connection_id: str,
19
airbyte_conn_id: str = "airbyte_default",
20
asynchronous: bool = False,
21
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
22
api_version: str = "v1",
23
wait_seconds: float = 3,
24
timeout: float = 3600,
25
**kwargs
26
) -> None:
27
"""
28
Initialize Airbyte sync operator.
29
30
Args:
31
connection_id: Required. Airbyte connection UUID to sync
32
airbyte_conn_id: Airflow connection ID for Airbyte server
33
asynchronous: Return job_id immediately without waiting for completion
34
deferrable: Use deferrable execution mode for long-running jobs
35
api_version: Airbyte API version to use
36
wait_seconds: Polling interval for synchronous mode
37
timeout: Maximum wait time for job completion
38
**kwargs: Additional BaseOperator arguments
39
"""
40
```
41
42
### Class Attributes
43
44
Template fields and UI configuration.
45
46
```python { .api }
47
template_fields: Sequence[str] = ("connection_id",)
48
ui_color: str = "#6C51FD"
49
```
50
51
### Execution Methods
52
53
Core execution functionality for different modes.
54
55
```python { .api }
56
def execute(self, context: Context) -> int:
57
"""
58
Execute the sync operation.
59
60
Args:
61
context: Airflow task execution context
62
63
Returns:
64
Job ID (int) for the submitted Airbyte job
65
66
Raises:
67
AirflowException: If job submission fails or job completes with error
68
"""
69
70
def execute_complete(self, context: Context, event: Any = None) -> None:
71
"""
72
Callback method for deferrable mode completion.
73
74
Args:
75
context: Airflow task execution context
76
event: Trigger event data
77
78
Raises:
79
AirflowException: If job completed with error status
80
"""
81
82
def on_kill(self) -> None:
83
"""
84
Cancel the Airbyte job when task is killed.
85
86
Called automatically by Airflow when task is cancelled or times out.
87
"""
88
```
89
90
## Usage Examples
91
92
### Synchronous Execution
93
94
```python
95
from datetime import datetime
96
from airflow import DAG
97
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
98
99
dag = DAG(
100
'sync_example',
101
start_date=datetime(2024, 1, 1),
102
schedule_interval='@daily'
103
)
104
105
# Synchronous sync - waits for completion
106
sync_task = AirbyteTriggerSyncOperator(
107
task_id='sync_customer_data',
108
connection_id='{{ var.value.customer_connection_id }}',
109
airbyte_conn_id='airbyte_default',
110
timeout=1800, # 30 minutes
111
wait_seconds=10, # Check every 10 seconds
112
dag=dag
113
)
114
```
115
116
### Asynchronous Execution
117
118
```python
119
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
120
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
121
122
# Async sync - returns job_id immediately
123
trigger_sync = AirbyteTriggerSyncOperator(
124
task_id='trigger_sync',
125
connection_id='connection-uuid-123',
126
asynchronous=True, # Return job_id without waiting
127
dag=dag
128
)
129
130
# Monitor completion with sensor
131
monitor_sync = AirbyteJobSensor(
132
task_id='monitor_sync',
133
airbyte_job_id="{{ task_instance.xcom_pull(task_ids='trigger_sync') }}",
134
timeout=3600,
135
dag=dag
136
)
137
138
trigger_sync >> monitor_sync
139
```
140
141
### Deferrable Execution
142
143
```python
144
# Deferrable mode - uses async triggers
145
deferrable_sync = AirbyteTriggerSyncOperator(
146
task_id='deferrable_sync',
147
connection_id='connection-uuid-123',
148
deferrable=True, # Use async trigger mechanism
149
timeout=7200, # 2 hours
150
dag=dag
151
)
152
```
153
154
### Dynamic Connection IDs
155
156
```python
157
# Using templated connection_id
158
dynamic_sync = AirbyteTriggerSyncOperator(
159
task_id='dynamic_sync',
160
connection_id='{{ dag_run.conf["connection_id"] }}', # From DAG run config
161
airbyte_conn_id='{{ var.value.airbyte_conn }}', # From Airflow variable
162
dag=dag
163
)
164
```
165
166
### Error Handling and Retries
167
168
```python
169
from datetime import timedelta
170
from airflow.operators.bash import BashOperator
171
from airflow.utils.trigger_rule import TriggerRule
172
173
# Sync with custom retry configuration
174
robust_sync = AirbyteTriggerSyncOperator(
175
task_id='robust_sync',
176
connection_id='connection-uuid-123',
177
timeout=3600,
178
retries=3,
179
retry_delay=timedelta(minutes=5),
180
dag=dag
181
)
182
183
# Cleanup task that runs even if sync fails
184
cleanup_task = BashOperator(
185
task_id='cleanup',
186
bash_command='echo "Cleaning up after sync"',
187
trigger_rule=TriggerRule.ALL_DONE, # Run regardless of upstream status
188
dag=dag
189
)
190
191
robust_sync >> cleanup_task
192
```
193
194
## Execution Modes
195
196
### Synchronous Mode (Default)
197
- **asynchronous=False, deferrable=False**
198
- Task blocks until job completion
199
- Uses polling with configurable intervals
200
- Suitable for short to medium duration jobs
201
- Consumes worker slot during entire execution
202
203
### Asynchronous Mode
204
- **asynchronous=True**
205
- Returns job_id immediately
206
- Requires separate monitoring (sensor/operator)
207
- Frees worker slot immediately
208
- Best for fire-and-forget scenarios
209
210
### Deferrable Mode
211
- **deferrable=True**
212
- Uses async triggers for monitoring
213
- Frees worker slot during job execution
214
- Automatically resumes when job completes
215
- Optimal for long-running jobs in resource-constrained environments
216
217
## Configuration
218
219
### Connection Requirements
220
221
The operator requires an Airflow connection of type "airbyte" with:
222
223
```python
224
{
225
"conn_type": "airbyte",
226
"host": "https://api.airbyte.com", # Airbyte server URL
227
"login": "client_id", # OAuth client ID
228
"password": "client_secret", # OAuth client secret
229
"schema": "v1/applications/token", # Token endpoint
230
"extra": {
231
"proxies": { # Optional proxy configuration
232
"http": "http://proxy:8080",
233
"https": "https://proxy:8080"
234
}
235
}
236
}
237
```
238
239
### Template Fields
240
241
The `connection_id` field supports Jinja templating for dynamic values:
242
243
```python
244
# From DAG run configuration
245
connection_id='{{ dag_run.conf["connection_id"] }}'
246
247
# From Airflow variables
248
connection_id='{{ var.value.my_connection_id }}'
249
250
# From task context
251
connection_id='{{ ds }}_connection' # Date-based connection
252
```
253
254
## Error Handling
255
256
The operator handles various error scenarios:
257
258
- **Job submission failures**: Invalid connection_id, authentication errors
259
- **Job execution failures**: Data sync errors, connection timeouts
260
- **Task cancellation**: Automatically cancels Airbyte job via on_kill()
261
- **Timeout scenarios**: Cancels job and raises AirflowException
262
- **Unexpected job states**: Handles CANCELLED and unknown states appropriately
263
264
All errors are logged with appropriate detail levels for debugging and monitoring.