Cocotb is a coroutine-based cosimulation library for writing VHDL and Verilog testbenches in Python.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Cocotb's task management system enables concurrent coroutine execution with functions for creating, scheduling, and controlling tasks. The system supports immediate scheduling, yielding execution control, and task lifecycle management.
Schedule coroutines to run concurrently without yielding control from the calling context.
def start_soon(coro: Union[Task, Coroutine]) -> Task:
"""
Schedule a coroutine to be run concurrently.
Note: This is not an async function, and the new task will not execute
until the calling task yields control.
Parameters:
- coro: Coroutine or Task to schedule
Returns:
Task object representing the scheduled coroutine
"""Usage Examples:
@cocotb.test()
async def concurrent_test(dut):
"""Test demonstrating concurrent task execution."""
# Schedule multiple tasks immediately
monitor_task = cocotb.start_soon(monitor_signals(dut))
stimulus_task = cocotb.start_soon(generate_stimulus(dut))
checker_task = cocotb.start_soon(check_responses(dut))
# Tasks start executing when this coroutine yields
await Timer(1000, units="ns")
# Clean up tasks
monitor_task.kill()
stimulus_task.kill()
checker_task.kill()
async def monitor_signals(dut):
"""Monitor signal changes continuously."""
while True:
await Edge(dut.data_bus)
cocotb.log.info(f"Data bus changed to: {dut.data_bus.value}")
async def generate_stimulus(dut):
"""Generate test stimulus."""
for i in range(10):
dut.input_data.value = i
await Timer(50, units="ns")Schedule coroutines and immediately yield control to allow pending tasks to execute.
async def start(coro: Union[Task, Coroutine]) -> Task:
"""
Schedule a coroutine to be run concurrently, then yield control to allow pending tasks to execute.
The calling task will resume execution before control is returned to the simulator.
Parameters:
- coro: Coroutine or Task to schedule
Returns:
Task object representing the scheduled coroutine
"""Usage Examples:
@cocotb.test()
async def yielding_test(dut):
"""Test demonstrating yielding task scheduling."""
# Schedule and immediately allow execution
background_task = await cocotb.start(background_monitor(dut))
# Background task has opportunity to start before continuing
cocotb.log.info("Background task started")
# Continue with main test logic
for i in range(5):
dut.control.value = i
await Timer(100, units="ns")
# Clean up
background_task.kill()
async def background_monitor(dut):
"""Background monitoring task."""
cocotb.log.info("Background monitor started")
while True:
await RisingEdge(dut.clk)
if dut.error_flag.value:
cocotb.log.error("Error flag detected!")Create tasks without scheduling them for later execution control.
def create_task(coro: Union[Task, Coroutine]) -> Task:
"""
Construct a coroutine into a Task without scheduling the Task.
The Task can later be scheduled with fork, start, or start_soon.
Parameters:
- coro: Coroutine to convert to Task
Returns:
Task object that can be scheduled later
"""Usage Examples:
@cocotb.test()
async def task_creation_test(dut):
"""Test demonstrating task creation and delayed scheduling."""
# Create tasks without scheduling
read_task = cocotb.create_task(read_sequence(dut, 0x100, 10))
write_task = cocotb.create_task(write_sequence(dut, 0x200, [1, 2, 3, 4]))
# Schedule tasks when needed
cocotb.start_soon(read_task)
await Timer(50, units="ns") # Delay between operations
cocotb.start_soon(write_task)
# Wait for completion
await read_task.join()
await write_task.join()
async def read_sequence(dut, base_addr, count):
"""Perform a sequence of read operations."""
results = []
for i in range(count):
dut.addr.value = base_addr + i
dut.read_enable.value = 1
await RisingEdge(dut.clk)
dut.read_enable.value = 0
await RisingEdge(dut.clk)
results.append(dut.data_out.value)
cocotb.log.info(f"Read {base_addr + i}: {dut.data_out.value}")
return resultsLegacy function for task scheduling, replaced by start_soon and start.
def fork(coro: Union[Task, Coroutine]) -> Task:
"""
Schedule a coroutine to be run concurrently.
DEPRECATED: This function has been deprecated in favor of start_soon and start.
In most cases you can simply substitute cocotb.fork with cocotb.start_soon.
Parameters:
- coro: Coroutine or Task to schedule
Returns:
Task object representing the scheduled coroutine
"""Migration Examples:
# OLD: Using deprecated fork
@cocotb.test()
async def old_style_test(dut):
task = cocotb.fork(background_process(dut)) # DEPRECATED
await Timer(100, units="ns")
task.kill()
# NEW: Using start_soon
@cocotb.test()
async def new_style_test(dut):
task = cocotb.start_soon(background_process(dut)) # PREFERRED
await Timer(100, units="ns")
task.kill()
# NEW: Using start for immediate execution
@cocotb.test()
async def immediate_test(dut):
task = await cocotb.start(background_process(dut)) # PREFERRED
await Timer(100, units="ns")
task.kill()Tasks provide methods for monitoring and controlling coroutine execution.
class Task:
"""
Represents a concurrent executing coroutine.
"""
def result(self):
"""
Get the result of the task.
Returns:
The return value of the coroutine
Raises:
Exception if task completed with error
"""
def exception(self):
"""
Get the exception that caused the task to fail.
Returns:
Exception object or None if task succeeded
"""
def done(self) -> bool:
"""
Check if the task is complete.
Returns:
True if task has finished (success or failure)
"""
def cancel(self):
"""
Cancel the task if it hasn't started executing.
Returns:
True if task was cancelled, False if already running
"""
def cancelled(self) -> bool:
"""
Check if the task was cancelled.
Returns:
True if task was cancelled before execution
"""
def kill(self):
"""
Terminate the task immediately.
Unlike cancel(), this stops a running task.
"""
async def join(self):
"""
Wait for the task to complete.
Returns:
The return value of the coroutine
Raises:
Exception if task failed
"""
def has_started(self) -> bool:
"""
Check if the task has started executing.
Returns:
True if task execution has begun
"""Usage Examples:
@cocotb.test()
async def task_control_test(dut):
"""Test demonstrating task lifecycle management."""
# Create and start a long-running task
long_task = cocotb.start_soon(long_running_process(dut))
# Check task status
assert not long_task.done()
assert long_task.has_started()
# Wait with timeout
try:
await with_timeout(long_task.join(), 500, "ns")
result = long_task.result()
cocotb.log.info(f"Task completed with result: {result}")
except SimTimeoutError:
cocotb.log.warning("Task timed out, terminating")
long_task.kill()
assert long_task.done()
@cocotb.test()
async def task_error_handling_test(dut):
"""Test demonstrating task error handling."""
# Start task that might fail
risky_task = cocotb.start_soon(risky_operation(dut))
# Wait for completion and handle errors
try:
await risky_task.join()
cocotb.log.info("Risky operation succeeded")
except Exception as e:
cocotb.log.error(f"Task failed with: {e}")
# Check error details
if risky_task.exception():
cocotb.log.error(f"Exception was: {risky_task.exception()}")
async def long_running_process(dut):
"""Simulate a long-running process."""
for i in range(100):
dut.counter.value = i
await Timer(10, units="ns")
return "completed"
async def risky_operation(dut):
"""Operation that might fail."""
await Timer(50, units="ns")
if dut.error_inject.value:
raise ValueError("Injected error occurred")
return "success"@cocotb.test()
async def synchronized_test(dut):
"""Test showing task synchronization patterns."""
# Start multiple tasks
tasks = [
cocotb.start_soon(worker_task(dut, i))
for i in range(4)
]
# Wait for all to complete
results = []
for task in tasks:
result = await task.join()
results.append(result)
cocotb.log.info(f"All tasks completed: {results}")
async def worker_task(dut, worker_id):
"""Individual worker task."""
await Timer(worker_id * 10, units="ns") # Staggered start
# Do work
for i in range(5):
dut._id(f"worker_{worker_id}_output", extended=False).value = i
await Timer(20, units="ns")
return f"worker_{worker_id}_done"def create_monitor_task(dut, signal_name, callback):
"""Factory function for creating monitor tasks."""
async def monitor_coroutine():
signal = dut._id(signal_name, extended=False)
while True:
await Edge(signal)
callback(signal.value)
return cocotb.create_task(monitor_coroutine())
@cocotb.test()
async def factory_test(dut):
"""Test using task factory pattern."""
def log_change(value):
cocotb.log.info(f"Signal changed to: {value}")
# Create monitors using factory
data_monitor = create_monitor_task(dut, "data_bus", log_change)
addr_monitor = create_monitor_task(dut, "addr_bus", log_change)
# Start monitors
cocotb.start_soon(data_monitor)
cocotb.start_soon(addr_monitor)
# Run test
await Timer(1000, units="ns")
# Clean up
data_monitor.kill()
addr_monitor.kill()Install with Tessl CLI
npx tessl i tessl/pypi-cocotb