Controller Area Network interface module for Python providing common abstractions for CAN hardware devices and message handling utilities
—
Advanced cyclic message transmission capabilities with configurable periods, durations, message modification callbacks, and lifecycle management for automotive and industrial applications requiring regular message transmission.
Base interface for periodic message transmission with lifecycle management.
class CyclicSendTaskABC(ABC):
@abstractmethod
def stop(self) -> None:
"""Stop the cyclic transmission task."""
@property
@abstractmethod
def period(self) -> float:
"""Get the transmission period in seconds."""
class ModifiableCyclicTaskABC(CyclicSendTaskABC):
@abstractmethod
def modify_data(self, msg: Message) -> None:
"""
Modify message data for next transmission.
Parameters:
- msg: Message to modify (modified in-place)
"""
class RestartableCyclicTaskABC(CyclicSendTaskABC):
@abstractmethod
def start(self) -> None:
"""Start or restart the cyclic transmission."""
class LimitedDurationCyclicTaskABC(CyclicSendTaskABC):
@property
@abstractmethod
def duration(self) -> Optional[float]:
"""Get the transmission duration in seconds (None for unlimited)."""Periodic transmission integrated with bus send_periodic method.
def send_periodic(self, msgs, period: float, duration=None, store_task=True,
autostart=True, modifier_callback=None):
"""
Start sending messages at a given period on this bus.
Parameters:
- msgs: Message or sequence of messages to transmit
- period: Period in seconds between each message
- duration: Duration in seconds to continue sending (None for indefinite)
- store_task: If True, attach task to bus instance for lifecycle management
- autostart: If True, start task immediately after creation
- modifier_callback: Function to modify message data before each send
Returns:
CyclicSendTaskABC: Task instance for controlling transmission
The task will be active until:
- Optional duration expires
- Bus instance goes out of scope
- Bus instance is shut down
- stop_all_periodic_tasks() is called
- Task's stop() method is called
"""
def stop_all_periodic_tasks(self, remove_tasks=True) -> None:
"""
Stop all periodic tasks started by this bus.
Parameters:
- remove_tasks: Whether to stop tracking the stopped tasks
"""Default threading-based implementation for periodic transmission.
class ThreadBasedCyclicSendTask:
def __init__(self, bus, lock, messages, period: float, duration=None,
autostart=True, modifier_callback=None):
"""
Thread-based cyclic message transmission.
Parameters:
- bus: Bus instance to send messages on
- lock: Threading lock for send synchronization
- messages: Message(s) to transmit cyclically
- period: Transmission period in seconds
- duration: Maximum duration (seconds) or None for unlimited
- autostart: Whether to start immediately
- modifier_callback: Optional message modification function
"""
def stop(self) -> None:
"""Stop the transmission thread."""
def start(self) -> None:
"""Start the transmission thread."""import can
import time
bus = can.Bus(channel='can0', interface='socketcan')
# Send heartbeat message every 100ms
heartbeat = can.Message(
arbitration_id=0x700,
data=[0x01, 0x02, 0x03, 0x04]
)
task = bus.send_periodic(heartbeat, period=0.1)
print("Sending heartbeat messages for 5 seconds...")
time.sleep(5)
task.stop()
bus.shutdown()import can
import time
bus = can.Bus(channel='can0', interface='socketcan')
# Multiple messages with different periods
messages = [
can.Message(arbitration_id=0x100, data=[0x01]), # Status
can.Message(arbitration_id=0x200, data=[0x02]), # Sensor 1
can.Message(arbitration_id=0x300, data=[0x03]), # Sensor 2
]
# Send all messages every 50ms
task1 = bus.send_periodic(messages, period=0.05)
# Send individual high-priority message every 10ms
priority_msg = can.Message(arbitration_id=0x50, data=[0xFF])
task2 = bus.send_periodic(priority_msg, period=0.01)
print("Sending multiple periodic messages...")
time.sleep(10)
# Stop specific tasks
task1.stop()
task2.stop()
bus.shutdown()import can
import time
bus = can.Bus(channel='can0', interface='socketcan')
# Send for exactly 30 seconds then stop automatically
test_message = can.Message(
arbitration_id=0x123,
data=[0x11, 0x22, 0x33, 0x44]
)
task = bus.send_periodic(
test_message,
period=0.1, # Every 100ms
duration=30.0 # For 30 seconds
)
print("Sending test messages for 30 seconds...")
# Task will automatically stop after 30 seconds
time.sleep(35) # Wait a bit longer to confirm it stopped
bus.shutdown()import can
import time
bus = can.Bus(channel='can0', interface='socketcan')
# Counter that increments in message data
counter_msg = can.Message(
arbitration_id=0x400,
data=[0x00, 0x00, 0x00, 0x00] # 32-bit counter in bytes 0-3
)
def increment_counter(msg):
"""Increment 32-bit counter in message data."""
# Convert bytes to integer, increment, convert back
counter = int.from_bytes(msg.data[:4], 'big')
counter = (counter + 1) % (2**32) # Wrap at 32-bit limit
msg.data[:4] = counter.to_bytes(4, 'big')
task = bus.send_periodic(
counter_msg,
period=0.1,
modifier_callback=increment_counter
)
print("Sending incrementing counter...")
time.sleep(10)
task.stop()
bus.shutdown()import can
import time
import math
bus = can.Bus(channel='can0', interface='socketcan')
# Simulate temperature sensor with sine wave
temp_msg = can.Message(
arbitration_id=0x510,
data=[0x00, 0x00] # 16-bit temperature value
)
start_time = time.time()
def update_temperature(msg):
"""Update temperature with sine wave simulation."""
elapsed = time.time() - start_time
# Sine wave: 20°C ± 10°C with 30-second period
temperature = 20.0 + 10.0 * math.sin(2 * math.pi * elapsed / 30.0)
# Convert to 16-bit integer (0.1°C resolution)
temp_int = int(temperature * 10)
msg.data[:2] = temp_int.to_bytes(2, 'big', signed=True)
task = bus.send_periodic(
temp_msg,
period=0.5, # Every 500ms
modifier_callback=update_temperature
)
print("Simulating temperature sensor for 60 seconds...")
time.sleep(60)
task.stop()
bus.shutdown()import can
import time
bus = can.Bus(channel='can0', interface='socketcan')
# Create multiple tasks with different lifecycles
tasks = []
# Long-running heartbeat
heartbeat = can.Message(arbitration_id=0x700, data=[0x01])
tasks.append(bus.send_periodic(heartbeat, period=1.0))
# Medium-term status updates
status = can.Message(arbitration_id=0x701, data=[0x02])
tasks.append(bus.send_periodic(status, period=0.5, duration=30.0))
# Short burst of test messages
test = can.Message(arbitration_id=0x702, data=[0x03])
tasks.append(bus.send_periodic(test, period=0.1, duration=5.0))
print("Running multiple tasks with different lifecycles...")
# Monitor tasks
for i in range(60):
time.sleep(1)
active_tasks = [t for t in tasks if hasattr(t, '_thread') and t._thread.is_alive()]
print(f"Second {i+1}: {len(active_tasks)} tasks still active")
# Stop all remaining tasks
bus.stop_all_periodic_tasks()
bus.shutdown()import can
import time
class RobustPeriodicSender:
def __init__(self, bus, msg, period):
self.bus = bus
self.msg = msg
self.period = period
self.error_count = 0
self.max_errors = 10
def error_tolerant_callback(self, msg):
"""Callback that handles its own errors."""
try:
# Simulate some processing that might fail
if self.error_count < 3: # Fail first few times
self.error_count += 1
raise ValueError("Simulated processing error")
# Normal processing
msg.data[0] = (msg.data[0] + 1) % 256
except Exception as e:
print(f"Error in callback: {e}")
if self.error_count >= self.max_errors:
print("Too many errors, stopping task")
return False # Signal to stop
return True
bus = can.Bus(channel='test', interface='virtual')
msg = can.Message(arbitration_id=0x123, data=[0x00])
sender = RobustPeriodicSender(bus, msg, 0.1)
# This would need custom implementation to handle callback errors
# Showing the concept of error-aware periodic transmission
print("Demonstrating error handling in periodic tasks...")
bus.shutdown()from abc import ABC, abstractmethod
from typing import Optional, Union, Sequence, Callable
import threading
class CyclicSendTaskABC(ABC):
"""Abstract base class for cyclic send tasks."""
@abstractmethod
def stop(self) -> None: ...
@property
@abstractmethod
def period(self) -> float: ...
class ModifiableCyclicTaskABC(CyclicSendTaskABC):
"""Cyclic task that supports message modification."""
@abstractmethod
def modify_data(self, msg: Message) -> None: ...
class ThreadBasedCyclicSendTask(CyclicSendTaskABC):
"""Default thread-based implementation."""
def __init__(self, bus, lock: threading.Lock, messages,
period: float, duration: Optional[float] = None,
autostart: bool = True,
modifier_callback: Optional[Callable[[Message], None]] = None): ...Install with Tessl CLI
npx tessl i tessl/pypi-python-can