CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-python-can

Controller Area Network interface module for Python providing common abstractions for CAN hardware devices and message handling utilities

Pending
Overview
Eval results
Files

event-system.mddocs/

Event System

Event-driven message handling through listeners and notifiers, enabling asynchronous message processing, filtering, buffering, and routing to multiple handlers simultaneously.

Capabilities

Base Listener Interface

Abstract base class for creating custom message handlers.

class Listener(ABC):
    @abstractmethod
    def on_message_received(self, msg: Message) -> None:
        """
        Handle received CAN message.
        
        Parameters:
        - msg: Received message object
        """
    
    def __call__(self, msg: Message) -> None:
        """Callable interface - delegates to on_message_received."""
    
    def on_error(self, exc: Exception) -> None:
        """
        Handle exceptions in receive thread.
        
        Parameters:
        - exc: Exception that caused thread to stop
        """
    
    def stop(self) -> None:
        """Clean up listener resources."""

Message Notifier

Routes messages from CAN buses to multiple listeners with thread management.

class Notifier:
    def __init__(self, bus: Bus, listeners: list[Listener], timeout=1.0, loop=None):
        """
        Create message notifier for routing bus messages.
        
        Parameters:
        - bus: CAN bus to read messages from
        - listeners: List of listeners to receive messages
        - timeout: Timeout for bus.recv() calls (seconds)
        - loop: Asyncio event loop for async listeners
        """
    
    def add_listener(self, listener: Listener) -> None:
        """Add a listener to receive messages."""
    
    def remove_listener(self, listener: Listener) -> None:
        """Remove a listener from receiving messages."""
    
    def stop(self, timeout=5) -> None:
        """Stop the notifier and all listeners."""
    
    def __enter__(self):
        """Context manager entry."""
    
    def __exit__(self, exc_type, exc_value, traceback):
        """Context manager exit with automatic stop."""

Message Buffering

Buffer messages in memory for batch processing or delayed handling.

class BufferedReader(Listener):
    def __init__(self, buffer_size: int = None):
        """
        Buffer messages in memory queue.
        
        Parameters:
        - buffer_size: Maximum buffer size (None for unlimited)
        """
    
    def get_message(self, timeout: float = None) -> Message | None:
        """
        Get next message from buffer.
        
        Parameters:
        - timeout: Maximum time to wait for message
        
        Returns:
        Next message or None on timeout
        """
    
    def on_message_received(self, msg: Message) -> None:
        """Add message to buffer."""

class AsyncBufferedReader(Listener):
    def __init__(self, loop=None):
        """Async version of BufferedReader."""
    
    async def get_message(self) -> Message:
        """Asynchronously get next message from buffer."""

Message Redirection

Redirect messages to other listeners or handlers.

class RedirectReader(Listener):
    def __init__(self, listener: Listener):
        """
        Redirect messages to another listener.
        
        Parameters:
        - listener: Target listener for message redirection
        """
    
    def on_message_received(self, msg: Message) -> None:
        """Forward message to target listener."""

Usage Examples

Basic Event Handling

import can

class MyListener(can.Listener):
    def on_message_received(self, msg):
        print(f"Received: ID=0x{msg.arbitration_id:X}, Data={list(msg.data)}")
    
    def on_error(self, exc):
        print(f"Error: {exc}")

bus = can.Bus(channel='can0', interface='socketcan')
listener = MyListener()

# Manual message handling
for _ in range(10):
    msg = bus.recv(timeout=1.0)
    if msg:
        listener(msg)

bus.shutdown()

Automatic Event Distribution

import can
import time

# Create multiple listeners
class CounterListener(can.Listener):
    def __init__(self):
        self.count = 0
    
    def on_message_received(self, msg):
        self.count += 1
        if self.count % 100 == 0:
            print(f"Processed {self.count} messages")

class FilterListener(can.Listener):
    def __init__(self, target_id):
        self.target_id = target_id
    
    def on_message_received(self, msg):
        if msg.arbitration_id == self.target_id:
            print(f"Target message: {msg}")

