or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

area-detectors.mdcore-framework.mdepics-integration.mdflyers-continuous-scanning.mdindex.mdmotors-positioners.mdsimulation-testing.mdspecialized-devices.md
tile.json

tessl/pypi-ophyd

Bluesky hardware abstraction library with EPICS control system integration for scientific instrument automation.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/ophyd@1.11.x

To install, run

npx @tessl/cli install tessl/pypi-ophyd@1.11.0

index.mddocs/

Ophyd

Ophyd is a comprehensive Python library that provides a high-level hardware abstraction layer for experiment orchestration and data acquisition systems, with particular emphasis on EPICS (Experimental Physics and Industrial Control System) control systems. It enables scientists and engineers to interact with diverse hardware devices through a unified interface featuring standardized methods like trigger(), read(), and set(), while abstracting away control system specifics.

Package Information

  • Package Name: ophyd
  • Language: Python
  • Installation: pip install ophyd
  • Documentation: https://blueskyproject.io/ophyd
  • Version: 1.11.0

Core Imports

import ophyd

Common imports for device creation and control:

from ophyd import Device, Component, EpicsSignal, EpicsMotor
from ophyd import Signal, SignalRO, DerivedSignal, EpicsSignalNoValidation
from ophyd import ALL_COMPONENTS, Kind
from ophyd import setup_ophyd, set_handler
from ophyd.status import StatusBase, wait

For area detector functionality:

from ophyd.areadetector import AreaDetector, SimDetector
from ophyd.areadetector.plugins import HDF5Plugin, ImagePlugin

Basic Usage

from ophyd import Device, Component, EpicsSignal, EpicsMotor
from ophyd.status import wait

# Define a custom device using Components
class MyDevice(Device):
    temperature = Component(EpicsSignal, 'TEMP:PV')
    pressure = Component(EpicsSignal, 'PRES:PV')
    
# Create device instance
device = MyDevice('MY:DEVICE:', name='my_device')

# Connect to hardware
device.wait_for_connection()

# Read all device values
reading = device.read()
print(reading)

# Use a positioner (motor)
motor = EpicsMotor('XF:28IDC:MOTOR1', name='motor1')
motor.wait_for_connection()

# Move motor and wait for completion
status = motor.set(10.0)  # Move to position 10.0
wait(status)  # Wait for move to complete

# Read motor position
current_pos = motor.position
print(f"Current position: {current_pos}")

Architecture

Ophyd's design is built around several key concepts:

  • Device: High-level objects that group related hardware components and provide coordinated operations
  • Signal: Individual readable/writable values representing hardware channels or computed values
  • Component: Building blocks that define the structure of devices and their relationships
  • Status: Objects that track the progress of asynchronous operations like moves or acquisitions
  • Control Layer: Abstraction over EPICS backends (pyepics, caproto, dummy) for different deployment needs

This architecture enables the creation of reusable, composable device definitions that can be shared across facilities and experiments while maintaining compatibility with the broader Bluesky ecosystem for automated data collection.

Capabilities

Core Framework

The foundational classes and interfaces that form the backbone of ophyd's device abstraction system, including device composition, signal management, and status tracking.

class Device:
    def __init__(self, prefix='', *, name, kind=None, read_attrs=None, configuration_attrs=None, parent=None, **kwargs): ...
    def read(self): ...
    def describe(self): ...
    def configure(self, d): ...
    def read_configuration(self): ...
    def describe_configuration(self): ...
    def trigger(self): ...
    def stage(self): ...
    def unstage(self): ...

class Signal:
    def __init__(self, *, name, value=0, timestamp=None, parent=None, kind='normal', tolerance=None, rtolerance=None, **kwargs): ...
    def get(self, **kwargs): ...
    def put(self, value, **kwargs): ...
    def set(self, value, **kwargs): ...
    def read(self): ...
    def describe(self): ...

class Component:
    def __init__(self, cls, suffix='', *, lazy=False, trigger_value=None, add_prefix=None, doc=None, kind='normal', **kwargs): ...

class StatusBase:
    def __init__(self, *, timeout=None, settle_time=None): ...
    @property
    def done(self): ...
    @property
    def success(self): ...
    def wait(self, timeout=None): ...

def wait(status, timeout=None, poll_period=0.05): ...

Core Framework

EPICS Integration

Hardware control through EPICS process variables, providing the primary interface to laboratory instruments and control systems in scientific facilities.

class EpicsSignal(Signal):
    def __init__(self, read_pv, write_pv=None, *, pv_kw=None, put_complete=False, string=False, limits=False, auto_monitor=None, name=None, **kwargs): ...

class EpicsSignalRO(Signal):
    def __init__(self, read_pv, *, pv_kw=None, string=False, limits=False, auto_monitor=None, name=None, **kwargs): ...

def set_cl(control_layer=None, *, pv_telemetry=False): ...
def get_cl(): ...

EPICS Integration

Motors and Positioners

Positioning devices including EPICS motors, software positioners, and pseudo positioners for coordinate transformations and multi-axis control.

class EpicsMotor(Device):
    def __init__(self, prefix='', *, name, settle_time=0.0, timeout=None, **kwargs): ...
    def move(self, position, **kwargs): ...
    def home(self, direction, **kwargs): ...
    def stop(self, **kwargs): ...
    @property
    def position(self): ...

