CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-airbyte

Apache Airflow provider for Airbyte data synchronization platform integration

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

async-triggers.mddocs/

Async Triggers

The AirbyteSyncTrigger provides asynchronous monitoring capabilities for Airbyte jobs in deferrable execution mode. It enables efficient resource utilization by releasing worker slots while monitoring long-running sync operations.

Capabilities

Trigger Initialization

Creates an async trigger for monitoring Airbyte job completion.

class AirbyteSyncTrigger(BaseTrigger):
    def __init__(
        self,
        job_id: int,
        conn_id: str,
        end_time: float,
        poll_interval: float,
    ) -> None:
        """
        Initialize Airbyte sync trigger.
        
        Args:
            job_id: Airbyte job ID to monitor
            conn_id: Airflow connection ID for Airbyte server
            end_time: Unix timestamp when monitoring should timeout
            poll_interval: Seconds between status checks
        """

Serialization

Methods for trigger persistence and restoration.

def serialize(self) -> tuple[str, dict[str, Any]]:
    """
    Serialize trigger state for persistence.
    
    Returns:
        Tuple of (class_path, serialized_arguments)
    """

Async Monitoring

Core asynchronous monitoring functionality.

async def run(self) -> AsyncIterator[TriggerEvent]:
    """
    Execute async monitoring loop until job completion or timeout.
    
    Yields:
        TriggerEvent with job completion status and details
        
    Events:
        - {"status": "success", "message": "...", "job_id": int}
        - {"status": "error", "message": "...", "job_id": int} 
        - {"status": "cancelled", "message": "...", "job_id": int}
    """

async def is_still_running(self, hook: AirbyteHook) -> bool:
    """
    Check if job is still in a running state.
    
    Args:
        hook: AirbyteHook instance for API communication
        
    Returns:
        True if job is RUNNING, PENDING, or INCOMPLETE
    """

Usage Examples

Direct Trigger Usage

import asyncio
from airflow.providers.airbyte.triggers.airbyte import AirbyteSyncTrigger

async def monitor_job():
    """Example of direct trigger usage."""
    trigger = AirbyteSyncTrigger(
        job_id=12345,
        conn_id='airbyte_default',
        end_time=time.time() + 3600,  # 1 hour from now
        poll_interval=60  # Check every minute
    )
    
    async for event in trigger.run():
        print(f"Job event: {event}")
        break  # Exit after first event

# Run the monitoring
asyncio.run(monitor_job())

Integration with Deferrable Operator

from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator

# Operator automatically uses trigger when deferrable=True
deferrable_sync = AirbyteTriggerSyncOperator(
    task_id='deferrable_sync',
    connection_id='connection-uuid-123',
    deferrable=True,  # Automatically creates and uses AirbyteSyncTrigger
    timeout=7200,     # 2 hours - converted to end_time
    dag=dag
)

Integration with Deferrable Sensor

from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor

# Sensor automatically uses trigger when deferrable=True
deferrable_sensor = AirbyteJobSensor(
    task_id='deferrable_monitor',
    airbyte_job_id=67890,
    deferrable=True,  # Automatically creates and uses AirbyteSyncTrigger
    timeout=3600,     # 1 hour - converted to end_time
    dag=dag
)

Custom Trigger Implementation

import time
from datetime import timedelta
from airflow.providers.airbyte.triggers.airbyte import AirbyteSyncTrigger
from airflow.providers.airbyte.version_compat import BaseOperator

class CustomAirbyteOperator(BaseOperator):
    """Custom operator with trigger usage."""
    
    def execute(self, context):
        """Execute with custom trigger configuration."""
        # Submit job logic here
        job_id = self.submit_job()
        
        if self.deferrable:
            # Custom trigger configuration
            self.defer(
                timeout=self.execution_timeout,
                trigger=AirbyteSyncTrigger(
                    job_id=job_id,
                    conn_id=self.airbyte_conn_id,
                    end_time=time.time() + 7200,  # Custom 2-hour timeout
                    poll_interval=30,  # Custom 30-second interval
                ),
                method_name="execute_complete",
            )
    
    def execute_complete(self, context, event=None):
        """Handle trigger completion."""
        if event["status"] == "success":
            self.log.info(f"Job {event['job_id']} completed successfully")
        else:
            raise AirflowException(f"Job failed: {event['message']}")

Event Types

The trigger yields different event types based on job outcomes:

Success Event

{
    "status": "success",
    "message": "Job run 12345 has completed successfully.",
    "job_id": 12345
}

Error Event

{
    "status": "error", 
    "message": "Job run 12345 has failed.",
    "job_id": 12345
}

Cancellation Event

{
    "status": "cancelled",
    "message": "Job run 12345 has been cancelled.", 
    "job_id": 12345
}

Timeout Event

{
    "status": "error",
    "message": "Job run 12345 has not reached a terminal status after 3600 seconds.",
    "job_id": 12345
}

Exception Event

{
    "status": "error",
    "message": "Connection timeout: Unable to reach Airbyte server",
    "job_id": 12345
}

Trigger Lifecycle

Initialization Phase

  1. Trigger receives job_id, connection info, and timing parameters
  2. Trigger serializes state for persistence
  3. Airflow schedules trigger for async execution

Monitoring Phase

  1. Trigger creates AirbyteHook for API communication
  2. Enters polling loop with specified interval
  3. Checks job status on each iteration
  4. Continues until terminal state or timeout

Completion Phase

  1. Trigger yields appropriate TriggerEvent
  2. Associated task receives event via execute_complete()
  3. Task completes or raises exception based on event status

Configuration

Timing Parameters

AirbyteSyncTrigger(
    job_id=12345,
    conn_id='airbyte_default',
    
    # Timeout configuration
    end_time=time.time() + 7200,  # 2 hours from now
    
    # Polling configuration  
    poll_interval=60,  # Check every 60 seconds
)

Connection Configuration

The trigger uses the same connection configuration as other Airbyte components:

# Connection parameters extracted from Airflow connection
{
    "host": "https://api.airbyte.com",
    "client_id": "oauth_client_id", 
    "client_secret": "oauth_client_secret",
    "token_url": "v1/applications/token",
    "proxies": {...}  # Optional proxy settings
}

Error Handling

The trigger handles various error scenarios:

Network Errors

  • Connection timeouts to Airbyte server
  • DNS resolution failures
  • Network connectivity issues

Authentication Errors

  • Invalid client credentials
  • Expired tokens
  • Authorization failures

API Errors

  • Invalid job IDs
  • Server internal errors
  • Rate limiting responses

Job State Errors

  • Unexpected job state transitions
  • Job not found scenarios
  • Malformed API responses

All errors are captured and yielded as error events with descriptive messages for debugging.

Best Practices

Timeout Configuration

  • Set reasonable timeouts based on expected job duration
  • Consider data volume and complexity when setting timeouts
  • Use longer timeouts for initial syncs, shorter for incremental

Polling Intervals

  • Balance between responsiveness and API load
  • Use longer intervals (60+ seconds) for long-running jobs
  • Use shorter intervals (10-30 seconds) for quick jobs

Resource Management

  • Prefer deferrable triggers over polling for long jobs
  • Monitor trigger resource usage in large deployments
  • Consider connection pooling for high-frequency monitoring

Error Recovery

  • Implement appropriate retry logic in calling operators/sensors
  • Log sufficient detail for troubleshooting failed jobs
  • Set up alerting for persistent trigger failures

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-airbyte

docs

async-triggers.md

hook-api.md

index.md

job-monitoring.md

sync-operations.md

tile.json