Controller Area Network interface module for Python providing common abstractions for CAN hardware devices and message handling utilities
—
Event-driven message handling through listeners and notifiers, enabling asynchronous message processing, filtering, buffering, and routing to multiple handlers simultaneously.
Abstract base class for creating custom message handlers.
class Listener(ABC):
@abstractmethod
def on_message_received(self, msg: Message) -> None:
"""
Handle received CAN message.
Parameters:
- msg: Received message object
"""
def __call__(self, msg: Message) -> None:
"""Callable interface - delegates to on_message_received."""
def on_error(self, exc: Exception) -> None:
"""
Handle exceptions in receive thread.
Parameters:
- exc: Exception that caused thread to stop
"""
def stop(self) -> None:
"""Clean up listener resources."""Routes messages from CAN buses to multiple listeners with thread management.
class Notifier:
def __init__(self, bus: Bus, listeners: list[Listener], timeout=1.0, loop=None):
"""
Create message notifier for routing bus messages.
Parameters:
- bus: CAN bus to read messages from
- listeners: List of listeners to receive messages
- timeout: Timeout for bus.recv() calls (seconds)
- loop: Asyncio event loop for async listeners
"""
def add_listener(self, listener: Listener) -> None:
"""Add a listener to receive messages."""
def remove_listener(self, listener: Listener) -> None:
"""Remove a listener from receiving messages."""
def stop(self, timeout=5) -> None:
"""Stop the notifier and all listeners."""
def __enter__(self):
"""Context manager entry."""
def __exit__(self, exc_type, exc_value, traceback):
"""Context manager exit with automatic stop."""Buffer messages in memory for batch processing or delayed handling.
class BufferedReader(Listener):
def __init__(self, buffer_size: int = None):
"""
Buffer messages in memory queue.
Parameters:
- buffer_size: Maximum buffer size (None for unlimited)
"""
def get_message(self, timeout: float = None) -> Message | None:
"""
Get next message from buffer.
Parameters:
- timeout: Maximum time to wait for message
Returns:
Next message or None on timeout
"""
def on_message_received(self, msg: Message) -> None:
"""Add message to buffer."""
class AsyncBufferedReader(Listener):
def __init__(self, loop=None):
"""Async version of BufferedReader."""
async def get_message(self) -> Message:
"""Asynchronously get next message from buffer."""Redirect messages to other listeners or handlers.
class RedirectReader(Listener):
def __init__(self, listener: Listener):
"""
Redirect messages to another listener.
Parameters:
- listener: Target listener for message redirection
"""
def on_message_received(self, msg: Message) -> None:
"""Forward message to target listener."""import can
class MyListener(can.Listener):
def on_message_received(self, msg):
print(f"Received: ID=0x{msg.arbitration_id:X}, Data={list(msg.data)}")
def on_error(self, exc):
print(f"Error: {exc}")
bus = can.Bus(channel='can0', interface='socketcan')
listener = MyListener()
# Manual message handling
for _ in range(10):
msg = bus.recv(timeout=1.0)
if msg:
listener(msg)
bus.shutdown()import can
import time
# Create multiple listeners
class CounterListener(can.Listener):
def __init__(self):
self.count = 0
def on_message_received(self, msg):
self.count += 1
if self.count % 100 == 0:
print(f"Processed {self.count} messages")
class FilterListener(can.Listener):
def __init__(self, target_id):
self.target_id = target_id
def on_message_received(self, msg):
if msg.arbitration_id == self.target_id:
print(f"Target message: {msg}")
bus = can.Bus(channel='can0', interface='socketcan')
listeners = [
CounterListener(),
FilterListener(0x123),
can.Printer(), # Print all messages
can.Logger('traffic.log') # Log all messages
]
# Start automatic distribution
notifier = can.Notifier(bus, listeners)
# Let it run for 30 seconds
time.sleep(30)
# Stop everything
notifier.stop()
bus.shutdown()
print(f"Total messages: {listeners[0].count}")import can
import threading
import time
bus = can.Bus(channel='can0', interface='socketcan')
buffer = can.BufferedReader()
# Start background message collection
notifier = can.Notifier(bus, [buffer])
# Process messages in batches
def process_batch():
batch = []
while len(batch) < 10:
msg = buffer.get_message(timeout=1.0)
if msg:
batch.append(msg)
else:
break
if batch:
print(f"Processing batch of {len(batch)} messages")
# Process batch...
# Run batch processing
for _ in range(5):
process_batch()
time.sleep(1)
notifier.stop()
bus.shutdown()import can
import asyncio
async def async_message_handler():
bus = can.Bus(channel='test', interface='virtual')
buffer = can.AsyncBufferedReader()
# Start message collection
notifier = can.Notifier(bus, [buffer])
# Process messages asynchronously
try:
for _ in range(10):
msg = await buffer.get_message()
print(f"Async received: {msg}")
# Simulate async processing
await asyncio.sleep(0.1)
finally:
notifier.stop()
bus.shutdown()
# Run async handler
asyncio.run(async_message_handler())import can
import time
from collections import defaultdict
class StatisticsListener(can.Listener):
def __init__(self):
self.msg_counts = defaultdict(int)
self.first_seen = {}
self.last_seen = {}
self.start_time = time.time()
def on_message_received(self, msg):
msg_id = msg.arbitration_id
self.msg_counts[msg_id] += 1
if msg_id not in self.first_seen:
self.first_seen[msg_id] = msg.timestamp
self.last_seen[msg_id] = msg.timestamp
def print_statistics(self):
print("CAN Bus Statistics:")
print(f"Runtime: {time.time() - self.start_time:.2f} seconds")
print(f"Unique IDs: {len(self.msg_counts)}")
for msg_id, count in sorted(self.msg_counts.items()):
duration = self.last_seen[msg_id] - self.first_seen[msg_id]
rate = count / max(duration, 0.001) # Avoid division by zero
print(f"ID 0x{msg_id:X}: {count} messages, {rate:.1f} msg/s")
bus = can.Bus(channel='can0', interface='socketcan')
stats = StatisticsListener()
notifier = can.Notifier(bus, [stats])
time.sleep(60) # Collect for 1 minute
notifier.stop()
stats.print_statistics()
bus.shutdown()from abc import ABC, abstractmethod
from typing import Union, Callable, Optional, Any, List
from collections.abc import Awaitable
import asyncio
# Message recipient types
MessageRecipient = Union[Listener, Callable[[Message], Union[Awaitable[None], None]]]
class Listener(ABC):
"""Abstract base class for message listeners."""
@abstractmethod
def on_message_received(self, msg: Message) -> None: ...
def on_error(self, exc: Exception) -> None: ...
def stop(self) -> None: ...
def __call__(self, msg: Message) -> None: ...Install with Tessl CLI
npx tessl i tessl/pypi-python-can