CtrlK
CommunityDocumentationLog inGet started
Tessl Logo

tessl/pypi-luigi

tessl install tessl/pypi-luigi@2.8.0

Python workflow management framework for building complex pipelines of batch jobs with dependency resolution and task scheduling.

Agent Success

Agent success rate when using this tile

72%

Improvement

Agent success rate improvement when using this tile compared to baseline

1.31x

Baseline

Agent success rate without this tile

55%

task.mdevals/scenario-7/

Resource-Limited Asset Transcoder

Build an orchestrated pipeline that processes a list of asset identifiers by fetching metadata, running a resource-intensive transcode, and packaging a manifest. The heavy stage must consume a named accelerator resource whose available slots are provided at runtime. Rely on the dependency's resource-aware scheduling to enforce limits; do not gate concurrency with manual semaphores or custom queues.

Capabilities

Configurable resource pool

  • Accepts a mapping of resource names to available slots and a worker count, then loads those limits into the scheduler before running. If no mapping is provided, default to a single accelerator slot. Timeline entries should record the resource-bound stage name and monotonic timestamps so concurrency can be measured. @test

Serialized heavy stage when single slot

  • For three asset ids and an accelerator limit of 1 with three workers, the heavy stage timeline never overlaps: at most one accelerator-bound task runs at a time. @test

Scaled heavy stage when multiple slots

  • With the same inputs and an accelerator limit of 2, the heavy stage shows overlap of up to two concurrent items and finishes faster than the single-slot run. @test

Ordered outputs

  • Returns packaged outputs matching the input ids in the original order (one output per id) regardless of concurrency. @test

Implementation

@generates

API

from typing import Any, Dict, List, Sequence, TypedDict

class TimelineEntry(TypedDict):
    item: str
    stage: str
    start: float
    end: float

class RunReport(TypedDict):
    outputs: List[str]
    timeline: List[TimelineEntry]
    runtime_seconds: float

def run_resource_limited_pipeline(
    item_ids: Sequence[str],
    resource_limits: Dict[str, int] | None = None,
    worker_count: int = 2
) -> RunReport:
    """
    Runs the orchestrated pipeline, using the dependency's resource-aware scheduling to bound concurrency of the accelerator-heavy stage.
    Timeline entries must cover the resource-bound stage (named "transcode") for each item using monotonic seconds, and each accelerator-bound unit of work should simulate at least ~0.15 seconds of active time so that overlap/serialization can be measured in tests.
    """

Dependencies { .dependencies }

Luigi { .dependency }

Workflow orchestration with task resources, dependency resolution, and scheduler-enforced resource limits. @satisfied-by

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/luigi@2.8.x
tile.json