Apache Airflow provider for Airbyte data synchronization platform integration
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
The AirbyteTriggerSyncOperator provides Airflow task functionality for triggering Airbyte data synchronization jobs. It supports both synchronous and asynchronous execution modes, with optional deferrable execution for long-running jobs.
from airflow.configuration import confCreates a sync operator task with comprehensive configuration options.
class AirbyteTriggerSyncOperator(BaseOperator):
def __init__(
self,
connection_id: str,
airbyte_conn_id: str = "airbyte_default",
asynchronous: bool = False,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
api_version: str = "v1",
wait_seconds: float = 3,
timeout: float = 3600,
**kwargs
) -> None:
"""
Initialize Airbyte sync operator.
Args:
connection_id: Required. Airbyte connection UUID to sync
airbyte_conn_id: Airflow connection ID for Airbyte server
asynchronous: Return job_id immediately without waiting for completion
deferrable: Use deferrable execution mode for long-running jobs
api_version: Airbyte API version to use
wait_seconds: Polling interval for synchronous mode
timeout: Maximum wait time for job completion
**kwargs: Additional BaseOperator arguments
"""Template fields and UI configuration.
template_fields: Sequence[str] = ("connection_id",)
ui_color: str = "#6C51FD"Core execution functionality for different modes.
def execute(self, context: Context) -> int:
"""
Execute the sync operation.
Args:
context: Airflow task execution context
Returns:
Job ID (int) for the submitted Airbyte job
Raises:
AirflowException: If job submission fails or job completes with error
"""
def execute_complete(self, context: Context, event: Any = None) -> None:
"""
Callback method for deferrable mode completion.
Args:
context: Airflow task execution context
event: Trigger event data
Raises:
AirflowException: If job completed with error status
"""
def on_kill(self) -> None:
"""
Cancel the Airbyte job when task is killed.
Called automatically by Airflow when task is cancelled or times out.
"""from datetime import datetime
from airflow import DAG
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
dag = DAG(
'sync_example',
start_date=datetime(2024, 1, 1),
schedule_interval='@daily'
)
# Synchronous sync - waits for completion
sync_task = AirbyteTriggerSyncOperator(
task_id='sync_customer_data',
connection_id='{{ var.value.customer_connection_id }}',
airbyte_conn_id='airbyte_default',
timeout=1800, # 30 minutes
wait_seconds=10, # Check every 10 seconds
dag=dag
)from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
# Async sync - returns job_id immediately
trigger_sync = AirbyteTriggerSyncOperator(
task_id='trigger_sync',
connection_id='connection-uuid-123',
asynchronous=True, # Return job_id without waiting
dag=dag
)
# Monitor completion with sensor
monitor_sync = AirbyteJobSensor(
task_id='monitor_sync',
airbyte_job_id="{{ task_instance.xcom_pull(task_ids='trigger_sync') }}",
timeout=3600,
dag=dag
)
trigger_sync >> monitor_sync# Deferrable mode - uses async triggers
deferrable_sync = AirbyteTriggerSyncOperator(
task_id='deferrable_sync',
connection_id='connection-uuid-123',
deferrable=True, # Use async trigger mechanism
timeout=7200, # 2 hours
dag=dag
)# Using templated connection_id
dynamic_sync = AirbyteTriggerSyncOperator(
task_id='dynamic_sync',
connection_id='{{ dag_run.conf["connection_id"] }}', # From DAG run config
airbyte_conn_id='{{ var.value.airbyte_conn }}', # From Airflow variable
dag=dag
)from datetime import timedelta
from airflow.operators.bash import BashOperator
from airflow.utils.trigger_rule import TriggerRule
# Sync with custom retry configuration
robust_sync = AirbyteTriggerSyncOperator(
task_id='robust_sync',
connection_id='connection-uuid-123',
timeout=3600,
retries=3,
retry_delay=timedelta(minutes=5),
dag=dag
)
# Cleanup task that runs even if sync fails
cleanup_task = BashOperator(
task_id='cleanup',
bash_command='echo "Cleaning up after sync"',
trigger_rule=TriggerRule.ALL_DONE, # Run regardless of upstream status
dag=dag
)
robust_sync >> cleanup_taskThe operator requires an Airflow connection of type "airbyte" with:
{
"conn_type": "airbyte",
"host": "https://api.airbyte.com", # Airbyte server URL
"login": "client_id", # OAuth client ID
"password": "client_secret", # OAuth client secret
"schema": "v1/applications/token", # Token endpoint
"extra": {
"proxies": { # Optional proxy configuration
"http": "http://proxy:8080",
"https": "https://proxy:8080"
}
}
}The connection_id field supports Jinja templating for dynamic values:
# From DAG run configuration
connection_id='{{ dag_run.conf["connection_id"] }}'
# From Airflow variables
connection_id='{{ var.value.my_connection_id }}'
# From task context
connection_id='{{ ds }}_connection' # Date-based connectionThe operator handles various error scenarios:
All errors are logged with appropriate detail levels for debugging and monitoring.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-airbyte