0
# Periodic Message Transmission
1
2
Advanced cyclic message transmission capabilities with configurable periods, durations, message modification callbacks, and lifecycle management for automotive and industrial applications requiring regular message transmission.
3
4
## Capabilities
5
6
### Cyclic Send Tasks
7
8
Base interface for periodic message transmission with lifecycle management.
9
10
```python { .api }
11
class CyclicSendTaskABC(ABC):
12
@abstractmethod
13
def stop(self) -> None:
14
"""Stop the cyclic transmission task."""
15
16
@property
17
@abstractmethod
18
def period(self) -> float:
19
"""Get the transmission period in seconds."""
20
21
class ModifiableCyclicTaskABC(CyclicSendTaskABC):
22
@abstractmethod
23
def modify_data(self, msg: Message) -> None:
24
"""
25
Modify message data for next transmission.
26
27
Parameters:
28
- msg: Message to modify (modified in-place)
29
"""
30
31
class RestartableCyclicTaskABC(CyclicSendTaskABC):
32
@abstractmethod
33
def start(self) -> None:
34
"""Start or restart the cyclic transmission."""
35
36
class LimitedDurationCyclicTaskABC(CyclicSendTaskABC):
37
@property
38
@abstractmethod
39
def duration(self) -> Optional[float]:
40
"""Get the transmission duration in seconds (None for unlimited)."""
41
```
42
43
### Bus Integration
44
45
Periodic transmission integrated with bus send_periodic method.
46
47
```python { .api }
48
def send_periodic(self, msgs, period: float, duration=None, store_task=True,
49
autostart=True, modifier_callback=None):
50
"""
51
Start sending messages at a given period on this bus.
52
53
Parameters:
54
- msgs: Message or sequence of messages to transmit
55
- period: Period in seconds between each message
56
- duration: Duration in seconds to continue sending (None for indefinite)
57
- store_task: If True, attach task to bus instance for lifecycle management
58
- autostart: If True, start task immediately after creation
59
- modifier_callback: Function to modify message data before each send
60
61
Returns:
62
CyclicSendTaskABC: Task instance for controlling transmission
63
64
The task will be active until:
65
- Optional duration expires
66
- Bus instance goes out of scope
67
- Bus instance is shut down
68
- stop_all_periodic_tasks() is called
69
- Task's stop() method is called
70
"""
71
72
def stop_all_periodic_tasks(self, remove_tasks=True) -> None:
73
"""
74
Stop all periodic tasks started by this bus.
75
76
Parameters:
77
- remove_tasks: Whether to stop tracking the stopped tasks
78
"""
79
```
80
81
### Thread-Based Implementation
82
83
Default threading-based implementation for periodic transmission.
84
85
```python { .api }
86
class ThreadBasedCyclicSendTask:
87
def __init__(self, bus, lock, messages, period: float, duration=None,
88
autostart=True, modifier_callback=None):
89
"""
90
Thread-based cyclic message transmission.
91
92
Parameters:
93
- bus: Bus instance to send messages on
94
- lock: Threading lock for send synchronization
95
- messages: Message(s) to transmit cyclically
96
- period: Transmission period in seconds
97
- duration: Maximum duration (seconds) or None for unlimited
98
- autostart: Whether to start immediately
99
- modifier_callback: Optional message modification function
100
"""
101
102
def stop(self) -> None:
103
"""Stop the transmission thread."""
104
105
def start(self) -> None:
106
"""Start the transmission thread."""
107
```
108
109
## Usage Examples
110
111
### Basic Periodic Transmission
112
113
```python
114
import can
115
import time
116
117
bus = can.Bus(channel='can0', interface='socketcan')
118
119
# Send heartbeat message every 100ms
120
heartbeat = can.Message(
121
arbitration_id=0x700,
122
data=[0x01, 0x02, 0x03, 0x04]
123
)
124
125
task = bus.send_periodic(heartbeat, period=0.1)
126
127
print("Sending heartbeat messages for 5 seconds...")
128
time.sleep(5)
129
130
task.stop()
131
bus.shutdown()
132
```
133
134
### Multiple Periodic Messages
135
136
```python
137
import can
138
import time
139
140
bus = can.Bus(channel='can0', interface='socketcan')
141
142
# Multiple messages with different periods
143
messages = [
144
can.Message(arbitration_id=0x100, data=[0x01]), # Status
145
can.Message(arbitration_id=0x200, data=[0x02]), # Sensor 1
146
can.Message(arbitration_id=0x300, data=[0x03]), # Sensor 2
147
]
148
149
# Send all messages every 50ms
150
task1 = bus.send_periodic(messages, period=0.05)
151
152
# Send individual high-priority message every 10ms
153
priority_msg = can.Message(arbitration_id=0x50, data=[0xFF])
154
task2 = bus.send_periodic(priority_msg, period=0.01)
155
156
print("Sending multiple periodic messages...")
157
time.sleep(10)
158
159
# Stop specific tasks
160
task1.stop()
161
task2.stop()
162
bus.shutdown()
163
```
164
165
### Limited Duration Transmission
166
167
```python
168
import can
169
import time
170
171
bus = can.Bus(channel='can0', interface='socketcan')
172
173
# Send for exactly 30 seconds then stop automatically
174
test_message = can.Message(
175
arbitration_id=0x123,
176
data=[0x11, 0x22, 0x33, 0x44]
177
)
178
179
task = bus.send_periodic(
180
test_message,
181
period=0.1, # Every 100ms
182
duration=30.0 # For 30 seconds
183
)
184
185
print("Sending test messages for 30 seconds...")
186
# Task will automatically stop after 30 seconds
187
time.sleep(35) # Wait a bit longer to confirm it stopped
188
189
bus.shutdown()
190
```
191
192
### Dynamic Message Modification
193
194
```python
195
import can
196
import time
197
198
bus = can.Bus(channel='can0', interface='socketcan')
199
200
# Counter that increments in message data
201
counter_msg = can.Message(
202
arbitration_id=0x400,
203
data=[0x00, 0x00, 0x00, 0x00] # 32-bit counter in bytes 0-3
204
)
205
206
def increment_counter(msg):
207
"""Increment 32-bit counter in message data."""
208
# Convert bytes to integer, increment, convert back
209
counter = int.from_bytes(msg.data[:4], 'big')
210
counter = (counter + 1) % (2**32) # Wrap at 32-bit limit
211
msg.data[:4] = counter.to_bytes(4, 'big')
212
213
task = bus.send_periodic(
214
counter_msg,
215
period=0.1,
216
modifier_callback=increment_counter
217
)
218
219
print("Sending incrementing counter...")
220
time.sleep(10)
221
222
task.stop()
223
bus.shutdown()
224
```
225
226
### Temperature Sensor Simulation
227
228
```python
229
import can
230
import time
231
import math
232
233
bus = can.Bus(channel='can0', interface='socketcan')
234
235
# Simulate temperature sensor with sine wave
236
temp_msg = can.Message(
237
arbitration_id=0x510,
238
data=[0x00, 0x00] # 16-bit temperature value
239
)
240
241
start_time = time.time()
242
243
def update_temperature(msg):
244
"""Update temperature with sine wave simulation."""
245
elapsed = time.time() - start_time
246
# Sine wave: 20°C ± 10°C with 30-second period
247
temperature = 20.0 + 10.0 * math.sin(2 * math.pi * elapsed / 30.0)
248
249
# Convert to 16-bit integer (0.1°C resolution)
250
temp_int = int(temperature * 10)
251
msg.data[:2] = temp_int.to_bytes(2, 'big', signed=True)
252
253
task = bus.send_periodic(
254
temp_msg,
255
period=0.5, # Every 500ms
256
modifier_callback=update_temperature
257
)
258
259
print("Simulating temperature sensor for 60 seconds...")
260
time.sleep(60)
261
262
task.stop()
263
bus.shutdown()
264
```
265
266
### Task Management
267
268
```python
269
import can
270
import time
271
272
bus = can.Bus(channel='can0', interface='socketcan')
273
274
# Create multiple tasks with different lifecycles
275
tasks = []
276
277
# Long-running heartbeat
278
heartbeat = can.Message(arbitration_id=0x700, data=[0x01])
279
tasks.append(bus.send_periodic(heartbeat, period=1.0))
280
281
# Medium-term status updates
282
status = can.Message(arbitration_id=0x701, data=[0x02])
283
tasks.append(bus.send_periodic(status, period=0.5, duration=30.0))
284
285
# Short burst of test messages
286
test = can.Message(arbitration_id=0x702, data=[0x03])
287
tasks.append(bus.send_periodic(test, period=0.1, duration=5.0))
288
289
print("Running multiple tasks with different lifecycles...")
290
291
# Monitor tasks
292
for i in range(60):
293
time.sleep(1)
294
active_tasks = [t for t in tasks if hasattr(t, '_thread') and t._thread.is_alive()]
295
print(f"Second {i+1}: {len(active_tasks)} tasks still active")
296
297
# Stop all remaining tasks
298
bus.stop_all_periodic_tasks()
299
bus.shutdown()
300
```
301
302
### Error Handling in Periodic Tasks
303
304
```python
305
import can
306
import time
307
308
class RobustPeriodicSender:
309
def __init__(self, bus, msg, period):
310
self.bus = bus
311
self.msg = msg
312
self.period = period
313
self.error_count = 0
314
self.max_errors = 10
315
316
def error_tolerant_callback(self, msg):
317
"""Callback that handles its own errors."""
318
try:
319
# Simulate some processing that might fail
320
if self.error_count < 3: # Fail first few times
321
self.error_count += 1
322
raise ValueError("Simulated processing error")
323
324
# Normal processing
325
msg.data[0] = (msg.data[0] + 1) % 256
326
327
except Exception as e:
328
print(f"Error in callback: {e}")
329
if self.error_count >= self.max_errors:
330
print("Too many errors, stopping task")
331
return False # Signal to stop
332
return True
333
334
bus = can.Bus(channel='test', interface='virtual')
335
336
msg = can.Message(arbitration_id=0x123, data=[0x00])
337
sender = RobustPeriodicSender(bus, msg, 0.1)
338
339
# This would need custom implementation to handle callback errors
340
# Showing the concept of error-aware periodic transmission
341
print("Demonstrating error handling in periodic tasks...")
342
343
bus.shutdown()
344
```
345
346
## Types
347
348
```python { .api }
349
from abc import ABC, abstractmethod
350
from typing import Optional, Union, Sequence, Callable
351
import threading
352
353
class CyclicSendTaskABC(ABC):
354
"""Abstract base class for cyclic send tasks."""
355
356
@abstractmethod
357
def stop(self) -> None: ...
358
359
@property
360
@abstractmethod
361
def period(self) -> float: ...
362
363
class ModifiableCyclicTaskABC(CyclicSendTaskABC):
364
"""Cyclic task that supports message modification."""
365
366
@abstractmethod
367
def modify_data(self, msg: Message) -> None: ...
368
369
class ThreadBasedCyclicSendTask(CyclicSendTaskABC):
370
"""Default thread-based implementation."""
371
372
def __init__(self, bus, lock: threading.Lock, messages,
373
period: float, duration: Optional[float] = None,
374
autostart: bool = True,
375
modifier_callback: Optional[Callable[[Message], None]] = None): ...
376
```