or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

binary-types.mdclock-generation.mdindex.mdlogging-utilities.mdsignal-handling.mdtask-management.mdtest-framework.mdtriggers-timing.md

task-management.mddocs/

0

# Task Management

1

2

Cocotb's task management system enables concurrent coroutine execution with functions for creating, scheduling, and controlling tasks. The system supports immediate scheduling, yielding execution control, and task lifecycle management.

3

4

## Capabilities

5

6

### Immediate Task Scheduling

7

8

Schedule coroutines to run concurrently without yielding control from the calling context.

9

10

```python { .api }

11

def start_soon(coro: Union[Task, Coroutine]) -> Task:

12

"""

13

Schedule a coroutine to be run concurrently.

14

15

Note: This is not an async function, and the new task will not execute

16

until the calling task yields control.

17

18

Parameters:

19

- coro: Coroutine or Task to schedule

20

21

Returns:

22

Task object representing the scheduled coroutine

23

"""

24

```

25

26

**Usage Examples:**

27

28

```python

29

@cocotb.test()

30

async def concurrent_test(dut):

31

"""Test demonstrating concurrent task execution."""

32

33

# Schedule multiple tasks immediately

34

monitor_task = cocotb.start_soon(monitor_signals(dut))

35

stimulus_task = cocotb.start_soon(generate_stimulus(dut))

36

checker_task = cocotb.start_soon(check_responses(dut))

37

38

# Tasks start executing when this coroutine yields

39

await Timer(1000, units="ns")

40

41

# Clean up tasks

42

monitor_task.kill()

43

stimulus_task.kill()

44

checker_task.kill()

45

46

async def monitor_signals(dut):

47

"""Monitor signal changes continuously."""

48

while True:

49

await Edge(dut.data_bus)

50

cocotb.log.info(f"Data bus changed to: {dut.data_bus.value}")

51

52

async def generate_stimulus(dut):

53

"""Generate test stimulus."""

54

for i in range(10):

55

dut.input_data.value = i

56

await Timer(50, units="ns")

57

```

58

59

### Yielding Task Scheduling

60

61

Schedule coroutines and immediately yield control to allow pending tasks to execute.

62

63

```python { .api }

64

async def start(coro: Union[Task, Coroutine]) -> Task:

65

"""

66

Schedule a coroutine to be run concurrently, then yield control to allow pending tasks to execute.

67

68

The calling task will resume execution before control is returned to the simulator.

69

70

Parameters:

71

- coro: Coroutine or Task to schedule

72

73

Returns:

74

Task object representing the scheduled coroutine

75

"""

76

```

77

78

**Usage Examples:**

79

80

```python

81

@cocotb.test()

82

async def yielding_test(dut):

83

"""Test demonstrating yielding task scheduling."""

84

85

# Schedule and immediately allow execution

86

background_task = await cocotb.start(background_monitor(dut))

87

88

# Background task has opportunity to start before continuing

89

cocotb.log.info("Background task started")

90

91

# Continue with main test logic

92

for i in range(5):

93

dut.control.value = i

94

await Timer(100, units="ns")

95

96

# Clean up

97

background_task.kill()

98

99

async def background_monitor(dut):

100

"""Background monitoring task."""

101

cocotb.log.info("Background monitor started")

102

while True:

103

await RisingEdge(dut.clk)

104

if dut.error_flag.value:

105

cocotb.log.error("Error flag detected!")

106

```

107

108

### Task Creation

109

110

Create tasks without scheduling them for later execution control.

111

112

```python { .api }

113

def create_task(coro: Union[Task, Coroutine]) -> Task:

114

"""

115

Construct a coroutine into a Task without scheduling the Task.

116

117

The Task can later be scheduled with fork, start, or start_soon.

118

119

Parameters:

120

- coro: Coroutine to convert to Task

121

122

Returns:

123

Task object that can be scheduled later

124

"""

125

```

126

127

**Usage Examples:**

128

129

```python

130

@cocotb.test()

131

async def task_creation_test(dut):

132

"""Test demonstrating task creation and delayed scheduling."""

133

134

# Create tasks without scheduling

135

read_task = cocotb.create_task(read_sequence(dut, 0x100, 10))

136

write_task = cocotb.create_task(write_sequence(dut, 0x200, [1, 2, 3, 4]))

137

138

# Schedule tasks when needed

139

cocotb.start_soon(read_task)

140

await Timer(50, units="ns") # Delay between operations

141

142

cocotb.start_soon(write_task)

143

144

# Wait for completion

145

await read_task.join()

146

await write_task.join()

147

148

async def read_sequence(dut, base_addr, count):

149

"""Perform a sequence of read operations."""

150

results = []

151

for i in range(count):

152

dut.addr.value = base_addr + i

153

dut.read_enable.value = 1

154

await RisingEdge(dut.clk)

155

156

dut.read_enable.value = 0

157

await RisingEdge(dut.clk)

158

159

results.append(dut.data_out.value)

160

cocotb.log.info(f"Read {base_addr + i}: {dut.data_out.value}")

161

162

return results

163

```

164

165

### Deprecated Fork Function

166

167

Legacy function for task scheduling, replaced by start_soon and start.

168

169

```python { .api }

170

def fork(coro: Union[Task, Coroutine]) -> Task:

171

"""

172

Schedule a coroutine to be run concurrently.

173

174

DEPRECATED: This function has been deprecated in favor of start_soon and start.

175

In most cases you can simply substitute cocotb.fork with cocotb.start_soon.

176

177

Parameters:

178

- coro: Coroutine or Task to schedule

179

180

Returns:

181

Task object representing the scheduled coroutine

182

"""

183

```

184

185

**Migration Examples:**

186

187

```python

188

# OLD: Using deprecated fork

189

@cocotb.test()

190

async def old_style_test(dut):

191

task = cocotb.fork(background_process(dut)) # DEPRECATED

192

await Timer(100, units="ns")

193

task.kill()

194

195

# NEW: Using start_soon

196

@cocotb.test()

197

async def new_style_test(dut):

198

task = cocotb.start_soon(background_process(dut)) # PREFERRED

199

await Timer(100, units="ns")

200

task.kill()

201

202

# NEW: Using start for immediate execution

203

@cocotb.test()

204

async def immediate_test(dut):

205

task = await cocotb.start(background_process(dut)) # PREFERRED

206

await Timer(100, units="ns")

207

task.kill()

208

```

209

210

## Task Class

211

212

### Task Lifecycle Management

213

214

Tasks provide methods for monitoring and controlling coroutine execution.

215

216

```python { .api }

217

class Task:

218

"""

219

Represents a concurrent executing coroutine.

220

"""

221

222

def result(self):

223

"""

224

Get the result of the task.

225

226

Returns:

227

The return value of the coroutine

228

229

Raises:

230

Exception if task completed with error

231

"""

232

233

def exception(self):

234

"""

235

Get the exception that caused the task to fail.

236

237

Returns:

238

Exception object or None if task succeeded

239

"""

240

241

def done(self) -> bool:

242

"""

243

Check if the task is complete.

244

245

Returns:

246

True if task has finished (success or failure)

247

"""

248

249

def cancel(self):

250

"""

251

Cancel the task if it hasn't started executing.

252

253

Returns:

254

True if task was cancelled, False if already running

255

"""

256

257

def cancelled(self) -> bool:

258

"""

259

Check if the task was cancelled.

260

261

Returns:

262

True if task was cancelled before execution

263

"""

264

265

def kill(self):

266

"""

267

Terminate the task immediately.

268

269

Unlike cancel(), this stops a running task.

270

"""

271

272

async def join(self):

273

"""

274

Wait for the task to complete.

275

276

Returns:

277

The return value of the coroutine

278

279

Raises:

280

Exception if task failed

281

"""

282

283

def has_started(self) -> bool:

284

"""

285

Check if the task has started executing.

286

287

Returns:

288

True if task execution has begun

289

"""

290

```

