or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

builtins.mdconcurrency.mddata-processing.mddevelopment.mdessential-stdlib.mdindex.mdnetworking.mdsecurity.mdsystem-os.md
tile.json

concurrency.mddocs/

Concurrency and Parallelism

Support for concurrent and parallel programming including threading, multiprocessing, asynchronous programming, and subprocess management.

Capabilities

Threading

Thread-based parallelism for I/O-bound tasks and concurrent execution within a single process.

import threading
import queue

# Thread class
class Thread:
    def __init__(self, group=None, target=None, name=None, args=(), 
                 kwargs=None, daemon=None) -> None: ...
    def start(self) -> None: ...
    def run(self) -> None: ...
    def join(self, timeout: float = None) -> None: ...
    def is_alive(self) -> bool: ...
    @property
    def name(self) -> str: ...
    @name.setter
    def name(self, value: str) -> None: ...
    @property
    def ident(self) -> int: ...
    @property
    def native_id(self) -> int: ...
    @property
    def daemon(self) -> bool: ...
    @daemon.setter
    def daemon(self, value: bool) -> None: ...

# Threading utilities
def current_thread() -> Thread: ...
def main_thread() -> Thread: ...
def active_count() -> int: ...
def enumerate() -> list: ...
def get_ident() -> int: ...
def get_native_id() -> int: ...

# Synchronization primitives
class Lock:
    def acquire(self, blocking: bool = True, timeout: float = -1) -> bool: ...
    def release(self) -> None: ...
    def locked(self) -> bool: ...
    def __enter__(self) -> bool: ...
    def __exit__(self, type, value, traceback) -> None: ...

class RLock:
    def acquire(self, blocking: bool = True, timeout: float = -1) -> bool: ...
    def release(self) -> None: ...
    def __enter__(self) -> bool: ...
    def __exit__(self, type, value, traceback) -> None: ...

class Condition:
    def __init__(self, lock=None) -> None: ...
    def acquire(self, blocking: bool = True, timeout: float = -1) -> bool: ...
    def release(self) -> None: ...
    def wait(self, timeout: float = None) -> bool: ...
    def wait_for(self, predicate, timeout: float = None) -> bool: ...
    def notify(self, n: int = 1) -> None: ...
    def notify_all(self) -> None: ...
    def __enter__(self) -> bool: ...
    def __exit__(self, type, value, traceback) -> None: ...

class Semaphore:
    def __init__(self, value: int = 1) -> None: ...
    def acquire(self, blocking: bool = True, timeout: float = None) -> bool: ...
    def release(self) -> None: ...
    def __enter__(self) -> bool: ...
    def __exit__(self, type, value, traceback) -> None: ...

class Event:
    def is_set(self) -> bool: ...
    def set(self) -> None: ...
    def clear(self) -> None: ...
    def wait(self, timeout: float = None) -> bool: ...

class Barrier:
    def __init__(self, parties: int, action=None, timeout: float = None) -> None: ...
    def wait(self, timeout: float = None) -> int: ...
    def reset(self) -> None: ...
    def abort(self) -> None: ...
    @property
    def parties(self) -> int: ...
    @property
    def n_waiting(self) -> int: ...
    @property
    def broken(self) -> bool: ...

# Thread-local storage
class local:
    def __init__(self) -> None: ...

# Timer
class Timer(Thread):
    def __init__(self, interval: float, function, args=None, kwargs=None) -> None: ...
    def cancel(self) -> None: ...

Queue (Thread-Safe Queues)

Thread-safe queue implementations for producer-consumer patterns.

import queue

class Queue:
    def __init__(self, maxsize: int = 0) -> None: ...
    def qsize(self) -> int: ...
    def empty(self) -> bool: ...
    def full(self) -> bool: ...
    def put(self, item, block: bool = True, timeout: float = None) -> None: ...
    def put_nowait(self, item) -> None: ...
    def get(self, block: bool = True, timeout: float = None): ...
    def get_nowait(self): ...
    def task_done(self) -> None: ...
    def join(self) -> None: ...

