CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-cocotb

Cocotb is a coroutine-based cosimulation library for writing VHDL and Verilog testbenches in Python.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

triggers-timing.mddocs/

Triggers and Timing

Cocotb's trigger system provides comprehensive time-based and event-based synchronization with the HDL simulator. Triggers enable coroutines to wait for specific simulation events, time delays, signal changes, and custom conditions.

Capabilities

Time-Based Triggers

Control simulation time progression with precise timing control and simulator phase awareness.

class Timer(time, units="step", round_mode=None):
    """
    Fire after specified simulation time duration.

    Parameters:
    - time: Time value (int, float, or decimal.Decimal)
    - units: Time units ("fs", "ps", "ns", "us", "ms", "sec", "step")
    - round_mode: Rounding mode for time conversion

    Usage:
    await Timer(100, units="ns")
    """

class ReadOnly():
    """
    Fire during read-only phase of current timestep.
    
    Useful for reading signal values after all combinatorial updates.
    
    Usage:
    await ReadOnly()
    """

class ReadWrite():
    """
    Fire during read-write phase of current timestep.
    
    Allows signal modifications within the current timestep.
    
    Usage:
    await ReadWrite()
    """

class NextTimeStep():
    """
    Fire at the next simulation timestep.
    
    Minimal time advancement to next simulator event.
    
    Usage:
    await NextTimeStep()
    """

Usage Examples:

@cocotb.test()
async def timing_test(dut):
    """Test demonstrating time-based triggers."""
    
    # Basic time delays
    await Timer(100, units="ns")
    dut.signal_a.value = 1
    
    await Timer(50, units="ns")
    dut.signal_b.value = 0
    
    # Simulator phase control
    dut.combo_input.value = 5
    await ReadOnly()  # Wait for combinatorial settling
    result = dut.combo_output.value  # Safe to read
    
    # Minimal time step
    await NextTimeStep()
    
    # Precise timing with decimal
    from decimal import Decimal
    await Timer(Decimal("33.333"), units="ns")

@cocotb.test()
async def phase_test(dut):
    """Test demonstrating simulator phase awareness."""
    
    # Set input and wait for combinatorial update
    dut.addr.value = 0x100
    await ReadOnly()
    
    # Read the result after combinatorial settling
    decoded_value = dut.chip_select.value
    cocotb.log.info(f"Address 0x100 decoded to: {decoded_value}")
    
    # Move to read-write phase for modifications
    await ReadWrite()
    dut.write_enable.value = 1

Signal Edge Triggers

Synchronize with HDL signal transitions for event-driven testbench behavior.

class RisingEdge(signal):
    """
    Fire on signal rising edge (0/Z to 1/X transition).

    Parameters:
    - signal: HDL signal to monitor

    Usage:
    await RisingEdge(dut.clk)
    """

class FallingEdge(signal):
    """
    Fire on signal falling edge (1/X to 0/Z transition).

    Parameters:
    - signal: HDL signal to monitor

    Usage:
    await FallingEdge(dut.reset)
    """

class Edge(signal):
    """
    Fire on any signal change (any value change).

    Parameters:
    - signal: HDL signal to monitor

    Usage:
    await Edge(dut.data_bus)
    """

Usage Examples:

@cocotb.test()
async def edge_test(dut):
    """Test demonstrating signal edge triggers."""
    
    # Clock-synchronous operations
    for i in range(10):
        await RisingEdge(dut.clk)
        dut.data_in.value = i
        cocotb.log.info(f"Cycle {i}: Applied data {i}")
    
    # Reset sequence
    dut.reset_n.value = 0
    await Timer(100, units="ns")
    dut.reset_n.value = 1
    await RisingEdge(dut.reset_n)  # Wait for reset release
    
    # Monitor any data change
    data_changes = 0
    for _ in range(50):
        await Edge(dut.output_data)
        data_changes += 1
        cocotb.log.info(f"Data changed to: {dut.output_data.value}")
    
    cocotb.log.info(f"Detected {data_changes} data changes")

@cocotb.test()  
async def handshake_test(dut):
    """Test demonstrating handshake protocol."""
    
    # Initiate request
    dut.request.value = 1
    dut.data_out.value = 0xABCD
    
    # Wait for acknowledgment
    await RisingEdge(dut.acknowledge)
    cocotb.log.info("Request acknowledged")
    
    # Complete handshake
    dut.request.value = 0
    await FallingEdge(dut.acknowledge)
    cocotb.log.info("Handshake completed")

Composite Triggers

