CtrlK
CommunityDocumentationLog inGet started
Tessl Logo

tessl/pypi-toil

Pipeline management software for clusters.

Agent Success

Agent success rate when using this tile

67%

Improvement

Agent success rate improvement when using this tile compared to baseline

1.05x

Baseline

Agent success rate without this tile

64%

Overview
Eval results
Files

task.mdevals/scenario-2/

Worker Failure Recovery System

Build a distributed worker coordination system that detects failed workers and automatically redistributes their assigned work to healthy workers.

Requirements

Your system should coordinate multiple workers with these capabilities:

Worker Health Tracking:

  • Workers register themselves with a coordinator
  • Workers send periodic heartbeat signals to indicate they're alive
  • The coordinator detects when workers fail to send heartbeats within a timeout period

Work Redistribution:

  • Track which files/tasks are assigned to each worker
  • When a worker is detected as dead, automatically reassign its work to active workers
  • Ensure no assigned work is lost when workers fail

Shared State Coordination:

  • Use a SQLite database to maintain shared state between coordinator and workers
  • Store worker status, heartbeat timestamps, and work assignments
  • Support safe concurrent access from multiple processes

Test Cases

  • A worker can register and send heartbeats that update its timestamp @test
  • The coordinator detects workers as dead when they haven't sent heartbeats within the timeout period @test
  • When a worker is marked dead, its assigned files are redistributed to active workers @test

API

from typing import List
import sqlite3
import time

class WorkerCoordinator:
    """Coordinates workers and detects failures for work redistribution."""

    def __init__(self, db_path: str, heartbeat_timeout: float = 5.0):
        """
        Initialize the coordinator.

        Args:
            db_path: Path to shared SQLite database
            heartbeat_timeout: Seconds without heartbeat before worker considered dead
        """
        pass

    def register_worker(self, worker_id: str) -> None:
        """
        Register a new worker with the coordinator.

        Args:
            worker_id: Unique identifier for the worker
        """
        pass

    def record_heartbeat(self, worker_id: str) -> None:
        """
        Record a heartbeat from a worker, updating its last-seen timestamp.

        Args:
            worker_id: Worker sending the heartbeat
        """
        pass

    def detect_dead_workers(self) -> List[str]:
        """
        Identify workers that haven't sent heartbeats within the timeout period.

        Returns:
            List of dead worker IDs
        """
        pass

    def assign_work(self, worker_id: str, file_id: str) -> None:
        """
        Assign a file to a worker for caching.

        Args:
            worker_id: Worker to assign to
            file_id: File identifier to assign
        """
        pass

    def redistribute_from_dead_worker(self, dead_worker_id: str,
                                       target_worker_id: str) -> List[str]:
        """
        Redistribute all work from a dead worker to a healthy worker.

        Args:
            dead_worker_id: Worker that has failed
            target_worker_id: Active worker to receive the work

        Returns:
            List of file IDs that were redistributed
        """
        pass

    def get_worker_assignments(self, worker_id: str) -> List[str]:
        """
        Get all files assigned to a specific worker.

        Args:
            worker_id: Worker to query

        Returns:
            List of file IDs assigned to this worker
        """
        pass

Dependencies { .dependencies }

toil { .dependency }

Provides the CachingFileStore coordination capabilities for multi-worker file caching with dead worker recovery and work stealing.

@describes

tessl i tessl/pypi-toil@9.0.0

tile.json