Programmatically author, schedule and monitor data pipelines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Asset-driven scheduling system for creating data-aware workflows, including asset definitions, timetables, and dependency management. Assets (formerly Datasets) enable data-driven orchestration where tasks trigger based on data availability rather than time.
Define data assets that can trigger downstream DAGs when updated.
class Asset:
def __init__(
self,
name: Optional[str] = None,
uri: Optional[Union[str, ObjectStoragePath]] = None,
*,
group: Optional[str] = None,
extra: Optional[Dict] = None,
watchers: Optional[List[Union[AssetWatcher, SerializedAssetWatcher]]] = None
):
"""
Create a data asset definition.
Args:
name: Human-readable name for the asset
uri: Unique resource identifier for the asset
group: Logical grouping for the asset
extra: Additional metadata
watchers: List of asset watchers for monitoring
"""
@property
def uri(self) -> str:
"""Asset URI."""
@property
def name(self) -> Optional[str]:
"""Asset name."""
@property
def group(self) -> Optional[str]:
"""Asset group."""Usage example:
from airflow.sdk.definitions.asset import Asset
from airflow.decorators import dag, task
# Define assets
customer_data = Asset("s3://bucket/customer_data/")
processed_data = Asset("s3://bucket/processed/customer_metrics/")
@dag(
dag_id='data_processing',
start_date=datetime(2024, 1, 1),
schedule=[customer_data], # Triggered when customer_data is updated
catchup=False
)
def data_processing():
@task(outlets=[processed_data]) # Marks processed_data as updated
def process_customer_data():
# Process the data
return "processed"
process_customer_data()
dag_instance = data_processing()Create aliases and collections for complex asset relationships.
class AssetAlias(BaseAsset):
def __init__(
self,
name: str,
*,
group: str = "asset"
):
"""
Create an asset alias.
Args:
name: Alias name
group: Asset group (default: "asset")
"""
@property
def name(self) -> str:
"""Alias name."""
class AssetAll:
def __init__(self, *assets: Union[Asset, AssetAlias]):
"""
Create an asset collection requiring ALL assets to be updated.
Args:
*assets: Assets that must all be updated
"""
class AssetAny:
def __init__(self, *assets: Union[Asset, AssetAlias]):
"""
Create an asset collection requiring ANY asset to be updated.
Args:
*assets: Assets where any one being updated triggers
"""Usage example:
from airflow import Asset
from airflow.sdk.definitions.asset import AssetAll, AssetAny, AssetAlias
# Define assets
sales_data = Asset("s3://bucket/sales/")
inventory_data = Asset("s3://bucket/inventory/")
customer_data = Asset("s3://bucket/customers/")
# Asset collections
all_core_data = AssetAll(sales_data, inventory_data, customer_data)
any_sales_data = AssetAny(sales_data, inventory_data)
# Asset alias
master_data = AssetAlias("master_data")
@dag(
dag_id='comprehensive_report',
schedule=[all_core_data], # Wait for all three datasets
start_date=datetime(2024, 1, 1)
)
def comprehensive_report():
@task
def generate_report():
return "comprehensive report generated"
generate_report()Monitor assets for changes and trigger based on conditions.
class AssetWatcher:
def __init__(
self,
name: str,
trigger: Union[BaseEventTrigger, Dict]
):
"""
Watch an asset for changes using event triggers.
Args:
name: Watcher name
trigger: Event trigger or trigger configuration
"""
@asset(
uri: str,
name: Optional[str] = None,
group: Optional[str] = None,
**kwargs
) -> Callable:
"""
Decorator to define an asset from a function.
Args:
uri: Asset URI
name: Asset name
group: Asset group
Returns:
Decorated function that defines asset behavior
"""Define custom scheduling patterns for DAGs.
class Timetable:
"""Base class for timetable implementations."""
def next_dagrun_info(
self,
last_automated_dagrun: Optional[datetime],
restriction: TimeRestriction
) -> Optional[DagRunInfo]:
"""
Get information for the next DAG run.
Args:
last_automated_dagrun: Last automated DAG run datetime
restriction: Time restriction configuration
Returns:
Information for next DAG run or None if no more runs
"""
class CronDataIntervalTimetable(Timetable):
def __init__(
self,
cron: str,
timezone: Union[str, timezone] = "UTC"
):
"""
Cron-based timetable with data intervals.
Args:
cron: Cron expression
timezone: Timezone for the schedule
"""
class CronTriggerTimetable(Timetable):
def __init__(
self,
cron: str,
timezone: Union[str, timezone] = "UTC"
):
"""
Cron-based trigger timetable.
Args:
cron: Cron expression
timezone: Timezone for the schedule
"""
class DeltaDataIntervalTimetable(Timetable):
def __init__(self, delta: timedelta):
"""
Interval-based timetable with data intervals.
Args:
delta: Time interval between runs
"""
class AssetTriggeredTimetable(Timetable):
def __init__(self, assets: List[Asset]):
"""
Asset-driven timetable.
Args:
assets: Assets that trigger the DAG
"""Usage example:
from airflow.timetables import CronDataIntervalTimetable, DeltaDataIntervalTimetable
from datetime import timedelta
# Custom timetables
every_15_minutes = DeltaDataIntervalTimetable(timedelta(minutes=15))
business_hours = CronDataIntervalTimetable("0 9-17 * * 1-5") # Weekdays 9 AM to 5 PM
@dag(
dag_id='custom_schedule',
timetable=every_15_minutes,
start_date=datetime(2024, 1, 1)
)
def custom_schedule():
@task
def regular_task():
return "executed every 15 minutes"
regular_task()Legacy dataset functionality maintained for backward compatibility.
class Dataset:
"""
**Deprecated**: Use Asset instead.
Legacy dataset definition for backward compatibility.
"""
def __init__(
self,
uri: str,
extra: Optional[Dict[str, Any]] = None
):
"""
Create a dataset (deprecated).
Args:
uri: Dataset URI
extra: Additional metadata
"""
@property
def uri(self) -> str:
"""Dataset URI."""Common scheduling patterns and utilities.
# Common schedule intervals
from airflow.utils.dates import days_ago
from datetime import timedelta
# Pre-defined schedules
SCHEDULE_INTERVALS = {
'@once': None, # Run once
'@hourly': timedelta(hours=1), # Every hour
'@daily': timedelta(days=1), # Every day
'@weekly': timedelta(weeks=1), # Every week
'@monthly': '0 0 1 * *', # First day of month
'@yearly': '0 0 1 1 *', # First day of year
}
def schedule_interval_to_cron(interval: Union[str, timedelta]) -> Optional[str]:
"""
Convert schedule interval to cron expression.
Args:
interval: Schedule interval
Returns:
Cron expression or None
"""Define time-based restrictions for DAG execution.
class TimeRestriction:
def __init__(
self,
earliest: Optional[datetime] = None,
latest: Optional[datetime] = None,
catchup: bool = True
):
"""
Define time restrictions for DAG scheduling.
Args:
earliest: Earliest allowed execution time
latest: Latest allowed execution time
catchup: Whether to perform catchup runs
"""
class DagRunInfo:
def __init__(
self,
run_id: str,
logical_date: datetime,
data_interval: DataInterval
):
"""
Information about a DAG run.
Args:
run_id: Unique run identifier
logical_date: Logical execution date
data_interval: Data interval for the run
"""
class DataInterval:
def __init__(self, start: datetime, end: datetime):
"""
Data interval for a DAG run.
Args:
start: Start of data interval
end: End of data interval
"""from typing import Union, Optional, List, Dict, Callable, Any
from datetime import datetime, timedelta, timezone
from airflow.utils.types import DagRunType
AssetCondition = Union[Asset, AssetAll, AssetAny, AssetAlias]
ScheduleInterval = Union[str, timedelta, Timetable, List[Asset], None]Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow