0
# Apache Airflow Providers - Airbyte
1
2
Apache Airflow provider package for integrating with Airbyte, an open-source data integration platform. This provider enables Airflow DAGs to trigger, monitor, and manage Airbyte data synchronization jobs through hooks, operators, sensors, and triggers.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-airbyte
7
- **Language**: Python
8
- **Installation**: `pip install apache-airflow-providers-airbyte`
9
10
## Core Imports
11
12
```python
13
from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
14
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
15
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
16
from airflow.providers.airbyte.triggers.airbyte import AirbyteSyncTrigger
17
from airflow.providers.airbyte.get_provider_info import get_provider_info
18
```
19
20
## Basic Usage
21
22
```python
23
from datetime import datetime
24
from airflow import DAG
25
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
26
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
27
28
# Define DAG
29
dag = DAG(
30
'airbyte_sync_example',
31
start_date=datetime(2024, 1, 1),
32
catchup=False,
33
schedule_interval='@daily'
34
)
35
36
# Trigger a sync job
37
trigger_sync = AirbyteTriggerSyncOperator(
38
task_id='trigger_airbyte_sync',
39
connection_id='your-airbyte-connection-id',
40
airbyte_conn_id='airbyte_default',
41
dag=dag
42
)
43
44
# Monitor job completion (if using asynchronous mode)
45
wait_for_completion = AirbyteJobSensor(
46
task_id='wait_for_sync_completion',
47
airbyte_job_id="{{ task_instance.xcom_pull(task_ids='trigger_airbyte_sync') }}",
48
airbyte_conn_id='airbyte_default',
49
dag=dag
50
)
51
52
trigger_sync >> wait_for_completion
53
```
54
55
## Architecture
56
57
The provider follows Airflow's plugin architecture with four main components:
58
59
- **Hook**: Manages API communication with Airbyte server
60
- **Operator**: Triggers sync jobs and manages execution
61
- **Sensor**: Monitors job status for completion
62
- **Trigger**: Provides async monitoring capabilities for deferrable execution
63
64
Connection management is handled through Airflow's connection system with support for client credentials authentication.
65
66
## Capabilities
67
68
### Hook API
69
70
Provides direct access to Airbyte API for job management, status checking, and connection testing.
71
72
```python { .api }
73
class AirbyteHook(BaseHook):
74
def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: str = "v1") -> None: ...
75
def get_job_details(self, job_id: int) -> Any: ...
76
def get_job_status(self, job_id: int) -> str: ...
77
def submit_sync_connection(self, connection_id: str) -> Any: ...
78
def cancel_job(self, job_id: int) -> Any: ...
79
def test_connection(self) -> tuple[bool, str]: ...
80
```
81
82
[Hook API](./hook-api.md)
83
84
### Sync Operations
85
86
Operator for triggering Airbyte sync jobs with support for both synchronous and asynchronous execution modes.
87
88
```python { .api }
89
class AirbyteTriggerSyncOperator(BaseOperator):
90
def __init__(
91
self,
92
connection_id: str,
93
airbyte_conn_id: str = "airbyte_default",
94
asynchronous: bool = False,
95
deferrable: bool = False,
96
api_version: str = "v1",
97
wait_seconds: float = 3,
98
timeout: float = 3600,
99
**kwargs
100
) -> None: ...
101
def execute(self, context: Context) -> int: ...
102
```
103
104
[Sync Operations](./sync-operations.md)
105
106
### Job Monitoring
107
108
Sensor for monitoring Airbyte job status with support for both polling and deferrable modes.
109
110
```python { .api }
111
class AirbyteJobSensor(BaseSensorOperator):
112
def __init__(
113
self,
114
*,
115
airbyte_job_id: int,
116
deferrable: bool = False,
117
airbyte_conn_id: str = "airbyte_default",
118
**kwargs
119
) -> None: ...
120
```
121
122
[Job Monitoring](./job-monitoring.md)
123
124
### Async Triggers
125
126
Trigger for asynchronous job monitoring in deferrable mode.
127
128
```python { .api }
129
class AirbyteSyncTrigger(BaseTrigger):
130
def __init__(
131
self,
132
job_id: int,
133
conn_id: str,
134
end_time: float,
135
poll_interval: float,
136
) -> None: ...
137
```
138
139
[Async Triggers](./async-triggers.md)
140
141
### Provider Information
142
143
Function for retrieving provider metadata and configuration details.
144
145
```python { .api }
146
def get_provider_info() -> dict[str, Any]:
147
"""
148
Get provider metadata including integrations, operators, hooks, sensors, and triggers.
149
150
Returns:
151
Dictionary containing provider configuration and component information
152
"""
153
```
154
155
## Types
156
157
```python { .api }
158
# Job status enumeration from airbyte-api
159
JobStatusEnum = Literal[
160
"RUNNING",
161
"PENDING",
162
"INCOMPLETE",
163
"SUCCEEDED",
164
"FAILED",
165
"CANCELLED"
166
]
167
168
# Airflow context type
169
Context = dict[str, Any]
170
```
171
172
## Connection Configuration
173
174
The provider uses Airflow connections with connection type "airbyte":
175
176
- **Host**: Airbyte server URL
177
- **Login**: Client ID for authentication
178
- **Password**: Client Secret for authentication
179
- **Schema**: Token URL (defaults to "v1/applications/token")
180
- **Extra**: Additional options like proxies
181
182
## Error Handling
183
184
All methods raise `AirflowException` for API errors, timeouts, and job failures. The provider includes proper error handling for:
185
186
- Connection failures
187
- Job timeout scenarios
188
- API authentication errors
189
- Invalid job states
190
- Network connectivity issues