or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes

pkg:github/python/cpython@v3.13.2

tile.json

tessl/github-python--cpython

tessl install tessl/github-python--cpython@3.13.0

CPython is the reference implementation of the Python programming language providing the core interpreter, runtime system, and comprehensive standard library.

Agent Success

Agent success rate when using this tile

96%

Improvement

Agent success rate improvement when using this tile compared to baseline

1.07x

Baseline

Agent success rate without this tile

90%

task.mdevals/scenario-8/

Async Job Orchestration

Coordinate asynchronous jobs with bounded concurrency, per-job timeouts, and clean loop management so that workloads can be triggered from both async and synchronous entrypoints.

Capabilities

Bounded concurrent runs

  • With three jobs that each wait roughly 0.05s and a max concurrency of 2, the total wall time stays under 0.11s while every job result appears in submission order with status "ok" and the returned payloads. @test

Per-job timeouts

  • When per_job_timeout is 0.05s, a job that waits ~0.1s is marked "timeout", cancelled, and returns no value, while a 0.01s job in the same batch still yields its payload as "ok". @test

Stop request handling

  • When a stop request is triggered while jobs are running, any incomplete unprotected jobs are cancelled and reported as "cancelled" within 0.02s of the request, while completed jobs keep their "ok" results. @test

Loop lifecycle for sync entrypoint

  • Calling run_sync twice in a row (each with a fast job) returns results both times without leaving a running loop or leaking tasks between invocations. @test

Implementation

@generates

API

from dataclasses import dataclass
from typing import Awaitable, Callable, Generic, Iterable, List, Optional, TypeVar

T = TypeVar("T")

@dataclass
class Job(Generic[T]):
    name: str
    coroutine_factory: Callable[[], Awaitable[T]]
    protected: bool = False  # run-to-completion even when a stop is requested

@dataclass
class JobResult(Generic[T]):
    name: str
    status: str  # "ok" | "timeout" | "cancelled" | "error"
    value: Optional[T] = None
    error: Optional[BaseException] = None

async def orchestrate_jobs(
    jobs: Iterable[Job[T]],
    max_concurrency: int,
    per_job_timeout: float,
    stop_signal: Optional[Awaitable[object]] = None
) -> List[JobResult[T]]:
    """
    Schedule jobs with bounded concurrency and per-job timeouts.
    Returns results in submission order while respecting stop requests.
    """

def run_sync(
    jobs: Iterable[Job[T]],
    max_concurrency: int,
    per_job_timeout: float,
    stop_after: Optional[float] = None
) -> List[JobResult[T]]:
    """
    Synchronous entrypoint that spins up and tears down an event loop
    to run orchestrate_jobs, respecting optional stop-after seconds.
    """

Dependencies { .dependencies }

asyncio { .dependency }

Provides asynchronous event loop, task orchestration, and timing primitives.