or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

bit-timing.mdbus-operations.mdcli-tools.mdevent-system.mdfile-io.mdhardware-interfaces.mdindex.mdmessage-handling.mdperiodic-transmission.md

event-system.mddocs/

0

# Event System

1

2

Event-driven message handling through listeners and notifiers, enabling asynchronous message processing, filtering, buffering, and routing to multiple handlers simultaneously.

3

4

## Capabilities

5

6

### Base Listener Interface

7

8

Abstract base class for creating custom message handlers.

9

10

```python { .api }

11

class Listener(ABC):

12

@abstractmethod

13

def on_message_received(self, msg: Message) -> None:

14

"""

15

Handle received CAN message.

16

17

Parameters:

18

- msg: Received message object

19

"""

20

21

def __call__(self, msg: Message) -> None:

22

"""Callable interface - delegates to on_message_received."""

23

24

def on_error(self, exc: Exception) -> None:

25

"""

26

Handle exceptions in receive thread.

27

28

Parameters:

29

- exc: Exception that caused thread to stop

30

"""

31

32

def stop(self) -> None:

33

"""Clean up listener resources."""

34

```

35

36

### Message Notifier

37

38

Routes messages from CAN buses to multiple listeners with thread management.

39

40

```python { .api }

41

class Notifier:

42

def __init__(self, bus: Bus, listeners: list[Listener], timeout=1.0, loop=None):

43

"""

44

Create message notifier for routing bus messages.

45

46

Parameters:

47

- bus: CAN bus to read messages from

48

- listeners: List of listeners to receive messages

49

- timeout: Timeout for bus.recv() calls (seconds)

50

- loop: Asyncio event loop for async listeners

51

"""

52

53

def add_listener(self, listener: Listener) -> None:

54

"""Add a listener to receive messages."""

55

56

def remove_listener(self, listener: Listener) -> None:

57

"""Remove a listener from receiving messages."""

58

59

def stop(self, timeout=5) -> None:

60

"""Stop the notifier and all listeners."""

61

62

def __enter__(self):

63

"""Context manager entry."""

64

65

def __exit__(self, exc_type, exc_value, traceback):

66

"""Context manager exit with automatic stop."""

67

```

68

69

### Message Buffering

70

71

Buffer messages in memory for batch processing or delayed handling.

72

73

```python { .api }

74

class BufferedReader(Listener):

75

def __init__(self, buffer_size: int = None):

76

"""

77

Buffer messages in memory queue.

78

79

Parameters:

80

- buffer_size: Maximum buffer size (None for unlimited)

81

"""

82

83

def get_message(self, timeout: float = None) -> Message | None:

84

"""

85

Get next message from buffer.

86

87

Parameters:

88

- timeout: Maximum time to wait for message

89

90

Returns:

91

Next message or None on timeout

92

"""

93

94

def on_message_received(self, msg: Message) -> None:

95

"""Add message to buffer."""

96

97

class AsyncBufferedReader(Listener):

98

def __init__(self, loop=None):

99

"""Async version of BufferedReader."""

100

101

async def get_message(self) -> Message:

102

"""Asynchronously get next message from buffer."""

103

```

104

105

### Message Redirection

106

107

Redirect messages to other listeners or handlers.

108

109

```python { .api }

110

class RedirectReader(Listener):

111

def __init__(self, listener: Listener):

112

"""

113

Redirect messages to another listener.

114

115

Parameters:

116

- listener: Target listener for message redirection

117

"""

118

119

def on_message_received(self, msg: Message) -> None:

120

"""Forward message to target listener."""

121

```

122

123

## Usage Examples

124

125

### Basic Event Handling

126

127

```python

128

import can

129

130

class MyListener(can.Listener):

131

def on_message_received(self, msg):

132

print(f"Received: ID=0x{msg.arbitration_id:X}, Data={list(msg.data)}")

133

134

def on_error(self, exc):

135

print(f"Error: {exc}")

136

137

bus = can.Bus(channel='can0', interface='socketcan')

138

listener = MyListener()

139

140

# Manual message handling

141

for _ in range(10):

142

msg = bus.recv(timeout=1.0)

143

if msg:

144

listener(msg)

145

146

bus.shutdown()

147

```

148

149

### Automatic Event Distribution

150

151