Combine multiple triggers for complex synchronization patterns.

class Combine(*triggers):
    """
    Fire when ALL triggers have fired.

    Parameters:
    - *triggers: Variable number of triggers to combine

    Usage:
    await Combine(RisingEdge(dut.clk), Timer(100, "ns"))
    """

class First(*triggers):
    """
    Fire when FIRST trigger fires.

    Parameters:
    - *triggers: Variable number of triggers to race

    Usage:
    await First(RisingEdge(dut.done), Timer(1000, "ns"))
    """

class Join(task):
    """
    Fire when task completes.

    Parameters:
    - task: Task to wait for completion

    Usage:
    await Join(background_task)
    """

class ClockCycles(signal, num_cycles, rising=True):
    """
    Fire after specified number of clock cycles.

    Parameters:
    - signal: Clock signal to count
    - num_cycles: Number of cycles to wait
    - rising: True for rising edges, False for falling edges

    Usage:
    await ClockCycles(dut.clk, 10)
    """

Usage Examples:

@cocotb.test()
async def composite_test(dut):
    """Test demonstrating composite triggers."""
    
    # Wait for both clock edge AND timer
    await Combine(RisingEdge(dut.clk), Timer(100, units="ns"))
    cocotb.log.info("Both clock edge and timer fired")
    
    # Race between completion and timeout
    background_task = cocotb.start_soon(slow_operation(dut))
    
    result = await First(
        Join(background_task),
        Timer(500, units="ns")  # Timeout
    )
    
    if background_task.done():
        cocotb.log.info("Operation completed on time")
    else:
        cocotb.log.warning("Operation timed out")
        background_task.kill()
    
    # Wait for specific number of clock cycles
    await ClockCycles(dut.clk, 5)
    cocotb.log.info("5 clock cycles elapsed")
    
    # Multi-signal synchronization
    await Combine(
        Edge(dut.valid),
        Edge(dut.ready),
        RisingEdge(dut.clk)
    )
    cocotb.log.info("Valid, ready, and clock all changed")

async def slow_operation(dut):
    """Simulate a slow operation."""
    await Timer(300, units="ns")
    dut.operation_done.value = 1
    return "completed"

Synchronization Primitives

Event and lock primitives for inter-coroutine synchronization.

class Event():
    """
    Event synchronization primitive for coordinating coroutines.
    """
    
    def set(self, data=None):
        """
        Set the event and wake all waiting coroutines.
        
        Parameters:
        - data: Optional data to pass to waiting coroutines
        """
    
    def wait(self):
        """
        Get trigger that fires when event is set.
        
        Returns:
        Trigger that can be awaited
        
        Usage:
        await event.wait()
        """
    
    def clear(self):
        """Clear the event."""
    
    def is_set(self) -> bool:
        """
        Check if event is currently set.
        
        Returns:
        True if event is set
        """
    
    @property
    def data(self):
        """
        Get data associated with the event.
        
        Returns:
        Data passed to set() method
        """

class Lock():
    """
    Lock synchronization primitive for mutual exclusion.
    """
    
    def acquire(self):
        """
        Get trigger that fires when lock is acquired.
        
        Returns:
        Trigger that can be awaited
        
        Usage:
        await lock.acquire()
        """
    
    def release(self):
        """Release the lock."""
    
    def locked(self) -> bool:
        """
        Check if lock is currently held.
        
        Returns:
        True if lock is held
        """

Usage Examples:

@cocotb.test()
async def synchronization_test(dut):
    """Test demonstrating synchronization primitives."""
    
    # Event coordination
    completion_event = Event()
    
    # Start producer and consumer
    producer_task = cocotb.start_soon(data_producer(dut, completion_event))
    consumer_task = cocotb.start_soon(data_consumer(dut, completion_event))
    
    # Wait for both to complete
    await producer_task.join()
    await consumer_task.join()

async def data_producer(dut, completion_event):
    """Produce data and signal completion."""
    for i in range(10):
        dut.data_buffer.value = i * 10
        await Timer(50, units="ns")
        cocotb.log.info(f"Produced: {i * 10}")
    
    # Signal completion to consumer
    completion_event.set(data="production_complete")

async def data_consumer(dut, completion_event):
    """Consume data and wait for completion."""
    consumed_count = 0
    
    while True:
        # Wait for either data or completion
        trigger = await First(
            Edge(dut.data_buffer),
            completion_event.wait()
        )
        
        if completion_event.is_set():
            cocotb.log.info(f"Production complete, consumed {consumed_count} items")
            break
        else:
            consumed_count += 1
            cocotb.log.info(f"Consumed: {dut.data_buffer.value}")