class LifoQueue(Queue):
    """Last-in, first-out queue (stack)."""

class PriorityQueue(Queue):
    """Priority queue with lowest priority item retrieved first."""

class SimpleQueue:
    def __init__(self) -> None: ...
    def qsize(self) -> int: ...
    def empty(self) -> bool: ...
    def put(self, item, block: bool = True, timeout: float = None) -> None: ...
    def put_nowait(self, item) -> None: ...
    def get(self, block: bool = True, timeout: float = None): ...
    def get_nowait(self): ...

# Exceptions
class Empty(Exception): ...
class Full(Exception): ...

Multiprocessing

Process-based parallelism for CPU-bound tasks and true parallel execution.

import multiprocessing

# Process class
class Process:
    def __init__(self, group=None, target=None, name=None, args=(), 
                 kwargs=None, daemon=None) -> None: ...
    def start(self) -> None: ...
    def run(self) -> None: ...
    def join(self, timeout: float = None) -> None: ...
    def is_alive(self) -> bool: ...
    def terminate(self) -> None: ...
    def kill(self) -> None: ...
    @property
    def name(self) -> str: ...
    @property
    def daemon(self) -> bool: ...
    @property
    def pid(self) -> int: ...
    @property
    def exitcode(self) -> int: ...
    @property
    def authkey(self) -> bytes: ...

# Process utilities
def current_process() -> Process: ...
def active_children() -> list: ...
def cpu_count() -> int: ...

# Process pools
class Pool:
    def __init__(self, processes=None, initializer=None, initargs=(), 
                 maxtasksperchild=None, context=None) -> None: ...
    def apply(self, func, args=(), kwds=None): ...
    def apply_async(self, func, args=(), kwds=None, callback=None, 
                    error_callback=None): ...
    def map(self, func, iterable, chunksize=None) -> list: ...
    def map_async(self, func, iterable, chunksize=None, callback=None, 
                  error_callback=None): ...
    def imap(self, func, iterable, chunksize=1): ...
    def imap_unordered(self, func, iterable, chunksize=1): ...
    def starmap(self, func, iterable, chunksize=None) -> list: ...
    def starmap_async(self, func, iterable, chunksize=None, callback=None, 
                      error_callback=None): ...
    def close(self) -> None: ...
    def terminate(self) -> None: ...
    def join(self) -> None: ...
    def __enter__(self) -> Pool: ...
    def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...

# Synchronization primitives (multiprocess)
class Lock:
    def acquire(self, block: bool = True, timeout: float = None) -> bool: ...
    def release(self) -> None: ...
    def __enter__(self) -> bool: ...
    def __exit__(self, type, value, traceback) -> None: ...

class RLock:
    def acquire(self, block: bool = True, timeout: float = None) -> bool: ...
    def release(self) -> None: ...
    def __enter__(self) -> bool: ...
    def __exit__(self, type, value, traceback) -> None: ...

class Semaphore:
    def __init__(self, value: int = 1) -> None: ...
    def acquire(self, block: bool = True, timeout: float = None) -> bool: ...
    def release(self) -> None: ...
    def __enter__(self) -> bool: ...
    def __exit__(self, type, value, traceback) -> None: ...

class Event:
    def is_set(self) -> bool: ...
    def set(self) -> None: ...
    def clear(self) -> None: ...
    def wait(self, timeout: float = None) -> bool: ...

class Condition:
    def __init__(self, lock=None) -> None: ...
    def acquire(self, block: bool = True, timeout: float = None) -> bool: ...
    def release(self) -> None: ...
    def wait(self, timeout: float = None) -> bool: ...
    def wait_for(self, predicate, timeout: float = None) -> bool: ...
    def notify(self, n: int = 1) -> None: ...
    def notify_all(self) -> None: ...

# Shared memory objects
class Value:
    def __init__(self, typecode: str, value, lock: bool = True) -> None: ...
    @property
    def value(self): ...

