0
# Triggers and Timing
1
2
Cocotb's trigger system provides comprehensive time-based and event-based synchronization with the HDL simulator. Triggers enable coroutines to wait for specific simulation events, time delays, signal changes, and custom conditions.
3
4
## Capabilities
5
6
### Time-Based Triggers
7
8
Control simulation time progression with precise timing control and simulator phase awareness.
9
10
```python { .api }
11
class Timer(time, units="step", round_mode=None):
12
"""
13
Fire after specified simulation time duration.
14
15
Parameters:
16
- time: Time value (int, float, or decimal.Decimal)
17
- units: Time units ("fs", "ps", "ns", "us", "ms", "sec", "step")
18
- round_mode: Rounding mode for time conversion
19
20
Usage:
21
await Timer(100, units="ns")
22
"""
23
24
class ReadOnly():
25
"""
26
Fire during read-only phase of current timestep.
27
28
Useful for reading signal values after all combinatorial updates.
29
30
Usage:
31
await ReadOnly()
32
"""
33
34
class ReadWrite():
35
"""
36
Fire during read-write phase of current timestep.
37
38
Allows signal modifications within the current timestep.
39
40
Usage:
41
await ReadWrite()
42
"""
43
44
class NextTimeStep():
45
"""
46
Fire at the next simulation timestep.
47
48
Minimal time advancement to next simulator event.
49
50
Usage:
51
await NextTimeStep()
52
"""
53
```
54
55
**Usage Examples:**
56
57
```python
58
@cocotb.test()
59
async def timing_test(dut):
60
"""Test demonstrating time-based triggers."""
61
62
# Basic time delays
63
await Timer(100, units="ns")
64
dut.signal_a.value = 1
65
66
await Timer(50, units="ns")
67
dut.signal_b.value = 0
68
69
# Simulator phase control
70
dut.combo_input.value = 5
71
await ReadOnly() # Wait for combinatorial settling
72
result = dut.combo_output.value # Safe to read
73
74
# Minimal time step
75
await NextTimeStep()
76
77
# Precise timing with decimal
78
from decimal import Decimal
79
await Timer(Decimal("33.333"), units="ns")
80
81
@cocotb.test()
82
async def phase_test(dut):
83
"""Test demonstrating simulator phase awareness."""
84
85
# Set input and wait for combinatorial update
86
dut.addr.value = 0x100
87
await ReadOnly()
88
89
# Read the result after combinatorial settling
90
decoded_value = dut.chip_select.value
91
cocotb.log.info(f"Address 0x100 decoded to: {decoded_value}")
92
93
# Move to read-write phase for modifications
94
await ReadWrite()
95
dut.write_enable.value = 1
96
```
97
98
### Signal Edge Triggers
99
100
Synchronize with HDL signal transitions for event-driven testbench behavior.
101
102
```python { .api }
103
class RisingEdge(signal):
104
"""
105
Fire on signal rising edge (0/Z to 1/X transition).
106
107
Parameters:
108
- signal: HDL signal to monitor
109
110
Usage:
111
await RisingEdge(dut.clk)
112
"""
113
114
class FallingEdge(signal):
115
"""
116
Fire on signal falling edge (1/X to 0/Z transition).
117
118
Parameters:
119
- signal: HDL signal to monitor
120
121
Usage:
122
await FallingEdge(dut.reset)
123
"""
124
125
class Edge(signal):
126
"""
127
Fire on any signal change (any value change).
128
129
Parameters:
130
- signal: HDL signal to monitor
131
132
Usage:
133
await Edge(dut.data_bus)
134
"""
135
```
136
137
**Usage Examples:**
138
139
```python
140
@cocotb.test()
141
async def edge_test(dut):
142
"""Test demonstrating signal edge triggers."""
143
144
# Clock-synchronous operations
145
for i in range(10):
146
await RisingEdge(dut.clk)
147
dut.data_in.value = i
148
cocotb.log.info(f"Cycle {i}: Applied data {i}")
149
150
# Reset sequence
151
dut.reset_n.value = 0
152
await Timer(100, units="ns")
153
dut.reset_n.value = 1
154
await RisingEdge(dut.reset_n) # Wait for reset release
155
156
# Monitor any data change
157
data_changes = 0
158
for _ in range(50):
159
await Edge(dut.output_data)
160
data_changes += 1
161
cocotb.log.info(f"Data changed to: {dut.output_data.value}")
162
163
cocotb.log.info(f"Detected {data_changes} data changes")
164
165
@cocotb.test()
166
async def handshake_test(dut):
167
"""Test demonstrating handshake protocol."""
168
169
# Initiate request
170
dut.request.value = 1
171
dut.data_out.value = 0xABCD
172
173
# Wait for acknowledgment
174
await RisingEdge(dut.acknowledge)
175
cocotb.log.info("Request acknowledged")
176
177
# Complete handshake
178
dut.request.value = 0
179
await FallingEdge(dut.acknowledge)
180
cocotb.log.info("Handshake completed")
181
```
182
183
### Composite Triggers
184
185
Combine multiple triggers for complex synchronization patterns.
186
187
```python { .api }
188
class Combine(*triggers):
189
"""
190
Fire when ALL triggers have fired.
191
192
Parameters:
193
- *triggers: Variable number of triggers to combine
194
195
Usage:
196
await Combine(RisingEdge(dut.clk), Timer(100, "ns"))
197
"""
198
199
class First(*triggers):
200
"""
201
Fire when FIRST trigger fires.
202
203
Parameters:
204
- *triggers: Variable number of triggers to race
205
206
Usage:
207
await First(RisingEdge(dut.done), Timer(1000, "ns"))
208
"""
209
210
class Join(task):
211
"""
212
Fire when task completes.
213
214
Parameters:
215
- task: Task to wait for completion
216
217
Usage:
218
await Join(background_task)
219
"""
220
221
class ClockCycles(signal, num_cycles, rising=True):
222
"""
223
Fire after specified number of clock cycles.
224
225
Parameters:
226
- signal: Clock signal to count
227
- num_cycles: Number of cycles to wait
228
- rising: True for rising edges, False for falling edges
229
230
Usage:
231
await ClockCycles(dut.clk, 10)
232
"""
233
```
234
235
**Usage Examples:**
236
237
```python
238
@cocotb.test()
239
async def composite_test(dut):
240
"""Test demonstrating composite triggers."""
241
242
# Wait for both clock edge AND timer
243
await Combine(RisingEdge(dut.clk), Timer(100, units="ns"))
244
cocotb.log.info("Both clock edge and timer fired")
245
246
# Race between completion and timeout
247
background_task = cocotb.start_soon(slow_operation(dut))
248
249
result = await First(
250
Join(background_task),
251
Timer(500, units="ns") # Timeout
252
)
253
254
if background_task.done():
255
cocotb.log.info("Operation completed on time")
256
else:
257
cocotb.log.warning("Operation timed out")
258
background_task.kill()
259
260
# Wait for specific number of clock cycles
261
await ClockCycles(dut.clk, 5)
262
cocotb.log.info("5 clock cycles elapsed")
263
264
# Multi-signal synchronization
265
await Combine(
266
Edge(dut.valid),
267
Edge(dut.ready),
268
RisingEdge(dut.clk)
269
)
270
cocotb.log.info("Valid, ready, and clock all changed")
271
272
async def slow_operation(dut):
273
"""Simulate a slow operation."""
274
await Timer(300, units="ns")
275
dut.operation_done.value = 1
276
return "completed"
277
```
278
279
### Synchronization Primitives
280
281
Event and lock primitives for inter-coroutine synchronization.
282
283
```python { .api }
284
class Event():
285
"""
286
Event synchronization primitive for coordinating coroutines.
287
"""
288
289
def set(self, data=None):
290
"""
291
Set the event and wake all waiting coroutines.
292
293
Parameters:
294
- data: Optional data to pass to waiting coroutines
295
"""
296
297
def wait(self):
298
"""
299
Get trigger that fires when event is set.
300
301
Returns:
302
Trigger that can be awaited
303
304
Usage:
305
await event.wait()
306
"""
307
308
def clear(self):
309
"""Clear the event."""
310
311
def is_set(self) -> bool:
312
"""
313
Check if event is currently set.
314
315
Returns:
316
True if event is set
317
"""
318
319
@property
320
def data(self):
321
"""
322
Get data associated with the event.
323
324
Returns:
325
Data passed to set() method
326
"""
327
328
class Lock():
329
"""
330
Lock synchronization primitive for mutual exclusion.
331
"""
332
333
def acquire(self):
334
"""
335
Get trigger that fires when lock is acquired.
336
337
Returns:
338
Trigger that can be awaited
339
340
Usage:
341
await lock.acquire()
342
"""
343
344
def release(self):
345
"""Release the lock."""
346
347
def locked(self) -> bool:
348
"""
349
Check if lock is currently held.
350
351
Returns:
352
True if lock is held
353
"""
354
```
355
356
**Usage Examples:**
357
358
```python
359
@cocotb.test()
360
async def synchronization_test(dut):
361
"""Test demonstrating synchronization primitives."""
362
363
# Event coordination
364
completion_event = Event()
365
366
# Start producer and consumer
367
producer_task = cocotb.start_soon(data_producer(dut, completion_event))
368
consumer_task = cocotb.start_soon(data_consumer(dut, completion_event))
369
370
# Wait for both to complete
371
await producer_task.join()
372
await consumer_task.join()
373
374
async def data_producer(dut, completion_event):
375
"""Produce data and signal completion."""
376
for i in range(10):
377
dut.data_buffer.value = i * 10
378
await Timer(50, units="ns")
379
cocotb.log.info(f"Produced: {i * 10}")
380
381
# Signal completion to consumer
382
completion_event.set(data="production_complete")
383
384
async def data_consumer(dut, completion_event):
385
"""Consume data and wait for completion."""
386
consumed_count = 0
387
388
while True:
389
# Wait for either data or completion
390
trigger = await First(
391
Edge(dut.data_buffer),
392
completion_event.wait()
393
)
394
395
if completion_event.is_set():
396
cocotb.log.info(f"Production complete, consumed {consumed_count} items")
397
break
398
else:
399
consumed_count += 1
400
cocotb.log.info(f"Consumed: {dut.data_buffer.value}")
401
402
@cocotb.test()
403
async def lock_test(dut):
404
"""Test demonstrating lock usage."""
405
406
shared_resource_lock = Lock()
407
408
# Start multiple tasks that need exclusive access
409
tasks = [
410
cocotb.start_soon(exclusive_operation(dut, shared_resource_lock, i))
411
for i in range(3)
412
]
413
414
# Wait for all to complete
415
for task in tasks:
416
await task.join()
417
418
async def exclusive_operation(dut, lock, operation_id):
419
"""Perform operation requiring exclusive access."""
420
cocotb.log.info(f"Operation {operation_id} waiting for lock")
421
422
await lock.acquire()
423
try:
424
cocotb.log.info(f"Operation {operation_id} acquired lock")
425
426
# Critical section
427
dut.shared_register.value = operation_id
428
await Timer(100, units="ns")
429
430
assert dut.shared_register.value == operation_id
431
cocotb.log.info(f"Operation {operation_id} completed successfully")
432
433
finally:
434
lock.release()
435
cocotb.log.info(f"Operation {operation_id} released lock")
436
```
437
438
### Special Triggers
439
440
Utility triggers for specific scenarios.
441
442
```python { .api }
443
class NullTrigger(name="", outcome=None):
444
"""
445
Fire immediately with optional outcome.
446
447
Parameters:
448
- name: Optional name for debugging
449
- outcome: Optional outcome to return
450
451
Usage:
452
await NullTrigger()
453
"""
454
```
455
456
### Timeout Utility
457
458
Add timeout capability to any trigger.
459
460
```python { .api }
461
def with_timeout(trigger, timeout_time, timeout_unit="step", round_mode=None):
462
"""
463
Add timeout to any trigger.
464
465
Parameters:
466
- trigger: Trigger to add timeout to
467
- timeout_time: Timeout duration
468
- timeout_unit: Timeout units
469
- round_mode: Rounding mode
470
471
Returns:
472
Trigger that times out if original doesn't fire
473
474
Raises:
475
SimTimeoutError: If timeout occurs before trigger fires
476
477
Usage:
478
await with_timeout(RisingEdge(dut.done), 1000, "ns")
479
"""
480
```
481
482
**Usage Examples:**
483
484
```python
485
from cocotb.result import SimTimeoutError
486
487
@cocotb.test()
488
async def timeout_test(dut):
489
"""Test demonstrating timeout usage."""
490
491
# Operation with timeout protection
492
try:
493
await with_timeout(RisingEdge(dut.response), 500, "ns")
494
cocotb.log.info("Response received on time")
495
except SimTimeoutError:
496
cocotb.log.error("Response timeout - continuing with default")
497
dut.response.value = 1 # Provide default response
498
499
# Immediate trigger
500
await NullTrigger("immediate_completion")
501
502
# Trigger with specific outcome
503
result_trigger = NullTrigger("result", outcome=42)
504
await result_trigger
505
```
506
507
## Advanced Trigger Patterns
508
509
### Custom Trigger Creation
510
511
```python
512
class WaitUntilValue:
513
"""Custom trigger that waits for signal to reach specific value."""
514
515
def __init__(self, signal, value, timeout_ns=1000):
516
self.signal = signal
517
self.value = value
518
self.timeout_ns = timeout_ns
519
520
async def __await__(self):
521
start_time = get_sim_time("ns")
522
523
while self.signal.value != self.value:
524
await Edge(self.signal)
525
current_time = get_sim_time("ns")
526
527
if current_time - start_time > self.timeout_ns:
528
raise SimTimeoutError(f"Signal never reached {self.value}")
529
530
return self.signal.value
531
532
@cocotb.test()
533
async def custom_trigger_test(dut):
534
"""Test using custom trigger."""
535
536
# Use custom trigger
537
await WaitUntilValue(dut.state_machine, 0x5, timeout_ns=2000)
538
cocotb.log.info("State machine reached target state")
539
```
540
541
### Trigger Composition Patterns
542
543
```python
544
@cocotb.test()
545
async def composition_test(dut):
546
"""Test showing trigger composition patterns."""
547
548
# Complex condition: clock edge AND data valid AND not busy
549
await Combine(
550
RisingEdge(dut.clk),
551
WaitUntilValue(dut.data_valid, 1),
552
WaitUntilValue(dut.busy, 0)
553
)
554
555
# Race multiple completion conditions
556
completion = await First(
557
WaitUntilValue(dut.done, 1),
558
WaitUntilValue(dut.error, 1),
559
Timer(5000, "ns") # Overall timeout
560
)
561
562
if dut.done.value:
563
cocotb.log.info("Operation completed successfully")
564
elif dut.error.value:
565
cocotb.log.error("Operation failed")
566
else:
567
cocotb.log.warning("Operation timed out")
568
```