Bluesky hardware abstraction library with EPICS control system integration for scientific instrument automation.
—
Hardware control through EPICS (Experimental Physics and Industrial Control System) process variables, providing the primary interface to laboratory instruments and control systems in scientific facilities. Ophyd abstracts the underlying EPICS client libraries (pyepics, caproto) and provides a unified interface for accessing EPICS PVs.
Signal classes that connect to EPICS process variables for reading and writing hardware values.
class EpicsSignal(Signal):
"""
Signal connected to EPICS process variables.
Parameters:
- read_pv (str): PV name for reading values
- write_pv (str): PV name for writing (defaults to read_pv)
- pv_kw (dict): Additional PV connection parameters
- put_complete (bool): Whether to wait for put completion
- string (bool): Whether to treat PV as string type
- limits (bool): Whether to respect PV limits
- auto_monitor (bool): Whether to automatically monitor changes
- name (str): Signal name
"""
def __init__(self, read_pv, write_pv=None, *, pv_kw=None, put_complete=False, string=False, limits=False, auto_monitor=None, name=None, **kwargs): ...
def get(self, *, as_string=None, timeout=None, **kwargs):
"""
Get current PV value.
Parameters:
- as_string (bool): Return value as string
- timeout (float): Read timeout
Returns:
Current PV value
"""
def put(self, value, *, force=False, timeout=None, use_complete=None, **kwargs):
"""
Put value to PV synchronously.
Parameters:
- value: Value to write
- force (bool): Force write even if same value
- timeout (float): Write timeout
- use_complete (bool): Wait for put completion
"""
def set(self, value, *, timeout=None, settle_time=None, **kwargs):
"""
Set PV value asynchronously.
Parameters:
- value: Target value
- timeout (float): Operation timeout
- settle_time (float): Additional settling time
Returns:
StatusBase: Status tracking set completion
"""
@property
def limits(self):
"""
PV operating limits.
Returns:
tuple: (low_limit, high_limit)
"""
@property
def precision(self):
"""
PV display precision.
Returns:
int: Number of decimal places
"""
@property
def units(self):
"""
PV engineering units.
Returns:
str: Units string
"""
class EpicsSignalRO(EpicsSignal):
"""
Read-only EPICS signal.
Parameters:
- read_pv (str): PV name for reading
- pv_kw (dict): PV connection parameters
- string (bool): Treat as string PV
- limits (bool): Respect PV limits
- auto_monitor (bool): Auto-monitor changes
- name (str): Signal name
"""
def __init__(self, read_pv, *, pv_kw=None, string=False, limits=False, auto_monitor=None, name=None, **kwargs): ...
class EpicsSignalNoValidation(EpicsSignal):
"""
EPICS signal that does not verify values on set operations.
This signal bypasses readback verification and returns completed status
immediately after issuing a put command. Useful for cases where readback
verification is not needed or not reliable.
Parameters:
- read_pv (str): PV name for reading values
- write_pv (str): PV name for writing (defaults to read_pv)
- put_complete (bool): Whether to wait for put completion
- string (bool): Treat PV as string type
- limits (bool): Respect PV limits
- name (str): Signal name
- write_timeout (float): Timeout for put completion
- connection_timeout (float): Connection timeout
"""
def __init__(self, read_pv, write_pv=None, *, put_complete=False, string=False, limits=False, name=None, **kwargs): ...
class EpicsSignalWithRBV(EpicsSignal):
"""
EPICS signal with separate readback PV using AreaDetector convention.
This signal automatically constructs read and write PVs from a base prefix:
- Read PV: prefix + "_RBV" (readback value)
- Write PV: prefix (setpoint)
Commonly used with AreaDetector devices where this naming pattern is standard.
Parameters:
- prefix (str): Base PV name (write PV), read PV will be prefix + "_RBV"
"""
def __init__(self, prefix, **kwargs): ...Functions to configure and manage the underlying EPICS client library backend.
def set_cl(control_layer=None, *, pv_telemetry=False):
"""
Set the control layer (EPICS client backend).
Parameters:
- control_layer (str): Backend to use ('pyepics', 'caproto', 'dummy', or 'any')
- pv_telemetry (bool): Enable PV access telemetry collection
Raises:
ImportError: If specified backend is not available
ValueError: If control_layer is unknown
"""
def get_cl():
"""
Get the current control layer.
Returns:
SimpleNamespace: Current control layer with methods:
- get_pv(pvname): Get PV object
- caput(pvname, value): Put value to PV
- caget(pvname): Get value from PV
- setup(): Setup backend
- thread_class: Thread class for backend
- name: Backend name
Raises:
RuntimeError: If control layer not set
"""Utility functions for working with EPICS PV names and data types.
def validate_pv_name(pv_name):
"""
Validate EPICS PV name format.
Parameters:
- pv_name (str): PV name to validate
Returns:
bool: True if valid PV name
"""
def split_record_field(pv_name):
"""
Split PV name into record and field parts.
Parameters:
- pv_name (str): Full PV name
Returns:
tuple: (record_name, field_name)
"""
def strip_field(pv_name):
"""
Remove field from PV name, returning just record name.
Parameters:
- pv_name (str): PV name potentially with field
Returns:
str: Record name without field
"""
def record_field(record, field):
"""
Combine record name and field into full PV name.
Parameters:
- record (str): Record name
- field (str): Field name
Returns:
str: Combined PV name
"""
def set_and_wait(signal, value, timeout=10):
"""
Set EPICS signal value and wait for completion.
Parameters:
- signal (EpicsSignal): Signal to set
- value: Target value
- timeout (float): Maximum wait time
Returns:
StatusBase: Completion status
"""
def data_type(pv):
"""
Get EPICS data type of PV.
Parameters:
- pv: PV object or name
Returns:
str: EPICS data type name
"""
def data_shape(pv):
"""
Get shape of PV data.
Parameters:
- pv: PV object or name
Returns:
tuple: Data shape dimensions
"""
def raise_if_disconnected(func):
"""
Decorator to check PV connection before operation.
Parameters:
- func (callable): Function to decorate
Returns:
callable: Decorated function that checks connection
Raises:
DisconnectedError: If PV is not connected
"""
def waveform_to_string(value, encoding='utf-8'):
"""
Convert EPICS waveform to string.
Parameters:
- value (array): Waveform data
- encoding (str): Character encoding
Returns:
str: Converted string
"""from ophyd import EpicsSignal, EpicsSignalRO
from ophyd.status import wait
# Create read-write EPICS signal
temperature = EpicsSignal('XF:28IDC:TMP:01', name='temperature')
temperature.wait_for_connection()
# Read current value
current_temp = temperature.get()
print(f"Current temperature: {current_temp} {temperature.units}")
# Set new value asynchronously
status = temperature.set(25.0)
wait(status) # Wait for completion
# Create read-only signal
pressure = EpicsSignalRO('XF:28IDC:PRES:01', name='pressure')
pressure.wait_for_connection()
current_pressure = pressure.get()
print(f"Current pressure: {current_pressure}")from ophyd import Device, Component, EpicsSignal, EpicsSignalRO
class TemperatureController(Device):
"""EPICS-based temperature controller."""
setpoint = Component(EpicsSignal, 'SP')
readback = Component(EpicsSignalRO, 'RBV')
status = Component(EpicsSignalRO, 'STAT')
def set_temperature(self, temp):
"""Set target temperature."""
return self.setpoint.set(temp)
@property
def temperature(self):
"""Current temperature reading."""
return self.readback.get()
# Create device instance
temp_ctrl = TemperatureController('XF:28IDC:TMP:', name='temp_controller')
temp_ctrl.wait_for_connection()
# Use device
status = temp_ctrl.set_temperature(22.5)
wait(status)
reading = temp_ctrl.read()
print(f"Temperature: {reading['temp_controller_readback']['value']}")from ophyd import set_cl, get_cl
# Set control layer to caproto
set_cl('caproto')
# Get current control layer info
cl = get_cl()
print(f"Using control layer: {cl.name}")
# Access low-level control layer functions
cl.caput('XF:28IDC:TMP:01', 25.0) # Direct PV access
value = cl.caget('XF:28IDC:TMP:01')
print(f"Direct PV read: {value}")
# Enable telemetry tracking
set_cl('pyepics', pv_telemetry=True)
cl = get_cl()
# After using signals, check PV access statistics
if hasattr(cl.get_pv, 'counter'):
print("Most accessed PVs:")
for pv_name, count in cl.get_pv.counter.most_common(10):
print(f" {pv_name}: {count} accesses")from ophyd import EpicsSignal
# String PV for device name
device_name = EpicsSignal('XF:28IDC:DEV:NAME', string=True, name='device_name')
device_name.wait_for_connection()
# Read string value
name = device_name.get()
print(f"Device name: {name}")
# Set string value
device_name.put("Sample_Detector_A")
# Waveform PV that contains string data
message_pv = EpicsSignal('XF:28IDC:MSG:01', name='message')
message_pv.wait_for_connection()
# Read waveform as string
from ophyd.utils.epics_pvs import waveform_to_string
raw_data = message_pv.get()
message = waveform_to_string(raw_data)
print(f"Message: {message}")from ophyd import EpicsSignal
from ophyd.utils.errors import DisconnectedError
import time
signal = EpicsSignal('XF:28IDC:NOEXIST:01', name='nonexistent')
try:
# This will timeout if PV doesn't exist
signal.wait_for_connection(timeout=2.0)
value = signal.get()
except Exception as e:
print(f"Connection failed: {e}")
# Check connection status
if signal.connected:
print("Signal is connected")
else:
print("Signal is not connected")
# Use raise_if_disconnected decorator
from ophyd.utils.epics_pvs import raise_if_disconnected
@raise_if_disconnected
def read_signal(sig):
return sig.get()
try:
value = read_signal(signal)
except DisconnectedError:
print("Cannot read from disconnected signal")Install with Tessl CLI
npx tessl i tessl/pypi-ophyd