class Array:
    def __init__(self, typecode: str, size_or_initializer, lock: bool = True) -> None: ...
    def __getitem__(self, index): ...
    def __setitem__(self, index, value) -> None: ...
    def __len__(self) -> int: ...

# Communication between processes
class Queue:
    def __init__(self, maxsize: int = 0) -> None: ...
    def put(self, obj, block: bool = True, timeout: float = None) -> None: ...
    def get(self, block: bool = True, timeout: float = None): ...
    def put_nowait(self, obj) -> None: ...
    def get_nowait(self): ...
    def empty(self) -> bool: ...
    def full(self) -> bool: ...
    def qsize(self) -> int: ...
    def close(self) -> None: ...
    def join_thread(self) -> None: ...
    def cancel_join_thread(self) -> None: ...

class Pipe:
    def __init__(self, duplex: bool = True) -> tuple: ...  # Returns (conn1, conn2)

class Connection:
    def send(self, obj) -> None: ...
    def recv(self): ...
    def fileno(self) -> int: ...
    def close(self) -> None: ...
    def poll(self, timeout: float = 0.0) -> bool: ...
    def send_bytes(self, buffer, offset: int = 0, size: int = None) -> None: ...
    def recv_bytes(self, maxlength: int = None) -> bytes: ...
    def recv_bytes_into(self, buffer, offset: int = 0) -> int: ...

Asyncio (Asynchronous Programming)

Event loop-based asynchronous programming for concurrent I/O operations.

import asyncio
import contextvars

# Context variable support for asyncio
class ContextVar:
    def __init__(self, name: str, default=None) -> None: ...
    def get(self, default=None): ...
    def set(self, value): ...

class Context:
    def copy(self) -> Context: ...
    def run(self, callable, *args, **kwargs): ...

def copy_context() -> Context: ...

# Event loop management
def run(main, debug: bool = None) -> None: ...
def get_event_loop() -> AbstractEventLoop: ...
def set_event_loop(loop) -> None: ...
def new_event_loop() -> AbstractEventLoop: ...
def get_running_loop() -> AbstractEventLoop: ...

# Coroutine utilities
def create_task(coro, name=None) -> Task: ...
def gather(*aws, return_exceptions: bool = False): ...
def wait_for(aw, timeout: float, loop=None): ...
def wait(aws, loop=None, timeout: float = None, return_when=ALL_COMPLETED) -> tuple: ...
def as_completed(aws, loop=None, timeout: float = None): ...
def shield(aw, loop=None): ...

# Sleep and timing
async def sleep(delay: float, result=None, loop=None): ...

# Synchronization primitives (async)
class Lock:
    async def acquire(self) -> None: ...
    def release(self) -> None: ...
    def locked(self) -> bool: ...
    async def __aenter__(self) -> None: ...
    async def __aexit__(self, exc_type, exc, tb) -> None: ...

class Event:
    def is_set(self) -> bool: ...
    def set(self) -> None: ...
    def clear(self) -> None: ...
    async def wait(self) -> None: ...

class Condition:
    def __init__(self, lock=None) -> None: ...
    async def acquire(self) -> None: ...
    def release(self) -> None: ...
    def locked(self) -> bool: ...
    async def wait(self) -> None: ...
    async def wait_for(self, predicate) -> None: ...
    def notify(self, n: int = 1) -> None: ...
    def notify_all(self) -> None: ...
    async def __aenter__(self) -> None: ...
    async def __aexit__(self, exc_type, exc, tb) -> None: ...

class Semaphore:
    def __init__(self, value: int = 1) -> None: ...
    async def acquire(self) -> None: ...
    def release(self) -> None: ...
    def locked(self) -> bool: ...
    async def __aenter__(self) -> None: ...
    async def __aexit__(self, exc_type, exc, tb) -> None: ...

# Queues (async)
class Queue:
    def __init__(self, maxsize: int = 0) -> None: ...
    def qsize(self) -> int: ...
    def empty(self) -> bool: ...
    def full(self) -> bool: ...
    async def put(self, item) -> None: ...
    def put_nowait(self, item) -> None: ...
    async def get(self): ...
    def get_nowait(self): ...
    def task_done(self) -> None: ...
    async def join(self) -> None: ...

