CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-ophyd

Bluesky hardware abstraction library with EPICS control system integration for scientific instrument automation.

Pending
Overview
Eval results
Files

simulation-testing.mddocs/

Simulation and Testing

Mock devices and signals for testing, development, and offline work without requiring actual hardware connections. These simulation tools enable development and testing of ophyd-based applications in environments where real hardware is not available.

Capabilities

Synthetic Signals

Simulated signals that provide realistic behavior for testing and development.

class SynSignal(Signal):
    """
    Synthetic signal with programmable behavior.
    
    Generates synthetic data based on functions, noise models,
    or static values for testing and simulation purposes.
    
    Parameters:
    - name (str): Signal name
    - func (callable): Function to generate values func(time)
    - **kwargs: Additional signal parameters
    """
    def __init__(self, *, name, func=None, **kwargs): ...
    
    def get(self, **kwargs):
        """
        Get synthetic signal value.
        
        Returns:
        Generated value based on function or static value
        """
    
    def put(self, value, **kwargs):
        """Set synthetic signal to new value."""
    
    def set(self, value, **kwargs):
        """
        Set synthetic signal asynchronously.
        
        Returns:
        Status: Immediately completed status
        """

class SynSignalRO(SynSignal):
    """
    Read-only synthetic signal.
    
    Cannot be written to, only provides synthetic readback values.
    """
    def __init__(self, *, name, func=None, **kwargs): ...

class SynPeriodicSignal(SynSignal):
    """
    Synthetic signal with periodic behavior.
    
    Generates values based on periodic functions like sine waves,
    square waves, or other repeating patterns.
    
    Parameters:
    - name (str): Signal name
    - period (float): Period in seconds
    - func (callable): Periodic function
    """
    def __init__(self, *, name, period=1.0, func=None, **kwargs): ...

class EnumSignal(Signal):
    """
    Signal with enumerated values for testing state machines.
    
    Parameters:
    - name (str): Signal name
    - enum_strs (list): List of valid string values
    - value: Initial value
    """
    def __init__(self, *, name, enum_strs, value=0, **kwargs): ...

Synthetic Positioners

Simulated positioners for testing motion control systems.

class SynAxis(Device):
    """
    Synthetic axis positioner with realistic motion behavior.
    
    Simulates motor-like behavior including motion time,
    velocity limits, and position feedback.
    
    Parameters:
    - name (str): Axis name
    - delay (float): Simulated move time per unit distance
    - precision (float): Position precision/tolerance
    - **kwargs: Additional device parameters
    """
    def __init__(self, *, name, delay=0.1, precision=0.1, **kwargs): ...
    
    def move(self, position, **kwargs):
        """
        Move to target position with simulated delay.
        
        Parameters:
        - position (float): Target position
        
        Returns:
        MoveStatus: Status with simulated move time
        """
    
    def set(self, position, **kwargs):
        """Set position (alias for move)."""
    
    @property
    def position(self):
        """
        Current axis position.
        
        Returns:
        float: Current position
        """
    
    @property
    def velocity(self):
        """
        Axis velocity setting.
        
        Returns:
        SynSignal: Velocity signal
        """

class SynAxisEmptyHints(SynAxis):
    """Synthetic axis with empty hints for testing."""
    def __init__(self, **kwargs): ...

class SynAxisNoHints(SynAxis):
    """Synthetic axis with no hints attribute."""
    def __init__(self, **kwargs): ...

class SynAxisNoPosition(SynAxis):
    """Synthetic axis without position attribute for error testing."""
    def __init__(self, **kwargs): ...

Synthetic Detectors

Simulated detectors for testing data acquisition workflows.

class SynGauss(Device):
    """
    Synthetic Gaussian peak detector.
    
    Generates Gaussian-shaped signals based on motor positions,
    useful for simulating peak scanning and optimization.
    
    Parameters:
    - name (str): Detector name
    - motor (Device): Motor device for position dependence  
    - motor_field (str): Motor field name to read position from
    - center (float): Peak center position
    - Imax (float): Peak maximum intensity
    - sigma (float): Peak width (standard deviation)
    - noise (str): Noise model ('poisson', 'uniform', None)
    - noise_multiplier (float): Noise scaling factor
    - random_state: Random state for reproducible noise
    """
    def __init__(self, name, motor, motor_field, center, Imax, sigma=1, 
                 noise='poisson', noise_multiplier=1, random_state=None, **kwargs): ...
    
    def trigger(self):
        """
        Trigger detector to generate Gaussian signal.
        
        Returns:
        Status: Trigger completion status
        """
    
    def read(self):
        """
        Read Gaussian detector value.
        
        Returns:
        dict: Reading with Gaussian value based on motor position
        """

