CtrlK
CommunityDocumentationLog inGet started
Tessl Logo

tessl/pypi-luigi

tessl install tessl/pypi-luigi@2.8.0

Python workflow management framework for building complex pipelines of batch jobs with dependency resolution and task scheduling.

Agent Success

Agent success rate when using this tile

72%

Improvement

Agent success rate improvement when using this tile compared to baseline

1.31x

Baseline

Agent success rate without this tile

55%

task.mdevals/scenario-5/

Recurring Range Runner

Wrap a per-day job class with automation that fans out across a date range, caps backfills, and forwards shared parameters into each interval run using the package's range helper instead of manual loops.

Capabilities

Fan-out across contiguous dates

  • With start=2024-01-01 and stop=2024-01-04, the runner prepares work for 2024-01-01, 2024-01-02, and 2024-01-03 only. @test

Open-ended backfill to yesterday

  • When stop is omitted, start=2024-03-08, and the current date is 2024-03-10, planned_dates() returns 2024-03-08 and 2024-03-09 in order. @test

Sliding backfill cap

  • When start is more than max_backfill_days before "today", the runner clips to the latest max_backfill_days window ending yesterday before generating tasks. Example: current date 2024-04-10, start=2024-02-01, max_backfill_days=5 yields dates 2024-04-05 through 2024-04-09. @test

Parameter propagation to each job

  • When params={"region": "us", "kind": "full"} and the runner builds tasks for 2024-05-01 through 2024-05-02, each created interval job receives the same params plus its own date argument. @test

Implementation

@generates

API

from datetime import date
from typing import Any, Callable, Dict, Optional, Sequence

class RangeRunner:
    def __init__(
        self,
        job_factory: Callable[..., Any],
        start: date,
        stop: Optional[date] = None,
        params: Optional[Dict[str, Any]] = None,
        max_backfill_days: int = 90,
    ):
        """
        job_factory: callable returning a per-date job when invoked with a date and optional shared params.
        start/stop: inclusive/exclusive boundaries for the date range; stop defaults to the day after the latest date to run.
        params: extra keyword arguments forwarded to every job.
        max_backfill_days: clamp open-ended runs to this many days counting back from yesterday when start is too far in the past.
        """

    def planned_dates(self, now: Optional[date] = None) -> Sequence[date]:
        """Return the ordered set of dates that will be triggered; defaults to using current UTC date when now is not provided."""

    def planned_jobs(self, now: Optional[date] = None) -> Sequence[Any]:
        """Return instantiated interval jobs matching planned_dates(), each created via job_factory with its date and shared params."""

    def run(self, now: Optional[date] = None) -> None:
        """Dispatch the planned jobs using the package's recurring/backfill helper rather than manual loops."""

Dependencies { .dependencies }

luigi { .dependency }

Workflow orchestration with recurring range and backfill helpers. @satisfied-by

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/luigi@2.8.x
tile.json