Cocotb is a coroutine-based cosimulation library for writing VHDL and Verilog testbenches in Python.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Cocotb's logging and utilities system provides simulation-aware logging, time manipulation functions, queue implementations, and debugging tools. The system integrates simulation time context with Python's logging framework and offers utilities for testbench development.
Logging system with simulation time context and hierarchical naming for testbench debugging.
class SimLog:
"""
Simulator-aware logger with hierarchical naming.
Automatically adds simulation time and hierarchical context to log messages.
Inherits from Python's logging.Logger with simulation-specific enhancements.
Methods:
- debug(msg, *args, **kwargs): Debug level logging
- info(msg, *args, **kwargs): Info level logging
- warning(msg, *args, **kwargs): Warning level logging
- error(msg, *args, **kwargs): Error level logging
- critical(msg, *args, **kwargs): Critical level logging
"""
class SimLogFormatter:
"""Custom log formatter for simulator logs."""
class SimColourLogFormatter(SimLogFormatter):
"""Colored log formatter for enhanced readability."""
class SimTimeContextFilter:
"""Filter to add simulation time context to log records."""
def default_config():
"""
Apply default logging configuration for cocotb.
Sets up simulation-aware logging with appropriate formatters
and filters for testbench development.
"""Usage Examples:
@cocotb.test()
async def logging_test(dut):
"""Test demonstrating simulation-aware logging."""
# Use cocotb's global logger
cocotb.log.info("Test started")
# Create hierarchical loggers
test_log = SimLog("test.main")
dut_log = SimLog("test.dut")
monitor_log = SimLog("test.monitor.data_bus")
# Log at different levels with time stamps
test_log.debug("Debug message - detailed info")
test_log.info("Test phase 1 starting")
test_log.warning("Potential timing issue detected")
# Simulate some time passing
await Timer(100, units="ns")
# Logs automatically include simulation time
dut_log.info(f"DUT state: {dut.state.value}")
monitor_log.info(f"Data bus activity: {dut.data_bus.value}")
await Timer(50, units="ns")
# Error logging
if dut.error_flag.value:
test_log.error("Error condition detected!")
test_log.info("Test completed successfully")
@cocotb.test()
async def hierarchical_logging_test(dut):
"""Test demonstrating hierarchical logging structure."""
# Create logger hierarchy for complex testbench
main_log = SimLog("testbench")
cpu_log = SimLog("testbench.cpu")
memory_log = SimLog("testbench.memory")
bus_log = SimLog("testbench.interconnect.bus")
# Coordinated logging from different components
main_log.info("Starting system-level test")
cpu_log.info("CPU initialization")
await Timer(25, units="ns")
memory_log.info("Memory subsystem ready")
await Timer(25, units="ns")
bus_log.info("Bus protocol check")
await Timer(25, units="ns")
main_log.info("All subsystems initialized")
# Component-specific detailed logging
for i in range(3):
cpu_log.debug(f"Instruction fetch cycle {i}")
bus_log.debug(f"Bus transaction {i}: read request")
memory_log.debug(f"Memory read from address 0x{i*4:08X}")
await Timer(20, units="ns")
main_log.info("Test sequence completed")Functions for simulation time manipulation and conversion between different time units.
def get_sim_time(units="step"):
"""
Get current simulation time.
Parameters:
- units: Time units to return ("fs", "ps", "ns", "us", "ms", "sec", "step")
Returns:
Current simulation time in specified units
"""
def get_time_from_sim_steps(steps, units):
"""
Convert simulation steps to time units.
Parameters:
- steps: Number of simulation steps
- units: Target time units
Returns:
Time value in specified units
"""
def get_sim_steps(time, units, round_mode=None):
"""
Convert time to simulation steps.
Parameters:
- time: Time value
- units: Time units of input
- round_mode: Rounding mode for conversion
Returns:
Number of simulation steps
"""Usage Examples:
@cocotb.test()
async def time_utilities_test(dut):
"""Test demonstrating time utility functions."""
# Record start time
start_time_ns = get_sim_time("ns")
start_time_ps = get_sim_time("ps")
start_time_steps = get_sim_time("step")
cocotb.log.info(f"Test start: {start_time_ns} ns, {start_time_ps} ps, {start_time_steps} steps")
# Simulate some time
await Timer(150, units="ns")
# Check elapsed time
current_time_ns = get_sim_time("ns")
elapsed_ns = current_time_ns - start_time_ns
cocotb.log.info(f"Elapsed time: {elapsed_ns} ns")
# Convert between units
steps_in_100ns = get_sim_steps(100, "ns")
time_from_steps = get_time_from_sim_steps(steps_in_100ns, "ps")
cocotb.log.info(f"100ns = {steps_in_100ns} steps = {time_from_steps} ps")
# Performance measurement
operation_start = get_sim_time("ps")
# Simulate some operation
for i in range(10):
dut.data.value = i
await Timer(5, units="ns")
operation_end = get_sim_time("ps")
operation_duration = operation_end - operation_start
cocotb.log.info(f"Operation took {operation_duration} ps ({operation_duration/1000} ns)")
@cocotb.test()
async def timing_analysis_test(dut):
"""Test demonstrating timing analysis with utilities."""
# Measure setup and hold times
timestamps = []
# Clock edge timing
for cycle in range(5):
await RisingEdge(dut.clk)
edge_time = get_sim_time("ps")
timestamps.append(edge_time)
# Setup time check
dut.data_in.value = cycle
setup_time = get_sim_time("ps")
cocotb.log.info(f"Cycle {cycle}: Clock edge at {edge_time} ps, data setup at {setup_time} ps")
# Check setup timing
setup_margin = edge_time - setup_time # Should be negative (data before clock)
if setup_margin > -1000: # Less than 1ns setup
cocotb.log.warning(f"Setup time violation: {setup_margin} ps")
# Calculate clock period statistics
periods = []
for i in range(1, len(timestamps)):
period = timestamps[i] - timestamps[i-1]
periods.append(period)
avg_period = sum(periods) / len(periods)
frequency_mhz = 1e6 / avg_period # Convert ps to MHz
cocotb.log.info(f"Average clock period: {avg_period} ps ({frequency_mhz:.2f} MHz)")Async queue implementations for producer-consumer patterns and inter-coroutine communication.
class Queue[T](maxsize=0):
"""
Generic async queue implementation.
Parameters:
- maxsize: Maximum queue size (0 for unlimited)
Methods:
- put(item): Put item into queue (async)
- get(): Get item from queue (async)
- put_nowait(item): Put item without waiting
- get_nowait(): Get item without waiting
- qsize(): Get current queue size
- empty(): Check if queue is empty
- full(): Check if queue is full
"""
async def put(self, item):
"""
Put item into queue.
Parameters:
- item: Item to add to queue
Blocks if queue is full until space available.
"""
async def get(self):
"""
Get item from queue.
Returns:
Item from queue
Blocks if queue is empty until item available.
"""
def put_nowait(self, item):
"""
Put item without waiting.
Parameters:
- item: Item to add
Raises:
QueueFull: If queue is at capacity
"""
def get_nowait(self):
"""
Get item without waiting.
Returns:
Item from queue
Raises:
QueueEmpty: If queue is empty
"""
def qsize(self) -> int:
"""Get current number of items in queue."""
@property
def maxsize(self) -> int:
"""Get maximum number of items allowed in queue."""
def empty(self) -> bool:
"""Check if queue is empty."""
def full(self) -> bool:
"""Check if queue is full."""
class PriorityQueue(maxsize=0):
"""
Priority queue implementation (smallest item first).
Items should be comparable or tuples of (priority, item).
"""
class LifoQueue(maxsize=0):
"""
LIFO queue (stack) implementation.
Last item put is first item retrieved.
"""
class QueueFull(Exception):
"""Raised when queue is full and put_nowait() is called."""
class QueueEmpty(Exception):
"""Raised when queue is empty and get_nowait() is called."""Usage Examples:
@cocotb.test()
async def queue_basic_test(dut):
"""Test demonstrating basic queue operations."""
# Create bounded queue
data_queue = Queue(maxsize=5)
# Producer coroutine
async def producer():
for i in range(10):
await data_queue.put(f"data_{i}")
cocotb.log.info(f"Produced: data_{i} (queue size: {data_queue.qsize()})")
await Timer(20, units="ns")
# Consumer coroutine
async def consumer():
consumed = []
while len(consumed) < 10:
item = await data_queue.get()
consumed.append(item)
cocotb.log.info(f"Consumed: {item} (queue size: {data_queue.qsize()})")
await Timer(30, units="ns") # Slower than producer
return consumed
# Run producer and consumer concurrently
producer_task = cocotb.start_soon(producer())
consumer_task = cocotb.start_soon(consumer())
# Wait for completion
await producer_task.join()
consumed_items = await consumer_task.join()
cocotb.log.info(f"Test completed. Consumed {len(consumed_items)} items")
@cocotb.test()
async def queue_types_test(dut):
"""Test demonstrating different queue types."""
# FIFO queue (default)
fifo_queue = Queue()
# LIFO queue (stack)
lifo_queue = LifoQueue()
# Priority queue
priority_queue = PriorityQueue()
# Fill queues with test data
test_items = ["first", "second", "third"]
priorities = [(3, "low"), (1, "high"), (2, "medium")]
# FIFO operations
for item in test_items:
await fifo_queue.put(item)
fifo_result = []
while not fifo_queue.empty():
fifo_result.append(await fifo_queue.get())
# LIFO operations
for item in test_items:
await lifo_queue.put(item)
lifo_result = []
while not lifo_queue.empty():
lifo_result.append(await lifo_queue.get())
# Priority operations
for priority_item in priorities:
await priority_queue.put(priority_item)
priority_result = []
while not priority_queue.empty():
priority_result.append(await priority_queue.get())
# Log results
cocotb.log.info(f"FIFO order: {fifo_result}")
cocotb.log.info(f"LIFO order: {lifo_result}")
cocotb.log.info(f"Priority order: {priority_result}")
@cocotb.test()
async def queue_flow_control_test(dut):
"""Test demonstrating queue flow control."""
# Small queue to test flow control
flow_queue = Queue(maxsize=2)
# Fast producer that will block
async def fast_producer():
for i in range(6):
cocotb.log.info(f"Attempting to put item_{i}")
await flow_queue.put(f"item_{i}")
cocotb.log.info(f"Successfully put item_{i}")
await Timer(10, units="ns")
# Slow consumer
async def slow_consumer():
consumed = []
for _ in range(6):
await Timer(50, units="ns") # Slow processing
item = await flow_queue.get()
consumed.append(item)
cocotb.log.info(f"Consumed: {item}")
return consumed
# Run with flow control
producer_task = cocotb.start_soon(fast_producer())
consumer_task = cocotb.start_soon(slow_consumer())
await consumer_task.join()
await producer_task.join()
cocotb.log.info("Flow control test completed")Tools for debugging coroutines, handling tracebacks, and development support.
def want_color_output() -> bool:
"""
Check if colored terminal output is wanted.
Returns:
True if colored output should be used
"""
def remove_traceback_frames(tb_or_exc, frame_names):
"""
Strip specific frames from traceback for cleaner error reporting.
Parameters:
- tb_or_exc: Traceback or exception object
- frame_names: List of frame names to remove
Returns:
Cleaned traceback
"""
def walk_coro_stack(coro):
"""
Walk coroutine call stack for debugging.
Parameters:
- coro: Coroutine to examine
Returns:
Iterator over stack frames
"""
def extract_coro_stack(coro, limit=None):
"""
Extract coroutine call stack as list.
Parameters:
- coro: Coroutine to examine
- limit: Maximum number of frames
Returns:
List of stack frames
"""Usage Examples:
@cocotb.test()
async def debugging_test(dut):
"""Test demonstrating debugging utilities."""
# Check terminal capabilities
if want_color_output():
cocotb.log.info("Color output is available")
else:
cocotb.log.info("Color output not available")
# Start a coroutine for stack inspection
debug_task = cocotb.start_soon(nested_coroutine_example(dut))
# Allow some execution
await Timer(50, units="ns")
# Extract coroutine stack
if not debug_task.done():
stack = extract_coro_stack(debug_task._handle, limit=5)
cocotb.log.info(f"Coroutine stack has {len(stack)} frames")
for i, frame in enumerate(stack):
cocotb.log.info(f" Frame {i}: {frame}")
# Wait for completion
await debug_task.join()
async def nested_coroutine_example(dut):
"""Example nested coroutine for stack inspection."""
async def level_1():
await level_2()
async def level_2():
await level_3()
async def level_3():
# This creates a deep call stack
await Timer(100, units="ns")
return "completed"
return await level_1()
@cocotb.test()
async def error_handling_test(dut):
"""Test demonstrating error handling utilities."""
async def failing_coroutine():
await Timer(25, units="ns")
raise ValueError("Intentional test error")
# Catch and analyze error
try:
await failing_coroutine()
except Exception as e:
cocotb.log.info(f"Caught exception: {e}")
# Get clean traceback
import traceback
tb = traceback.format_exc()
cocotb.log.info("Full traceback:")
for line in tb.split('\n'):
if line.strip():
cocotb.log.info(f" {line}")Legacy utility functions maintained for backwards compatibility.
def hexdump(x):
"""
DEPRECATED: Hexdump of buffer.
Parameters:
- x: Buffer to dump
Returns:
Hexadecimal representation string
"""
def hexdiffs(x, y):
"""
DEPRECATED: Show hex differences between buffers.
Parameters:
- x: First buffer
- y: Second buffer
Returns:
String showing differences
"""
def pack(ctypes_obj):
"""
DEPRECATED: Convert ctypes object to string.
Parameters:
- ctypes_obj: Ctypes object to convert
Returns:
String representation
"""
def unpack(ctypes_obj, string, bytes=None):
"""
DEPRECATED: Unpack string to ctypes object.
Parameters:
- ctypes_obj: Target ctypes object
- string: String to unpack
- bytes: Optional byte specification
Returns:
Unpacked ctypes object
"""Helper classes for advanced patterns and singleton implementations.
class ParametrizedSingleton(type):
"""
Metaclass for parametrized singleton pattern.
Creates singleton instances based on constructor parameters.
"""
class lazy_property:
"""
Property decorator for lazy evaluation.
Property value is computed only once and cached.
"""Usage Examples:
class ConfigManager(metaclass=ParametrizedSingleton):
"""Example using parametrized singleton."""
def __init__(self, config_name):
self.config_name = config_name
self.settings = {}
cocotb.log.info(f"Created config manager for: {config_name}")
class TestBench:
"""Example using lazy property."""
def __init__(self, dut):
self.dut = dut
@lazy_property
def memory_map(self):
"""Expensive computation done only once."""
cocotb.log.info("Computing memory map...")
# Simulate expensive operation
return {"base": 0x1000, "size": 0x1000}
@cocotb.test()
async def utility_classes_test(dut):
"""Test demonstrating utility classes."""
# Singleton pattern test
config1 = ConfigManager("test_config")
config2 = ConfigManager("test_config") # Same instance
config3 = ConfigManager("other_config") # Different instance
assert config1 is config2 # Same singleton
assert config1 is not config3 # Different parameter
# Lazy property test
tb = TestBench(dut)
# First access computes value
mem_map1 = tb.memory_map
# Second access uses cached value
mem_map2 = tb.memory_map
assert mem_map1 is mem_map2 # Same cached object
cocotb.log.info(f"Memory map: {mem_map1}")Install with Tessl CLI
npx tessl i tessl/pypi-cocotb