class PseudoPositioner(Device):
    def __init__(self, prefix='', *, configuration_attrs=None, read_attrs=None, **kwargs): ...
    def forward(self, pseudo_pos): ...
    def inverse(self, real_pos): ...

class SoftPositioner(Device):
    def __init__(self, *, name=None, **kwargs): ...
    def move(self, position, **kwargs): ...

Motors and Positioners

Area Detectors

Comprehensive support for 2D area detectors including cameras, plugins for data processing, file I/O, and triggering strategies for coordinated data acquisition.

class AreaDetector(Device):
    def __init__(self, prefix, *, name, **kwargs): ...
    def stage(self): ...
    def unstage(self): ...
    def trigger(self): ...

class DetectorBase(Device):
    def __init__(self, prefix, *, read_attrs=None, configuration_attrs=None, **kwargs): ...

# Camera classes for specific detector types
class SimDetectorCam(CamBase): ...
class AndorCam(CamBase): ...
class PerkinElmerCam(CamBase): ...

# Plugin classes for data processing
class HDF5Plugin(FilePlugin): ...
class TIFFPlugin(FilePlugin): ...
class ImagePlugin(PluginBase): ...
class StatsPlugin(PluginBase): ...

Area Detectors

Specialized Devices

Pre-built device classes for common laboratory instruments including scalers, multi-channel analyzers, and electrometers.

class EpicsScaler(Device):
    def __init__(self, prefix, *, name, **kwargs): ...
    def stage(self): ...
    def trigger(self): ...

class EpicsMCA(Device):
    def __init__(self, prefix, *, name, **kwargs): ...
    def erase(self): ...
    def start(self): ...
    def stop(self): ...

class QuadEM(Device):
    def __init__(self, prefix, *, name, **kwargs): ...
    def stage(self): ...
    def trigger(self): ...

Specialized Devices

Simulation and Testing

Mock devices and signals for testing, development, and offline work without requiring actual hardware connections.

class SynSignal(Signal):
    def __init__(self, *, name, func=None, **kwargs): ...

class SynAxis(Device):
    def __init__(self, *, name, **kwargs): ...
    def move(self, position, **kwargs): ...

class SynGauss(Device):
    def __init__(self, name, motor, motor_field, center, Imax, sigma=1, noise='poisson', noise_multiplier=1, random_state=None, **kwargs): ...

class FakeEpicsSignal(Signal):
    def __init__(self, read_pv, write_pv=None, **kwargs): ...

Simulation and Testing

Flyers and Continuous Scanning

Interfaces for continuous scanning and fly scanning where data is collected while motors are in motion.

class FlyerInterface:
    def kickoff(self): ...
    def complete(self): ...
    def collect(self): ...
    def describe_collect(self): ...

class MonitorFlyerMixin:
    def kickoff(self): ...
    def complete(self): ...
    def collect(self): ...

Flyers and Continuous Scanning

Units and Conversions

Support for unit-aware signals and unit conversions.

class UnitConversionDerivedSignal(DerivedSignal):
    def __init__(self, derived_from, original_units, derived_units, **kwargs): ...

Callbacks and Event Publishing

Event publishing for integration with data acquisition systems.

class UidPublish:
    def __init__(self, RE, stream_name='primary'): ...

class LastUidPublish:  
    def __init__(self, RE): ...

Utility Functions

Device management and configuration utilities for connection handling, logging, and instance management.

# Connection management
def wait_for_lazy_connection(): ...
def do_not_wait_for_lazy_connection(): ...

# Instance management  
def register_instances_in_weakset(device_class): ...
def register_instances_keyed_on_name(device_class): ...
def select_version(name, version): ...

# Setup and configuration
def setup_ophyd(logger=None): ...
def set_handler(handler=None): ...

# Component utilities
def kind_context(kind): ...
ALL_COMPONENTS: frozenset  # Set of all component classes

Error Handling

Ophyd defines several exception types for different error conditions:

class OpException(Exception): ...
class ReadOnlyError(OpException): ...
class LimitError(OpException): ...
class DisconnectedError(OpException): ...
class StatusTimeoutError(Exception): ...
class WaitTimeoutError(Exception): ...

Common error handling patterns:

from ophyd.utils.errors import DisconnectedError, StatusTimeoutError

try:
    status = device.set(new_value)
    wait(status, timeout=10.0)
except DisconnectedError:
    print("Device is not connected")
except StatusTimeoutError:
    print("Operation timed out")

Types

from enum import IntFlag

class Kind(IntFlag):
    """Flags for indicating the kind of reading."""
    omitted = 0b000
    normal = 0b001  
    config = 0b010
    hinted = 0b101

class Staged(Enum):
    """Enum for device staging states."""
    no = 'no'
    yes = 'yes' 
    partially = 'partially'

# Type aliases
Reading = Dict[str, Dict[str, Any]]  # {'signal_name': {'value': val, 'timestamp': ts}}
Configuration = Dict[str, Dict[str, Any]]  # Same structure as Reading
Description = Dict[str, Dict[str, Any]]  # {'signal_name': {'dtype': 'number', 'shape': []}}