tessl install tessl/pypi-luigi@2.8.0Python workflow management framework for building complex pipelines of batch jobs with dependency resolution and task scheduling.
Agent Success
Agent success rate when using this tile
72%
Improvement
Agent success rate improvement when using this tile compared to baseline
1.31x
Baseline
Agent success rate without this tile
55%
Wrap a per-day job class with automation that fans out across a date range, caps backfills, and forwards shared parameters into each interval run using the package's range helper instead of manual loops.
start=2024-01-01 and stop=2024-01-04, the runner prepares work for 2024-01-01, 2024-01-02, and 2024-01-03 only. @teststop is omitted, start=2024-03-08, and the current date is 2024-03-10, planned_dates() returns 2024-03-08 and 2024-03-09 in order. @teststart is more than max_backfill_days before "today", the runner clips to the latest max_backfill_days window ending yesterday before generating tasks. Example: current date 2024-04-10, start=2024-02-01, max_backfill_days=5 yields dates 2024-04-05 through 2024-04-09. @testparams={"region": "us", "kind": "full"} and the runner builds tasks for 2024-05-01 through 2024-05-02, each created interval job receives the same params plus its own date argument. @test@generates
from datetime import date
from typing import Any, Callable, Dict, Optional, Sequence
class RangeRunner:
def __init__(
self,
job_factory: Callable[..., Any],
start: date,
stop: Optional[date] = None,
params: Optional[Dict[str, Any]] = None,
max_backfill_days: int = 90,
):
"""
job_factory: callable returning a per-date job when invoked with a date and optional shared params.
start/stop: inclusive/exclusive boundaries for the date range; stop defaults to the day after the latest date to run.
params: extra keyword arguments forwarded to every job.
max_backfill_days: clamp open-ended runs to this many days counting back from yesterday when start is too far in the past.
"""
def planned_dates(self, now: Optional[date] = None) -> Sequence[date]:
"""Return the ordered set of dates that will be triggered; defaults to using current UTC date when now is not provided."""
def planned_jobs(self, now: Optional[date] = None) -> Sequence[Any]:
"""Return instantiated interval jobs matching planned_dates(), each created via job_factory with its date and shared params."""
def run(self, now: Optional[date] = None) -> None:
"""Dispatch the planned jobs using the package's recurring/backfill helper rather than manual loops."""Workflow orchestration with recurring range and backfill helpers. @satisfied-by