```python

152

import can

153

import time

154

155

# Create multiple listeners

156

class CounterListener(can.Listener):

157

def __init__(self):

158

self.count = 0

159

160

def on_message_received(self, msg):

161

self.count += 1

162

if self.count % 100 == 0:

163

print(f"Processed {self.count} messages")

164

165

class FilterListener(can.Listener):

166

def __init__(self, target_id):

167

self.target_id = target_id

168

169

def on_message_received(self, msg):

170

if msg.arbitration_id == self.target_id:

171

print(f"Target message: {msg}")

172

173

bus = can.Bus(channel='can0', interface='socketcan')

174

175

listeners = [

176

CounterListener(),

177

FilterListener(0x123),

178

can.Printer(), # Print all messages

179

can.Logger('traffic.log') # Log all messages

180

]

181

182

# Start automatic distribution

183

notifier = can.Notifier(bus, listeners)

184

185

# Let it run for 30 seconds

186

time.sleep(30)

187

188

# Stop everything

189

notifier.stop()

190

bus.shutdown()

191

192

print(f"Total messages: {listeners[0].count}")

193

```

194

195

### Message Buffering

196

197

```python

198

import can

199

import threading

200

import time

201

202

bus = can.Bus(channel='can0', interface='socketcan')

203

buffer = can.BufferedReader()

204

205

# Start background message collection

206

notifier = can.Notifier(bus, [buffer])

207

208

# Process messages in batches

209

def process_batch():

210

batch = []

211

while len(batch) < 10:

212

msg = buffer.get_message(timeout=1.0)

213

if msg:

214

batch.append(msg)

215

else:

216

break

217

218

if batch:

219

print(f"Processing batch of {len(batch)} messages")

220

# Process batch...

221

222

# Run batch processing

223

for _ in range(5):

224

process_batch()

225

time.sleep(1)

226

227

notifier.stop()

228

bus.shutdown()

229

```

230

231

### Async Message Handling

232

233

```python

234

import can

235

import asyncio

236

237

async def async_message_handler():

238

bus = can.Bus(channel='test', interface='virtual')

239

buffer = can.AsyncBufferedReader()

240

241

# Start message collection

242

notifier = can.Notifier(bus, [buffer])

243

244

# Process messages asynchronously

245

try:

246

for _ in range(10):

247

msg = await buffer.get_message()

248

print(f"Async received: {msg}")

249

250

# Simulate async processing

251

await asyncio.sleep(0.1)

252

finally:

253

notifier.stop()

254

bus.shutdown()

255

256

# Run async handler

257

asyncio.run(async_message_handler())

258

```

259

260

### Custom Listener with State

261

262

```python

263

import can

264

import time

265

from collections import defaultdict

266

267

class StatisticsListener(can.Listener):

268

def __init__(self):

269

self.msg_counts = defaultdict(int)

270

self.first_seen = {}

271

self.last_seen = {}

272

self.start_time = time.time()

273

274

def on_message_received(self, msg):

275

msg_id = msg.arbitration_id

276

self.msg_counts[msg_id] += 1

277

278

if msg_id not in self.first_seen:

279

self.first_seen[msg_id] = msg.timestamp

280

self.last_seen[msg_id] = msg.timestamp

281

282

def print_statistics(self):

283

print("CAN Bus Statistics:")

284

print(f"Runtime: {time.time() - self.start_time:.2f} seconds")

285

print(f"Unique IDs: {len(self.msg_counts)}")

286

287

for msg_id, count in sorted(self.msg_counts.items()):

288

duration = self.last_seen[msg_id] - self.first_seen[msg_id]

289

rate = count / max(duration, 0.001) # Avoid division by zero

290

print(f"ID 0x{msg_id:X}: {count} messages, {rate:.1f} msg/s")

291

292

bus = can.Bus(channel='can0', interface='socketcan')

293

stats = StatisticsListener()

294

295

notifier = can.Notifier(bus, [stats])

296

time.sleep(60) # Collect for 1 minute

297

notifier.stop()

298

299

stats.print_statistics()

300

bus.shutdown()

301

```

302

303

## Types

304

305

```python { .api }

306

from abc import ABC, abstractmethod

307

from typing import Union, Callable, Optional, Any, List

308

from collections.abc import Awaitable

309

import asyncio

310

311

# Message recipient types

312

MessageRecipient = Union[Listener, Callable[[Message], Union[Awaitable[None], None]]]

313

314

class Listener(ABC):

315

"""Abstract base class for message listeners."""

316

317

@abstractmethod

318

def on_message_received(self, msg: Message) -> None: ...

319

320

def on_error(self, exc: Exception) -> None: ...

321

def stop(self) -> None: ...

322

def __call__(self, msg: Message) -> None: ...

323

```