291

292

**Usage Examples:**

293

294

```python

295

@cocotb.test()

296

async def task_control_test(dut):

297

"""Test demonstrating task lifecycle management."""

298

299

# Create and start a long-running task

300

long_task = cocotb.start_soon(long_running_process(dut))

301

302

# Check task status

303

assert not long_task.done()

304

assert long_task.has_started()

305

306

# Wait with timeout

307

try:

308

await with_timeout(long_task.join(), 500, "ns")

309

result = long_task.result()

310

cocotb.log.info(f"Task completed with result: {result}")

311

except SimTimeoutError:

312

cocotb.log.warning("Task timed out, terminating")

313

long_task.kill()

314

assert long_task.done()

315

316

@cocotb.test()

317

async def task_error_handling_test(dut):

318

"""Test demonstrating task error handling."""

319

320

# Start task that might fail

321

risky_task = cocotb.start_soon(risky_operation(dut))

322

323

# Wait for completion and handle errors

324

try:

325

await risky_task.join()

326

cocotb.log.info("Risky operation succeeded")

327

except Exception as e:

328

cocotb.log.error(f"Task failed with: {e}")

329

330

# Check error details

331

if risky_task.exception():

332

cocotb.log.error(f"Exception was: {risky_task.exception()}")

333

334

async def long_running_process(dut):

335

"""Simulate a long-running process."""

336

for i in range(100):

337

dut.counter.value = i

338

await Timer(10, units="ns")

339

return "completed"

340

341

async def risky_operation(dut):

342

"""Operation that might fail."""

343

await Timer(50, units="ns")

344

if dut.error_inject.value:

345

raise ValueError("Injected error occurred")

346

return "success"

347

```

348

349

## Advanced Task Patterns

350

351

### Task Synchronization

352

353

```python

354

@cocotb.test()

355

async def synchronized_test(dut):

356

"""Test showing task synchronization patterns."""

357

358

# Start multiple tasks

359

tasks = [

360

cocotb.start_soon(worker_task(dut, i))

361

for i in range(4)

362

]

363

364

# Wait for all to complete

365

results = []

366

for task in tasks:

367

result = await task.join()

368

results.append(result)

369

370

cocotb.log.info(f"All tasks completed: {results}")

371

372

async def worker_task(dut, worker_id):

373

"""Individual worker task."""

374

await Timer(worker_id * 10, units="ns") # Staggered start

375

376

# Do work

377

for i in range(5):

378

dut._id(f"worker_{worker_id}_output", extended=False).value = i

379

await Timer(20, units="ns")

380

381

return f"worker_{worker_id}_done"

382

```

383

384

## Task Utilities

385

386

### Task Factory Pattern

387

388

```python

389

def create_monitor_task(dut, signal_name, callback):

390

"""Factory function for creating monitor tasks."""

391

392

async def monitor_coroutine():

393

signal = dut._id(signal_name, extended=False)

394

while True:

395

await Edge(signal)

396

callback(signal.value)

397

398

return cocotb.create_task(monitor_coroutine())

399

400

@cocotb.test()

401

async def factory_test(dut):

402

"""Test using task factory pattern."""

403

404

def log_change(value):

405

cocotb.log.info(f"Signal changed to: {value}")

406

407

# Create monitors using factory

408

data_monitor = create_monitor_task(dut, "data_bus", log_change)

409

addr_monitor = create_monitor_task(dut, "addr_bus", log_change)

410

411

# Start monitors

412

cocotb.start_soon(data_monitor)

413

cocotb.start_soon(addr_monitor)

414

415

# Run test

416

await Timer(1000, units="ns")

417

418

# Clean up

419

data_monitor.kill()

420

addr_monitor.kill()

421

```