Bluesky hardware abstraction library with EPICS control system integration for scientific instrument automation.
—
Mock devices and signals for testing, development, and offline work without requiring actual hardware connections. These simulation tools enable development and testing of ophyd-based applications in environments where real hardware is not available.
Simulated signals that provide realistic behavior for testing and development.
class SynSignal(Signal):
"""
Synthetic signal with programmable behavior.
Generates synthetic data based on functions, noise models,
or static values for testing and simulation purposes.
Parameters:
- name (str): Signal name
- func (callable): Function to generate values func(time)
- **kwargs: Additional signal parameters
"""
def __init__(self, *, name, func=None, **kwargs): ...
def get(self, **kwargs):
"""
Get synthetic signal value.
Returns:
Generated value based on function or static value
"""
def put(self, value, **kwargs):
"""Set synthetic signal to new value."""
def set(self, value, **kwargs):
"""
Set synthetic signal asynchronously.
Returns:
Status: Immediately completed status
"""
class SynSignalRO(SynSignal):
"""
Read-only synthetic signal.
Cannot be written to, only provides synthetic readback values.
"""
def __init__(self, *, name, func=None, **kwargs): ...
class SynPeriodicSignal(SynSignal):
"""
Synthetic signal with periodic behavior.
Generates values based on periodic functions like sine waves,
square waves, or other repeating patterns.
Parameters:
- name (str): Signal name
- period (float): Period in seconds
- func (callable): Periodic function
"""
def __init__(self, *, name, period=1.0, func=None, **kwargs): ...
class EnumSignal(Signal):
"""
Signal with enumerated values for testing state machines.
Parameters:
- name (str): Signal name
- enum_strs (list): List of valid string values
- value: Initial value
"""
def __init__(self, *, name, enum_strs, value=0, **kwargs): ...Simulated positioners for testing motion control systems.
class SynAxis(Device):
"""
Synthetic axis positioner with realistic motion behavior.
Simulates motor-like behavior including motion time,
velocity limits, and position feedback.
Parameters:
- name (str): Axis name
- delay (float): Simulated move time per unit distance
- precision (float): Position precision/tolerance
- **kwargs: Additional device parameters
"""
def __init__(self, *, name, delay=0.1, precision=0.1, **kwargs): ...
def move(self, position, **kwargs):
"""
Move to target position with simulated delay.
Parameters:
- position (float): Target position
Returns:
MoveStatus: Status with simulated move time
"""
def set(self, position, **kwargs):
"""Set position (alias for move)."""
@property
def position(self):
"""
Current axis position.
Returns:
float: Current position
"""
@property
def velocity(self):
"""
Axis velocity setting.
Returns:
SynSignal: Velocity signal
"""
class SynAxisEmptyHints(SynAxis):
"""Synthetic axis with empty hints for testing."""
def __init__(self, **kwargs): ...
class SynAxisNoHints(SynAxis):
"""Synthetic axis with no hints attribute."""
def __init__(self, **kwargs): ...
class SynAxisNoPosition(SynAxis):
"""Synthetic axis without position attribute for error testing."""
def __init__(self, **kwargs): ...Simulated detectors for testing data acquisition workflows.
class SynGauss(Device):
"""
Synthetic Gaussian peak detector.
Generates Gaussian-shaped signals based on motor positions,
useful for simulating peak scanning and optimization.
Parameters:
- name (str): Detector name
- motor (Device): Motor device for position dependence
- motor_field (str): Motor field name to read position from
- center (float): Peak center position
- Imax (float): Peak maximum intensity
- sigma (float): Peak width (standard deviation)
- noise (str): Noise model ('poisson', 'uniform', None)
- noise_multiplier (float): Noise scaling factor
- random_state: Random state for reproducible noise
"""
def __init__(self, name, motor, motor_field, center, Imax, sigma=1,
noise='poisson', noise_multiplier=1, random_state=None, **kwargs): ...
def trigger(self):
"""
Trigger detector to generate Gaussian signal.
Returns:
Status: Trigger completion status
"""
def read(self):
"""
Read Gaussian detector value.
Returns:
dict: Reading with Gaussian value based on motor position
"""
class Syn2DGauss(Device):
"""
Synthetic 2D Gaussian detector.
Generates 2D Gaussian signals based on two motor positions.
Parameters:
- name (str): Detector name
- motor0 (Device): First motor (X axis)
- motor0_field (str): First motor field name
- motor1 (Device): Second motor (Y axis)
- motor1_field (str): Second motor field name
- center (tuple): Peak center (x, y)
- Imax (float): Peak maximum intensity
- sigma (float): Peak width
"""
def __init__(self, name, motor0, motor0_field, motor1, motor1_field,
center, Imax, sigma=1, **kwargs): ...
class ABDetector(Device):
"""
Simple A/B detector for basic testing.
Provides two correlated synthetic signals (A and B channels).
"""
def __init__(self, name, **kwargs): ...
def trigger(self):
"""Trigger A/B detector reading."""
def read(self):
"""Read A and B channel values."""
class DetWithCountTime(Device):
"""
Detector with configurable count time.
Simulates detectors that require count time configuration.
"""
def __init__(self, name, **kwargs): ...
@property
def count_time(self):
"""
Count time setting.
Returns:
SynSignal: Count time signal
"""
class InvariantSignal(Signal):
"""
Signal that maintains constant value for testing.
Useful for testing systems that expect unchanging reference values.
"""
def __init__(self, *, name, value=1, **kwargs): ...Mock EPICS signals that simulate EPICS behavior without requiring EPICS infrastructure.
class FakeEpicsSignal(Signal):
"""
Fake EPICS signal for testing without EPICS.
Simulates EpicsSignal behavior including connection status,
limits, precision, and units.
Parameters:
- read_pv (str): Simulated read PV name
- write_pv (str): Simulated write PV name (optional)
- **kwargs: Additional signal parameters
"""
def __init__(self, read_pv, write_pv=None, **kwargs): ...
def get(self, **kwargs):
"""Get fake PV value."""
def put(self, value, **kwargs):
"""Put value to fake PV."""
@property
def limits(self):
"""
Simulated PV limits.
Returns:
tuple: (low_limit, high_limit)
"""
@property
def precision(self):
"""
Simulated PV precision.
Returns:
int: Display precision
"""
@property
def units(self):
"""
Simulated PV units.
Returns:
str: Engineering units
"""
class FakeEpicsSignalRO(FakeEpicsSignal):
"""Read-only fake EPICS signal."""
def __init__(self, read_pv, **kwargs): ...
class FakeEpicsSignalWithRBV(FakeEpicsSignal):
"""
Fake EPICS signal with separate readback PV.
Simulates setpoint/readback PV pairs common in EPICS.
"""
def __init__(self, prefix, **kwargs): ...Utilities and classes specifically designed for testing ophyd applications.
class NullStatus(StatusBase):
"""
Null status that completes immediately.
Useful for testing when you need a status object but
don't want any actual delay or processing.
"""
def __init__(self, **kwargs): ...
@property
def done(self):
"""Always returns True."""
return True
@property
def success(self):
"""Always returns True."""
return True
class MockFlyer(Device):
"""
Mock flyer device for testing fly scanning.
Simulates flyer interface without requiring actual
continuous scanning hardware.
"""
def __init__(self, name, **kwargs): ...
def kickoff(self):
"""
Start mock fly scan.
Returns:
NullStatus: Immediately complete status
"""
def complete(self):
"""
Complete mock fly scan.
Returns:
NullStatus: Immediately complete status
"""
def collect(self):
"""
Collect mock fly scan data.
Yields:
dict: Mock event documents
"""
class TrivialFlyer(Device):
"""
Trivial flyer implementation for basic testing.
Minimal flyer interface for testing flyer protocols.
"""
def __init__(self, **kwargs): ...
def kickoff(self):
"""Start trivial fly scan."""
def complete(self):
"""Complete trivial fly scan."""
def collect(self):
"""Collect trivial fly scan data."""
class SynSignalWithRegistry(SynSignal):
"""
Synthetic signal that registers itself for testing registry systems.
"""
def __init__(self, **kwargs): ...
class SPseudo3x3(PseudoPositioner):
"""
3x3 synthetic pseudo positioner for testing coordinate transformations.
Provides 3 pseudo axes that map to 3 real axes through
a 3x3 transformation matrix.
"""
def __init__(self, **kwargs): ...
def forward(self, pseudo_pos):
"""Transform 3 pseudo coordinates to 3 real coordinates."""
def inverse(self, real_pos):
"""Transform 3 real coordinates to 3 pseudo coordinates."""
class SPseudo1x3(PseudoPositioner):
"""
1x3 synthetic pseudo positioner.
Maps 1 pseudo axis to 3 real axes for testing
one-to-many transformations.
"""
def __init__(self, **kwargs): ...
def forward(self, pseudo_pos):
"""Transform 1 pseudo coordinate to 3 real coordinates."""
def inverse(self, real_pos):
"""Transform 3 real coordinates to 1 pseudo coordinate."""Advanced simulation classes for specialized testing scenarios and device emulation.
class DirectImage(Device):
"""
Synthetic area detector that directly stores image data.
Used for testing image data workflows without requiring
actual area detector hardware or complex image generation.
Parameters:
- name (str): Device name
- func (callable): Optional function to generate image data
"""
def __init__(self, *, name, func=None, **kwargs): ...
# Component for storing image data
img: Component # Direct image storage
class NewTrivialFlyer(Device):
"""
Enhanced TrivialFlyer with asset document support.
Implements the new-style API for Resource and Datum documents,
supporting modern asset management patterns in Bluesky.
"""
def __init__(self, **kwargs): ...
def kickoff(self):
"""Start flyer operation."""
def complete(self):
"""Complete flyer operation."""
def collect(self):
"""Collect flight data."""
def collect_asset_docs(self):
"""Collect asset documents (Resource and Datum)."""
class DetWithConf(Device):
"""
Detector with separate read and configuration attributes.
Demonstrates device configuration patterns where some
components are read regularly and others only for configuration.
"""
def __init__(self, **kwargs): ...
# Mixed read/config components
a: Component # read_attrs
b: Component # read_attrs
c: Component # configuration_attrs
d: Component # configuration_attrs
class FakeEpicsPathSignal(FakeEpicsSignal):
"""
Specialized fake EPICS signal for path handling.
Used in AreaDetector path management simulation,
supporting different path semantics and validation.
Parameters:
- read_pv (str): Read PV name
- write_pv (str): Write PV name
- path_semantics (str): Path validation semantics
"""
def __init__(self, read_pv, write_pv=None, *, path_semantics='posix', **kwargs): ...
class SynSignalWithRegistry(SynSignal):
"""
Synthetic signal with asset document registry support.
Generates asset documents (Resource and Datum) for testing
data management workflows with external file storage.
Parameters:
- name (str): Signal name
- save_path (str): Path for storing generated data files
- save_spec (str): File format specification
- save_func (callable): Function for saving data
"""
def __init__(self, *, name, save_path=None, save_spec='NSPY_SEQ', save_func=None, **kwargs): ...
def collect_asset_docs(self):
"""Generate Resource and Datum documents for collected data."""Helper functions for creating and managing synthetic devices and test environments.
def make_fake_device(cls):
"""
Create fake version of real EPICS device class.
Replaces all EpicsSignal components with FakeEpicsSignal
equivalents, enabling testing without EPICS infrastructure.
Parameters:
- cls (Device): Device class to make fake version of
Returns:
Device: New device class with fake signals
"""
def clear_fake_device(dev, *, exclude=None):
"""
Reset all signals in fake device to default values.
Useful for cleaning up test state between test runs.
Parameters:
- dev (Device): Fake device to clear
- exclude (list): Signal names to skip clearing
"""
def instantiate_fake_device(dev_cls, **kwargs):
"""
Create instance of fake device with sensible defaults.
Handles complex initialization requirements and provides
reasonable default values for testing.
Parameters:
- dev_cls (Device): Device class to instantiate
- **kwargs: Override default parameters
Returns:
Device: Configured fake device instance
"""
def hw(save_path=None):
"""
Factory function creating complete synthetic hardware suite.
Returns namespace containing motors, detectors, scalers,
area detectors, and other common beamline devices.
Parameters:
- save_path (str): Path for saving generated data
Returns:
SimpleNamespace: Collection of synthetic hardware devices
"""
def new_uid():
"""Generate new unique identifier string."""
def short_uid():
"""Generate short unique identifier (6 characters)."""from ophyd.sim import SynSignal, SynSignalRO
import time
import numpy as np
# Create synthetic signal with static value
temperature = SynSignal(name='temperature', value=25.0)
# Read and write synthetic signal
print(f"Initial temperature: {temperature.get()}")
temperature.put(30.0)
print(f"New temperature: {temperature.get()}")
# Create signal with time-based function
def sine_wave(t=None):
if t is None:
t = time.time()
return 10 * np.sin(2 * np.pi * t / 5.0) # 5 second period
oscillating_signal = SynSignal(name='oscillator', func=sine_wave)
# Read oscillating values
for i in range(5):
print(f"Oscillator value: {oscillating_signal.get():.3f}")
time.sleep(1)from ophyd.sim import SynAxis
from ophyd.status import wait
import time
# Create synthetic motor
motor = SynAxis(name='synthetic_motor', delay=0.1) # 0.1s per unit move
print(f"Initial position: {motor.position}")
# Move motor with realistic delay
start_time = time.time()
status = motor.move(10.0)
print("Move started...")
wait(status) # Wait for simulated move to complete
end_time = time.time()
print(f"Final position: {motor.position}")
print(f"Move took: {end_time - start_time:.2f} seconds")
# Test position tolerance
motor.move(5.0)
wait(status)
print(f"Position after move to 5.0: {motor.position}")from ophyd.sim import SynGauss, SynAxis
from ophyd.status import wait
import numpy as np
import matplotlib.pyplot as plt
# Create synthetic motor and Gaussian detector
motor = SynAxis(name='scan_motor')
detector = SynGauss(
'gauss_det',
motor, 'position', # Position dependence
center=5.0, # Peak at position 5.0
Imax=1000, # Peak intensity
sigma=1.0, # Peak width
noise='poisson' # Poisson noise
)
# Scan across peak
positions = np.linspace(0, 10, 21)
intensities = []
for pos in positions:
# Move motor
status = motor.move(pos)
wait(status)
# Trigger detector
det_status = detector.trigger()
wait(det_status)
# Read intensity
reading = detector.read()
intensity = reading['gauss_det']['value']
intensities.append(intensity)
print(f"Position: {pos:4.1f}, Intensity: {intensity:6.1f}")
# Plot results
plt.figure(figsize=(10, 6))
plt.plot(positions, intensities, 'bo-')
plt.xlabel('Motor Position')
plt.ylabel('Detector Intensity')
plt.title('Synthetic Gaussian Peak Scan')
plt.grid(True)
plt.show()from ophyd.sim import FakeEpicsSignal, FakeEpicsSignalWithRBV
# Create fake EPICS signals for testing
temp_sensor = FakeEpicsSignal('XF:28IDC:TEMP:01', name='temperature')
pressure_ctrl = FakeEpicsSignalWithRBV('XF:28IDC:PRES:', name='pressure')
# Test EPICS-like behavior without EPICS
print(f"Temperature: {temp_sensor.get()}")
print(f"Temperature limits: {temp_sensor.limits}")
print(f"Temperature units: {temp_sensor.units}")
# Test setpoint/readback behavior
pressure_ctrl.put(1.5) # Set pressure
setpoint = pressure_ctrl.get()
readback = pressure_ctrl.user_readback.get()
print(f"Pressure setpoint: {setpoint}")
print(f"Pressure readback: {readback}")from ophyd.sim import (SynAxis, SynGauss, ABDetector,
DetWithCountTime, NullStatus)
from ophyd.status import wait
class MockBeamline:
"""Mock beamline for testing data acquisition."""
def __init__(self):
# Create synthetic devices
self.motor_x = SynAxis(name='motor_x')
self.motor_y = SynAxis(name='motor_y')
self.detector_main = SynGauss(
'main_det', self.motor_x, 'position',
center=0, Imax=10000, sigma=2.0
)
self.detector_monitor = ABDetector('monitor')
self.detector_timed = DetWithCountTime('timed_det')
def scan_1d(self, motor, positions, detector):
"""Perform 1D scan."""
results = []
for pos in positions:
# Move motor
status = motor.move(pos)
wait(status)
# Trigger detector
det_status = detector.trigger()
wait(det_status)
# Read data
reading = detector.read()
results.append({
'position': pos,
'reading': reading
})
return results
def scan_2d(self, motor_x, positions_x, motor_y, positions_y, detector):
"""Perform 2D grid scan."""
results = []
for x_pos in positions_x:
for y_pos in positions_y:
# Move both motors
x_status = motor_x.move(x_pos)
y_status = motor_y.move(y_pos)
wait([x_status, y_status])
# Trigger detector
det_status = detector.trigger()
wait(det_status)
# Read data
reading = detector.read()
results.append({
'x_position': x_pos,
'y_position': y_pos,
'reading': reading
})
return results
# Use mock beamline
beamline = MockBeamline()
# Perform 1D scan
positions = [-5, -2, -1, 0, 1, 2, 5]
scan_results = beamline.scan_1d(
beamline.motor_x,
positions,
beamline.detector_main
)
print("1D Scan Results:")
for result in scan_results:
pos = result['position']
intensity = result['reading']['main_det']['value']
print(f"Position: {pos:4.1f}, Intensity: {intensity:8.1f}")from ophyd.sim import SynAxis, DetWithCountTime
from ophyd import Device, Component
class MockExperimentDevice(Device):
"""Mock device for testing staging protocols."""
motor = Component(SynAxis, '')
detector = Component(DetWithCountTime, '')
def stage(self):
"""Custom staging behavior."""
print("Staging experiment device...")
# Configure detector count time
self.detector.count_time.put(0.1)
# Call parent staging
staged = super().stage()
print("Device staged successfully")
return staged
def unstage(self):
"""Custom unstaging behavior."""
print("Unstaging experiment device...")
# Call parent unstaging
unstaged = super().unstage()
print("Device unstaged successfully")
return unstaged
# Test staging behavior
device = MockExperimentDevice(name='experiment')
# Stage device
staged_components = device.stage()
print(f"Staged components: {staged_components}")
# Use device (simulated)
print("Using device for data acquisition...")
# Unstage device
unstaged_components = device.unstage()
print(f"Unstaged components: {unstaged_components}")Install with Tessl CLI
npx tessl i tessl/pypi-ophyd