class Syn2DGauss(Device):
    """
    Synthetic 2D Gaussian detector.
    
    Generates 2D Gaussian signals based on two motor positions.
    
    Parameters:
    - name (str): Detector name
    - motor0 (Device): First motor (X axis)
    - motor0_field (str): First motor field name
    - motor1 (Device): Second motor (Y axis)  
    - motor1_field (str): Second motor field name
    - center (tuple): Peak center (x, y)
    - Imax (float): Peak maximum intensity
    - sigma (float): Peak width
    """
    def __init__(self, name, motor0, motor0_field, motor1, motor1_field,
                 center, Imax, sigma=1, **kwargs): ...

class ABDetector(Device):
    """
    Simple A/B detector for basic testing.
    
    Provides two correlated synthetic signals (A and B channels).
    """
    def __init__(self, name, **kwargs): ...
    
    def trigger(self):
        """Trigger A/B detector reading."""
    
    def read(self):
        """Read A and B channel values."""

class DetWithCountTime(Device):
    """
    Detector with configurable count time.
    
    Simulates detectors that require count time configuration.
    """
    def __init__(self, name, **kwargs): ...
    
    @property
    def count_time(self):
        """
        Count time setting.
        
        Returns:
        SynSignal: Count time signal
        """

class InvariantSignal(Signal):
    """
    Signal that maintains constant value for testing.
    
    Useful for testing systems that expect unchanging reference values.
    """
    def __init__(self, *, name, value=1, **kwargs): ...

Fake EPICS Signals

Mock EPICS signals that simulate EPICS behavior without requiring EPICS infrastructure.

class FakeEpicsSignal(Signal):
    """
    Fake EPICS signal for testing without EPICS.
    
    Simulates EpicsSignal behavior including connection status,
    limits, precision, and units.
    
    Parameters:
    - read_pv (str): Simulated read PV name
    - write_pv (str): Simulated write PV name (optional)
    - **kwargs: Additional signal parameters
    """
    def __init__(self, read_pv, write_pv=None, **kwargs): ...
    
    def get(self, **kwargs):
        """Get fake PV value."""
    
    def put(self, value, **kwargs):
        """Put value to fake PV."""
    
    @property
    def limits(self):
        """
        Simulated PV limits.
        
        Returns:
        tuple: (low_limit, high_limit)
        """
    
    @property
    def precision(self):
        """
        Simulated PV precision.
        
        Returns:
        int: Display precision
        """
    
    @property
    def units(self):
        """
        Simulated PV units.
        
        Returns:
        str: Engineering units
        """

class FakeEpicsSignalRO(FakeEpicsSignal):
    """Read-only fake EPICS signal."""
    def __init__(self, read_pv, **kwargs): ...

class FakeEpicsSignalWithRBV(FakeEpicsSignal):
    """
    Fake EPICS signal with separate readback PV.
    
    Simulates setpoint/readback PV pairs common in EPICS.
    """
    def __init__(self, prefix, **kwargs): ...

Testing Utilities

Utilities and classes specifically designed for testing ophyd applications.

class NullStatus(StatusBase):
    """
    Null status that completes immediately.
    
    Useful for testing when you need a status object but
    don't want any actual delay or processing.
    """
    def __init__(self, **kwargs): ...
    
    @property
    def done(self):
        """Always returns True."""
        return True
    
    @property
    def success(self):
        """Always returns True."""
        return True

class MockFlyer(Device):
    """
    Mock flyer device for testing fly scanning.
    
    Simulates flyer interface without requiring actual
    continuous scanning hardware.
    """
    def __init__(self, name, **kwargs): ...
    
    def kickoff(self):
        """
        Start mock fly scan.
        
        Returns:
        NullStatus: Immediately complete status
        """
    
    def complete(self):
        """
        Complete mock fly scan.
        
        Returns:
        NullStatus: Immediately complete status
        """
    
    def collect(self):
        """
        Collect mock fly scan data.
        
        Yields:
        dict: Mock event documents
        """

class TrivialFlyer(Device):
    """
    Trivial flyer implementation for basic testing.
    
    Minimal flyer interface for testing flyer protocols.
    """
    def __init__(self, **kwargs): ...
    
    def kickoff(self):
        """Start trivial fly scan."""
    
    def complete(self): 
        """Complete trivial fly scan."""
    
    def collect(self):
        """Collect trivial fly scan data."""

class SynSignalWithRegistry(SynSignal):
    """
    Synthetic signal that registers itself for testing registry systems.
    """
    def __init__(self, **kwargs): ...

class SPseudo3x3(PseudoPositioner):
    """
    3x3 synthetic pseudo positioner for testing coordinate transformations.
    
    Provides 3 pseudo axes that map to 3 real axes through
    a 3x3 transformation matrix.
    """
    def __init__(self, **kwargs): ...
    
    def forward(self, pseudo_pos):
        """Transform 3 pseudo coordinates to 3 real coordinates."""
    
    def inverse(self, real_pos):
        """Transform 3 real coordinates to 3 pseudo coordinates."""

class SPseudo1x3(PseudoPositioner):
    """
    1x3 synthetic pseudo positioner.
    
    Maps 1 pseudo axis to 3 real axes for testing
    one-to-many transformations.
    """
    def __init__(self, **kwargs): ...
    
    def forward(self, pseudo_pos):
        """Transform 1 pseudo coordinate to 3 real coordinates."""
    
    def inverse(self, real_pos):
        """Transform 3 real coordinates to 1 pseudo coordinate."""

Additional Simulation Classes

Advanced simulation classes for specialized testing scenarios and device emulation.

class DirectImage(Device):
    """
    Synthetic area detector that directly stores image data.
    
    Used for testing image data workflows without requiring
    actual area detector hardware or complex image generation.
    
    Parameters:
    - name (str): Device name
    - func (callable): Optional function to generate image data
    """
    def __init__(self, *, name, func=None, **kwargs): ...
    
    # Component for storing image data
    img: Component  # Direct image storage

class NewTrivialFlyer(Device):
    """
    Enhanced TrivialFlyer with asset document support.
    
    Implements the new-style API for Resource and Datum documents,
    supporting modern asset management patterns in Bluesky.
    """
    def __init__(self, **kwargs): ...
    
    def kickoff(self):
        """Start flyer operation."""
    
    def complete(self):
        """Complete flyer operation."""
    
    def collect(self):
        """Collect flight data."""
    
    def collect_asset_docs(self):
        """Collect asset documents (Resource and Datum)."""

class DetWithConf(Device):
    """
    Detector with separate read and configuration attributes.
    
    Demonstrates device configuration patterns where some
    components are read regularly and others only for configuration.
    """
    def __init__(self, **kwargs): ...
    
    # Mixed read/config components
    a: Component  # read_attrs
    b: Component  # read_attrs  
    c: Component  # configuration_attrs
    d: Component  # configuration_attrs

class FakeEpicsPathSignal(FakeEpicsSignal):
    """
    Specialized fake EPICS signal for path handling.
    
    Used in AreaDetector path management simulation,
    supporting different path semantics and validation.
    
    Parameters:
    - read_pv (str): Read PV name 
    - write_pv (str): Write PV name
    - path_semantics (str): Path validation semantics
    """
    def __init__(self, read_pv, write_pv=None, *, path_semantics='posix', **kwargs): ...

class SynSignalWithRegistry(SynSignal):
    """
    Synthetic signal with asset document registry support.
    
    Generates asset documents (Resource and Datum) for testing
    data management workflows with external file storage.
    
    Parameters:
    - name (str): Signal name
    - save_path (str): Path for storing generated data files
    - save_spec (str): File format specification
    - save_func (callable): Function for saving data
    """
    def __init__(self, *, name, save_path=None, save_spec='NSPY_SEQ', save_func=None, **kwargs): ...
    
    def collect_asset_docs(self):
        """Generate Resource and Datum documents for collected data."""

Utility Functions

Helper functions for creating and managing synthetic devices and test environments.

def make_fake_device(cls):
    """
    Create fake version of real EPICS device class.
    
    Replaces all EpicsSignal components with FakeEpicsSignal
    equivalents, enabling testing without EPICS infrastructure.
    
    Parameters:
    - cls (Device): Device class to make fake version of
    
    Returns:
    Device: New device class with fake signals
    """

def clear_fake_device(dev, *, exclude=None):
    """
    Reset all signals in fake device to default values.
    
    Useful for cleaning up test state between test runs.
    
    Parameters:
    - dev (Device): Fake device to clear
    - exclude (list): Signal names to skip clearing
    """

def instantiate_fake_device(dev_cls, **kwargs):
    """
    Create instance of fake device with sensible defaults.
    
    Handles complex initialization requirements and provides
    reasonable default values for testing.
    
    Parameters:
    - dev_cls (Device): Device class to instantiate
    - **kwargs: Override default parameters
    
    Returns:
    Device: Configured fake device instance
    """

def hw(save_path=None):
    """
    Factory function creating complete synthetic hardware suite.
    
    Returns namespace containing motors, detectors, scalers,
    area detectors, and other common beamline devices.
    
    Parameters:
    - save_path (str): Path for saving generated data
    
    Returns:
    SimpleNamespace: Collection of synthetic hardware devices
    """

def new_uid():
    """Generate new unique identifier string."""

def short_uid():
    """Generate short unique identifier (6 characters)."""

Usage Examples

Basic Synthetic Signal Testing

from ophyd.sim import SynSignal, SynSignalRO
import time
import numpy as np

# Create synthetic signal with static value
temperature = SynSignal(name='temperature', value=25.0)

# Read and write synthetic signal
print(f"Initial temperature: {temperature.get()}")
temperature.put(30.0)
print(f"New temperature: {temperature.get()}")

# Create signal with time-based function
def sine_wave(t=None):
    if t is None:
        t = time.time()
    return 10 * np.sin(2 * np.pi * t / 5.0)  # 5 second period

oscillating_signal = SynSignal(name='oscillator', func=sine_wave)

# Read oscillating values
for i in range(5):
    print(f"Oscillator value: {oscillating_signal.get():.3f}")
    time.sleep(1)

Synthetic Motor Testing

from ophyd.sim import SynAxis
from ophyd.status import wait
import time

# Create synthetic motor
motor = SynAxis(name='synthetic_motor', delay=0.1)  # 0.1s per unit move

print(f"Initial position: {motor.position}")

# Move motor with realistic delay
start_time = time.time()
status = motor.move(10.0)

print("Move started...")
wait(status)  # Wait for simulated move to complete
end_time = time.time()

print(f"Final position: {motor.position}")
print(f"Move took: {end_time - start_time:.2f} seconds")

# Test position tolerance
motor.move(5.0)
wait(status)
print(f"Position after move to 5.0: {motor.position}")

Gaussian Peak Detector Simulation

from ophyd.sim import SynGauss, SynAxis
from ophyd.status import wait
import numpy as np
import matplotlib.pyplot as plt

# Create synthetic motor and Gaussian detector
motor = SynAxis(name='scan_motor')
detector = SynGauss(
    'gauss_det', 
    motor, 'position',  # Position dependence
    center=5.0,         # Peak at position 5.0
    Imax=1000,          # Peak intensity
    sigma=1.0,          # Peak width
    noise='poisson'     # Poisson noise
)

# Scan across peak
positions = np.linspace(0, 10, 21)
intensities = []

for pos in positions:
    # Move motor
    status = motor.move(pos)
    wait(status)
    
    # Trigger detector
    det_status = detector.trigger()
    wait(det_status)
    
    # Read intensity
    reading = detector.read()
    intensity = reading['gauss_det']['value']
    intensities.append(intensity)
    
    print(f"Position: {pos:4.1f}, Intensity: {intensity:6.1f}")

# Plot results
plt.figure(figsize=(10, 6))
plt.plot(positions, intensities, 'bo-')
plt.xlabel('Motor Position')
plt.ylabel('Detector Intensity')
plt.title('Synthetic Gaussian Peak Scan')
plt.grid(True)
plt.show()

Fake EPICS Signal Testing

from ophyd.sim import FakeEpicsSignal, FakeEpicsSignalWithRBV

# Create fake EPICS signals for testing
temp_sensor = FakeEpicsSignal('XF:28IDC:TEMP:01', name='temperature')
pressure_ctrl = FakeEpicsSignalWithRBV('XF:28IDC:PRES:', name='pressure')

# Test EPICS-like behavior without EPICS
print(f"Temperature: {temp_sensor.get()}")
print(f"Temperature limits: {temp_sensor.limits}")
print(f"Temperature units: {temp_sensor.units}")

# Test setpoint/readback behavior
pressure_ctrl.put(1.5)  # Set pressure
setpoint = pressure_ctrl.get()
readback = pressure_ctrl.user_readback.get()

print(f"Pressure setpoint: {setpoint}")
print(f"Pressure readback: {readback}")

Mock Data Acquisition System

from ophyd.sim import (SynAxis, SynGauss, ABDetector, 
                       DetWithCountTime, NullStatus)
from ophyd.status import wait

class MockBeamline:
    """Mock beamline for testing data acquisition."""
    
    def __init__(self):
        # Create synthetic devices
        self.motor_x = SynAxis(name='motor_x')
        self.motor_y = SynAxis(name='motor_y')
        
        self.detector_main = SynGauss(
            'main_det', self.motor_x, 'position',
            center=0, Imax=10000, sigma=2.0
        )
        
        self.detector_monitor = ABDetector('monitor')
        self.detector_timed = DetWithCountTime('timed_det')
        
    def scan_1d(self, motor, positions, detector):
        """Perform 1D scan."""
        results = []
        
        for pos in positions:
            # Move motor
            status = motor.move(pos)
            wait(status)
            
            # Trigger detector
            det_status = detector.trigger()
            wait(det_status)
            
            # Read data
            reading = detector.read()
            results.append({
                'position': pos,
                'reading': reading
            })
            
        return results
    
    def scan_2d(self, motor_x, positions_x, motor_y, positions_y, detector):
        """Perform 2D grid scan."""
        results = []
        
        for x_pos in positions_x:
            for y_pos in positions_y:
                # Move both motors
                x_status = motor_x.move(x_pos)
                y_status = motor_y.move(y_pos)
                wait([x_status, y_status])
                
                # Trigger detector
                det_status = detector.trigger()
                wait(det_status)
                
                # Read data
                reading = detector.read()
                results.append({
                    'x_position': x_pos,
                    'y_position': y_pos,
                    'reading': reading
                })
                
        return results

# Use mock beamline
beamline = MockBeamline()

# Perform 1D scan
positions = [-5, -2, -1, 0, 1, 2, 5]
scan_results = beamline.scan_1d(
    beamline.motor_x, 
    positions, 
    beamline.detector_main
)

print("1D Scan Results:")
for result in scan_results:
    pos = result['position']
    intensity = result['reading']['main_det']['value']
    print(f"Position: {pos:4.1f}, Intensity: {intensity:8.1f}")

Testing Device Staging

from ophyd.sim import SynAxis, DetWithCountTime
from ophyd import Device, Component

class MockExperimentDevice(Device):
    """Mock device for testing staging protocols."""
    
    motor = Component(SynAxis, '')
    detector = Component(DetWithCountTime, '')
    
    def stage(self):
        """Custom staging behavior."""
        print("Staging experiment device...")
        
        # Configure detector count time
        self.detector.count_time.put(0.1)
        
        # Call parent staging
        staged = super().stage()
        
        print("Device staged successfully")
        return staged
    
    def unstage(self):
        """Custom unstaging behavior."""
        print("Unstaging experiment device...")
        
        # Call parent unstaging
        unstaged = super().unstage()
        
        print("Device unstaged successfully")
        return unstaged

# Test staging behavior
device = MockExperimentDevice(name='experiment')

# Stage device
staged_components = device.stage()
print(f"Staged components: {staged_components}")

# Use device (simulated)
print("Using device for data acquisition...")

# Unstage device
unstaged_components = device.unstage()
print(f"Unstaged components: {unstaged_components}")

Install with Tessl CLI

npx tessl i tessl/pypi-ophyd

docs

area-detectors.md

core-framework.md

epics-integration.md

flyers-continuous-scanning.md

index.md

motors-positioners.md

simulation-testing.md

specialized-devices.md

tile.json