bus = can.Bus(channel='can0', interface='socketcan')

listeners = [
    CounterListener(),
    FilterListener(0x123),
    can.Printer(),  # Print all messages
    can.Logger('traffic.log')  # Log all messages
]

# Start automatic distribution
notifier = can.Notifier(bus, listeners)

# Let it run for 30 seconds
time.sleep(30)

# Stop everything
notifier.stop()
bus.shutdown()

print(f"Total messages: {listeners[0].count}")

Message Buffering

import can
import threading
import time

bus = can.Bus(channel='can0', interface='socketcan')
buffer = can.BufferedReader()

# Start background message collection
notifier = can.Notifier(bus, [buffer])

# Process messages in batches
def process_batch():
    batch = []
    while len(batch) < 10:
        msg = buffer.get_message(timeout=1.0)
        if msg:
            batch.append(msg)
        else:
            break
    
    if batch:
        print(f"Processing batch of {len(batch)} messages")
        # Process batch...

# Run batch processing
for _ in range(5):
    process_batch()
    time.sleep(1)

notifier.stop()
bus.shutdown()

Async Message Handling

import can
import asyncio

async def async_message_handler():
    bus = can.Bus(channel='test', interface='virtual')
    buffer = can.AsyncBufferedReader()
    
    # Start message collection
    notifier = can.Notifier(bus, [buffer])
    
    # Process messages asynchronously
    try:
        for _ in range(10):
            msg = await buffer.get_message()
            print(f"Async received: {msg}")
            
            # Simulate async processing
            await asyncio.sleep(0.1)
    finally:
        notifier.stop()
        bus.shutdown()

# Run async handler
asyncio.run(async_message_handler())

Custom Listener with State

import can
import time
from collections import defaultdict

class StatisticsListener(can.Listener):
    def __init__(self):
        self.msg_counts = defaultdict(int)
        self.first_seen = {}
        self.last_seen = {}
        self.start_time = time.time()
    
    def on_message_received(self, msg):
        msg_id = msg.arbitration_id
        self.msg_counts[msg_id] += 1
        
        if msg_id not in self.first_seen:
            self.first_seen[msg_id] = msg.timestamp
        self.last_seen[msg_id] = msg.timestamp
    
    def print_statistics(self):
        print("CAN Bus Statistics:")
        print(f"Runtime: {time.time() - self.start_time:.2f} seconds")
        print(f"Unique IDs: {len(self.msg_counts)}")
        
        for msg_id, count in sorted(self.msg_counts.items()):
            duration = self.last_seen[msg_id] - self.first_seen[msg_id]
            rate = count / max(duration, 0.001)  # Avoid division by zero
            print(f"ID 0x{msg_id:X}: {count} messages, {rate:.1f} msg/s")

bus = can.Bus(channel='can0', interface='socketcan')
stats = StatisticsListener()

notifier = can.Notifier(bus, [stats])
time.sleep(60)  # Collect for 1 minute
notifier.stop()

stats.print_statistics()
bus.shutdown()

Types

from abc import ABC, abstractmethod
from typing import Union, Callable, Optional, Any, List
from collections.abc import Awaitable
import asyncio

# Message recipient types
MessageRecipient = Union[Listener, Callable[[Message], Union[Awaitable[None], None]]]

class Listener(ABC):
    """Abstract base class for message listeners."""
    
    @abstractmethod
    def on_message_received(self, msg: Message) -> None: ...
    
    def on_error(self, exc: Exception) -> None: ...
    def stop(self) -> None: ...
    def __call__(self, msg: Message) -> None: ...

Install with Tessl CLI

npx tessl i tessl/pypi-python-can

docs

bit-timing.md

bus-operations.md

cli-tools.md

event-system.md

file-io.md

hardware-interfaces.md

index.md

message-handling.md

periodic-transmission.md

tile.json