Apache Airflow provider for Airbyte data synchronization platform integration
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
The AirbyteJobSensor provides Airflow sensor functionality for monitoring the status of Airbyte jobs. It supports both traditional polling and deferrable execution modes, making it suitable for monitoring long-running sync operations.
from airflow.configuration import confCreates a sensor to monitor specific Airbyte job completion.
class AirbyteJobSensor(BaseSensorOperator):
def __init__(
self,
*,
airbyte_job_id: int,
deferrable: bool = conf.getboolean("operators", "default_deferrable", fallback=False),
airbyte_conn_id: str = "airbyte_default",
api_version: str = "v1",
**kwargs
) -> None:
"""
Initialize Airbyte job sensor.
Args:
airbyte_job_id: Required. Airbyte job ID to monitor
deferrable: Use deferrable execution mode
airbyte_conn_id: Airflow connection ID for Airbyte server
api_version: Airbyte API version to use
**kwargs: Additional BaseSensorOperator arguments (poke_interval, timeout, etc.)
"""Template fields and UI configuration.
template_fields: Sequence[str] = ("airbyte_job_id",)
ui_color: str = "#6C51FD"Core sensor functionality for job status checking.
def poke(self, context: Context) -> bool:
"""
Check job status and determine if sensor condition is satisfied.
Args:
context: Airflow task execution context
Returns:
True if job completed successfully, False if still running
Raises:
AirflowException: If job failed or was cancelled
"""
def execute(self, context: Context) -> Any:
"""
Execute sensor logic with support for both polling and deferrable modes.
Args:
context: Airflow task execution context
Returns:
None when job completes successfully
Raises:
AirflowException: If job fails, is cancelled, or times out
"""
def execute_complete(self, context: Context, event: Any = None) -> None:
"""
Callback method for deferrable mode completion.
Args:
context: Airflow task execution context
event: Trigger event data
Raises:
AirflowException: If job completed with error status
"""from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
dag = DAG(
'monitor_example',
start_date=datetime(2024, 1, 1),
schedule_interval=None
)
# Monitor specific job ID
monitor_job = AirbyteJobSensor(
task_id='wait_for_sync',
airbyte_job_id=12345,
airbyte_conn_id='airbyte_default',
poke_interval=30, # Check every 30 seconds
timeout=3600, # 1 hour timeout
dag=dag
)from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
# Trigger async job
trigger_sync = AirbyteTriggerSyncOperator(
task_id='trigger_sync',
connection_id='connection-uuid-123',
asynchronous=True, # Returns job_id
dag=dag
)
# Monitor the triggered job
monitor_sync = AirbyteJobSensor(
task_id='monitor_sync',
airbyte_job_id="{{ task_instance.xcom_pull(task_ids='trigger_sync') }}",
poke_interval=60, # Check every minute
timeout=7200, # 2 hour timeout
dag=dag
)
trigger_sync >> monitor_sync# Deferrable sensor - releases worker slot while waiting
deferrable_monitor = AirbyteJobSensor(
task_id='deferrable_monitor',
airbyte_job_id=67890,
deferrable=True, # Use async trigger
timeout=24*3600, # 24 hour timeout
dag=dag
)# Monitor job ID from DAG configuration
dynamic_monitor = AirbyteJobSensor(
task_id='dynamic_monitor',
airbyte_job_id="{{ dag_run.conf['job_id'] }}",
poke_interval=45,
dag=dag
)
# Monitor job ID from Airflow variable
variable_monitor = AirbyteJobSensor(
task_id='variable_monitor',
airbyte_job_id="{{ var.value.current_job_id }}",
timeout=1800,
dag=dag
)from airflow.utils.task_group import TaskGroup
# Monitor multiple jobs in parallel
with TaskGroup('monitor_jobs', dag=dag) as job_group:
for i, job_id in enumerate([111, 222, 333]):
AirbyteJobSensor(
task_id=f'monitor_job_{i}',
airbyte_job_id=job_id,
poke_interval=30,
timeout=3600,
)from airflow.operators.python import BranchPythonOperator
from airflow.operators.dummy import DummyOperator
def check_job_status(**context):
"""Decide whether to monitor or skip based on conditions."""
# Custom logic to determine if monitoring is needed
if context['dag_run'].conf.get('monitor_job', True):
return 'monitor_job'
else:
return 'skip_monitoring'
branch_task = BranchPythonOperator(
task_id='check_monitoring_needed',
python_callable=check_job_status,
dag=dag
)
monitor_job = AirbyteJobSensor(
task_id='monitor_job',
airbyte_job_id="{{ dag_run.conf['job_id'] }}",
poke_interval=60,
dag=dag
)
skip_task = DummyOperator(
task_id='skip_monitoring',
dag=dag
)
branch_task >> [monitor_job, skip_task]AirbyteJobSensor(
# Required parameters
airbyte_job_id=12345,
# Connection configuration
airbyte_conn_id='my_airbyte_conn',
api_version='v1',
# Execution mode
deferrable=True, # Use async triggers
# Timing configuration (inherited from BaseSensorOperator)
poke_interval=30, # Seconds between status checks
timeout=3600, # Maximum wait time
exponential_backoff=True, # Increase intervals on failures
max_retry_delay=60, # Maximum backoff interval
# Retry configuration
retries=3,
retry_delay=timedelta(minutes=5),
retry_exponential_backoff=True,
)When deferrable=True, the sensor automatically configures:
# Default deferrable settings (applied automatically)
poke_interval = 5 # Quick initial check
timeout = 60*60*24*7 # 7 days default timeoutThe airbyte_job_id field supports Jinja templating:
# From XCom (previous task output)
airbyte_job_id="{{ task_instance.xcom_pull(task_ids='trigger_task') }}"
# From DAG run configuration
airbyte_job_id="{{ dag_run.conf['job_id'] }}"
# From Airflow variables
airbyte_job_id="{{ var.value.job_to_monitor }}"
# From task instance context
airbyte_job_id="{{ ti.xcom_pull(key='job_id') }}"The sensor handles all Airbyte job statuses:
Any unexpected job status raises AirflowException with detailed information.
The sensor provides comprehensive error handling:
All errors include detailed logging for troubleshooting and monitoring purposes.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-airbyte