class LifoQueue(Queue): ...
class PriorityQueue(Queue): ...

# Task and Future
class Task:
    def cancel(self) -> bool: ...
    def cancelled(self) -> bool: ...
    def done(self) -> bool: ...
    def result(self): ...
    def exception(self) -> BaseException: ...
    def add_done_callback(self, fn, context=None) -> None: ...
    def remove_done_callback(self, fn) -> int: ...
    def get_stack(self, limit=None) -> list: ...
    def print_stack(self, limit=None, file=None) -> None: ...
    def get_coro(self): ...
    def get_name(self) -> str: ...
    def set_name(self, value: str) -> None: ...

class Future:
    def cancel(self) -> bool: ...
    def cancelled(self) -> bool: ...
    def done(self) -> bool: ...
    def result(self): ...
    def exception(self) -> BaseException: ...
    def add_done_callback(self, fn, context=None) -> None: ...
    def remove_done_callback(self, fn) -> int: ...
    def set_result(self, result) -> None: ...
    def set_exception(self, exception) -> None: ...

# Constants
ALL_COMPLETED: str
FIRST_COMPLETED: str  
FIRST_EXCEPTION: str

Subprocess Management

Spawning and managing subprocesses for running external commands.

import subprocess

# Main subprocess functions
def run(args, bufsize: int = -1, executable=None, stdin=None, stdout=None, 
        stderr=None, preexec_fn=None, close_fds: bool = True, shell: bool = False, 
        cwd=None, env=None, universal_newlines=None, startupinfo=None, 
        creationflags: int = 0, restore_signals: bool = True, 
        start_new_session: bool = False, pass_fds=(), encoding=None, 
        errors=None, text=None, timeout: float = None, check: bool = False, 
        capture_output: bool = False) -> CompletedProcess: ...

def call(args, bufsize: int = -1, executable=None, stdin=None, stdout=None, 
         stderr=None, preexec_fn=None, close_fds: bool = True, shell: bool = False, 
         cwd=None, env=None, universal_newlines=None, startupinfo=None, 
         creationflags: int = 0, restore_signals: bool = True, 
         start_new_session: bool = False, pass_fds=(), timeout: float = None) -> int: ...

def check_call(args, bufsize: int = -1, executable=None, stdin=None, stdout=None, 
               stderr=None, preexec_fn=None, close_fds: bool = True, 
               shell: bool = False, cwd=None, env=None, universal_newlines=None, 
               startupinfo=None, creationflags: int = 0, restore_signals: bool = True, 
               start_new_session: bool = False, pass_fds=(), timeout: float = None) -> int: ...

def check_output(args, bufsize: int = -1, executable=None, stdin=None, 
                 stderr=None, preexec_fn=None, close_fds: bool = True, 
                 shell: bool = False, cwd=None, env=None, universal_newlines=None, 
                 startupinfo=None, creationflags: int = 0, restore_signals: bool = True, 
                 start_new_session: bool = False, pass_fds=(), timeout: float = None, 
                 encoding=None, errors=None, text=None) -> bytes: ...

# Popen class for advanced subprocess control
class Popen:
    def __init__(self, args, bufsize: int = -1, executable=None, stdin=None, 
                 stdout=None, stderr=None, preexec_fn=None, close_fds: bool = True, 
                 shell: bool = False, cwd=None, env=None, universal_newlines=None, 
                 startupinfo=None, creationflags: int = 0, restore_signals: bool = True, 
                 start_new_session: bool = False, pass_fds=(), encoding=None, 
                 errors=None, text=None, user=None, group=None, 
                 extra_groups=None, umask: int = -1) -> None: ...
    
    def poll(self) -> int: ...
    def wait(self, timeout: float = None) -> int: ...
    def communicate(self, input=None, timeout: float = None) -> tuple: ...
    def send_signal(self, signal: int) -> None: ...
    def terminate(self) -> None: ...
    def kill(self) -> None: ...
    
    @property
    def args(self): ...
    @property
    def stdin(self): ...
    @property
    def stdout(self): ...
    @property  
    def stderr(self): ...
    @property
    def pid(self) -> int: ...
    @property
    def returncode(self) -> int: ...

