CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow

Programmatically author, schedule and monitor data pipelines

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

assets-scheduling.mddocs/

Assets and Scheduling

Asset-driven scheduling system for creating data-aware workflows, including asset definitions, timetables, and dependency management. Assets (formerly Datasets) enable data-driven orchestration where tasks trigger based on data availability rather than time.

Capabilities

Asset Definition

Define data assets that can trigger downstream DAGs when updated.

class Asset:
    def __init__(
        self,
        name: Optional[str] = None,
        uri: Optional[Union[str, ObjectStoragePath]] = None,
        *,
        group: Optional[str] = None,
        extra: Optional[Dict] = None,
        watchers: Optional[List[Union[AssetWatcher, SerializedAssetWatcher]]] = None
    ):
        """
        Create a data asset definition.
        
        Args:
            name: Human-readable name for the asset
            uri: Unique resource identifier for the asset
            group: Logical grouping for the asset
            extra: Additional metadata
            watchers: List of asset watchers for monitoring
        """
    
    @property
    def uri(self) -> str:
        """Asset URI."""
    
    @property
    def name(self) -> Optional[str]:
        """Asset name."""
    
    @property
    def group(self) -> Optional[str]:
        """Asset group."""

Usage example:

from airflow.sdk.definitions.asset import Asset
from airflow.decorators import dag, task

# Define assets
customer_data = Asset("s3://bucket/customer_data/")
processed_data = Asset("s3://bucket/processed/customer_metrics/")

@dag(
    dag_id='data_processing',
    start_date=datetime(2024, 1, 1),
    schedule=[customer_data],  # Triggered when customer_data is updated
    catchup=False
)
def data_processing():
    @task(outlets=[processed_data])  # Marks processed_data as updated
    def process_customer_data():
        # Process the data
        return "processed"
    
    process_customer_data()

dag_instance = data_processing()

Asset Aliases and Collections

Create aliases and collections for complex asset relationships.

class AssetAlias(BaseAsset):
    def __init__(
        self,
        name: str,
        *,
        group: str = "asset"
    ):
        """
        Create an asset alias.
        
        Args:
            name: Alias name
            group: Asset group (default: "asset")
        """
    
    @property
    def name(self) -> str:
        """Alias name."""

class AssetAll:
    def __init__(self, *assets: Union[Asset, AssetAlias]):
        """
        Create an asset collection requiring ALL assets to be updated.
        
        Args:
            *assets: Assets that must all be updated
        """

class AssetAny:
    def __init__(self, *assets: Union[Asset, AssetAlias]):
        """
        Create an asset collection requiring ANY asset to be updated.
        
        Args:
            *assets: Assets where any one being updated triggers
        """

Usage example:

from airflow import Asset
from airflow.sdk.definitions.asset import AssetAll, AssetAny, AssetAlias

# Define assets
sales_data = Asset("s3://bucket/sales/")
inventory_data = Asset("s3://bucket/inventory/")
customer_data = Asset("s3://bucket/customers/")

# Asset collections
all_core_data = AssetAll(sales_data, inventory_data, customer_data)
any_sales_data = AssetAny(sales_data, inventory_data)

# Asset alias
master_data = AssetAlias("master_data")

@dag(
    dag_id='comprehensive_report',
    schedule=[all_core_data],  # Wait for all three datasets
    start_date=datetime(2024, 1, 1)
)
def comprehensive_report():
    @task
    def generate_report():
        return "comprehensive report generated"
    
    generate_report()

Asset Watchers

Monitor assets for changes and trigger based on conditions.

class AssetWatcher:
    def __init__(
        self,
        name: str,
        trigger: Union[BaseEventTrigger, Dict]
    ):
        """
        Watch an asset for changes using event triggers.
        
        Args:
            name: Watcher name
            trigger: Event trigger or trigger configuration
        """

@asset(
    uri: str,
    name: Optional[str] = None,
    group: Optional[str] = None,
    **kwargs
) -> Callable:
    """
    Decorator to define an asset from a function.
    
    Args:
        uri: Asset URI
        name: Asset name
        group: Asset group
        
    Returns:
        Decorated function that defines asset behavior
    """

Timetables

Define custom scheduling patterns for DAGs.

class Timetable:
    """Base class for timetable implementations."""
    
    def next_dagrun_info(
        self,
        last_automated_dagrun: Optional[datetime],
        restriction: TimeRestriction
    ) -> Optional[DagRunInfo]:
        """
        Get information for the next DAG run.
        
        Args:
            last_automated_dagrun: Last automated DAG run datetime
            restriction: Time restriction configuration
            
        Returns:
            Information for next DAG run or None if no more runs
        """

