Controller Area Network interface module for Python providing common abstractions for CAN hardware devices and message handling utilities
—
Fundamental CAN bus operations that provide the foundation for all CAN communication. The Bus class serves as the central abstraction for connecting to CAN hardware, sending and receiving messages, and managing bus lifecycle.
Create and configure CAN bus connections with automatic interface detection and configuration loading from default locations.
def Bus(channel=None, interface=None, config_context=None, ignore_config=False, **kwargs):
"""
Create a new bus instance with configuration loading.
Parameters:
- channel: Channel identification (backend dependent)
- interface: Interface name (see VALID_INTERFACES)
- config_context: Extra context for config sources
- ignore_config: If True, only use provided arguments
- **kwargs: Backend-specific configuration parameters
Returns:
BusABC: Configured bus instance
Raises:
- CanInterfaceNotImplementedError: Interface not available
- CanInitializationError: Bus initialization failed
- ValueError: Invalid parameters
"""Send CAN messages to the bus with optional timeout and error handling.
def send(self, msg: Message, timeout=None) -> None:
"""
Transmit a message to the CAN bus.
Parameters:
- msg: Message object to transmit
- timeout: Maximum time to wait for transmission (seconds)
None blocks indefinitely, 0 for non-blocking
Raises:
- CanOperationError: Transmission failed
- CanTimeoutError: Timeout exceeded
"""Receive CAN messages from the bus with filtering and timeout support.
def recv(self, timeout=None) -> Message | None:
"""
Block waiting for a message from the bus.
Parameters:
- timeout: Maximum time to wait for message (seconds)
None waits indefinitely
Returns:
Message object or None on timeout
Raises:
- CanOperationError: Reception failed
"""Start sending messages at regular intervals with advanced configuration options.
def send_periodic(self, msgs, period: float, duration=None, store_task=True,
autostart=True, modifier_callback=None):
"""
Start sending messages at a given period.
Parameters:
- msgs: Message or sequence of messages to transmit
- period: Period in seconds between each message
- duration: Duration to continue sending (None for indefinite)
- store_task: If True, attach task to bus instance
- autostart: If True, start task immediately
- modifier_callback: Function to modify message data before sending
Returns:
CyclicSendTaskABC: Task instance for controlling transmission
"""
def stop_all_periodic_tasks(self, remove_tasks=True) -> None:
"""
Stop all periodic message transmission tasks.
Parameters:
- remove_tasks: Whether to remove tasks from tracking
"""Configure hardware and software message filtering to receive only relevant messages.
def set_filters(self, filters=None) -> None:
"""
Apply filtering to all messages received by this bus.
Parameters:
- filters: List of filter dictionaries or None to receive all messages
Each filter: {"can_id": int, "can_mask": int, "extended": bool}
Filter matches when: (received_id & can_mask) == (can_id & can_mask)
"""
@property
def filters(self) -> CanFilters | None:
"""Get or set the current message filters."""Monitor and control bus state and protocol information.
@property
def state(self) -> BusState:
"""Return the current state of the hardware (ACTIVE/PASSIVE/ERROR)."""
@state.setter
def state(self, new_state: BusState) -> None:
"""Set the new state of the hardware."""
@property
def protocol(self) -> CanProtocol:
"""Return the CAN protocol used by this bus instance."""Proper cleanup and resource management with context manager support.
def shutdown(self) -> None:
"""
Carry out interface-specific cleanup for shutting down the bus.
Can be safely called multiple times.
"""
def flush_tx_buffer(self) -> None:
"""Discard every message queued in the output buffer(s)."""
def __enter__(self):
"""Context manager entry."""
def __exit__(self, exc_type, exc_value, traceback):
"""Context manager exit with automatic shutdown."""Use the bus as an iterator to continuously receive messages.
def __iter__(self):
"""
Allow iteration over messages as they are received.
Yields:
Message objects as they arrive
Example:
for msg in bus:
print(msg)
"""import can
# Create bus with automatic interface detection
bus = can.Bus(channel='can0')
# Send a message
msg = can.Message(arbitration_id=0x123, data=[1, 2, 3, 4])
bus.send(msg)
# Receive with timeout
received = bus.recv(timeout=5.0)
if received:
print(f"Received: {received}")
# Use as iterator
for msg in bus:
print(f"Got message: {msg}")
if some_condition:
break
bus.shutdown()import can
with can.Bus(channel='can0', interface='socketcan') as bus:
# Bus automatically cleaned up on exit
bus.send(can.Message(arbitration_id=0x456, data=[0x11, 0x22]))
# Receive multiple messages
for _ in range(10):
msg = bus.recv(timeout=1.0)
if msg:
process_message(msg)import can
bus = can.Bus(channel='can0')
# Filter for specific message IDs
filters = [
{"can_id": 0x123, "can_mask": 0x7FF, "extended": False},
{"can_id": 0x456, "can_mask": 0x7FF, "extended": False},
]
bus.set_filters(filters)
# Only messages with ID 0x123 or 0x456 will be received
msg = bus.recv()import can
bus = can.Bus(channel='can0')
# Send heartbeat message every 100ms
heartbeat = can.Message(arbitration_id=0x700, data=[0x00])
task = bus.send_periodic(heartbeat, period=0.1)
# Modify message data over time
def update_counter(msg):
msg.data[0] = (msg.data[0] + 1) % 256
task_with_counter = bus.send_periodic(
heartbeat,
period=0.1,
modifier_callback=update_counter
)
# Stop after 5 seconds
import time
time.sleep(5)
task.stop()
bus.shutdown()from abc import ABC, abstractmethod
from typing import Optional, Union, Sequence, Callable
from collections.abc import Iterator
class BusABC(ABC):
"""Abstract base class for all CAN bus implementations."""
channel_info: str # Description of underlying bus/channel
RECV_LOGGING_LEVEL: int # Log level for received messages
@abstractmethod
def __init__(self, channel, can_filters=None, **kwargs): ...
@abstractmethod
def send(self, msg: Message, timeout=None) -> None: ...
def recv(self, timeout=None) -> Message | None: ...
def send_periodic(self, msgs, period: float, **kwargs): ...
def set_filters(self, filters=None) -> None: ...
def shutdown(self) -> None: ...Install with Tessl CLI
npx tessl i tessl/pypi-python-can