Apache Airflow provider for Airbyte data synchronization platform integration
npx @tessl/cli install tessl/pypi-apache-airflow-providers-airbyte@5.2.0Apache Airflow provider package for integrating with Airbyte, an open-source data integration platform. This provider enables Airflow DAGs to trigger, monitor, and manage Airbyte data synchronization jobs through hooks, operators, sensors, and triggers.
pip install apache-airflow-providers-airbytefrom airflow.providers.airbyte.hooks.airbyte import AirbyteHook
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
from airflow.providers.airbyte.triggers.airbyte import AirbyteSyncTrigger
from airflow.providers.airbyte.get_provider_info import get_provider_infofrom datetime import datetime
from airflow import DAG
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
# Define DAG
dag = DAG(
'airbyte_sync_example',
start_date=datetime(2024, 1, 1),
catchup=False,
schedule_interval='@daily'
)
# Trigger a sync job
trigger_sync = AirbyteTriggerSyncOperator(
task_id='trigger_airbyte_sync',
connection_id='your-airbyte-connection-id',
airbyte_conn_id='airbyte_default',
dag=dag
)
# Monitor job completion (if using asynchronous mode)
wait_for_completion = AirbyteJobSensor(
task_id='wait_for_sync_completion',
airbyte_job_id="{{ task_instance.xcom_pull(task_ids='trigger_airbyte_sync') }}",
airbyte_conn_id='airbyte_default',
dag=dag
)
trigger_sync >> wait_for_completionThe provider follows Airflow's plugin architecture with four main components:
Connection management is handled through Airflow's connection system with support for client credentials authentication.
Provides direct access to Airbyte API for job management, status checking, and connection testing.
class AirbyteHook(BaseHook):
def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: str = "v1") -> None: ...
def get_job_details(self, job_id: int) -> Any: ...
def get_job_status(self, job_id: int) -> str: ...
def submit_sync_connection(self, connection_id: str) -> Any: ...
def cancel_job(self, job_id: int) -> Any: ...
def test_connection(self) -> tuple[bool, str]: ...Operator for triggering Airbyte sync jobs with support for both synchronous and asynchronous execution modes.
class AirbyteTriggerSyncOperator(BaseOperator):
def __init__(
self,
connection_id: str,
airbyte_conn_id: str = "airbyte_default",
asynchronous: bool = False,
deferrable: bool = False,
api_version: str = "v1",
wait_seconds: float = 3,
timeout: float = 3600,
**kwargs
) -> None: ...
def execute(self, context: Context) -> int: ...Sensor for monitoring Airbyte job status with support for both polling and deferrable modes.
class AirbyteJobSensor(BaseSensorOperator):
def __init__(
self,
*,
airbyte_job_id: int,
deferrable: bool = False,
airbyte_conn_id: str = "airbyte_default",
**kwargs
) -> None: ...Trigger for asynchronous job monitoring in deferrable mode.
class AirbyteSyncTrigger(BaseTrigger):
def __init__(
self,
job_id: int,
conn_id: str,
end_time: float,
poll_interval: float,
) -> None: ...Function for retrieving provider metadata and configuration details.
def get_provider_info() -> dict[str, Any]:
"""
Get provider metadata including integrations, operators, hooks, sensors, and triggers.
Returns:
Dictionary containing provider configuration and component information
"""# Job status enumeration from airbyte-api
JobStatusEnum = Literal[
"RUNNING",
"PENDING",
"INCOMPLETE",
"SUCCEEDED",
"FAILED",
"CANCELLED"
]
# Airflow context type
Context = dict[str, Any]The provider uses Airflow connections with connection type "airbyte":
All methods raise AirflowException for API errors, timeouts, and job failures. The provider includes proper error handling for: