or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

binary-types.mdclock-generation.mdindex.mdlogging-utilities.mdsignal-handling.mdtask-management.mdtest-framework.mdtriggers-timing.md

triggers-timing.mddocs/

0

# Triggers and Timing

1

2

Cocotb's trigger system provides comprehensive time-based and event-based synchronization with the HDL simulator. Triggers enable coroutines to wait for specific simulation events, time delays, signal changes, and custom conditions.

3

4

## Capabilities

5

6

### Time-Based Triggers

7

8

Control simulation time progression with precise timing control and simulator phase awareness.

9

10

```python { .api }

11

class Timer(time, units="step", round_mode=None):

12

"""

13

Fire after specified simulation time duration.

14

15

Parameters:

16

- time: Time value (int, float, or decimal.Decimal)

17

- units: Time units ("fs", "ps", "ns", "us", "ms", "sec", "step")

18

- round_mode: Rounding mode for time conversion

19

20

Usage:

21

await Timer(100, units="ns")

22

"""

23

24

class ReadOnly():

25

"""

26

Fire during read-only phase of current timestep.

27

28

Useful for reading signal values after all combinatorial updates.

29

30

Usage:

31

await ReadOnly()

32

"""

33

34

class ReadWrite():

35

"""

36

Fire during read-write phase of current timestep.

37

38

Allows signal modifications within the current timestep.

39

40

Usage:

41

await ReadWrite()

42

"""

43

44

class NextTimeStep():

45

"""

46

Fire at the next simulation timestep.

47

48

Minimal time advancement to next simulator event.

49

50

Usage:

51

await NextTimeStep()

52

"""

53

```

54

55

**Usage Examples:**

56

57

```python

58

@cocotb.test()

59

async def timing_test(dut):

60

"""Test demonstrating time-based triggers."""

61

62

# Basic time delays

63

await Timer(100, units="ns")

64

dut.signal_a.value = 1

65

66

await Timer(50, units="ns")

67

dut.signal_b.value = 0

68

69

# Simulator phase control

70

dut.combo_input.value = 5

71

await ReadOnly() # Wait for combinatorial settling

72

result = dut.combo_output.value # Safe to read

73

74

# Minimal time step

75

await NextTimeStep()

76

77

# Precise timing with decimal

78

from decimal import Decimal

79

await Timer(Decimal("33.333"), units="ns")

80

81

@cocotb.test()

82

async def phase_test(dut):

83

"""Test demonstrating simulator phase awareness."""

84

85

# Set input and wait for combinatorial update

86

dut.addr.value = 0x100

87

await ReadOnly()

88

89

# Read the result after combinatorial settling

90

decoded_value = dut.chip_select.value

91

cocotb.log.info(f"Address 0x100 decoded to: {decoded_value}")

92

93

# Move to read-write phase for modifications

94

await ReadWrite()

95

dut.write_enable.value = 1

96

```

97

98

### Signal Edge Triggers

99

100

Synchronize with HDL signal transitions for event-driven testbench behavior.

101

102

```python { .api }

103

class RisingEdge(signal):

104

"""

105

Fire on signal rising edge (0/Z to 1/X transition).

106

107

Parameters:

108

- signal: HDL signal to monitor

109

110

Usage:

111

await RisingEdge(dut.clk)

112

"""

113

114

class FallingEdge(signal):

115

"""

116

Fire on signal falling edge (1/X to 0/Z transition).

117

118

Parameters:

119

- signal: HDL signal to monitor

120

121

Usage:

122

await FallingEdge(dut.reset)

123

"""

124

125

class Edge(signal):

126

"""

127

Fire on any signal change (any value change).

128

129

Parameters:

130

- signal: HDL signal to monitor

131

132

Usage:

133

await Edge(dut.data_bus)

134

"""

135

```

136

137

**Usage Examples:**

138

139

```python

140

@cocotb.test()

141

async def edge_test(dut):

142

"""Test demonstrating signal edge triggers."""

143

144

# Clock-synchronous operations

145

for i in range(10):

146

await RisingEdge(dut.clk)

147

dut.data_in.value = i

148

cocotb.log.info(f"Cycle {i}: Applied data {i}")

149

150

# Reset sequence

151

dut.reset_n.value = 0

152

await Timer(100, units="ns")

153

dut.reset_n.value = 1

154

await RisingEdge(dut.reset_n) # Wait for reset release

155

156

# Monitor any data change

157

data_changes = 0

158

for _ in range(50):

159

await Edge(dut.output_data)

160

data_changes += 1

161

cocotb.log.info(f"Data changed to: {dut.output_data.value}")

162

163

cocotb.log.info(f"Detected {data_changes} data changes")

164

165

@cocotb.test()

166

async def handshake_test(dut):

167

"""Test demonstrating handshake protocol."""

168

169

# Initiate request

170

dut.request.value = 1

171

dut.data_out.value = 0xABCD

172

173

# Wait for acknowledgment

174

await RisingEdge(dut.acknowledge)

175

cocotb.log.info("Request acknowledged")

176

177

# Complete handshake

178

dut.request.value = 0

179

await FallingEdge(dut.acknowledge)

180

cocotb.log.info("Handshake completed")

181

```

182

183

### Composite Triggers

184

185

Combine multiple triggers for complex synchronization patterns.

186

187

```python { .api }

188

class Combine(*triggers):

189

"""

190

Fire when ALL triggers have fired.

191

192

Parameters:

193

- *triggers: Variable number of triggers to combine

194

195

Usage:

196

await Combine(RisingEdge(dut.clk), Timer(100, "ns"))

197

"""

198

199

class First(*triggers):

200

"""

201

Fire when FIRST trigger fires.

202

203

Parameters:

204

- *triggers: Variable number of triggers to race

205

206

Usage:

207

await First(RisingEdge(dut.done), Timer(1000, "ns"))

208

"""

209

210

class Join(task):

211

"""

212

Fire when task completes.

213

214

Parameters:

215

- task: Task to wait for completion

216

217

Usage:

218

await Join(background_task)

219

"""

220

221

class ClockCycles(signal, num_cycles, rising=True):

222

"""

223

Fire after specified number of clock cycles.

224

225

Parameters:

226

- signal: Clock signal to count

227

- num_cycles: Number of cycles to wait

228

- rising: True for rising edges, False for falling edges

229

230

Usage:

231

await ClockCycles(dut.clk, 10)

232

"""

233

```

234

235

**Usage Examples:**

236

237

```python

238

@cocotb.test()

239

async def composite_test(dut):

240

"""Test demonstrating composite triggers."""

241

242

# Wait for both clock edge AND timer

243

await Combine(RisingEdge(dut.clk), Timer(100, units="ns"))

244

cocotb.log.info("Both clock edge and timer fired")

245

246

# Race between completion and timeout

247

background_task = cocotb.start_soon(slow_operation(dut))

248

249

result = await First(

250

Join(background_task),

251

Timer(500, units="ns") # Timeout

252

)

253

254

if background_task.done():

255

cocotb.log.info("Operation completed on time")

256

else:

257

cocotb.log.warning("Operation timed out")

258

background_task.kill()

259

260

# Wait for specific number of clock cycles

261

await ClockCycles(dut.clk, 5)

262

cocotb.log.info("5 clock cycles elapsed")

263

264

# Multi-signal synchronization

265

await Combine(

266

Edge(dut.valid),

267

Edge(dut.ready),

268

RisingEdge(dut.clk)

269

)

270

cocotb.log.info("Valid, ready, and clock all changed")

271

272

async def slow_operation(dut):

273

"""Simulate a slow operation."""

274

await Timer(300, units="ns")

275

dut.operation_done.value = 1

276

return "completed"

277

```

278

279

### Synchronization Primitives

280

281

Event and lock primitives for inter-coroutine synchronization.

282

283

```python { .api }

284

class Event():

285

"""

286

Event synchronization primitive for coordinating coroutines.

287

"""

288

289

def set(self, data=None):

290

"""

291

Set the event and wake all waiting coroutines.

292

293

Parameters:

294

- data: Optional data to pass to waiting coroutines

295

"""

296

297

def wait(self):

298

"""

299

Get trigger that fires when event is set.

300

301

Returns:

302

Trigger that can be awaited

303

304

Usage:

305

await event.wait()

306

"""

307

308

def clear(self):

309

"""Clear the event."""

310

311

def is_set(self) -> bool:

312

"""

313

Check if event is currently set.

314

315

Returns:

316

True if event is set

317

"""

318

319

@property

320

def data(self):

321

"""

322

Get data associated with the event.

323

324

Returns:

325

Data passed to set() method

326

"""

327

328

class Lock():

329

"""

330

Lock synchronization primitive for mutual exclusion.

331

"""

332

333

def acquire(self):

334

"""

335

Get trigger that fires when lock is acquired.

336

337

Returns:

338

Trigger that can be awaited

339

340

Usage:

341

await lock.acquire()

342

"""

343

344

def release(self):

345

"""Release the lock."""

346

347

def locked(self) -> bool:

348

"""

349

Check if lock is currently held.

350

351

Returns:

352

True if lock is held

353

"""

354

```

355

356

**Usage Examples:**

357

358

