0
# Task Management
1
2
Cocotb's task management system enables concurrent coroutine execution with functions for creating, scheduling, and controlling tasks. The system supports immediate scheduling, yielding execution control, and task lifecycle management.
3
4
## Capabilities
5
6
### Immediate Task Scheduling
7
8
Schedule coroutines to run concurrently without yielding control from the calling context.
9
10
```python { .api }
11
def start_soon(coro: Union[Task, Coroutine]) -> Task:
12
"""
13
Schedule a coroutine to be run concurrently.
14
15
Note: This is not an async function, and the new task will not execute
16
until the calling task yields control.
17
18
Parameters:
19
- coro: Coroutine or Task to schedule
20
21
Returns:
22
Task object representing the scheduled coroutine
23
"""
24
```
25
26
**Usage Examples:**
27
28
```python
29
@cocotb.test()
30
async def concurrent_test(dut):
31
"""Test demonstrating concurrent task execution."""
32
33
# Schedule multiple tasks immediately
34
monitor_task = cocotb.start_soon(monitor_signals(dut))
35
stimulus_task = cocotb.start_soon(generate_stimulus(dut))
36
checker_task = cocotb.start_soon(check_responses(dut))
37
38
# Tasks start executing when this coroutine yields
39
await Timer(1000, units="ns")
40
41
# Clean up tasks
42
monitor_task.kill()
43
stimulus_task.kill()
44
checker_task.kill()
45
46
async def monitor_signals(dut):
47
"""Monitor signal changes continuously."""
48
while True:
49
await Edge(dut.data_bus)
50
cocotb.log.info(f"Data bus changed to: {dut.data_bus.value}")
51
52
async def generate_stimulus(dut):
53
"""Generate test stimulus."""
54
for i in range(10):
55
dut.input_data.value = i
56
await Timer(50, units="ns")
57
```
58
59
### Yielding Task Scheduling
60
61
Schedule coroutines and immediately yield control to allow pending tasks to execute.
62
63
```python { .api }
64
async def start(coro: Union[Task, Coroutine]) -> Task:
65
"""
66
Schedule a coroutine to be run concurrently, then yield control to allow pending tasks to execute.
67
68
The calling task will resume execution before control is returned to the simulator.
69
70
Parameters:
71
- coro: Coroutine or Task to schedule
72
73
Returns:
74
Task object representing the scheduled coroutine
75
"""
76
```
77
78
**Usage Examples:**
79
80
```python
81
@cocotb.test()
82
async def yielding_test(dut):
83
"""Test demonstrating yielding task scheduling."""
84
85
# Schedule and immediately allow execution
86
background_task = await cocotb.start(background_monitor(dut))
87
88
# Background task has opportunity to start before continuing
89
cocotb.log.info("Background task started")
90
91
# Continue with main test logic
92
for i in range(5):
93
dut.control.value = i
94
await Timer(100, units="ns")
95
96
# Clean up
97
background_task.kill()
98
99
async def background_monitor(dut):
100
"""Background monitoring task."""
101
cocotb.log.info("Background monitor started")
102
while True:
103
await RisingEdge(dut.clk)
104
if dut.error_flag.value:
105
cocotb.log.error("Error flag detected!")
106
```
107
108
### Task Creation
109
110
Create tasks without scheduling them for later execution control.
111
112
```python { .api }
113
def create_task(coro: Union[Task, Coroutine]) -> Task:
114
"""
115
Construct a coroutine into a Task without scheduling the Task.
116
117
The Task can later be scheduled with fork, start, or start_soon.
118
119
Parameters:
120
- coro: Coroutine to convert to Task
121
122
Returns:
123
Task object that can be scheduled later
124
"""
125
```
126
127
**Usage Examples:**
128
129
```python
130
@cocotb.test()
131
async def task_creation_test(dut):
132
"""Test demonstrating task creation and delayed scheduling."""
133
134
# Create tasks without scheduling
135
read_task = cocotb.create_task(read_sequence(dut, 0x100, 10))
136
write_task = cocotb.create_task(write_sequence(dut, 0x200, [1, 2, 3, 4]))
137
138
# Schedule tasks when needed
139
cocotb.start_soon(read_task)
140
await Timer(50, units="ns") # Delay between operations
141
142
cocotb.start_soon(write_task)
143
144
# Wait for completion
145
await read_task.join()
146
await write_task.join()
147
148
async def read_sequence(dut, base_addr, count):
149
"""Perform a sequence of read operations."""
150
results = []
151
for i in range(count):
152
dut.addr.value = base_addr + i
153
dut.read_enable.value = 1
154
await RisingEdge(dut.clk)
155
156
dut.read_enable.value = 0
157
await RisingEdge(dut.clk)
158
159
results.append(dut.data_out.value)
160
cocotb.log.info(f"Read {base_addr + i}: {dut.data_out.value}")
161
162
return results
163
```
164
165
### Deprecated Fork Function
166
167
Legacy function for task scheduling, replaced by start_soon and start.
168
169
```python { .api }
170
def fork(coro: Union[Task, Coroutine]) -> Task:
171
"""
172
Schedule a coroutine to be run concurrently.
173
174
DEPRECATED: This function has been deprecated in favor of start_soon and start.
175
In most cases you can simply substitute cocotb.fork with cocotb.start_soon.
176
177
Parameters:
178
- coro: Coroutine or Task to schedule
179
180
Returns:
181
Task object representing the scheduled coroutine
182
"""
183
```
184
185
**Migration Examples:**
186
187
```python
188
# OLD: Using deprecated fork
189
@cocotb.test()
190
async def old_style_test(dut):
191
task = cocotb.fork(background_process(dut)) # DEPRECATED
192
await Timer(100, units="ns")
193
task.kill()
194
195
# NEW: Using start_soon
196
@cocotb.test()
197
async def new_style_test(dut):
198
task = cocotb.start_soon(background_process(dut)) # PREFERRED
199
await Timer(100, units="ns")
200
task.kill()
201
202
# NEW: Using start for immediate execution
203
@cocotb.test()
204
async def immediate_test(dut):
205
task = await cocotb.start(background_process(dut)) # PREFERRED
206
await Timer(100, units="ns")
207
task.kill()
208
```
209
210
## Task Class
211
212
### Task Lifecycle Management
213
214
Tasks provide methods for monitoring and controlling coroutine execution.
215
216
```python { .api }
217
class Task:
218
"""
219
Represents a concurrent executing coroutine.
220
"""
221
222
def result(self):
223
"""
224
Get the result of the task.
225
226
Returns:
227
The return value of the coroutine
228
229
Raises:
230
Exception if task completed with error
231
"""
232
233
def exception(self):
234
"""
235
Get the exception that caused the task to fail.
236
237
Returns:
238
Exception object or None if task succeeded
239
"""
240
241
def done(self) -> bool:
242
"""
243
Check if the task is complete.
244
245
Returns:
246
True if task has finished (success or failure)
247
"""
248
249
def cancel(self):
250
"""
251
Cancel the task if it hasn't started executing.
252
253
Returns:
254
True if task was cancelled, False if already running
255
"""
256
257
def cancelled(self) -> bool:
258
"""
259
Check if the task was cancelled.
260
261
Returns:
262
True if task was cancelled before execution
263
"""
264
265
def kill(self):
266
"""
267
Terminate the task immediately.
268
269
Unlike cancel(), this stops a running task.
270
"""
271
272
async def join(self):
273
"""
274
Wait for the task to complete.
275
276
Returns:
277
The return value of the coroutine
278
279
Raises:
280
Exception if task failed
281
"""
282
283
def has_started(self) -> bool:
284
"""
285
Check if the task has started executing.
286
287
Returns:
288
True if task execution has begun
289
"""
290
```
291
292
**Usage Examples:**
293
294
```python
295
@cocotb.test()
296
async def task_control_test(dut):
297
"""Test demonstrating task lifecycle management."""
298
299
# Create and start a long-running task
300
long_task = cocotb.start_soon(long_running_process(dut))
301
302
# Check task status
303
assert not long_task.done()
304
assert long_task.has_started()
305
306
# Wait with timeout
307
try:
308
await with_timeout(long_task.join(), 500, "ns")
309
result = long_task.result()
310
cocotb.log.info(f"Task completed with result: {result}")
311
except SimTimeoutError:
312
cocotb.log.warning("Task timed out, terminating")
313
long_task.kill()
314
assert long_task.done()
315
316
@cocotb.test()
317
async def task_error_handling_test(dut):
318
"""Test demonstrating task error handling."""
319
320
# Start task that might fail
321
risky_task = cocotb.start_soon(risky_operation(dut))
322
323
# Wait for completion and handle errors
324
try:
325
await risky_task.join()
326
cocotb.log.info("Risky operation succeeded")
327
except Exception as e:
328
cocotb.log.error(f"Task failed with: {e}")
329
330
# Check error details
331
if risky_task.exception():
332
cocotb.log.error(f"Exception was: {risky_task.exception()}")
333
334
async def long_running_process(dut):
335
"""Simulate a long-running process."""
336
for i in range(100):
337
dut.counter.value = i
338
await Timer(10, units="ns")
339
return "completed"
340
341
async def risky_operation(dut):
342
"""Operation that might fail."""
343
await Timer(50, units="ns")
344
if dut.error_inject.value:
345
raise ValueError("Injected error occurred")
346
return "success"
347
```
348
349
## Advanced Task Patterns
350
351
### Task Synchronization
352
353
```python
354
@cocotb.test()
355
async def synchronized_test(dut):
356
"""Test showing task synchronization patterns."""
357
358
# Start multiple tasks
359
tasks = [
360
cocotb.start_soon(worker_task(dut, i))
361
for i in range(4)
362
]
363
364
# Wait for all to complete
365
results = []
366
for task in tasks:
367
result = await task.join()
368
results.append(result)
369
370
cocotb.log.info(f"All tasks completed: {results}")
371
372
async def worker_task(dut, worker_id):
373
"""Individual worker task."""
374
await Timer(worker_id * 10, units="ns") # Staggered start
375
376
# Do work
377
for i in range(5):
378
dut._id(f"worker_{worker_id}_output", extended=False).value = i
379
await Timer(20, units="ns")
380
381
return f"worker_{worker_id}_done"
382
```
383
384
## Task Utilities
385
386
### Task Factory Pattern
387
388
```python
389
def create_monitor_task(dut, signal_name, callback):
390
"""Factory function for creating monitor tasks."""
391
392
async def monitor_coroutine():
393
signal = dut._id(signal_name, extended=False)
394
while True:
395
await Edge(signal)
396
callback(signal.value)
397
398
return cocotb.create_task(monitor_coroutine())
399
400
@cocotb.test()
401
async def factory_test(dut):
402
"""Test using task factory pattern."""
403
404
def log_change(value):
405
cocotb.log.info(f"Signal changed to: {value}")
406
407
# Create monitors using factory
408
data_monitor = create_monitor_task(dut, "data_bus", log_change)
409
addr_monitor = create_monitor_task(dut, "addr_bus", log_change)
410
411
# Start monitors
412
cocotb.start_soon(data_monitor)
413
cocotb.start_soon(addr_monitor)
414
415
# Run test
416
await Timer(1000, units="ns")
417
418
# Clean up
419
data_monitor.kill()
420
addr_monitor.kill()
421
```