Bluesky hardware abstraction library with EPICS control system integration for scientific instrument automation.
npx @tessl/cli install tessl/pypi-ophyd@1.11.0Ophyd is a comprehensive Python library that provides a high-level hardware abstraction layer for experiment orchestration and data acquisition systems, with particular emphasis on EPICS (Experimental Physics and Industrial Control System) control systems. It enables scientists and engineers to interact with diverse hardware devices through a unified interface featuring standardized methods like trigger(), read(), and set(), while abstracting away control system specifics.
pip install ophydimport ophydCommon imports for device creation and control:
from ophyd import Device, Component, EpicsSignal, EpicsMotor
from ophyd import Signal, SignalRO, DerivedSignal, EpicsSignalNoValidation
from ophyd import ALL_COMPONENTS, Kind
from ophyd import setup_ophyd, set_handler
from ophyd.status import StatusBase, waitFor area detector functionality:
from ophyd.areadetector import AreaDetector, SimDetector
from ophyd.areadetector.plugins import HDF5Plugin, ImagePluginfrom ophyd import Device, Component, EpicsSignal, EpicsMotor
from ophyd.status import wait
# Define a custom device using Components
class MyDevice(Device):
temperature = Component(EpicsSignal, 'TEMP:PV')
pressure = Component(EpicsSignal, 'PRES:PV')
# Create device instance
device = MyDevice('MY:DEVICE:', name='my_device')
# Connect to hardware
device.wait_for_connection()
# Read all device values
reading = device.read()
print(reading)
# Use a positioner (motor)
motor = EpicsMotor('XF:28IDC:MOTOR1', name='motor1')
motor.wait_for_connection()
# Move motor and wait for completion
status = motor.set(10.0) # Move to position 10.0
wait(status) # Wait for move to complete
# Read motor position
current_pos = motor.position
print(f"Current position: {current_pos}")Ophyd's design is built around several key concepts:
This architecture enables the creation of reusable, composable device definitions that can be shared across facilities and experiments while maintaining compatibility with the broader Bluesky ecosystem for automated data collection.
The foundational classes and interfaces that form the backbone of ophyd's device abstraction system, including device composition, signal management, and status tracking.
class Device:
def __init__(self, prefix='', *, name, kind=None, read_attrs=None, configuration_attrs=None, parent=None, **kwargs): ...
def read(self): ...
def describe(self): ...
def configure(self, d): ...
def read_configuration(self): ...
def describe_configuration(self): ...
def trigger(self): ...
def stage(self): ...
def unstage(self): ...
class Signal:
def __init__(self, *, name, value=0, timestamp=None, parent=None, kind='normal', tolerance=None, rtolerance=None, **kwargs): ...
def get(self, **kwargs): ...
def put(self, value, **kwargs): ...
def set(self, value, **kwargs): ...
def read(self): ...
def describe(self): ...
class Component:
def __init__(self, cls, suffix='', *, lazy=False, trigger_value=None, add_prefix=None, doc=None, kind='normal', **kwargs): ...
class StatusBase:
def __init__(self, *, timeout=None, settle_time=None): ...
@property
def done(self): ...
@property
def success(self): ...
def wait(self, timeout=None): ...
def wait(status, timeout=None, poll_period=0.05): ...Hardware control through EPICS process variables, providing the primary interface to laboratory instruments and control systems in scientific facilities.
class EpicsSignal(Signal):
def __init__(self, read_pv, write_pv=None, *, pv_kw=None, put_complete=False, string=False, limits=False, auto_monitor=None, name=None, **kwargs): ...
class EpicsSignalRO(Signal):
def __init__(self, read_pv, *, pv_kw=None, string=False, limits=False, auto_monitor=None, name=None, **kwargs): ...
def set_cl(control_layer=None, *, pv_telemetry=False): ...
def get_cl(): ...Positioning devices including EPICS motors, software positioners, and pseudo positioners for coordinate transformations and multi-axis control.
class EpicsMotor(Device):
def __init__(self, prefix='', *, name, settle_time=0.0, timeout=None, **kwargs): ...
def move(self, position, **kwargs): ...
def home(self, direction, **kwargs): ...
def stop(self, **kwargs): ...
@property
def position(self): ...
class PseudoPositioner(Device):
def __init__(self, prefix='', *, configuration_attrs=None, read_attrs=None, **kwargs): ...
def forward(self, pseudo_pos): ...
def inverse(self, real_pos): ...
class SoftPositioner(Device):
def __init__(self, *, name=None, **kwargs): ...
def move(self, position, **kwargs): ...Comprehensive support for 2D area detectors including cameras, plugins for data processing, file I/O, and triggering strategies for coordinated data acquisition.
class AreaDetector(Device):
def __init__(self, prefix, *, name, **kwargs): ...
def stage(self): ...
def unstage(self): ...
def trigger(self): ...
class DetectorBase(Device):
def __init__(self, prefix, *, read_attrs=None, configuration_attrs=None, **kwargs): ...
# Camera classes for specific detector types
class SimDetectorCam(CamBase): ...
class AndorCam(CamBase): ...
class PerkinElmerCam(CamBase): ...
# Plugin classes for data processing
class HDF5Plugin(FilePlugin): ...
class TIFFPlugin(FilePlugin): ...
class ImagePlugin(PluginBase): ...
class StatsPlugin(PluginBase): ...Pre-built device classes for common laboratory instruments including scalers, multi-channel analyzers, and electrometers.
class EpicsScaler(Device):
def __init__(self, prefix, *, name, **kwargs): ...
def stage(self): ...
def trigger(self): ...
class EpicsMCA(Device):
def __init__(self, prefix, *, name, **kwargs): ...
def erase(self): ...
def start(self): ...
def stop(self): ...
class QuadEM(Device):
def __init__(self, prefix, *, name, **kwargs): ...
def stage(self): ...
def trigger(self): ...Mock devices and signals for testing, development, and offline work without requiring actual hardware connections.
class SynSignal(Signal):
def __init__(self, *, name, func=None, **kwargs): ...
class SynAxis(Device):
def __init__(self, *, name, **kwargs): ...
def move(self, position, **kwargs): ...
class SynGauss(Device):
def __init__(self, name, motor, motor_field, center, Imax, sigma=1, noise='poisson', noise_multiplier=1, random_state=None, **kwargs): ...
class FakeEpicsSignal(Signal):
def __init__(self, read_pv, write_pv=None, **kwargs): ...Interfaces for continuous scanning and fly scanning where data is collected while motors are in motion.
class FlyerInterface:
def kickoff(self): ...
def complete(self): ...
def collect(self): ...
def describe_collect(self): ...
class MonitorFlyerMixin:
def kickoff(self): ...
def complete(self): ...
def collect(self): ...Flyers and Continuous Scanning
Support for unit-aware signals and unit conversions.
class UnitConversionDerivedSignal(DerivedSignal):
def __init__(self, derived_from, original_units, derived_units, **kwargs): ...Event publishing for integration with data acquisition systems.
class UidPublish:
def __init__(self, RE, stream_name='primary'): ...
class LastUidPublish:
def __init__(self, RE): ...Device management and configuration utilities for connection handling, logging, and instance management.
# Connection management
def wait_for_lazy_connection(): ...
def do_not_wait_for_lazy_connection(): ...
# Instance management
def register_instances_in_weakset(device_class): ...
def register_instances_keyed_on_name(device_class): ...
def select_version(name, version): ...
# Setup and configuration
def setup_ophyd(logger=None): ...
def set_handler(handler=None): ...
# Component utilities
def kind_context(kind): ...
ALL_COMPONENTS: frozenset # Set of all component classesOphyd defines several exception types for different error conditions:
class OpException(Exception): ...
class ReadOnlyError(OpException): ...
class LimitError(OpException): ...
class DisconnectedError(OpException): ...
class StatusTimeoutError(Exception): ...
class WaitTimeoutError(Exception): ...Common error handling patterns:
from ophyd.utils.errors import DisconnectedError, StatusTimeoutError
try:
status = device.set(new_value)
wait(status, timeout=10.0)
except DisconnectedError:
print("Device is not connected")
except StatusTimeoutError:
print("Operation timed out")from enum import IntFlag
class Kind(IntFlag):
"""Flags for indicating the kind of reading."""
omitted = 0b000
normal = 0b001
config = 0b010
hinted = 0b101
class Staged(Enum):
"""Enum for device staging states."""
no = 'no'
yes = 'yes'
partially = 'partially'
# Type aliases
Reading = Dict[str, Dict[str, Any]] # {'signal_name': {'value': val, 'timestamp': ts}}
Configuration = Dict[str, Dict[str, Any]] # Same structure as Reading
Description = Dict[str, Dict[str, Any]] # {'signal_name': {'dtype': 'number', 'shape': []}}