0
# Event System
1
2
Event-driven message handling through listeners and notifiers, enabling asynchronous message processing, filtering, buffering, and routing to multiple handlers simultaneously.
3
4
## Capabilities
5
6
### Base Listener Interface
7
8
Abstract base class for creating custom message handlers.
9
10
```python { .api }
11
class Listener(ABC):
12
@abstractmethod
13
def on_message_received(self, msg: Message) -> None:
14
"""
15
Handle received CAN message.
16
17
Parameters:
18
- msg: Received message object
19
"""
20
21
def __call__(self, msg: Message) -> None:
22
"""Callable interface - delegates to on_message_received."""
23
24
def on_error(self, exc: Exception) -> None:
25
"""
26
Handle exceptions in receive thread.
27
28
Parameters:
29
- exc: Exception that caused thread to stop
30
"""
31
32
def stop(self) -> None:
33
"""Clean up listener resources."""
34
```
35
36
### Message Notifier
37
38
Routes messages from CAN buses to multiple listeners with thread management.
39
40
```python { .api }
41
class Notifier:
42
def __init__(self, bus: Bus, listeners: list[Listener], timeout=1.0, loop=None):
43
"""
44
Create message notifier for routing bus messages.
45
46
Parameters:
47
- bus: CAN bus to read messages from
48
- listeners: List of listeners to receive messages
49
- timeout: Timeout for bus.recv() calls (seconds)
50
- loop: Asyncio event loop for async listeners
51
"""
52
53
def add_listener(self, listener: Listener) -> None:
54
"""Add a listener to receive messages."""
55
56
def remove_listener(self, listener: Listener) -> None:
57
"""Remove a listener from receiving messages."""
58
59
def stop(self, timeout=5) -> None:
60
"""Stop the notifier and all listeners."""
61
62
def __enter__(self):
63
"""Context manager entry."""
64
65
def __exit__(self, exc_type, exc_value, traceback):
66
"""Context manager exit with automatic stop."""
67
```
68
69
### Message Buffering
70
71
Buffer messages in memory for batch processing or delayed handling.
72
73
```python { .api }
74
class BufferedReader(Listener):
75
def __init__(self, buffer_size: int = None):
76
"""
77
Buffer messages in memory queue.
78
79
Parameters:
80
- buffer_size: Maximum buffer size (None for unlimited)
81
"""
82
83
def get_message(self, timeout: float = None) -> Message | None:
84
"""
85
Get next message from buffer.
86
87
Parameters:
88
- timeout: Maximum time to wait for message
89
90
Returns:
91
Next message or None on timeout
92
"""
93
94
def on_message_received(self, msg: Message) -> None:
95
"""Add message to buffer."""
96
97
class AsyncBufferedReader(Listener):
98
def __init__(self, loop=None):
99
"""Async version of BufferedReader."""
100
101
async def get_message(self) -> Message:
102
"""Asynchronously get next message from buffer."""
103
```
104
105
### Message Redirection
106
107
Redirect messages to other listeners or handlers.
108
109
```python { .api }
110
class RedirectReader(Listener):
111
def __init__(self, listener: Listener):
112
"""
113
Redirect messages to another listener.
114
115
Parameters:
116
- listener: Target listener for message redirection
117
"""
118
119
def on_message_received(self, msg: Message) -> None:
120
"""Forward message to target listener."""
121
```
122
123
## Usage Examples
124
125
### Basic Event Handling
126
127
```python
128
import can
129
130
class MyListener(can.Listener):
131
def on_message_received(self, msg):
132
print(f"Received: ID=0x{msg.arbitration_id:X}, Data={list(msg.data)}")
133
134
def on_error(self, exc):
135
print(f"Error: {exc}")
136
137
bus = can.Bus(channel='can0', interface='socketcan')
138
listener = MyListener()
139
140
# Manual message handling
141
for _ in range(10):
142
msg = bus.recv(timeout=1.0)
143
if msg:
144
listener(msg)
145
146
bus.shutdown()
147
```
148
149
### Automatic Event Distribution
150
151
```python
152
import can
153
import time
154
155
# Create multiple listeners
156
class CounterListener(can.Listener):
157
def __init__(self):
158
self.count = 0
159
160
def on_message_received(self, msg):
161
self.count += 1
162
if self.count % 100 == 0:
163
print(f"Processed {self.count} messages")
164
165
class FilterListener(can.Listener):
166
def __init__(self, target_id):
167
self.target_id = target_id
168
169
def on_message_received(self, msg):
170
if msg.arbitration_id == self.target_id:
171
print(f"Target message: {msg}")
172
173
bus = can.Bus(channel='can0', interface='socketcan')
174
175
listeners = [
176
CounterListener(),
177
FilterListener(0x123),
178
can.Printer(), # Print all messages
179
can.Logger('traffic.log') # Log all messages
180
]
181
182
# Start automatic distribution
183
notifier = can.Notifier(bus, listeners)
184
185
# Let it run for 30 seconds
186
time.sleep(30)
187
188
# Stop everything
189
notifier.stop()
190
bus.shutdown()
191
192
print(f"Total messages: {listeners[0].count}")
193
```
194
195
### Message Buffering
196
197
```python
198
import can
199
import threading
200
import time
201
202
bus = can.Bus(channel='can0', interface='socketcan')
203
buffer = can.BufferedReader()
204
205
# Start background message collection
206
notifier = can.Notifier(bus, [buffer])
207
208
# Process messages in batches
209
def process_batch():
210
batch = []
211
while len(batch) < 10:
212
msg = buffer.get_message(timeout=1.0)
213
if msg:
214
batch.append(msg)
215
else:
216
break
217
218
if batch:
219
print(f"Processing batch of {len(batch)} messages")
220
# Process batch...
221
222
# Run batch processing
223
for _ in range(5):
224
process_batch()
225
time.sleep(1)
226
227
notifier.stop()
228
bus.shutdown()
229
```
230
231
### Async Message Handling
232
233
```python
234
import can
235
import asyncio
236
237
async def async_message_handler():
238
bus = can.Bus(channel='test', interface='virtual')
239
buffer = can.AsyncBufferedReader()
240
241
# Start message collection
242
notifier = can.Notifier(bus, [buffer])
243
244
# Process messages asynchronously
245
try:
246
for _ in range(10):
247
msg = await buffer.get_message()
248
print(f"Async received: {msg}")
249
250
# Simulate async processing
251
await asyncio.sleep(0.1)
252
finally:
253
notifier.stop()
254
bus.shutdown()
255
256
# Run async handler
257
asyncio.run(async_message_handler())
258
```
259
260
### Custom Listener with State
261
262
```python
263
import can
264
import time
265
from collections import defaultdict
266
267
class StatisticsListener(can.Listener):
268
def __init__(self):
269
self.msg_counts = defaultdict(int)
270
self.first_seen = {}
271
self.last_seen = {}
272
self.start_time = time.time()
273
274
def on_message_received(self, msg):
275
msg_id = msg.arbitration_id
276
self.msg_counts[msg_id] += 1
277
278
if msg_id not in self.first_seen:
279
self.first_seen[msg_id] = msg.timestamp
280
self.last_seen[msg_id] = msg.timestamp
281
282
def print_statistics(self):
283
print("CAN Bus Statistics:")
284
print(f"Runtime: {time.time() - self.start_time:.2f} seconds")
285
print(f"Unique IDs: {len(self.msg_counts)}")
286
287
for msg_id, count in sorted(self.msg_counts.items()):
288
duration = self.last_seen[msg_id] - self.first_seen[msg_id]
289
rate = count / max(duration, 0.001) # Avoid division by zero
290
print(f"ID 0x{msg_id:X}: {count} messages, {rate:.1f} msg/s")
291
292
bus = can.Bus(channel='can0', interface='socketcan')
293
stats = StatisticsListener()
294
295
notifier = can.Notifier(bus, [stats])
296
time.sleep(60) # Collect for 1 minute
297
notifier.stop()
298
299
stats.print_statistics()
300
bus.shutdown()
301
```
302
303
## Types
304
305
```python { .api }
306
from abc import ABC, abstractmethod
307
from typing import Union, Callable, Optional, Any, List
308
from collections.abc import Awaitable
309
import asyncio
310
311
# Message recipient types
312
MessageRecipient = Union[Listener, Callable[[Message], Union[Awaitable[None], None]]]
313
314
class Listener(ABC):
315
"""Abstract base class for message listeners."""
316
317
@abstractmethod
318
def on_message_received(self, msg: Message) -> None: ...
319
320
def on_error(self, exc: Exception) -> None: ...
321
def stop(self) -> None: ...
322
def __call__(self, msg: Message) -> None: ...
323
```