Cocotb is a coroutine-based cosimulation library for writing VHDL and Verilog testbenches in Python.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Cocotb's trigger system provides comprehensive time-based and event-based synchronization with the HDL simulator. Triggers enable coroutines to wait for specific simulation events, time delays, signal changes, and custom conditions.
Control simulation time progression with precise timing control and simulator phase awareness.
class Timer(time, units="step", round_mode=None):
"""
Fire after specified simulation time duration.
Parameters:
- time: Time value (int, float, or decimal.Decimal)
- units: Time units ("fs", "ps", "ns", "us", "ms", "sec", "step")
- round_mode: Rounding mode for time conversion
Usage:
await Timer(100, units="ns")
"""
class ReadOnly():
"""
Fire during read-only phase of current timestep.
Useful for reading signal values after all combinatorial updates.
Usage:
await ReadOnly()
"""
class ReadWrite():
"""
Fire during read-write phase of current timestep.
Allows signal modifications within the current timestep.
Usage:
await ReadWrite()
"""
class NextTimeStep():
"""
Fire at the next simulation timestep.
Minimal time advancement to next simulator event.
Usage:
await NextTimeStep()
"""Usage Examples:
@cocotb.test()
async def timing_test(dut):
"""Test demonstrating time-based triggers."""
# Basic time delays
await Timer(100, units="ns")
dut.signal_a.value = 1
await Timer(50, units="ns")
dut.signal_b.value = 0
# Simulator phase control
dut.combo_input.value = 5
await ReadOnly() # Wait for combinatorial settling
result = dut.combo_output.value # Safe to read
# Minimal time step
await NextTimeStep()
# Precise timing with decimal
from decimal import Decimal
await Timer(Decimal("33.333"), units="ns")
@cocotb.test()
async def phase_test(dut):
"""Test demonstrating simulator phase awareness."""
# Set input and wait for combinatorial update
dut.addr.value = 0x100
await ReadOnly()
# Read the result after combinatorial settling
decoded_value = dut.chip_select.value
cocotb.log.info(f"Address 0x100 decoded to: {decoded_value}")
# Move to read-write phase for modifications
await ReadWrite()
dut.write_enable.value = 1Synchronize with HDL signal transitions for event-driven testbench behavior.
class RisingEdge(signal):
"""
Fire on signal rising edge (0/Z to 1/X transition).
Parameters:
- signal: HDL signal to monitor
Usage:
await RisingEdge(dut.clk)
"""
class FallingEdge(signal):
"""
Fire on signal falling edge (1/X to 0/Z transition).
Parameters:
- signal: HDL signal to monitor
Usage:
await FallingEdge(dut.reset)
"""
class Edge(signal):
"""
Fire on any signal change (any value change).
Parameters:
- signal: HDL signal to monitor
Usage:
await Edge(dut.data_bus)
"""Usage Examples:
@cocotb.test()
async def edge_test(dut):
"""Test demonstrating signal edge triggers."""
# Clock-synchronous operations
for i in range(10):
await RisingEdge(dut.clk)
dut.data_in.value = i
cocotb.log.info(f"Cycle {i}: Applied data {i}")
# Reset sequence
dut.reset_n.value = 0
await Timer(100, units="ns")
dut.reset_n.value = 1
await RisingEdge(dut.reset_n) # Wait for reset release
# Monitor any data change
data_changes = 0
for _ in range(50):
await Edge(dut.output_data)
data_changes += 1
cocotb.log.info(f"Data changed to: {dut.output_data.value}")
cocotb.log.info(f"Detected {data_changes} data changes")
@cocotb.test()
async def handshake_test(dut):
"""Test demonstrating handshake protocol."""
# Initiate request
dut.request.value = 1
dut.data_out.value = 0xABCD
# Wait for acknowledgment
await RisingEdge(dut.acknowledge)
cocotb.log.info("Request acknowledged")
# Complete handshake
dut.request.value = 0
await FallingEdge(dut.acknowledge)
cocotb.log.info("Handshake completed")Combine multiple triggers for complex synchronization patterns.
class Combine(*triggers):
"""
Fire when ALL triggers have fired.
Parameters:
- *triggers: Variable number of triggers to combine
Usage:
await Combine(RisingEdge(dut.clk), Timer(100, "ns"))
"""
class First(*triggers):
"""
Fire when FIRST trigger fires.
Parameters:
- *triggers: Variable number of triggers to race
Usage:
await First(RisingEdge(dut.done), Timer(1000, "ns"))
"""
class Join(task):
"""
Fire when task completes.
Parameters:
- task: Task to wait for completion
Usage:
await Join(background_task)
"""
class ClockCycles(signal, num_cycles, rising=True):
"""
Fire after specified number of clock cycles.
Parameters:
- signal: Clock signal to count
- num_cycles: Number of cycles to wait
- rising: True for rising edges, False for falling edges
Usage:
await ClockCycles(dut.clk, 10)
"""Usage Examples:
@cocotb.test()
async def composite_test(dut):
"""Test demonstrating composite triggers."""
# Wait for both clock edge AND timer
await Combine(RisingEdge(dut.clk), Timer(100, units="ns"))
cocotb.log.info("Both clock edge and timer fired")
# Race between completion and timeout
background_task = cocotb.start_soon(slow_operation(dut))
result = await First(
Join(background_task),
Timer(500, units="ns") # Timeout
)
if background_task.done():
cocotb.log.info("Operation completed on time")
else:
cocotb.log.warning("Operation timed out")
background_task.kill()
# Wait for specific number of clock cycles
await ClockCycles(dut.clk, 5)
cocotb.log.info("5 clock cycles elapsed")
# Multi-signal synchronization
await Combine(
Edge(dut.valid),
Edge(dut.ready),
RisingEdge(dut.clk)
)
cocotb.log.info("Valid, ready, and clock all changed")
async def slow_operation(dut):
"""Simulate a slow operation."""
await Timer(300, units="ns")
dut.operation_done.value = 1
return "completed"Event and lock primitives for inter-coroutine synchronization.
class Event():
"""
Event synchronization primitive for coordinating coroutines.
"""
def set(self, data=None):
"""
Set the event and wake all waiting coroutines.
Parameters:
- data: Optional data to pass to waiting coroutines
"""
def wait(self):
"""
Get trigger that fires when event is set.
Returns:
Trigger that can be awaited
Usage:
await event.wait()
"""
def clear(self):
"""Clear the event."""
def is_set(self) -> bool:
"""
Check if event is currently set.
Returns:
True if event is set
"""
@property
def data(self):
"""
Get data associated with the event.
Returns:
Data passed to set() method
"""
class Lock():
"""
Lock synchronization primitive for mutual exclusion.
"""
def acquire(self):
"""
Get trigger that fires when lock is acquired.
Returns:
Trigger that can be awaited
Usage:
await lock.acquire()
"""
def release(self):
"""Release the lock."""
def locked(self) -> bool:
"""
Check if lock is currently held.
Returns:
True if lock is held
"""Usage Examples:
@cocotb.test()
async def synchronization_test(dut):
"""Test demonstrating synchronization primitives."""
# Event coordination
completion_event = Event()
# Start producer and consumer
producer_task = cocotb.start_soon(data_producer(dut, completion_event))
consumer_task = cocotb.start_soon(data_consumer(dut, completion_event))
# Wait for both to complete
await producer_task.join()
await consumer_task.join()
async def data_producer(dut, completion_event):
"""Produce data and signal completion."""
for i in range(10):
dut.data_buffer.value = i * 10
await Timer(50, units="ns")
cocotb.log.info(f"Produced: {i * 10}")
# Signal completion to consumer
completion_event.set(data="production_complete")
async def data_consumer(dut, completion_event):
"""Consume data and wait for completion."""
consumed_count = 0
while True:
# Wait for either data or completion
trigger = await First(
Edge(dut.data_buffer),
completion_event.wait()
)
if completion_event.is_set():
cocotb.log.info(f"Production complete, consumed {consumed_count} items")
break
else:
consumed_count += 1
cocotb.log.info(f"Consumed: {dut.data_buffer.value}")
@cocotb.test()
async def lock_test(dut):
"""Test demonstrating lock usage."""
shared_resource_lock = Lock()
# Start multiple tasks that need exclusive access
tasks = [
cocotb.start_soon(exclusive_operation(dut, shared_resource_lock, i))
for i in range(3)
]
# Wait for all to complete
for task in tasks:
await task.join()
async def exclusive_operation(dut, lock, operation_id):
"""Perform operation requiring exclusive access."""
cocotb.log.info(f"Operation {operation_id} waiting for lock")
await lock.acquire()
try:
cocotb.log.info(f"Operation {operation_id} acquired lock")
# Critical section
dut.shared_register.value = operation_id
await Timer(100, units="ns")
assert dut.shared_register.value == operation_id
cocotb.log.info(f"Operation {operation_id} completed successfully")
finally:
lock.release()
cocotb.log.info(f"Operation {operation_id} released lock")Utility triggers for specific scenarios.
class NullTrigger(name="", outcome=None):
"""
Fire immediately with optional outcome.
Parameters:
- name: Optional name for debugging
- outcome: Optional outcome to return
Usage:
await NullTrigger()
"""Add timeout capability to any trigger.
def with_timeout(trigger, timeout_time, timeout_unit="step", round_mode=None):
"""
Add timeout to any trigger.
Parameters:
- trigger: Trigger to add timeout to
- timeout_time: Timeout duration
- timeout_unit: Timeout units
- round_mode: Rounding mode
Returns:
Trigger that times out if original doesn't fire
Raises:
SimTimeoutError: If timeout occurs before trigger fires
Usage:
await with_timeout(RisingEdge(dut.done), 1000, "ns")
"""Usage Examples:
from cocotb.result import SimTimeoutError
@cocotb.test()
async def timeout_test(dut):
"""Test demonstrating timeout usage."""
# Operation with timeout protection
try:
await with_timeout(RisingEdge(dut.response), 500, "ns")
cocotb.log.info("Response received on time")
except SimTimeoutError:
cocotb.log.error("Response timeout - continuing with default")
dut.response.value = 1 # Provide default response
# Immediate trigger
await NullTrigger("immediate_completion")
# Trigger with specific outcome
result_trigger = NullTrigger("result", outcome=42)
await result_triggerclass WaitUntilValue:
"""Custom trigger that waits for signal to reach specific value."""
def __init__(self, signal, value, timeout_ns=1000):
self.signal = signal
self.value = value
self.timeout_ns = timeout_ns
async def __await__(self):
start_time = get_sim_time("ns")
while self.signal.value != self.value:
await Edge(self.signal)
current_time = get_sim_time("ns")
if current_time - start_time > self.timeout_ns:
raise SimTimeoutError(f"Signal never reached {self.value}")
return self.signal.value
@cocotb.test()
async def custom_trigger_test(dut):
"""Test using custom trigger."""
# Use custom trigger
await WaitUntilValue(dut.state_machine, 0x5, timeout_ns=2000)
cocotb.log.info("State machine reached target state")@cocotb.test()
async def composition_test(dut):
"""Test showing trigger composition patterns."""
# Complex condition: clock edge AND data valid AND not busy
await Combine(
RisingEdge(dut.clk),
WaitUntilValue(dut.data_valid, 1),
WaitUntilValue(dut.busy, 0)
)
# Race multiple completion conditions
completion = await First(
WaitUntilValue(dut.done, 1),
WaitUntilValue(dut.error, 1),
Timer(5000, "ns") # Overall timeout
)
if dut.done.value:
cocotb.log.info("Operation completed successfully")
elif dut.error.value:
cocotb.log.error("Operation failed")
else:
cocotb.log.warning("Operation timed out")Install with Tessl CLI
npx tessl i tessl/pypi-cocotb