or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

async-triggers.mdhook-api.mdindex.mdjob-monitoring.mdsync-operations.md
tile.json

tessl/pypi-apache-airflow-providers-airbyte

Apache Airflow provider for Airbyte data synchronization platform integration

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-airbyte@5.2.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-airbyte@5.2.0

index.mddocs/

Apache Airflow Providers - Airbyte

Apache Airflow provider package for integrating with Airbyte, an open-source data integration platform. This provider enables Airflow DAGs to trigger, monitor, and manage Airbyte data synchronization jobs through hooks, operators, sensors, and triggers.

Package Information

  • Package Name: apache-airflow-providers-airbyte
  • Language: Python
  • Installation: pip install apache-airflow-providers-airbyte

Core Imports

from airflow.providers.airbyte.hooks.airbyte import AirbyteHook
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
from airflow.providers.airbyte.triggers.airbyte import AirbyteSyncTrigger
from airflow.providers.airbyte.get_provider_info import get_provider_info

Basic Usage

from datetime import datetime
from airflow import DAG
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor

# Define DAG
dag = DAG(
    'airbyte_sync_example',
    start_date=datetime(2024, 1, 1),
    catchup=False,
    schedule_interval='@daily'
)

# Trigger a sync job
trigger_sync = AirbyteTriggerSyncOperator(
    task_id='trigger_airbyte_sync',
    connection_id='your-airbyte-connection-id',
    airbyte_conn_id='airbyte_default',
    dag=dag
)

# Monitor job completion (if using asynchronous mode)
wait_for_completion = AirbyteJobSensor(
    task_id='wait_for_sync_completion', 
    airbyte_job_id="{{ task_instance.xcom_pull(task_ids='trigger_airbyte_sync') }}",
    airbyte_conn_id='airbyte_default',
    dag=dag
)

trigger_sync >> wait_for_completion

Architecture

The provider follows Airflow's plugin architecture with four main components:

  • Hook: Manages API communication with Airbyte server
  • Operator: Triggers sync jobs and manages execution
  • Sensor: Monitors job status for completion
  • Trigger: Provides async monitoring capabilities for deferrable execution

Connection management is handled through Airflow's connection system with support for client credentials authentication.

Capabilities

Hook API

Provides direct access to Airbyte API for job management, status checking, and connection testing.

class AirbyteHook(BaseHook):
    def __init__(self, airbyte_conn_id: str = "airbyte_default", api_version: str = "v1") -> None: ...
    def get_job_details(self, job_id: int) -> Any: ...
    def get_job_status(self, job_id: int) -> str: ...
    def submit_sync_connection(self, connection_id: str) -> Any: ...
    def cancel_job(self, job_id: int) -> Any: ...
    def test_connection(self) -> tuple[bool, str]: ...

Hook API

Sync Operations

Operator for triggering Airbyte sync jobs with support for both synchronous and asynchronous execution modes.

class AirbyteTriggerSyncOperator(BaseOperator):
    def __init__(
        self,
        connection_id: str,
        airbyte_conn_id: str = "airbyte_default",
        asynchronous: bool = False,
        deferrable: bool = False,
        api_version: str = "v1",
        wait_seconds: float = 3,
        timeout: float = 3600,
        **kwargs
    ) -> None: ...
    def execute(self, context: Context) -> int: ...

Sync Operations

Job Monitoring

Sensor for monitoring Airbyte job status with support for both polling and deferrable modes.

class AirbyteJobSensor(BaseSensorOperator):
    def __init__(
        self,
        *,
        airbyte_job_id: int,
        deferrable: bool = False,
        airbyte_conn_id: str = "airbyte_default",
        **kwargs
    ) -> None: ...

Job Monitoring

Async Triggers

Trigger for asynchronous job monitoring in deferrable mode.

class AirbyteSyncTrigger(BaseTrigger):
    def __init__(
        self,
        job_id: int,
        conn_id: str,
        end_time: float,
        poll_interval: float,
    ) -> None: ...

Async Triggers

Provider Information

Function for retrieving provider metadata and configuration details.

def get_provider_info() -> dict[str, Any]:
    """
    Get provider metadata including integrations, operators, hooks, sensors, and triggers.
    
    Returns:
        Dictionary containing provider configuration and component information
    """

Types

# Job status enumeration from airbyte-api
JobStatusEnum = Literal[
    "RUNNING",
    "PENDING", 
    "INCOMPLETE",
    "SUCCEEDED",
    "FAILED",
    "CANCELLED"
]

# Airflow context type
Context = dict[str, Any]

Connection Configuration

The provider uses Airflow connections with connection type "airbyte":

  • Host: Airbyte server URL
  • Login: Client ID for authentication
  • Password: Client Secret for authentication
  • Schema: Token URL (defaults to "v1/applications/token")
  • Extra: Additional options like proxies

Error Handling

All methods raise AirflowException for API errors, timeouts, and job failures. The provider includes proper error handling for:

  • Connection failures
  • Job timeout scenarios
  • API authentication errors
  • Invalid job states
  • Network connectivity issues