CtrlK
CommunityDocumentationLog inGet started
Tessl Logo

tessl/pypi-toil

Pipeline management software for clusters.

Agent Success

Agent success rate when using this tile

67%

Improvement

Agent success rate improvement when using this tile compared to baseline

1.05x

Baseline

Agent success rate without this tile

64%

Overview
Eval results
Files

task.mdevals/scenario-7/

Data Processing Pipeline

Problem Statement

You need to build a data processing pipeline that processes a list of data files through multiple stages. The pipeline should:

  1. Download Stage: Download each input file in parallel
  2. Processing Stage: After all files are downloaded, process each file independently in parallel
  3. Aggregation Stage: After all files are processed, combine the results into a single output
  4. Cleanup Stage: After aggregation completes, perform cleanup tasks

Each stage must wait for the previous stage to fully complete before starting.

Requirements

Implement a workflow that:

  1. Takes a list of input file URLs as command-line arguments
  2. Creates a job hierarchy where:
    • A root job coordinates the entire workflow
    • Download jobs run in parallel as children of the root job
    • Processing jobs run in parallel after all downloads complete
    • An aggregation job runs after all processing jobs complete
    • A cleanup job runs after aggregation completes
  3. Each job should print its stage and the file it's working on (if applicable)
  4. The workflow should execute all jobs in the correct order based on dependencies

Implementation Details

File Structure

Create the following files:

  • pipeline.py - Main implementation file
  • test_pipeline.py - Test file

Expected Workflow Structure

The workflow should be organized as:

  • A root job that adds download jobs as children and processing coordinator as a follow-on
  • Download jobs (one per URL) run in parallel
  • After downloads complete, processing jobs run in parallel
  • After processing completes, aggregation runs
  • After aggregation completes, cleanup runs

Job Behavior

  • DownloadJob: Simulates downloading a file (print "Downloading [url]")
  • ProcessJob: Simulates processing a file (print "Processing [filename]")
  • AggregateJob: Simulates aggregating results (print "Aggregating N results")
  • CleanupJob: Simulates cleanup (print "Cleaning up")

Command-Line Interface

The pipeline should be executable via:

python pipeline.py <job-store> <url1> <url2> <url3>

Example:

python pipeline.py file:my-jobstore http://example.com/data1.csv http://example.com/data2.csv

Dependencies { .dependencies }

toil { .dependency }

Provides workflow orchestration and job dependency management.

Test Cases

Test 1: Three File Pipeline @test

Test File: test_pipeline.py

Description: Verify that a pipeline with three input files executes jobs in the correct order.

Test Code:

import tempfile
import os
from pipeline import RootJob

def test_three_file_pipeline():
    """Test pipeline execution order with three files."""
    # Create a temporary job store
    with tempfile.TemporaryDirectory() as tmpdir:
        jobstore = f"file:{tmpdir}"
        urls = [
            "http://example.com/file1.csv",
            "http://example.com/file2.csv",
            "http://example.com/file3.csv"
        ]

        # Create and run the root job
        root_job = RootJob(urls)

        # In a real test, you would start the workflow with Toil.start()
        # and capture output to verify execution order
        # For this example, we just verify the job was created
        assert root_job is not None
        assert len(urls) == 3

Expected Behavior:

  • All download jobs should print before any process jobs
  • All process jobs should print before the aggregation job
  • The cleanup job should print last

Test 2: Single File Pipeline @test

Test File: test_pipeline.py

Description: Verify that a pipeline works correctly with a single input file.

Test Code:

import tempfile
from pipeline import RootJob

def test_single_file_pipeline():
    """Test pipeline execution with a single file."""
    with tempfile.TemporaryDirectory() as tmpdir:
        jobstore = f"file:{tmpdir}"
        urls = ["http://example.com/single.csv"]

        root_job = RootJob(urls)

        assert root_job is not None
        assert len(urls) == 1

Expected Behavior:

  • Pipeline should execute all stages even with one file
  • Order should be: download → process → aggregate → cleanup

Success Criteria

  1. Jobs execute in the correct dependency order
  2. Parallel jobs (downloads, processing) can run concurrently
  3. Sequential stages (download → process → aggregate → cleanup) execute in order
  4. The workflow completes successfully with the test cases
tessl i tessl/pypi-toil@9.0.0

tile.json