Pipeline management software for clusters.
Agent Success
Agent success rate when using this tile
67%
Improvement
Agent success rate improvement when using this tile compared to baseline
1.05x
Baseline
Agent success rate without this tile
64%
Build a distributed worker coordination system that detects failed workers and automatically redistributes their assigned work to healthy workers.
Your system should coordinate multiple workers with these capabilities:
Worker Health Tracking:
Work Redistribution:
Shared State Coordination:
from typing import List
import sqlite3
import time
class WorkerCoordinator:
"""Coordinates workers and detects failures for work redistribution."""
def __init__(self, db_path: str, heartbeat_timeout: float = 5.0):
"""
Initialize the coordinator.
Args:
db_path: Path to shared SQLite database
heartbeat_timeout: Seconds without heartbeat before worker considered dead
"""
pass
def register_worker(self, worker_id: str) -> None:
"""
Register a new worker with the coordinator.
Args:
worker_id: Unique identifier for the worker
"""
pass
def record_heartbeat(self, worker_id: str) -> None:
"""
Record a heartbeat from a worker, updating its last-seen timestamp.
Args:
worker_id: Worker sending the heartbeat
"""
pass
def detect_dead_workers(self) -> List[str]:
"""
Identify workers that haven't sent heartbeats within the timeout period.
Returns:
List of dead worker IDs
"""
pass
def assign_work(self, worker_id: str, file_id: str) -> None:
"""
Assign a file to a worker for caching.
Args:
worker_id: Worker to assign to
file_id: File identifier to assign
"""
pass
def redistribute_from_dead_worker(self, dead_worker_id: str,
target_worker_id: str) -> List[str]:
"""
Redistribute all work from a dead worker to a healthy worker.
Args:
dead_worker_id: Worker that has failed
target_worker_id: Active worker to receive the work
Returns:
List of file IDs that were redistributed
"""
pass
def get_worker_assignments(self, worker_id: str) -> List[str]:
"""
Get all files assigned to a specific worker.
Args:
worker_id: Worker to query
Returns:
List of file IDs assigned to this worker
"""
passProvides the CachingFileStore coordination capabilities for multi-worker file caching with dead worker recovery and work stealing.
@describes
tessl i tessl/pypi-toil@9.0.0docs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10