Programmatically author, schedule and monitor data pipelines
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
ORM models for DAGs, tasks, runs, connections, and metadata storage with SQLAlchemy integration. These models represent Airflow's database schema and provide programmatic access to workflow metadata.
Base ORM models and database configuration.
class Base:
"""SQLAlchemy base class for all Airflow models."""
__tablename__: str
__table_args__: Dict[str, Any]
ID_LEN: int = 250 # Standard ID field length
class DagModel(Base):
"""ORM model for DAG metadata."""
dag_id: str
is_active: bool
is_paused: bool
last_parsed_time: Optional[datetime]
last_pickled: Optional[datetime]
last_expired: Optional[datetime]
scheduler_lock: Optional[bool]
pickle_id: Optional[int]
fileloc: str
owners: str
description: Optional[str]
default_view: str
schedule_interval: Optional[str]
tags: List[str]Models for task instances and DAG runs.
class TaskInstance(Base):
"""ORM model for task instance execution."""
task_id: str
dag_id: str
run_id: str
execution_date: datetime
start_date: Optional[datetime]
end_date: Optional[datetime]
duration: Optional[float]
state: Optional[str]
try_number: int
max_tries: int
hostname: str
unixname: str
job_id: Optional[int]
pool: str
pool_slots: int
queue: str
priority_weight: int
operator: str
queued_dttm: Optional[datetime]
pid: Optional[int]
executor_config: Optional[Dict]
class DagRun(Base):
"""ORM model for DAG run instances."""
dag_id: str
execution_date: datetime
run_id: str
state: str
run_type: str
external_trigger: bool
start_date: Optional[datetime]
end_date: Optional[datetime]
creating_job_id: Optional[int]Models for resource pools, connections, and variables.
class Pool(Base):
"""ORM model for resource pools."""
pool: str
slots: int
description: str
class Connection(Base):
"""ORM model for connection configurations."""
conn_id: str
conn_type: str
description: Optional[str]
host: Optional[str]
schema: Optional[str]
login: Optional[str]
password: Optional[str]
port: Optional[int]
extra: Optional[str]
class Variable(Base):
"""ORM model for Airflow variables."""
key: str
val: Optional[str]
description: Optional[str]
is_encrypted: boolfrom typing import Optional, Dict, Any, List
from datetime import datetime
from sqlalchemy.ext.declarative import declarative_base
ModelType = type[Base]Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow