Apache Airflow provider for Airbyte data synchronization platform integration
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Apache Airflow provider package for integrating with Airbyte, an open-source data integration platform. This provider enables Airflow DAGs to trigger, monitor, and manage Airbyte data synchronization jobs through hooks, operators, sensors, and triggers.
pip install apache-airflow-providers-airbytefrom airflow.providers.airbyte.hooks.airbyte import AirbyteHook
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
from airflow.providers.airbyte.triggers.airbyte import AirbyteSyncTrigger
from airflow.providers.airbyte.get_provider_info import get_provider_infofrom datetime import datetime
from airflow import DAG
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
# Define DAG
dag = DAG(
'airbyte_sync_example',
start_date=datetime(2024, 1, 1),
catchup=False,
schedule_interval='@daily'
)
# Trigger a sync job
trigger_sync = AirbyteTriggerSyncOperator(
task_id='trigger_airbyte_sync',
connection_id='your-airbyte-connection-id',
airbyte_conn_id='airbyte_default',
dag=dag
)
# Monitor job completion (if using asynchronous mode)
wait_for_completion = AirbyteJobSensor(
task_id='wait_for_sync_completion',
airbyte_job_id="{{ task_instance.xcom_pull(task_ids='trigger_airbyte_sync') }}",
airbyte_conn_id='airbyte_default',
dag=dag
)
trigger_sync >> wait_for_completionThe provider follows Airflow's plugin architecture with four main components:
Connection management is handled through Airflow's connection system with support for client credentials authentication.
Provides direct access to Airbyte API for job management, status checking, and connection testing.
class AirbyteHook(BaseHook):
def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: str = "v1") -> None: ...
def get_job_details(self, job_id: int) -> Any: ...
def get_job_status(self, job_id: int) -> str: ...
def submit_sync_connection(self, connection_id: str) -> Any: ...
def cancel_job(self, job_id: int) -> Any: ...
def test_connection(self) -> tuple[bool, str]: ...Operator for triggering Airbyte sync jobs with support for both synchronous and asynchronous execution modes.
class AirbyteTriggerSyncOperator(BaseOperator):
def __init__(
self,
connection_id: str,
airbyte_conn_id: str = "airbyte_default",
asynchronous: bool = False,
deferrable: bool = False,
api_version: str = "v1",
wait_seconds: float = 3,
timeout: float = 3600,
**kwargs
) -> None: ...
def execute(self, context: Context) -> int: ...Sensor for monitoring Airbyte job status with support for both polling and deferrable modes.
class AirbyteJobSensor(BaseSensorOperator):
def __init__(
self,
*,
airbyte_job_id: int,
deferrable: bool = False,
airbyte_conn_id: str = "airbyte_default",
**kwargs
) -> None: ...Trigger for asynchronous job monitoring in deferrable mode.
class AirbyteSyncTrigger(BaseTrigger):
def __init__(
self,
job_id: int,
conn_id: str,
end_time: float,
poll_interval: float,
) -> None: ...Function for retrieving provider metadata and configuration details.
def get_provider_info() -> dict[str, Any]:
"""
Get provider metadata including integrations, operators, hooks, sensors, and triggers.
Returns:
Dictionary containing provider configuration and component information
"""# Job status enumeration from airbyte-api
JobStatusEnum = Literal[
"RUNNING",
"PENDING",
"INCOMPLETE",
"SUCCEEDED",
"FAILED",
"CANCELLED"
]
# Airflow context type
Context = dict[str, Any]The provider uses Airflow connections with connection type "airbyte":
All methods raise AirflowException for API errors, timeouts, and job failures. The provider includes proper error handling for: