CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-python-can

Controller Area Network interface module for Python providing common abstractions for CAN hardware devices and message handling utilities

Pending
Overview
Eval results
Files

periodic-transmission.mddocs/

Periodic Message Transmission

Advanced cyclic message transmission capabilities with configurable periods, durations, message modification callbacks, and lifecycle management for automotive and industrial applications requiring regular message transmission.

Capabilities

Cyclic Send Tasks

Base interface for periodic message transmission with lifecycle management.

class CyclicSendTaskABC(ABC):
    @abstractmethod
    def stop(self) -> None:
        """Stop the cyclic transmission task."""
    
    @property
    @abstractmethod
    def period(self) -> float:
        """Get the transmission period in seconds."""

class ModifiableCyclicTaskABC(CyclicSendTaskABC):
    @abstractmethod  
    def modify_data(self, msg: Message) -> None:
        """
        Modify message data for next transmission.
        
        Parameters:
        - msg: Message to modify (modified in-place)
        """

class RestartableCyclicTaskABC(CyclicSendTaskABC):
    @abstractmethod
    def start(self) -> None:
        """Start or restart the cyclic transmission."""

class LimitedDurationCyclicTaskABC(CyclicSendTaskABC):
    @property
    @abstractmethod
    def duration(self) -> Optional[float]:
        """Get the transmission duration in seconds (None for unlimited)."""

Bus Integration

Periodic transmission integrated with bus send_periodic method.

def send_periodic(self, msgs, period: float, duration=None, store_task=True,
                 autostart=True, modifier_callback=None):
    """
    Start sending messages at a given period on this bus.
    
    Parameters:
    - msgs: Message or sequence of messages to transmit
    - period: Period in seconds between each message
    - duration: Duration in seconds to continue sending (None for indefinite)
    - store_task: If True, attach task to bus instance for lifecycle management
    - autostart: If True, start task immediately after creation
    - modifier_callback: Function to modify message data before each send
    
    Returns:
    CyclicSendTaskABC: Task instance for controlling transmission
    
    The task will be active until:
    - Optional duration expires
    - Bus instance goes out of scope
    - Bus instance is shut down
    - stop_all_periodic_tasks() is called
    - Task's stop() method is called
    """

def stop_all_periodic_tasks(self, remove_tasks=True) -> None:
    """
    Stop all periodic tasks started by this bus.
    
    Parameters:
    - remove_tasks: Whether to stop tracking the stopped tasks
    """

Thread-Based Implementation

Default threading-based implementation for periodic transmission.

class ThreadBasedCyclicSendTask:
    def __init__(self, bus, lock, messages, period: float, duration=None,
                 autostart=True, modifier_callback=None):
        """
        Thread-based cyclic message transmission.
        
        Parameters:
        - bus: Bus instance to send messages on
        - lock: Threading lock for send synchronization
        - messages: Message(s) to transmit cyclically
        - period: Transmission period in seconds
        - duration: Maximum duration (seconds) or None for unlimited
        - autostart: Whether to start immediately
        - modifier_callback: Optional message modification function
        """
    
    def stop(self) -> None:
        """Stop the transmission thread."""
    
    def start(self) -> None:
        """Start the transmission thread."""

Usage Examples

Basic Periodic Transmission

import can
import time

bus = can.Bus(channel='can0', interface='socketcan')

# Send heartbeat message every 100ms
heartbeat = can.Message(
    arbitration_id=0x700,
    data=[0x01, 0x02, 0x03, 0x04]
)

task = bus.send_periodic(heartbeat, period=0.1)

print("Sending heartbeat messages for 5 seconds...")
time.sleep(5)

task.stop()
bus.shutdown()

Multiple Periodic Messages

import can
import time

bus = can.Bus(channel='can0', interface='socketcan')

# Multiple messages with different periods
messages = [
    can.Message(arbitration_id=0x100, data=[0x01]),  # Status
    can.Message(arbitration_id=0x200, data=[0x02]),  # Sensor 1
    can.Message(arbitration_id=0x300, data=[0x03]),  # Sensor 2
]

# Send all messages every 50ms
task1 = bus.send_periodic(messages, period=0.05)

# Send individual high-priority message every 10ms
priority_msg = can.Message(arbitration_id=0x50, data=[0xFF])
task2 = bus.send_periodic(priority_msg, period=0.01)

print("Sending multiple periodic messages...")
time.sleep(10)

# Stop specific tasks
task1.stop()
task2.stop()
bus.shutdown()

Limited Duration Transmission

import can
import time

bus = can.Bus(channel='can0', interface='socketcan')

# Send for exactly 30 seconds then stop automatically
test_message = can.Message(
    arbitration_id=0x123,
    data=[0x11, 0x22, 0x33, 0x44]
)

task = bus.send_periodic(
    test_message, 
    period=0.1,          # Every 100ms
    duration=30.0        # For 30 seconds
)

print("Sending test messages for 30 seconds...")
# Task will automatically stop after 30 seconds
time.sleep(35)  # Wait a bit longer to confirm it stopped

bus.shutdown()

Dynamic Message Modification

import can
import time

bus = can.Bus(channel='can0', interface='socketcan')

# Counter that increments in message data
counter_msg = can.Message(
    arbitration_id=0x400,
    data=[0x00, 0x00, 0x00, 0x00]  # 32-bit counter in bytes 0-3
)

def increment_counter(msg):
    """Increment 32-bit counter in message data."""
    # Convert bytes to integer, increment, convert back
    counter = int.from_bytes(msg.data[:4], 'big')
    counter = (counter + 1) % (2**32)  # Wrap at 32-bit limit
    msg.data[:4] = counter.to_bytes(4, 'big')

task = bus.send_periodic(
    counter_msg,
    period=0.1,
    modifier_callback=increment_counter
)

print("Sending incrementing counter...")
time.sleep(10)

task.stop()
bus.shutdown()

Temperature Sensor Simulation

import can
import time
import math

bus = can.Bus(channel='can0', interface='socketcan')

# Simulate temperature sensor with sine wave
temp_msg = can.Message(
    arbitration_id=0x510,
    data=[0x00, 0x00]  # 16-bit temperature value
)

start_time = time.time()

def update_temperature(msg):
    """Update temperature with sine wave simulation."""
    elapsed = time.time() - start_time
    # Sine wave: 20°C ± 10°C with 30-second period
    temperature = 20.0 + 10.0 * math.sin(2 * math.pi * elapsed / 30.0)
    
    # Convert to 16-bit integer (0.1°C resolution)
    temp_int = int(temperature * 10)
    msg.data[:2] = temp_int.to_bytes(2, 'big', signed=True)

task = bus.send_periodic(
    temp_msg,
    period=0.5,  # Every 500ms
    modifier_callback=update_temperature
)

print("Simulating temperature sensor for 60 seconds...")
time.sleep(60)

task.stop()
bus.shutdown()

Task Management

import can
import time

bus = can.Bus(channel='can0', interface='socketcan')

# Create multiple tasks with different lifecycles
tasks = []

# Long-running heartbeat
heartbeat = can.Message(arbitration_id=0x700, data=[0x01])
tasks.append(bus.send_periodic(heartbeat, period=1.0))

# Medium-term status updates
status = can.Message(arbitration_id=0x701, data=[0x02])  
tasks.append(bus.send_periodic(status, period=0.5, duration=30.0))

# Short burst of test messages
test = can.Message(arbitration_id=0x702, data=[0x03])
tasks.append(bus.send_periodic(test, period=0.1, duration=5.0))

print("Running multiple tasks with different lifecycles...")

# Monitor tasks
for i in range(60):
    time.sleep(1)
    active_tasks = [t for t in tasks if hasattr(t, '_thread') and t._thread.is_alive()]
    print(f"Second {i+1}: {len(active_tasks)} tasks still active")

# Stop all remaining tasks
bus.stop_all_periodic_tasks()
bus.shutdown()

Error Handling in Periodic Tasks

import can
import time

class RobustPeriodicSender:
    def __init__(self, bus, msg, period):
        self.bus = bus
        self.msg = msg
        self.period = period
        self.error_count = 0
        self.max_errors = 10
        
    def error_tolerant_callback(self, msg):
        """Callback that handles its own errors."""
        try:
            # Simulate some processing that might fail
            if self.error_count < 3:  # Fail first few times
                self.error_count += 1
                raise ValueError("Simulated processing error")
            
            # Normal processing
            msg.data[0] = (msg.data[0] + 1) % 256
            
        except Exception as e:
            print(f"Error in callback: {e}")
            if self.error_count >= self.max_errors:
                print("Too many errors, stopping task")
                return False  # Signal to stop
        return True

bus = can.Bus(channel='test', interface='virtual')

msg = can.Message(arbitration_id=0x123, data=[0x00])
sender = RobustPeriodicSender(bus, msg, 0.1)

# This would need custom implementation to handle callback errors
# Showing the concept of error-aware periodic transmission
print("Demonstrating error handling in periodic tasks...")

bus.shutdown()

Types

from abc import ABC, abstractmethod
from typing import Optional, Union, Sequence, Callable
import threading

class CyclicSendTaskABC(ABC):
    """Abstract base class for cyclic send tasks."""
    
    @abstractmethod
    def stop(self) -> None: ...
    
    @property
    @abstractmethod  
    def period(self) -> float: ...

class ModifiableCyclicTaskABC(CyclicSendTaskABC):
    """Cyclic task that supports message modification."""
    
    @abstractmethod
    def modify_data(self, msg: Message) -> None: ...

class ThreadBasedCyclicSendTask(CyclicSendTaskABC):
    """Default thread-based implementation."""
    
    def __init__(self, bus, lock: threading.Lock, messages, 
                 period: float, duration: Optional[float] = None,
                 autostart: bool = True, 
                 modifier_callback: Optional[Callable[[Message], None]] = None): ...

Install with Tessl CLI

npx tessl i tessl/pypi-python-can

docs

bit-timing.md

bus-operations.md

cli-tools.md

event-system.md

file-io.md

hardware-interfaces.md

index.md

message-handling.md

periodic-transmission.md

tile.json