@cocotb.test()
async def lock_test(dut):
    """Test demonstrating lock usage."""
    
    shared_resource_lock = Lock()
    
    # Start multiple tasks that need exclusive access
    tasks = [
        cocotb.start_soon(exclusive_operation(dut, shared_resource_lock, i))
        for i in range(3)
    ]
    
    # Wait for all to complete
    for task in tasks:
        await task.join()

async def exclusive_operation(dut, lock, operation_id):
    """Perform operation requiring exclusive access."""
    cocotb.log.info(f"Operation {operation_id} waiting for lock")
    
    await lock.acquire()
    try:
        cocotb.log.info(f"Operation {operation_id} acquired lock")
        
        # Critical section
        dut.shared_register.value = operation_id
        await Timer(100, units="ns")
        
        assert dut.shared_register.value == operation_id
        cocotb.log.info(f"Operation {operation_id} completed successfully")
        
    finally:
        lock.release()
        cocotb.log.info(f"Operation {operation_id} released lock")

Special Triggers

Utility triggers for specific scenarios.

class NullTrigger(name="", outcome=None):
    """
    Fire immediately with optional outcome.

    Parameters:
    - name: Optional name for debugging
    - outcome: Optional outcome to return

    Usage:
    await NullTrigger()
    """

Timeout Utility

Add timeout capability to any trigger.

def with_timeout(trigger, timeout_time, timeout_unit="step", round_mode=None):
    """
    Add timeout to any trigger.

    Parameters:
    - trigger: Trigger to add timeout to
    - timeout_time: Timeout duration
    - timeout_unit: Timeout units
    - round_mode: Rounding mode

    Returns:
    Trigger that times out if original doesn't fire

    Raises:
    SimTimeoutError: If timeout occurs before trigger fires

    Usage:
    await with_timeout(RisingEdge(dut.done), 1000, "ns")
    """

Usage Examples:

from cocotb.result import SimTimeoutError

@cocotb.test()
async def timeout_test(dut):
    """Test demonstrating timeout usage."""
    
    # Operation with timeout protection
    try:
        await with_timeout(RisingEdge(dut.response), 500, "ns")
        cocotb.log.info("Response received on time")
    except SimTimeoutError:
        cocotb.log.error("Response timeout - continuing with default")
        dut.response.value = 1  # Provide default response
    
    # Immediate trigger
    await NullTrigger("immediate_completion")
    
    # Trigger with specific outcome
    result_trigger = NullTrigger("result", outcome=42)
    await result_trigger

Advanced Trigger Patterns

Custom Trigger Creation

class WaitUntilValue:
    """Custom trigger that waits for signal to reach specific value."""
    
    def __init__(self, signal, value, timeout_ns=1000):
        self.signal = signal
        self.value = value
        self.timeout_ns = timeout_ns
    
    async def __await__(self):
        start_time = get_sim_time("ns")
        
        while self.signal.value != self.value:
            await Edge(self.signal)
            current_time = get_sim_time("ns")
            
            if current_time - start_time > self.timeout_ns:
                raise SimTimeoutError(f"Signal never reached {self.value}")
        
        return self.signal.value

@cocotb.test()
async def custom_trigger_test(dut):
    """Test using custom trigger."""
    
    # Use custom trigger
    await WaitUntilValue(dut.state_machine, 0x5, timeout_ns=2000)
    cocotb.log.info("State machine reached target state")

Trigger Composition Patterns

@cocotb.test()
async def composition_test(dut):
    """Test showing trigger composition patterns."""
    
    # Complex condition: clock edge AND data valid AND not busy
    await Combine(
        RisingEdge(dut.clk),
        WaitUntilValue(dut.data_valid, 1),
        WaitUntilValue(dut.busy, 0)
    )
    
    # Race multiple completion conditions
    completion = await First(
        WaitUntilValue(dut.done, 1),
        WaitUntilValue(dut.error, 1),
        Timer(5000, "ns")  # Overall timeout
    )
    
    if dut.done.value:
        cocotb.log.info("Operation completed successfully")
    elif dut.error.value:
        cocotb.log.error("Operation failed")
    else:
        cocotb.log.warning("Operation timed out")

Install with Tessl CLI

npx tessl i tessl/pypi-cocotb

docs

binary-types.md

clock-generation.md

index.md

logging-utilities.md

signal-handling.md

task-management.md

test-framework.md

triggers-timing.md

tile.json