class CronDataIntervalTimetable(Timetable):
    def __init__(
        self,
        cron: str,
        timezone: Union[str, timezone] = "UTC"
    ):
        """
        Cron-based timetable with data intervals.
        
        Args:
            cron: Cron expression
            timezone: Timezone for the schedule
        """

class CronTriggerTimetable(Timetable):
    def __init__(
        self,
        cron: str,
        timezone: Union[str, timezone] = "UTC"
    ):
        """
        Cron-based trigger timetable.
        
        Args:
            cron: Cron expression
            timezone: Timezone for the schedule
        """

class DeltaDataIntervalTimetable(Timetable):
    def __init__(self, delta: timedelta):
        """
        Interval-based timetable with data intervals.
        
        Args:
            delta: Time interval between runs
        """

class AssetTriggeredTimetable(Timetable):
    def __init__(self, assets: List[Asset]):
        """
        Asset-driven timetable.
        
        Args:
            assets: Assets that trigger the DAG
        """

Usage example:

from airflow.timetables import CronDataIntervalTimetable, DeltaDataIntervalTimetable
from datetime import timedelta

# Custom timetables
every_15_minutes = DeltaDataIntervalTimetable(timedelta(minutes=15))
business_hours = CronDataIntervalTimetable("0 9-17 * * 1-5")  # Weekdays 9 AM to 5 PM

@dag(
    dag_id='custom_schedule',
    timetable=every_15_minutes,
    start_date=datetime(2024, 1, 1)
)
def custom_schedule():
    @task
    def regular_task():
        return "executed every 15 minutes"
    
    regular_task()

Dataset Events (Legacy)

Legacy dataset functionality maintained for backward compatibility.

class Dataset:
    """
    **Deprecated**: Use Asset instead.
    
    Legacy dataset definition for backward compatibility.
    """
    def __init__(
        self,
        uri: str,
        extra: Optional[Dict[str, Any]] = None
    ):
        """
        Create a dataset (deprecated).
        
        Args:
            uri: Dataset URI
            extra: Additional metadata
        """
    
    @property
    def uri(self) -> str:
        """Dataset URI."""

Schedule Intervals

Common scheduling patterns and utilities.

# Common schedule intervals
from airflow.utils.dates import days_ago
from datetime import timedelta

# Pre-defined schedules
SCHEDULE_INTERVALS = {
    '@once': None,                    # Run once
    '@hourly': timedelta(hours=1),    # Every hour
    '@daily': timedelta(days=1),      # Every day
    '@weekly': timedelta(weeks=1),    # Every week
    '@monthly': '0 0 1 * *',         # First day of month
    '@yearly': '0 0 1 1 *',          # First day of year
}

def schedule_interval_to_cron(interval: Union[str, timedelta]) -> Optional[str]:
    """
    Convert schedule interval to cron expression.
    
    Args:
        interval: Schedule interval
        
    Returns:
        Cron expression or None
    """

Time Restrictions

Define time-based restrictions for DAG execution.

class TimeRestriction:
    def __init__(
        self,
        earliest: Optional[datetime] = None,
        latest: Optional[datetime] = None,
        catchup: bool = True
    ):
        """
        Define time restrictions for DAG scheduling.
        
        Args:
            earliest: Earliest allowed execution time
            latest: Latest allowed execution time
            catchup: Whether to perform catchup runs
        """

class DagRunInfo:
    def __init__(
        self,
        run_id: str,
        logical_date: datetime,
        data_interval: DataInterval
    ):
        """
        Information about a DAG run.
        
        Args:
            run_id: Unique run identifier
            logical_date: Logical execution date
            data_interval: Data interval for the run
        """

class DataInterval:
    def __init__(self, start: datetime, end: datetime):
        """
        Data interval for a DAG run.
        
        Args:
            start: Start of data interval
            end: End of data interval
        """

Types

from typing import Union, Optional, List, Dict, Callable, Any
from datetime import datetime, timedelta, timezone
from airflow.utils.types import DagRunType

AssetCondition = Union[Asset, AssetAll, AssetAny, AssetAlias]
ScheduleInterval = Union[str, timedelta, Timetable, List[Asset], None]

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow

docs

assets-scheduling.md

cli-utilities.md

configuration.md

dag-management.md

database-models.md

exceptions.md

executors.md

extensions.md

index.md

task-operators.md

xcom.md

tile.json