CtrlK
CommunityDocumentationLog inGet started
Tessl Logo

tessl/pypi-luigi

tessl install tessl/pypi-luigi@2.8.0

Python workflow management framework for building complex pipelines of batch jobs with dependency resolution and task scheduling.

Agent Success

Agent success rate when using this tile

72%

Improvement

Agent success rate improvement when using this tile compared to baseline

1.31x

Baseline

Agent success rate without this tile

55%

task.mdevals/scenario-4/

Instrumented Workflow Metrics

Build a small two-step data pipeline that copies newline-separated integers from a source file and emits instrumentation about task lifecycle and results.

Capabilities

Produces summary output

  • When the source file contains integers 1\n2\n3\n, running the pipeline writes summary.json in output_dir with line_count: 3, total: 6, average: 2.0 and returns the same numbers from run_instrumented_pipeline @test

Tracks lifecycle metrics

  • Metrics written to metrics_path and returned by run_instrumented_pipeline include counts for task started and completed events covering both pipeline tasks; counts match tasks executed and include per-task timestamps formatted as ISO-8601 strings @test

Propagates failure instrumentation

  • If the source file contains the line FAIL, the pipeline stops before writing summary output, records a failure count and last error message for the task that encountered the marker in the metrics output, and returns a flag indicating failure @test

Flushes metrics on worker teardown

  • metrics_path is only created after the workflow worker shuts down, ensuring the file reflects final counts even when tasks emit intermediate events @test

Implementation

  • The pipeline has a first task that copies source_file into output_dir/raw.txt.
  • A second task depends on the copy, reads integers from raw.txt (ignoring blank lines), and writes output_dir/summary.json with line_count, total, and average fields.
  • Instrumentation should be attached via the workflow engine's event system so metrics are emitted on task start, completion, and failure.
  • A custom worker should be used so metrics are flushed to metrics_path after tasks finish.
  • run_instrumented_pipeline should return a dictionary containing the summary values and a metrics block with per-task timestamps plus started, completed, and failed counts along with a boolean failed flag.

@generates

API

from typing import Dict, Any

def run_instrumented_pipeline(source_file: str, output_dir: str, metrics_path: str) -> Dict[str, Any]:
    """Execute the instrumented pipeline, return summary and metrics (with a failure flag), and write metrics_path."""

Dependencies { .dependencies }

luigi { .dependency }

Provides workflow orchestration with event hooks, metrics collection, and customizable workers.

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/luigi@2.8.x
tile.json