Apache Airflow provider for Airbyte data synchronization platform integration
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
The AirbyteSyncTrigger provides asynchronous monitoring capabilities for Airbyte jobs in deferrable execution mode. It enables efficient resource utilization by releasing worker slots while monitoring long-running sync operations.
Creates an async trigger for monitoring Airbyte job completion.
class AirbyteSyncTrigger(BaseTrigger):
def __init__(
self,
job_id: int,
conn_id: str,
end_time: float,
poll_interval: float,
) -> None:
"""
Initialize Airbyte sync trigger.
Args:
job_id: Airbyte job ID to monitor
conn_id: Airflow connection ID for Airbyte server
end_time: Unix timestamp when monitoring should timeout
poll_interval: Seconds between status checks
"""Methods for trigger persistence and restoration.
def serialize(self) -> tuple[str, dict[str, Any]]:
"""
Serialize trigger state for persistence.
Returns:
Tuple of (class_path, serialized_arguments)
"""Core asynchronous monitoring functionality.
async def run(self) -> AsyncIterator[TriggerEvent]:
"""
Execute async monitoring loop until job completion or timeout.
Yields:
TriggerEvent with job completion status and details
Events:
- {"status": "success", "message": "...", "job_id": int}
- {"status": "error", "message": "...", "job_id": int}
- {"status": "cancelled", "message": "...", "job_id": int}
"""
async def is_still_running(self, hook: AirbyteHook) -> bool:
"""
Check if job is still in a running state.
Args:
hook: AirbyteHook instance for API communication
Returns:
True if job is RUNNING, PENDING, or INCOMPLETE
"""import asyncio
from airflow.providers.airbyte.triggers.airbyte import AirbyteSyncTrigger
async def monitor_job():
"""Example of direct trigger usage."""
trigger = AirbyteSyncTrigger(
job_id=12345,
conn_id='airbyte_default',
end_time=time.time() + 3600, # 1 hour from now
poll_interval=60 # Check every minute
)
async for event in trigger.run():
print(f"Job event: {event}")
break # Exit after first event
# Run the monitoring
asyncio.run(monitor_job())from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
# Operator automatically uses trigger when deferrable=True
deferrable_sync = AirbyteTriggerSyncOperator(
task_id='deferrable_sync',
connection_id='connection-uuid-123',
deferrable=True, # Automatically creates and uses AirbyteSyncTrigger
timeout=7200, # 2 hours - converted to end_time
dag=dag
)from airflow.providers.airbyte.sensors.airbyte import AirbyteJobSensor
# Sensor automatically uses trigger when deferrable=True
deferrable_sensor = AirbyteJobSensor(
task_id='deferrable_monitor',
airbyte_job_id=67890,
deferrable=True, # Automatically creates and uses AirbyteSyncTrigger
timeout=3600, # 1 hour - converted to end_time
dag=dag
)import time
from datetime import timedelta
from airflow.providers.airbyte.triggers.airbyte import AirbyteSyncTrigger
from airflow.providers.airbyte.version_compat import BaseOperator
class CustomAirbyteOperator(BaseOperator):
"""Custom operator with trigger usage."""
def execute(self, context):
"""Execute with custom trigger configuration."""
# Submit job logic here
job_id = self.submit_job()
if self.deferrable:
# Custom trigger configuration
self.defer(
timeout=self.execution_timeout,
trigger=AirbyteSyncTrigger(
job_id=job_id,
conn_id=self.airbyte_conn_id,
end_time=time.time() + 7200, # Custom 2-hour timeout
poll_interval=30, # Custom 30-second interval
),
method_name="execute_complete",
)
def execute_complete(self, context, event=None):
"""Handle trigger completion."""
if event["status"] == "success":
self.log.info(f"Job {event['job_id']} completed successfully")
else:
raise AirflowException(f"Job failed: {event['message']}")The trigger yields different event types based on job outcomes:
{
"status": "success",
"message": "Job run 12345 has completed successfully.",
"job_id": 12345
}{
"status": "error",
"message": "Job run 12345 has failed.",
"job_id": 12345
}{
"status": "cancelled",
"message": "Job run 12345 has been cancelled.",
"job_id": 12345
}{
"status": "error",
"message": "Job run 12345 has not reached a terminal status after 3600 seconds.",
"job_id": 12345
}{
"status": "error",
"message": "Connection timeout: Unable to reach Airbyte server",
"job_id": 12345
}AirbyteSyncTrigger(
job_id=12345,
conn_id='airbyte_default',
# Timeout configuration
end_time=time.time() + 7200, # 2 hours from now
# Polling configuration
poll_interval=60, # Check every 60 seconds
)The trigger uses the same connection configuration as other Airbyte components:
# Connection parameters extracted from Airflow connection
{
"host": "https://api.airbyte.com",
"client_id": "oauth_client_id",
"client_secret": "oauth_client_secret",
"token_url": "v1/applications/token",
"proxies": {...} # Optional proxy settings
}The trigger handles various error scenarios:
All errors are captured and yielded as error events with descriptive messages for debugging.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-airbyte