# Result classes
class CompletedProcess:
    def __init__(self, args, returncode: int, stdout=None, stderr=None) -> None: ...
    def check_returncode(self) -> None: ...
    @property
    def args(self): ...
    @property
    def returncode(self) -> int: ...
    @property
    def stdout(self): ...
    @property
    def stderr(self): ...

# Constants for stdio redirection
PIPE: int       # Create a new pipe to the child
STDOUT: int     # Redirect stderr to stdout  
DEVNULL: int    # Redirect to os.devnull

# Exceptions
class SubprocessError(Exception): ...
class CalledProcessError(SubprocessError): ...
class TimeoutExpired(SubprocessError): ...

Concurrent Futures

High-level interface for asynchronously executing callables.

import concurrent.futures

# Executor base class
class Executor:
    def submit(self, fn, *args, **kwargs) -> Future: ...
    def map(self, func, *iterables, timeout: float = None, chunksize: int = 1): ...
    def shutdown(self, wait: bool = True, cancel_futures: bool = False) -> None: ...
    def __enter__(self) -> Executor: ...
    def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...

# Thread pool executor  
class ThreadPoolExecutor(Executor):
    def __init__(self, max_workers=None, thread_name_prefix: str = '', 
                 initializer=None, initargs=()) -> None: ...

# Process pool executor
class ProcessPoolExecutor(Executor):
    def __init__(self, max_workers=None, mp_context=None, initializer=None, 
                 initargs=()) -> None: ...

# Future object
class Future:
    def cancel(self) -> bool: ...
    def cancelled(self) -> bool: ...
    def running(self) -> bool: ...
    def done(self) -> bool: ...
    def result(self, timeout: float = None): ...
    def exception(self, timeout: float = None) -> BaseException: ...
    def add_done_callback(self, fn) -> None: ...

# Utility functions
def as_completed(fs, timeout: float = None): ...
def wait(fs, timeout: float = None, return_when=ALL_COMPLETED) -> tuple: ...

# Constants
ALL_COMPLETED: str
FIRST_COMPLETED: str
FIRST_EXCEPTION: str

# Exceptions
class CancelledError(Exception): ...
class TimeoutError(Exception): ...
class BrokenExecutor(RuntimeError): ...
class InvalidStateError(Exception): ...

Types

Threading Types

# Thread identification
ThreadID = int

# Lock types (can be used as type annotations)
LockType = threading.Lock | threading.RLock

Multiprocessing Types

# Process identification  
ProcessID = int

# Shared memory typecodes
TypeCode = str  # 'b', 'h', 'i', 'l', 'q', 'f', 'd', etc.

# Pipe connection pair
PipeConnection = tuple[Connection, Connection]

Asyncio Types

# Coroutine type
Coroutine = object

# Awaitable type
Awaitable = object

# Event loop types
class AbstractEventLoop:
    """Abstract base class for event loops."""
    def run_until_complete(self, future): ...
    def run_forever(self) -> None: ...
    def stop(self) -> None: ...
    def is_running(self) -> bool: ...
    def is_closed(self) -> bool: ...
    def close(self) -> None: ...
    def create_task(self, coro, name=None) -> Task: ...
    def call_soon(self, callback, *args, context=None): ...
    def call_later(self, delay: float, callback, *args, context=None): ...
    def call_at(self, when: float, callback, *args, context=None): ...

Subprocess Types

# Process arguments
ProcessArgs = str | list[str]  # Command as string or list

# File descriptors for stdio
StdioType = int | None  # File descriptor, PIPE, STDOUT, DEVNULL, or None

# Environment variables
EnvType = dict[str, str] | None

Future Types

# Future state constants  
FutureState = str  # 'PENDING', 'RUNNING', 'CANCELLED', 'FINISHED'

# Future result type
FutureResult = object | None