CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-airbyte

Apache Airflow provider for Airbyte data synchronization platform integration

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

job-monitoring.mddocs/

Job Monitoring

The AirbyteJobSensor provides Airflow sensor functionality for monitoring the status of Airbyte jobs. It supports both traditional polling and deferrable execution modes, making it suitable for monitoring long-running sync operations.

from airflow.configuration import conf

Capabilities

Sensor Initialization

Creates a sensor to monitor specific Airbyte job completion.

class AirbyteJobSensor(BaseSensorOperator):
    def __init__(
        self,
        *,
        airbyte_job_id: int,
        deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
        airbyte_conn_id: str = "airbyte_default",
        api_version: str = "v1",
        **kwargs
    ) -> None:
        """
        Initialize Airbyte job sensor.
        
        Args:
            airbyte_job_id: Required. Airbyte job ID to monitor
            deferrable: Use deferrable execution mode
            airbyte_conn_id: Airflow connection ID for Airbyte server  
            api_version: Airbyte API version to use
            **kwargs: Additional BaseSensorOperator arguments (poke_interval, timeout, etc.)
        """

Class Attributes

Template fields and UI configuration.

template_fields: Sequence[str] = ("airbyte_job_id",)
ui_color: str = "#6C51FD"

Monitoring Methods

Core sensor functionality for job status checking.

def poke(self, context: Context) -> bool:
    """
    Check job status and determine if sensor condition is satisfied.
    
    Args:
        context: Airflow task execution context
        
    Returns:
        True if job completed successfully, False if still running
        
    Raises:
        AirflowException: If job failed or was cancelled
    """

def execute(self, context: Context) -> Any:
    """
    Execute sensor logic with support for both polling and deferrable modes.
    
    Args:
        context: Airflow task execution context
        
    Returns:
        None when job completes successfully
        
    Raises:
        AirflowException: If job fails, is cancelled, or times out
    """

def execute_complete(self, context: Context, event: Any = None) -> None:
    """
    Callback method for deferrable mode completion.
    
    Args:
        context: Airflow task execution context
        event: Trigger event data
        
    Raises:
        AirflowException: If job completed with error status
    """

Usage Examples

Basic Job Monitoring

from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor

dag = DAG(
    'monitor_example',
    start_date=datetime(2024, 1, 1),
    schedule_interval=None
)

# Monitor specific job ID
monitor_job = AirbyteJobSensor(
    task_id='wait_for_sync',
    airbyte_job_id=12345,
    airbyte_conn_id='airbyte_default',
    poke_interval=30,  # Check every 30 seconds
    timeout=3600,      # 1 hour timeout
    dag=dag
)

Monitoring Async Jobs

from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor

# Trigger async job
trigger_sync = AirbyteTriggerSyncOperator(
    task_id='trigger_sync',
    connection_id='connection-uuid-123',
    asynchronous=True,  # Returns job_id
    dag=dag
)

# Monitor the triggered job
monitor_sync = AirbyteJobSensor(
    task_id='monitor_sync',
    airbyte_job_id="{{ task_instance.xcom_pull(task_ids='trigger_sync') }}",
    poke_interval=60,  # Check every minute
    timeout=7200,      # 2 hour timeout
    dag=dag
)

trigger_sync >> monitor_sync

Deferrable Monitoring

# Deferrable sensor - releases worker slot while waiting
deferrable_monitor = AirbyteJobSensor(
    task_id='deferrable_monitor',
    airbyte_job_id=67890,
    deferrable=True,   # Use async trigger
    timeout=24*3600,   # 24 hour timeout
    dag=dag
)

Dynamic Job ID Monitoring

# Monitor job ID from DAG configuration
dynamic_monitor = AirbyteJobSensor(
    task_id='dynamic_monitor', 
    airbyte_job_id="{{ dag_run.conf['job_id'] }}",
    poke_interval=45,
    dag=dag
)

# Monitor job ID from Airflow variable
variable_monitor = AirbyteJobSensor(
    task_id='variable_monitor',
    airbyte_job_id="{{ var.value.current_job_id }}",
    timeout=1800,
    dag=dag
)

Multiple Job Monitoring

from airflow.utils.task_group import TaskGroup

# Monitor multiple jobs in parallel
with TaskGroup('monitor_jobs', dag=dag) as job_group:
    for i, job_id in enumerate([111, 222, 333]):
        AirbyteJobSensor(
            task_id=f'monitor_job_{i}',
            airbyte_job_id=job_id,
            poke_interval=30,
            timeout=3600,
        )

Conditional Monitoring with Branching

from airflow.operators.python import BranchPythonOperator
from airflow.operators.dummy import DummyOperator

def check_job_status(**context):
    """Decide whether to monitor or skip based on conditions."""
    # Custom logic to determine if monitoring is needed
    if context['dag_run'].conf.get('monitor_job', True):
        return 'monitor_job'
    else:
        return 'skip_monitoring'

branch_task = BranchPythonOperator(
    task_id='check_monitoring_needed',
    python_callable=check_job_status,
    dag=dag
)

monitor_job = AirbyteJobSensor(
    task_id='monitor_job',
    airbyte_job_id="{{ dag_run.conf['job_id'] }}",
    poke_interval=60,
    dag=dag
)

skip_task = DummyOperator(
    task_id='skip_monitoring',
    dag=dag
)

branch_task >> [monitor_job, skip_task]

Configuration Options

Sensor-Specific Configuration

AirbyteJobSensor(
    # Required parameters
    airbyte_job_id=12345,
    
    # Connection configuration
    airbyte_conn_id='my_airbyte_conn',
    api_version='v1',
    
    # Execution mode
    deferrable=True,  # Use async triggers
    
    # Timing configuration (inherited from BaseSensorOperator)
    poke_interval=30,     # Seconds between status checks
    timeout=3600,         # Maximum wait time
    exponential_backoff=True,  # Increase intervals on failures
    max_retry_delay=60,   # Maximum backoff interval
    
    # Retry configuration
    retries=3,
    retry_delay=timedelta(minutes=5),
    retry_exponential_backoff=True,
)

Deferrable Mode Configuration

When deferrable=True, the sensor automatically configures:

# Default deferrable settings (applied automatically)
poke_interval = 5      # Quick initial check
timeout = 60*60*24*7   # 7 days default timeout

Template Fields

The airbyte_job_id field supports Jinja templating:

# From XCom (previous task output)
airbyte_job_id="{{ task_instance.xcom_pull(task_ids='trigger_task') }}"

# From DAG run configuration
airbyte_job_id="{{ dag_run.conf['job_id'] }}"

# From Airflow variables
airbyte_job_id="{{ var.value.job_to_monitor }}"

# From task instance context
airbyte_job_id="{{ ti.xcom_pull(key='job_id') }}"

Execution Modes

Polling Mode (Default)

  • deferrable=False
  • Continuously occupies worker slot
  • Suitable for short to medium duration jobs
  • Uses configurable poke_interval for status checks
  • Traditional Airflow sensor behavior

Deferrable Mode

  • deferrable=True
  • Releases worker slot while waiting
  • Uses async triggers for monitoring
  • Optimal for long-running jobs
  • Automatically resumes when job completes
  • Better resource utilization in large deployments

Job Status Handling

The sensor handles all Airbyte job statuses:

Success States

  • SUCCEEDED: Job completed successfully, sensor returns True

Waiting States

  • RUNNING: Job actively executing, sensor continues waiting
  • PENDING: Job queued for execution, sensor continues waiting
  • INCOMPLETE: Job partially completed, sensor continues waiting

Error States

  • FAILED: Job execution failed, raises AirflowException
  • CANCELLED: Job was cancelled, raises AirflowException

Unknown States

Any unexpected job status raises AirflowException with detailed information.

Error Handling

The sensor provides comprehensive error handling:

  • Connection errors: Network issues, authentication failures
  • Job not found: Invalid job_id or job expired
  • Timeout errors: Job exceeds specified timeout duration
  • API errors: Airbyte server errors, rate limiting
  • State transition errors: Unexpected job state changes

All errors include detailed logging for troubleshooting and monitoring purposes.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-airbyte

docs

async-triggers.md

hook-api.md

index.md

job-monitoring.md

sync-operations.md

tile.json