CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-airbyte

Apache Airflow provider for Airbyte data synchronization platform integration

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

hook-api.mddocs/

Hook API

The AirbyteHook provides direct access to the Airbyte API for managing connections, jobs, and server communication. It handles authentication, API session management, and provides methods for all core Airbyte operations.

Capabilities

Hook Initialization

Creates a new hook instance with connection and API configuration.

class AirbyteHook(BaseHook):
    def __init__(
        self,
        airbyte_conn_id: str = "airbyte_default",
        api_version: str = "v1"
    ) -> None:
        """
        Initialize AirbyteHook.
        
        Args:
            airbyte_conn_id: Airflow connection ID for Airbyte server
            api_version: Airbyte API version to use
        """

Connection Management

Methods for managing connection parameters and API session creation.

def get_conn_params(self, conn_id: str) -> Any:
    """
    Extract connection parameters from Airflow connection.
    
    Args:
        conn_id: Airflow connection identifier
        
    Returns:
        Dictionary containing connection parameters
    """

def create_api_session(self) -> AirbyteAPI:
    """
    Create authenticated Airbyte API session.
    
    Returns:
        Configured AirbyteAPI client instance
    """

def test_connection(self) -> tuple[bool, str]:
    """
    Test connection to Airbyte server.
    
    Returns:
        Tuple of (success_status, message)
    """

Job Management

Core methods for managing Airbyte sync jobs.

def submit_sync_connection(self, connection_id: str) -> Any:
    """
    Submit a new sync job for the specified connection.
    
    Args:
        connection_id: Airbyte connection UUID
        
    Returns:
        Job response object with job_id and status
        
    Raises:
        AirflowException: If job submission fails
    """

def get_job_details(self, job_id: int) -> Any:
    """
    Retrieve detailed information about a specific job.
    
    Args:
        job_id: Airbyte job identifier
        
    Returns:
        Job response object with complete job metadata
        
    Raises:
        AirflowException: If job retrieval fails
    """

def get_job_status(self, job_id: int) -> str:
    """
    Get the current status of a job.
    
    Args:
        job_id: Airbyte job identifier
        
    Returns:
        Job status string (RUNNING, PENDING, SUCCEEDED, FAILED, etc.)
    """

def cancel_job(self, job_id: int) -> Any:
    """
    Cancel a running or pending job.
    
    Args:
        job_id: Airbyte job identifier
        
    Returns:
        Job response object with cancellation details
        
    Raises:
        AirflowException: If job cancellation fails
    """

Job Monitoring

Advanced job monitoring with timeout and polling capabilities.

def wait_for_job(
    self,
    job_id: str | int,
    wait_seconds: float = 3,
    timeout: float | None = 3600
) -> None:
    """
    Poll job until completion or timeout.
    
    Args:
        job_id: Airbyte job identifier
        wait_seconds: Polling interval in seconds
        timeout: Maximum wait time in seconds (None for no timeout)
        
    Raises:
        AirflowException: If job fails, is cancelled, times out, or encounters unexpected state
    """

UI Integration

Class method for Airflow UI field configuration.

@classmethod
def get_ui_field_behaviour(cls) -> dict[str, Any]:
    """
    Define UI field behavior for connection forms.
    
    Returns:
        Dictionary specifying hidden fields, labels, and placeholders
    """

Usage Examples

Basic Hook Usage

from airflow.providers.airbyte.hooks.airbyte import AirbyteHook

# Initialize hook
hook = AirbyteHook(airbyte_conn_id='my_airbyte_conn')

# Test connection
success, message = hook.test_connection()
if success:
    print("Connection successful")
    
    # Submit sync job
    job_response = hook.submit_sync_connection('connection-uuid-123')
    job_id = job_response.job_id
    
    # Monitor job
    hook.wait_for_job(job_id, wait_seconds=5, timeout=1800)
    print("Job completed successfully")

Manual Job Monitoring

import time
from airflow.providers.airbyte.hooks.airbyte import AirbyteHook

hook = AirbyteHook()

# Submit job
job_response = hook.submit_sync_connection('connection-uuid-123')
job_id = job_response.job_id

# Manual polling loop
while True:
    status = hook.get_job_status(job_id)
    print(f"Job status: {status}")
    
    if status in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
        break
        
    time.sleep(10)

# Get final job details
job_details = hook.get_job_details(job_id)
print(f"Final job state: {job_details}")

Error Handling

The hook raises AirflowException for various error conditions:

  • Connection errors: Invalid credentials, network issues, server unavailable
  • Job submission errors: Invalid connection ID, insufficient permissions
  • Timeout errors: Job exceeds specified timeout duration
  • Job state errors: Job fails or is cancelled unexpectedly
  • API errors: Malformed requests, server errors, rate limiting

All methods include appropriate error handling and logging for debugging purposes.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-airbyte

docs

async-triggers.md

hook-api.md

index.md

job-monitoring.md

sync-operations.md

tile.json