CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-cocotb

Cocotb is a coroutine-based cosimulation library for writing VHDL and Verilog testbenches in Python.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

task-management.mddocs/

Task Management

Cocotb's task management system enables concurrent coroutine execution with functions for creating, scheduling, and controlling tasks. The system supports immediate scheduling, yielding execution control, and task lifecycle management.

Capabilities

Immediate Task Scheduling

Schedule coroutines to run concurrently without yielding control from the calling context.

def start_soon(coro: Union[Task, Coroutine]) -> Task:
    """
    Schedule a coroutine to be run concurrently.

    Note: This is not an async function, and the new task will not execute 
    until the calling task yields control.

    Parameters:
    - coro: Coroutine or Task to schedule

    Returns:
    Task object representing the scheduled coroutine
    """

Usage Examples:

@cocotb.test()
async def concurrent_test(dut):
    """Test demonstrating concurrent task execution."""
    
    # Schedule multiple tasks immediately
    monitor_task = cocotb.start_soon(monitor_signals(dut))
    stimulus_task = cocotb.start_soon(generate_stimulus(dut))
    checker_task = cocotb.start_soon(check_responses(dut))
    
    # Tasks start executing when this coroutine yields
    await Timer(1000, units="ns")
    
    # Clean up tasks
    monitor_task.kill()
    stimulus_task.kill() 
    checker_task.kill()

async def monitor_signals(dut):
    """Monitor signal changes continuously."""
    while True:
        await Edge(dut.data_bus)
        cocotb.log.info(f"Data bus changed to: {dut.data_bus.value}")

async def generate_stimulus(dut):
    """Generate test stimulus."""
    for i in range(10):
        dut.input_data.value = i
        await Timer(50, units="ns")

Yielding Task Scheduling

Schedule coroutines and immediately yield control to allow pending tasks to execute.

async def start(coro: Union[Task, Coroutine]) -> Task:
    """
    Schedule a coroutine to be run concurrently, then yield control to allow pending tasks to execute.

    The calling task will resume execution before control is returned to the simulator.

    Parameters:
    - coro: Coroutine or Task to schedule

    Returns:
    Task object representing the scheduled coroutine
    """

Usage Examples:

@cocotb.test()
async def yielding_test(dut):
    """Test demonstrating yielding task scheduling."""
    
    # Schedule and immediately allow execution
    background_task = await cocotb.start(background_monitor(dut))
    
    # Background task has opportunity to start before continuing
    cocotb.log.info("Background task started")
    
    # Continue with main test logic
    for i in range(5):
        dut.control.value = i
        await Timer(100, units="ns")
    
    # Clean up
    background_task.kill()

async def background_monitor(dut):
    """Background monitoring task."""
    cocotb.log.info("Background monitor started")
    while True:
        await RisingEdge(dut.clk)
        if dut.error_flag.value:
            cocotb.log.error("Error flag detected!")

Task Creation

Create tasks without scheduling them for later execution control.

def create_task(coro: Union[Task, Coroutine]) -> Task:
    """
    Construct a coroutine into a Task without scheduling the Task.

    The Task can later be scheduled with fork, start, or start_soon.

    Parameters:
    - coro: Coroutine to convert to Task

    Returns:
    Task object that can be scheduled later
    """

Usage Examples:

@cocotb.test()
async def task_creation_test(dut):
    """Test demonstrating task creation and delayed scheduling."""
    
    # Create tasks without scheduling
    read_task = cocotb.create_task(read_sequence(dut, 0x100, 10))
    write_task = cocotb.create_task(write_sequence(dut, 0x200, [1, 2, 3, 4]))
    
    # Schedule tasks when needed
    cocotb.start_soon(read_task)
    await Timer(50, units="ns")  # Delay between operations
    
    cocotb.start_soon(write_task)
    
    # Wait for completion
    await read_task.join()
    await write_task.join()

async def read_sequence(dut, base_addr, count):
    """Perform a sequence of read operations."""
    results = []
    for i in range(count):
        dut.addr.value = base_addr + i
        dut.read_enable.value = 1
        await RisingEdge(dut.clk)
        
        dut.read_enable.value = 0
        await RisingEdge(dut.clk)
        
        results.append(dut.data_out.value)
        cocotb.log.info(f"Read {base_addr + i}: {dut.data_out.value}")
    
    return results

Deprecated Fork Function

Legacy function for task scheduling, replaced by start_soon and start.

def fork(coro: Union[Task, Coroutine]) -> Task:
    """
    Schedule a coroutine to be run concurrently.

    DEPRECATED: This function has been deprecated in favor of start_soon and start.
    In most cases you can simply substitute cocotb.fork with cocotb.start_soon.

    Parameters:
    - coro: Coroutine or Task to schedule

    Returns:
    Task object representing the scheduled coroutine
    """

Migration Examples:

# OLD: Using deprecated fork
@cocotb.test()
async def old_style_test(dut):
    task = cocotb.fork(background_process(dut))  # DEPRECATED
    await Timer(100, units="ns")
    task.kill()