```python

359

@cocotb.test()

360

async def synchronization_test(dut):

361

"""Test demonstrating synchronization primitives."""

362

363

# Event coordination

364

completion_event = Event()

365

366

# Start producer and consumer

367

producer_task = cocotb.start_soon(data_producer(dut, completion_event))

368

consumer_task = cocotb.start_soon(data_consumer(dut, completion_event))

369

370

# Wait for both to complete

371

await producer_task.join()

372

await consumer_task.join()

373

374

async def data_producer(dut, completion_event):

375

"""Produce data and signal completion."""

376

for i in range(10):

377

dut.data_buffer.value = i * 10

378

await Timer(50, units="ns")

379

cocotb.log.info(f"Produced: {i * 10}")

380

381

# Signal completion to consumer

382

completion_event.set(data="production_complete")

383

384

async def data_consumer(dut, completion_event):

385

"""Consume data and wait for completion."""

386

consumed_count = 0

387

388

while True:

389

# Wait for either data or completion

390

trigger = await First(

391

Edge(dut.data_buffer),

392

completion_event.wait()

393

)

394

395

if completion_event.is_set():

396

cocotb.log.info(f"Production complete, consumed {consumed_count} items")

397

break

398

else:

399

consumed_count += 1

400

cocotb.log.info(f"Consumed: {dut.data_buffer.value}")

401

402

@cocotb.test()

403

async def lock_test(dut):

404

"""Test demonstrating lock usage."""

405

406

shared_resource_lock = Lock()

407

408

# Start multiple tasks that need exclusive access

409

tasks = [

410

cocotb.start_soon(exclusive_operation(dut, shared_resource_lock, i))

411

for i in range(3)

412

]

413

414

# Wait for all to complete

415

for task in tasks:

416

await task.join()

417

418

async def exclusive_operation(dut, lock, operation_id):

419

"""Perform operation requiring exclusive access."""

420

cocotb.log.info(f"Operation {operation_id} waiting for lock")

421

422

await lock.acquire()

423

try:

424

cocotb.log.info(f"Operation {operation_id} acquired lock")

425

426

# Critical section

427

dut.shared_register.value = operation_id

428

await Timer(100, units="ns")

429

430

assert dut.shared_register.value == operation_id

431

cocotb.log.info(f"Operation {operation_id} completed successfully")

432

433

finally:

434

lock.release()

435

cocotb.log.info(f"Operation {operation_id} released lock")

436

```

437

438

### Special Triggers

439

440

Utility triggers for specific scenarios.

441

442

```python { .api }

443

class NullTrigger(name="", outcome=None):

444

"""

445

Fire immediately with optional outcome.

446

447

Parameters:

448

- name: Optional name for debugging

449

- outcome: Optional outcome to return

450

451

Usage:

452

await NullTrigger()

453

"""

454

```

455

456

### Timeout Utility

457

458

Add timeout capability to any trigger.

459

460

```python { .api }

461

def with_timeout(trigger, timeout_time, timeout_unit="step", round_mode=None):

462

"""

463

Add timeout to any trigger.

464

465

Parameters:

466

- trigger: Trigger to add timeout to

467

- timeout_time: Timeout duration

468

- timeout_unit: Timeout units

469

- round_mode: Rounding mode

470

471

Returns:

472

Trigger that times out if original doesn't fire

473

474

Raises:

475

SimTimeoutError: If timeout occurs before trigger fires

476

477

Usage:

478

await with_timeout(RisingEdge(dut.done), 1000, "ns")

479

"""

480

```

481

482

**Usage Examples:**

483

484

```python

485

from cocotb.result import SimTimeoutError

486

487

@cocotb.test()

488

async def timeout_test(dut):

489

"""Test demonstrating timeout usage."""

490

491

# Operation with timeout protection

492

try:

493

await with_timeout(RisingEdge(dut.response), 500, "ns")

494

cocotb.log.info("Response received on time")

495

except SimTimeoutError:

496

cocotb.log.error("Response timeout - continuing with default")

497

dut.response.value = 1 # Provide default response

498

499

# Immediate trigger

500

await NullTrigger("immediate_completion")

501

502

# Trigger with specific outcome

503

result_trigger = NullTrigger("result", outcome=42)

504

await result_trigger

505

```

506

507

## Advanced Trigger Patterns

508

509

### Custom Trigger Creation

510

511

```python

512

class WaitUntilValue:

513

"""Custom trigger that waits for signal to reach specific value."""

514

515

def __init__(self, signal, value, timeout_ns=1000):

516

self.signal = signal

517

self.value = value

518

self.timeout_ns = timeout_ns

519

520

async def __await__(self):

521

start_time = get_sim_time("ns")

522

523

while self.signal.value != self.value:

524

await Edge(self.signal)

525

current_time = get_sim_time("ns")

526

527

if current_time - start_time > self.timeout_ns:

528

raise SimTimeoutError(f"Signal never reached {self.value}")

529

530

return self.signal.value

531

532

@cocotb.test()

533

async def custom_trigger_test(dut):

534

"""Test using custom trigger."""

535

536

# Use custom trigger

537

await WaitUntilValue(dut.state_machine, 0x5, timeout_ns=2000)

538

cocotb.log.info("State machine reached target state")

539

```

540

541

### Trigger Composition Patterns

542

543

```python

544

@cocotb.test()

545

async def composition_test(dut):

546

"""Test showing trigger composition patterns."""

547

548

# Complex condition: clock edge AND data valid AND not busy

549

await Combine(

550

RisingEdge(dut.clk),

551

WaitUntilValue(dut.data_valid, 1),

552

WaitUntilValue(dut.busy, 0)

553

)

554

555

# Race multiple completion conditions

556

completion = await First(

557

WaitUntilValue(dut.done, 1),

558

WaitUntilValue(dut.error, 1),

559

Timer(5000, "ns") # Overall timeout

560

)

561

562

if dut.done.value:

563

cocotb.log.info("Operation completed successfully")

564

elif dut.error.value:

565

cocotb.log.error("Operation failed")

566

else:

567

cocotb.log.warning("Operation timed out")

568

```