CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-python-can

Controller Area Network interface module for Python providing common abstractions for CAN hardware devices and message handling utilities

Pending
Overview
Eval results
Files

bus-operations.mddocs/

Core Bus Operations

Fundamental CAN bus operations that provide the foundation for all CAN communication. The Bus class serves as the central abstraction for connecting to CAN hardware, sending and receiving messages, and managing bus lifecycle.

Capabilities

Bus Creation and Connection

Create and configure CAN bus connections with automatic interface detection and configuration loading from default locations.

def Bus(channel=None, interface=None, config_context=None, ignore_config=False, **kwargs):
    """
    Create a new bus instance with configuration loading.
    
    Parameters:
    - channel: Channel identification (backend dependent)
    - interface: Interface name (see VALID_INTERFACES)
    - config_context: Extra context for config sources
    - ignore_config: If True, only use provided arguments
    - **kwargs: Backend-specific configuration parameters
    
    Returns:
    BusABC: Configured bus instance
    
    Raises:
    - CanInterfaceNotImplementedError: Interface not available
    - CanInitializationError: Bus initialization failed
    - ValueError: Invalid parameters
    """

Message Transmission

Send CAN messages to the bus with optional timeout and error handling.

def send(self, msg: Message, timeout=None) -> None:
    """
    Transmit a message to the CAN bus.
    
    Parameters:
    - msg: Message object to transmit
    - timeout: Maximum time to wait for transmission (seconds)
             None blocks indefinitely, 0 for non-blocking
    
    Raises:
    - CanOperationError: Transmission failed
    - CanTimeoutError: Timeout exceeded
    """

Message Reception

Receive CAN messages from the bus with filtering and timeout support.

def recv(self, timeout=None) -> Message | None:
    """
    Block waiting for a message from the bus.
    
    Parameters:
    - timeout: Maximum time to wait for message (seconds)
              None waits indefinitely
    
    Returns:
    Message object or None on timeout
    
    Raises:
    - CanOperationError: Reception failed
    """

Periodic Message Transmission

Start sending messages at regular intervals with advanced configuration options.

def send_periodic(self, msgs, period: float, duration=None, store_task=True, 
                 autostart=True, modifier_callback=None):
    """
    Start sending messages at a given period.
    
    Parameters:
    - msgs: Message or sequence of messages to transmit
    - period: Period in seconds between each message
    - duration: Duration to continue sending (None for indefinite)
    - store_task: If True, attach task to bus instance
    - autostart: If True, start task immediately
    - modifier_callback: Function to modify message data before sending
    
    Returns:
    CyclicSendTaskABC: Task instance for controlling transmission
    """

def stop_all_periodic_tasks(self, remove_tasks=True) -> None:
    """
    Stop all periodic message transmission tasks.
    
    Parameters:
    - remove_tasks: Whether to remove tasks from tracking
    """

Message Filtering

Configure hardware and software message filtering to receive only relevant messages.

def set_filters(self, filters=None) -> None:
    """
    Apply filtering to all messages received by this bus.
    
    Parameters:
    - filters: List of filter dictionaries or None to receive all messages
              Each filter: {"can_id": int, "can_mask": int, "extended": bool}
              Filter matches when: (received_id & can_mask) == (can_id & can_mask)
    """

@property
def filters(self) -> CanFilters | None:
    """Get or set the current message filters."""

Bus State Management

Monitor and control bus state and protocol information.

@property
def state(self) -> BusState:
    """Return the current state of the hardware (ACTIVE/PASSIVE/ERROR)."""

@state.setter
def state(self, new_state: BusState) -> None:
    """Set the new state of the hardware."""

@property
def protocol(self) -> CanProtocol:
    """Return the CAN protocol used by this bus instance."""

Resource Management

Proper cleanup and resource management with context manager support.

def shutdown(self) -> None:
    """
    Carry out interface-specific cleanup for shutting down the bus.
    Can be safely called multiple times.
    """

def flush_tx_buffer(self) -> None:
    """Discard every message queued in the output buffer(s)."""

def __enter__(self): 
    """Context manager entry."""

def __exit__(self, exc_type, exc_value, traceback):
    """Context manager exit with automatic shutdown."""

Iterator Interface

Use the bus as an iterator to continuously receive messages.

def __iter__(self):
    """
    Allow iteration over messages as they are received.
    
    Yields:
    Message objects as they arrive
    
    Example:
    for msg in bus:
        print(msg)
    """

Usage Examples

Basic Bus Operations

import can

# Create bus with automatic interface detection
bus = can.Bus(channel='can0')

# Send a message
msg = can.Message(arbitration_id=0x123, data=[1, 2, 3, 4])
bus.send(msg)

# Receive with timeout
received = bus.recv(timeout=5.0)
if received:
    print(f"Received: {received}")

# Use as iterator
for msg in bus:
    print(f"Got message: {msg}")
    if some_condition:
        break

bus.shutdown()

Context Manager Usage

import can

with can.Bus(channel='can0', interface='socketcan') as bus:
    # Bus automatically cleaned up on exit
    bus.send(can.Message(arbitration_id=0x456, data=[0x11, 0x22]))
    
    # Receive multiple messages
    for _ in range(10):
        msg = bus.recv(timeout=1.0)
        if msg:
            process_message(msg)

Message Filtering

import can

bus = can.Bus(channel='can0')

# Filter for specific message IDs
filters = [
    {"can_id": 0x123, "can_mask": 0x7FF, "extended": False},
    {"can_id": 0x456, "can_mask": 0x7FF, "extended": False},
]
bus.set_filters(filters)

# Only messages with ID 0x123 or 0x456 will be received
msg = bus.recv()

Periodic Transmission

import can

bus = can.Bus(channel='can0')

# Send heartbeat message every 100ms
heartbeat = can.Message(arbitration_id=0x700, data=[0x00])
task = bus.send_periodic(heartbeat, period=0.1)

# Modify message data over time
def update_counter(msg):
    msg.data[0] = (msg.data[0] + 1) % 256

task_with_counter = bus.send_periodic(
    heartbeat, 
    period=0.1, 
    modifier_callback=update_counter
)

# Stop after 5 seconds
import time
time.sleep(5)
task.stop()
bus.shutdown()

Types

from abc import ABC, abstractmethod
from typing import Optional, Union, Sequence, Callable
from collections.abc import Iterator

class BusABC(ABC):
    """Abstract base class for all CAN bus implementations."""
    
    channel_info: str  # Description of underlying bus/channel
    RECV_LOGGING_LEVEL: int  # Log level for received messages
    
    @abstractmethod
    def __init__(self, channel, can_filters=None, **kwargs): ...
    
    @abstractmethod
    def send(self, msg: Message, timeout=None) -> None: ...
    
    def recv(self, timeout=None) -> Message | None: ...
    def send_periodic(self, msgs, period: float, **kwargs): ...
    def set_filters(self, filters=None) -> None: ...
    def shutdown(self) -> None: ...

Install with Tessl CLI

npx tessl i tessl/pypi-python-can

docs

bit-timing.md

bus-operations.md

cli-tools.md

event-system.md

file-io.md

hardware-interfaces.md

index.md

message-handling.md

periodic-transmission.md

tile.json