# NEW: Using start_soon
@cocotb.test()
async def new_style_test(dut):
    task = cocotb.start_soon(background_process(dut))  # PREFERRED
    await Timer(100, units="ns")
    task.kill()

# NEW: Using start for immediate execution
@cocotb.test()
async def immediate_test(dut):
    task = await cocotb.start(background_process(dut))  # PREFERRED
    await Timer(100, units="ns")
    task.kill()

Task Class

Task Lifecycle Management

Tasks provide methods for monitoring and controlling coroutine execution.

class Task:
    """
    Represents a concurrent executing coroutine.
    """
    
    def result(self):
        """
        Get the result of the task.
        
        Returns:
        The return value of the coroutine
        
        Raises:
        Exception if task completed with error
        """
    
    def exception(self):
        """
        Get the exception that caused the task to fail.
        
        Returns:
        Exception object or None if task succeeded
        """
    
    def done(self) -> bool:
        """
        Check if the task is complete.
        
        Returns:
        True if task has finished (success or failure)
        """
    
    def cancel(self):
        """
        Cancel the task if it hasn't started executing.
        
        Returns:
        True if task was cancelled, False if already running
        """
    
    def cancelled(self) -> bool:
        """
        Check if the task was cancelled.
        
        Returns:
        True if task was cancelled before execution
        """
    
    def kill(self):
        """
        Terminate the task immediately.
        
        Unlike cancel(), this stops a running task.
        """
    
    async def join(self):
        """
        Wait for the task to complete.
        
        Returns:
        The return value of the coroutine
        
        Raises:
        Exception if task failed
        """
    
    def has_started(self) -> bool:
        """
        Check if the task has started executing.
        
        Returns:
        True if task execution has begun
        """

Usage Examples:

@cocotb.test()
async def task_control_test(dut):
    """Test demonstrating task lifecycle management."""
    
    # Create and start a long-running task
    long_task = cocotb.start_soon(long_running_process(dut))
    
    # Check task status
    assert not long_task.done()
    assert long_task.has_started()
    
    # Wait with timeout
    try:
        await with_timeout(long_task.join(), 500, "ns")
        result = long_task.result()
        cocotb.log.info(f"Task completed with result: {result}")
    except SimTimeoutError:
        cocotb.log.warning("Task timed out, terminating")
        long_task.kill()
        assert long_task.done()

@cocotb.test()
async def task_error_handling_test(dut):
    """Test demonstrating task error handling."""
    
    # Start task that might fail
    risky_task = cocotb.start_soon(risky_operation(dut))
    
    # Wait for completion and handle errors
    try:
        await risky_task.join()
        cocotb.log.info("Risky operation succeeded")
    except Exception as e:
        cocotb.log.error(f"Task failed with: {e}")
        
        # Check error details
        if risky_task.exception():
            cocotb.log.error(f"Exception was: {risky_task.exception()}")

async def long_running_process(dut):
    """Simulate a long-running process."""
    for i in range(100):
        dut.counter.value = i
        await Timer(10, units="ns")
    return "completed"

async def risky_operation(dut):
    """Operation that might fail."""
    await Timer(50, units="ns")
    if dut.error_inject.value:
        raise ValueError("Injected error occurred")
    return "success"

Advanced Task Patterns

Task Synchronization

@cocotb.test()
async def synchronized_test(dut):
    """Test showing task synchronization patterns."""
    
    # Start multiple tasks
    tasks = [
        cocotb.start_soon(worker_task(dut, i)) 
        for i in range(4)
    ]
    
    # Wait for all to complete
    results = []
    for task in tasks:
        result = await task.join()
        results.append(result)
    
    cocotb.log.info(f"All tasks completed: {results}")

async def worker_task(dut, worker_id):
    """Individual worker task."""
    await Timer(worker_id * 10, units="ns")  # Staggered start
    
    # Do work
    for i in range(5):
        dut._id(f"worker_{worker_id}_output", extended=False).value = i
        await Timer(20, units="ns")
    
    return f"worker_{worker_id}_done"

Task Utilities

Task Factory Pattern

def create_monitor_task(dut, signal_name, callback):
    """Factory function for creating monitor tasks."""
    
    async def monitor_coroutine():
        signal = dut._id(signal_name, extended=False)
        while True:
            await Edge(signal)
            callback(signal.value)
    
    return cocotb.create_task(monitor_coroutine())

@cocotb.test()
async def factory_test(dut):
    """Test using task factory pattern."""
    
    def log_change(value):
        cocotb.log.info(f"Signal changed to: {value}")
    
    # Create monitors using factory
    data_monitor = create_monitor_task(dut, "data_bus", log_change)
    addr_monitor = create_monitor_task(dut, "addr_bus", log_change)
    
    # Start monitors
    cocotb.start_soon(data_monitor)
    cocotb.start_soon(addr_monitor)
    
    # Run test
    await Timer(1000, units="ns")
    
    # Clean up
    data_monitor.kill()
    addr_monitor.kill()

Install with Tessl CLI

npx tessl i tessl/pypi-cocotb

docs

binary-types.md

clock-generation.md

index.md

logging-utilities.md

signal-handling.md

task-management.md

test-framework.md

triggers-timing.md

tile.json