or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bit-timing.mdbus-operations.mdcli-tools.mdevent-system.mdfile-io.mdhardware-interfaces.mdindex.mdmessage-handling.mdperiodic-transmission.md

periodic-transmission.mddocs/

0

# Periodic Message Transmission

1

2

Advanced cyclic message transmission capabilities with configurable periods, durations, message modification callbacks, and lifecycle management for automotive and industrial applications requiring regular message transmission.

3

4

## Capabilities

5

6

### Cyclic Send Tasks

7

8

Base interface for periodic message transmission with lifecycle management.

9

10

```python { .api }

11

class CyclicSendTaskABC(ABC):

12

@abstractmethod

13

def stop(self) -> None:

14

"""Stop the cyclic transmission task."""

15

16

@property

17

@abstractmethod

18

def period(self) -> float:

19

"""Get the transmission period in seconds."""

20

21

class ModifiableCyclicTaskABC(CyclicSendTaskABC):

22

@abstractmethod

23

def modify_data(self, msg: Message) -> None:

24

"""

25

Modify message data for next transmission.

26

27

Parameters:

28

- msg: Message to modify (modified in-place)

29

"""

30

31

class RestartableCyclicTaskABC(CyclicSendTaskABC):

32

@abstractmethod

33

def start(self) -> None:

34

"""Start or restart the cyclic transmission."""

35

36

class LimitedDurationCyclicTaskABC(CyclicSendTaskABC):

37

@property

38

@abstractmethod

39

def duration(self) -> Optional[float]:

40

"""Get the transmission duration in seconds (None for unlimited)."""

41

```

42

43

### Bus Integration

44

45

Periodic transmission integrated with bus send_periodic method.

46

47

```python { .api }

48

def send_periodic(self, msgs, period: float, duration=None, store_task=True,

49

autostart=True, modifier_callback=None):

50

"""

51

Start sending messages at a given period on this bus.

52

53

Parameters:

54

- msgs: Message or sequence of messages to transmit

55

- period: Period in seconds between each message

56

- duration: Duration in seconds to continue sending (None for indefinite)

57

- store_task: If True, attach task to bus instance for lifecycle management

58

- autostart: If True, start task immediately after creation

59

- modifier_callback: Function to modify message data before each send

60

61

Returns:

62

CyclicSendTaskABC: Task instance for controlling transmission

63

64

The task will be active until:

65

- Optional duration expires

66

- Bus instance goes out of scope

67

- Bus instance is shut down

68

- stop_all_periodic_tasks() is called

69

- Task's stop() method is called

70

"""

71

72

def stop_all_periodic_tasks(self, remove_tasks=True) -> None:

73

"""

74

Stop all periodic tasks started by this bus.

75

76

Parameters:

77

- remove_tasks: Whether to stop tracking the stopped tasks

78

"""

79

```

80

81

### Thread-Based Implementation

82

83

Default threading-based implementation for periodic transmission.

84

85

```python { .api }

86

class ThreadBasedCyclicSendTask:

87

def __init__(self, bus, lock, messages, period: float, duration=None,

88

autostart=True, modifier_callback=None):

89

"""

90

Thread-based cyclic message transmission.

91

92

Parameters:

93

- bus: Bus instance to send messages on

94

- lock: Threading lock for send synchronization

95

- messages: Message(s) to transmit cyclically

96

- period: Transmission period in seconds

97

- duration: Maximum duration (seconds) or None for unlimited

98

- autostart: Whether to start immediately

99

- modifier_callback: Optional message modification function

100

"""

101

102

def stop(self) -> None:

103

"""Stop the transmission thread."""

104

105

def start(self) -> None:

106

"""Start the transmission thread."""

107

```

108

109

## Usage Examples

110

111

### Basic Periodic Transmission

112

113

```python

114

import can

115

import time

116

117

bus = can.Bus(channel='can0', interface='socketcan')

118

119

# Send heartbeat message every 100ms

120

heartbeat = can.Message(

121

arbitration_id=0x700,

122

data=[0x01, 0x02, 0x03, 0x04]

123

)

124

125

task = bus.send_periodic(heartbeat, period=0.1)

126

127

print("Sending heartbeat messages for 5 seconds...")

128

time.sleep(5)

129

130

task.stop()

131

bus.shutdown()

132

```

133

134

### Multiple Periodic Messages

135

136

```python

137

import can

138

import time

139

140

bus = can.Bus(channel='can0', interface='socketcan')

141

142

# Multiple messages with different periods

143

messages = [

144

can.Message(arbitration_id=0x100, data=[0x01]), # Status

145

can.Message(arbitration_id=0x200, data=[0x02]), # Sensor 1

146

can.Message(arbitration_id=0x300, data=[0x03]), # Sensor 2

147

]

148

149

# Send all messages every 50ms

150

task1 = bus.send_periodic(messages, period=0.05)

151

152

# Send individual high-priority message every 10ms

153

priority_msg = can.Message(arbitration_id=0x50, data=[0xFF])

154

task2 = bus.send_periodic(priority_msg, period=0.01)

155

156

print("Sending multiple periodic messages...")

157

time.sleep(10)

158

159

# Stop specific tasks

160

task1.stop()

161

task2.stop()

162

bus.shutdown()

163

```

164

165

### Limited Duration Transmission

166

167

```python

168

import can

169

import time

170

171

bus = can.Bus(channel='can0', interface='socketcan')

172

173

# Send for exactly 30 seconds then stop automatically

174

test_message = can.Message(

175

arbitration_id=0x123,

176

data=[0x11, 0x22, 0x33, 0x44]

177

)

178

179

task = bus.send_periodic(

180

test_message,

181

period=0.1, # Every 100ms

182

duration=30.0 # For 30 seconds

183

)

184

185

print("Sending test messages for 30 seconds...")

186

# Task will automatically stop after 30 seconds

187

time.sleep(35) # Wait a bit longer to confirm it stopped

188

189

bus.shutdown()

190

```

191

192

### Dynamic Message Modification

193

194

```python

195

import can

196

import time

197

198

bus = can.Bus(channel='can0', interface='socketcan')

199

200

# Counter that increments in message data

201

counter_msg = can.Message(

202

arbitration_id=0x400,

203

data=[0x00, 0x00, 0x00, 0x00] # 32-bit counter in bytes 0-3

204

)

205

206

def increment_counter(msg):

207

"""Increment 32-bit counter in message data."""

208

# Convert bytes to integer, increment, convert back

209

counter = int.from_bytes(msg.data[:4], 'big')

210

counter = (counter + 1) % (2**32) # Wrap at 32-bit limit

211

msg.data[:4] = counter.to_bytes(4, 'big')

212

213

task = bus.send_periodic(

214

counter_msg,

215

period=0.1,

216

modifier_callback=increment_counter

217

)

218

219

print("Sending incrementing counter...")

220

time.sleep(10)

221

222

task.stop()

223

bus.shutdown()

224

```

225

226

### Temperature Sensor Simulation

227

228

```python

229

import can

230

import time

231

import math

232

233

bus = can.Bus(channel='can0', interface='socketcan')

234

235

# Simulate temperature sensor with sine wave

236

temp_msg = can.Message(

237

arbitration_id=0x510,

238

data=[0x00, 0x00] # 16-bit temperature value

239

)

240

241

start_time = time.time()

242

243

def update_temperature(msg):

244

"""Update temperature with sine wave simulation."""

245

elapsed = time.time() - start_time

246

# Sine wave: 20°C ± 10°C with 30-second period

247

temperature = 20.0 + 10.0 * math.sin(2 * math.pi * elapsed / 30.0)

248

249

# Convert to 16-bit integer (0.1°C resolution)

250

temp_int = int(temperature * 10)

251

msg.data[:2] = temp_int.to_bytes(2, 'big', signed=True)

252

253

task = bus.send_periodic(

254

temp_msg,

255

period=0.5, # Every 500ms

256

modifier_callback=update_temperature

257

)

258

259

print("Simulating temperature sensor for 60 seconds...")

260

time.sleep(60)

261

262

task.stop()

263

bus.shutdown()

264

```

265

266

### Task Management

267

268

```python

269

import can

270

import time

271

272

bus = can.Bus(channel='can0', interface='socketcan')

273

274

# Create multiple tasks with different lifecycles

275

tasks = []

276

277

# Long-running heartbeat

278

heartbeat = can.Message(arbitration_id=0x700, data=[0x01])

279

tasks.append(bus.send_periodic(heartbeat, period=1.0))

280

281

# Medium-term status updates

282

status = can.Message(arbitration_id=0x701, data=[0x02])

283

tasks.append(bus.send_periodic(status, period=0.5, duration=30.0))

284

285

# Short burst of test messages

286

test = can.Message(arbitration_id=0x702, data=[0x03])

287

tasks.append(bus.send_periodic(test, period=0.1, duration=5.0))

288

289

print("Running multiple tasks with different lifecycles...")

290

291

# Monitor tasks

292

for i in range(60):

293

time.sleep(1)

294

active_tasks = [t for t in tasks if hasattr(t, '_thread') and t._thread.is_alive()]

295

print(f"Second {i+1}: {len(active_tasks)} tasks still active")

296

297

# Stop all remaining tasks

298

bus.stop_all_periodic_tasks()

299

bus.shutdown()

300

```

301

302

### Error Handling in Periodic Tasks

303

304

```python

305

import can

306

import time

307

308

class RobustPeriodicSender:

309

def __init__(self, bus, msg, period):

310

self.bus = bus

311

self.msg = msg

312

self.period = period

313

self.error_count = 0

314

self.max_errors = 10

315

316

def error_tolerant_callback(self, msg):

317

"""Callback that handles its own errors."""

318

try:

319

# Simulate some processing that might fail

320

if self.error_count < 3: # Fail first few times

321

self.error_count += 1

322

raise ValueError("Simulated processing error")

323

324

# Normal processing

325

msg.data[0] = (msg.data[0] + 1) % 256

326

327

except Exception as e:

328

print(f"Error in callback: {e}")

329

if self.error_count >= self.max_errors:

330

print("Too many errors, stopping task")

331

return False # Signal to stop

332

return True

333

334

bus = can.Bus(channel='test', interface='virtual')

335

336

msg = can.Message(arbitration_id=0x123, data=[0x00])

337

sender = RobustPeriodicSender(bus, msg, 0.1)

338

339

# This would need custom implementation to handle callback errors

340

# Showing the concept of error-aware periodic transmission

341

print("Demonstrating error handling in periodic tasks...")

342

343

bus.shutdown()

344

```

345

346

## Types

347

348

```python { .api }

349

from abc import ABC, abstractmethod

350

from typing import Optional, Union, Sequence, Callable

351

import threading

352

353

class CyclicSendTaskABC(ABC):

354

"""Abstract base class for cyclic send tasks."""

355

356

@abstractmethod

357

def stop(self) -> None: ...

358

359

@property

360

@abstractmethod

361

def period(self) -> float: ...

362

363

class ModifiableCyclicTaskABC(CyclicSendTaskABC):

364

"""Cyclic task that supports message modification."""

365

366

@abstractmethod

367

def modify_data(self, msg: Message) -> None: ...

368

369

class ThreadBasedCyclicSendTask(CyclicSendTaskABC):

370

"""Default thread-based implementation."""

371

372

def __init__(self, bus, lock: threading.Lock, messages,

373

period: float, duration: Optional[float] = None,

374

autostart: bool = True,

375

modifier_callback: Optional[Callable[[Message], None]] = None): ...

376

```