CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-airbyte

Apache Airflow provider for Airbyte data synchronization platform integration

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

sync-operations.mddocs/

Sync Operations

The AirbyteTriggerSyncOperator provides Airflow task functionality for triggering Airbyte data synchronization jobs. It supports both synchronous and asynchronous execution modes, with optional deferrable execution for long-running jobs.

from airflow.configuration import conf

Capabilities

Operator Initialization

Creates a sync operator task with comprehensive configuration options.

class AirbyteTriggerSyncOperator(BaseOperator):
    def __init__(
        self,
        connection_id: str,
        airbyte_conn_id: str = "airbyte_default",
        asynchronous: bool = False,
        deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
        api_version: str = "v1",
        wait_seconds: float = 3,
        timeout: float = 3600,
        **kwargs
    ) -> None:
        """
        Initialize Airbyte sync operator.
        
        Args:
            connection_id: Required. Airbyte connection UUID to sync
            airbyte_conn_id: Airflow connection ID for Airbyte server
            asynchronous: Return job_id immediately without waiting for completion
            deferrable: Use deferrable execution mode for long-running jobs
            api_version: Airbyte API version to use
            wait_seconds: Polling interval for synchronous mode
            timeout: Maximum wait time for job completion
            **kwargs: Additional BaseOperator arguments
        """

Class Attributes

Template fields and UI configuration.

template_fields: Sequence[str] = ("connection_id",)
ui_color: str = "#6C51FD"

Execution Methods

Core execution functionality for different modes.

def execute(self, context: Context) -> int:
    """
    Execute the sync operation.
    
    Args:
        context: Airflow task execution context
        
    Returns:
        Job ID (int) for the submitted Airbyte job
        
    Raises:
        AirflowException: If job submission fails or job completes with error
    """

def execute_complete(self, context: Context, event: Any = None) -> None:
    """
    Callback method for deferrable mode completion.
    
    Args:
        context: Airflow task execution context
        event: Trigger event data
        
    Raises:
        AirflowException: If job completed with error status
    """

def on_kill(self) -> None:
    """
    Cancel the Airbyte job when task is killed.
    
    Called automatically by Airflow when task is cancelled or times out.
    """

Usage Examples

Synchronous Execution

from datetime import datetime
from airflow import DAG
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator

dag = DAG(
    'sync_example',
    start_date=datetime(2024, 1, 1),
    schedule_interval='@daily'
)

# Synchronous sync - waits for completion
sync_task = AirbyteTriggerSyncOperator(
    task_id='sync_customer_data',
    connection_id='{{ var.value.customer_connection_id }}',
    airbyte_conn_id='airbyte_default',
    timeout=1800,  # 30 minutes
    wait_seconds=10,  # Check every 10 seconds
    dag=dag
)

Asynchronous Execution

from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor

# Async sync - returns job_id immediately
trigger_sync = AirbyteTriggerSyncOperator(
    task_id='trigger_sync',
    connection_id='connection-uuid-123',
    asynchronous=True,  # Return job_id without waiting
    dag=dag
)

# Monitor completion with sensor
monitor_sync = AirbyteJobSensor(
    task_id='monitor_sync',
    airbyte_job_id="{{ task_instance.xcom_pull(task_ids='trigger_sync') }}",
    timeout=3600,
    dag=dag
)

trigger_sync >> monitor_sync

Deferrable Execution

# Deferrable mode - uses async triggers
deferrable_sync = AirbyteTriggerSyncOperator(
    task_id='deferrable_sync',
    connection_id='connection-uuid-123',
    deferrable=True,  # Use async trigger mechanism
    timeout=7200,  # 2 hours
    dag=dag
)

Dynamic Connection IDs

# Using templated connection_id
dynamic_sync = AirbyteTriggerSyncOperator(
    task_id='dynamic_sync',
    connection_id='{{ dag_run.conf["connection_id"] }}',  # From DAG run config
    airbyte_conn_id='{{ var.value.airbyte_conn }}',  # From Airflow variable
    dag=dag
)

Error Handling and Retries

from datetime import timedelta
from airflow.operators.bash import BashOperator
from airflow.utils.trigger_rule import TriggerRule

# Sync with custom retry configuration
robust_sync = AirbyteTriggerSyncOperator(
    task_id='robust_sync',
    connection_id='connection-uuid-123',
    timeout=3600,
    retries=3,
    retry_delay=timedelta(minutes=5),
    dag=dag
)

# Cleanup task that runs even if sync fails
cleanup_task = BashOperator(
    task_id='cleanup',
    bash_command='echo "Cleaning up after sync"',
    trigger_rule=TriggerRule.ALL_DONE,  # Run regardless of upstream status
    dag=dag
)

robust_sync >> cleanup_task

Execution Modes

Synchronous Mode (Default)

  • asynchronous=False, deferrable=False
  • Task blocks until job completion
  • Uses polling with configurable intervals
  • Suitable for short to medium duration jobs
  • Consumes worker slot during entire execution

Asynchronous Mode

  • asynchronous=True
  • Returns job_id immediately
  • Requires separate monitoring (sensor/operator)
  • Frees worker slot immediately
  • Best for fire-and-forget scenarios

Deferrable Mode

  • deferrable=True
  • Uses async triggers for monitoring
  • Frees worker slot during job execution
  • Automatically resumes when job completes
  • Optimal for long-running jobs in resource-constrained environments

Configuration

Connection Requirements

The operator requires an Airflow connection of type "airbyte" with:

{
    "conn_type": "airbyte",
    "host": "https://api.airbyte.com",  # Airbyte server URL
    "login": "client_id",              # OAuth client ID
    "password": "client_secret",       # OAuth client secret
    "schema": "v1/applications/token", # Token endpoint
    "extra": {
        "proxies": {                   # Optional proxy configuration
            "http": "http://proxy:8080",
            "https": "https://proxy:8080"
        }
    }
}

Template Fields

The connection_id field supports Jinja templating for dynamic values:

# From DAG run configuration
connection_id='{{ dag_run.conf["connection_id"] }}'

# From Airflow variables
connection_id='{{ var.value.my_connection_id }}'

# From task context
connection_id='{{ ds }}_connection'  # Date-based connection

Error Handling

The operator handles various error scenarios:

  • Job submission failures: Invalid connection_id, authentication errors
  • Job execution failures: Data sync errors, connection timeouts
  • Task cancellation: Automatically cancels Airbyte job via on_kill()
  • Timeout scenarios: Cancels job and raises AirflowException
  • Unexpected job states: Handles CANCELLED and unknown states appropriately

All errors are logged with appropriate detail levels for debugging and monitoring.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-airbyte

docs

async-triggers.md

hook-api.md

index.md

job-monitoring.md

sync-operations.md

tile.json