CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-ophyd

Bluesky hardware abstraction library with EPICS control system integration for scientific instrument automation.

Pending
Overview
Eval results
Files

epics-integration.mddocs/

EPICS Integration

Hardware control through EPICS (Experimental Physics and Industrial Control System) process variables, providing the primary interface to laboratory instruments and control systems in scientific facilities. Ophyd abstracts the underlying EPICS client libraries (pyepics, caproto) and provides a unified interface for accessing EPICS PVs.

Capabilities

EPICS Signals

Signal classes that connect to EPICS process variables for reading and writing hardware values.

class EpicsSignal(Signal):
    """
    Signal connected to EPICS process variables.
    
    Parameters:
    - read_pv (str): PV name for reading values
    - write_pv (str): PV name for writing (defaults to read_pv)
    - pv_kw (dict): Additional PV connection parameters
    - put_complete (bool): Whether to wait for put completion
    - string (bool): Whether to treat PV as string type
    - limits (bool): Whether to respect PV limits
    - auto_monitor (bool): Whether to automatically monitor changes
    - name (str): Signal name
    """
    def __init__(self, read_pv, write_pv=None, *, pv_kw=None, put_complete=False, string=False, limits=False, auto_monitor=None, name=None, **kwargs): ...
    
    def get(self, *, as_string=None, timeout=None, **kwargs):
        """
        Get current PV value.
        
        Parameters:
        - as_string (bool): Return value as string
        - timeout (float): Read timeout
        
        Returns:
        Current PV value
        """
    
    def put(self, value, *, force=False, timeout=None, use_complete=None, **kwargs):
        """
        Put value to PV synchronously.
        
        Parameters:
        - value: Value to write
        - force (bool): Force write even if same value
        - timeout (float): Write timeout  
        - use_complete (bool): Wait for put completion
        """
    
    def set(self, value, *, timeout=None, settle_time=None, **kwargs):
        """
        Set PV value asynchronously.
        
        Parameters:
        - value: Target value
        - timeout (float): Operation timeout
        - settle_time (float): Additional settling time
        
        Returns:
        StatusBase: Status tracking set completion
        """
    
    @property
    def limits(self):
        """
        PV operating limits.
        
        Returns:
        tuple: (low_limit, high_limit)
        """
    
    @property
    def precision(self):
        """
        PV display precision.
        
        Returns:
        int: Number of decimal places
        """
    
    @property
    def units(self):
        """
        PV engineering units.
        
        Returns:
        str: Units string
        """

class EpicsSignalRO(EpicsSignal):
    """
    Read-only EPICS signal.
    
    Parameters:
    - read_pv (str): PV name for reading
    - pv_kw (dict): PV connection parameters
    - string (bool): Treat as string PV
    - limits (bool): Respect PV limits
    - auto_monitor (bool): Auto-monitor changes
    - name (str): Signal name
    """
    def __init__(self, read_pv, *, pv_kw=None, string=False, limits=False, auto_monitor=None, name=None, **kwargs): ...

class EpicsSignalNoValidation(EpicsSignal):
    """
    EPICS signal that does not verify values on set operations.
    
    This signal bypasses readback verification and returns completed status
    immediately after issuing a put command. Useful for cases where readback
    verification is not needed or not reliable.
    
    Parameters:
    - read_pv (str): PV name for reading values
    - write_pv (str): PV name for writing (defaults to read_pv)
    - put_complete (bool): Whether to wait for put completion
    - string (bool): Treat PV as string type
    - limits (bool): Respect PV limits
    - name (str): Signal name
    - write_timeout (float): Timeout for put completion
    - connection_timeout (float): Connection timeout
    """
    def __init__(self, read_pv, write_pv=None, *, put_complete=False, string=False, limits=False, name=None, **kwargs): ...

class EpicsSignalWithRBV(EpicsSignal):
    """
    EPICS signal with separate readback PV using AreaDetector convention.
    
    This signal automatically constructs read and write PVs from a base prefix:
    - Read PV: prefix + "_RBV" (readback value)  
    - Write PV: prefix (setpoint)
    
    Commonly used with AreaDetector devices where this naming pattern is standard.
    
    Parameters:
    - prefix (str): Base PV name (write PV), read PV will be prefix + "_RBV"
    """
    def __init__(self, prefix, **kwargs): ...

Control Layer Management

Functions to configure and manage the underlying EPICS client library backend.

def set_cl(control_layer=None, *, pv_telemetry=False):
    """
    Set the control layer (EPICS client backend).
    
    Parameters:
    - control_layer (str): Backend to use ('pyepics', 'caproto', 'dummy', or 'any')
    - pv_telemetry (bool): Enable PV access telemetry collection
    
    Raises:
    ImportError: If specified backend is not available
    ValueError: If control_layer is unknown
    """

def get_cl():
    """
    Get the current control layer.
    
    Returns:
    SimpleNamespace: Current control layer with methods:
        - get_pv(pvname): Get PV object
        - caput(pvname, value): Put value to PV
        - caget(pvname): Get value from PV
        - setup(): Setup backend
        - thread_class: Thread class for backend
        - name: Backend name
    
    Raises:
    RuntimeError: If control layer not set
    """

EPICS Utilities

Utility functions for working with EPICS PV names and data types.

def validate_pv_name(pv_name):
    """
    Validate EPICS PV name format.
    
    Parameters:
    - pv_name (str): PV name to validate
    
    Returns:
    bool: True if valid PV name
    """

def split_record_field(pv_name):
    """
    Split PV name into record and field parts.
    
    Parameters:
    - pv_name (str): Full PV name
    
    Returns:
    tuple: (record_name, field_name)
    """

def strip_field(pv_name):
    """
    Remove field from PV name, returning just record name.
    
    Parameters:
    - pv_name (str): PV name potentially with field
    
    Returns:
    str: Record name without field
    """

def record_field(record, field):
    """
    Combine record name and field into full PV name.
    
    Parameters:
    - record (str): Record name
    - field (str): Field name
    
    Returns:
    str: Combined PV name
    """

def set_and_wait(signal, value, timeout=10):
    """
    Set EPICS signal value and wait for completion.
    
    Parameters:
    - signal (EpicsSignal): Signal to set
    - value: Target value
    - timeout (float): Maximum wait time
    
    Returns:
    StatusBase: Completion status
    """

def data_type(pv):
    """
    Get EPICS data type of PV.
    
    Parameters:
    - pv: PV object or name
    
    Returns:
    str: EPICS data type name
    """

def data_shape(pv):
    """
    Get shape of PV data.
    
    Parameters:
    - pv: PV object or name
    
    Returns:
    tuple: Data shape dimensions
    """

def raise_if_disconnected(func):
    """
    Decorator to check PV connection before operation.
    
    Parameters:
    - func (callable): Function to decorate
    
    Returns:
    callable: Decorated function that checks connection
    
    Raises:
    DisconnectedError: If PV is not connected
    """

def waveform_to_string(value, encoding='utf-8'):
    """
    Convert EPICS waveform to string.
    
    Parameters:
    - value (array): Waveform data
    - encoding (str): Character encoding
    
    Returns:
    str: Converted string
    """

Usage Examples

Basic EPICS Signal Usage

from ophyd import EpicsSignal, EpicsSignalRO
from ophyd.status import wait

# Create read-write EPICS signal
temperature = EpicsSignal('XF:28IDC:TMP:01', name='temperature')
temperature.wait_for_connection()

# Read current value
current_temp = temperature.get()
print(f"Current temperature: {current_temp} {temperature.units}")

# Set new value asynchronously
status = temperature.set(25.0)
wait(status)  # Wait for completion

# Create read-only signal
pressure = EpicsSignalRO('XF:28IDC:PRES:01', name='pressure')
pressure.wait_for_connection()

current_pressure = pressure.get()
print(f"Current pressure: {current_pressure}")

Device with EPICS Components

from ophyd import Device, Component, EpicsSignal, EpicsSignalRO

class TemperatureController(Device):
    """EPICS-based temperature controller."""
    setpoint = Component(EpicsSignal, 'SP')
    readback = Component(EpicsSignalRO, 'RBV')
    status = Component(EpicsSignalRO, 'STAT')
    
    def set_temperature(self, temp):
        """Set target temperature."""
        return self.setpoint.set(temp)
    
    @property
    def temperature(self):
        """Current temperature reading."""
        return self.readback.get()

# Create device instance
temp_ctrl = TemperatureController('XF:28IDC:TMP:', name='temp_controller')
temp_ctrl.wait_for_connection()

# Use device
status = temp_ctrl.set_temperature(22.5)
wait(status)

reading = temp_ctrl.read()
print(f"Temperature: {reading['temp_controller_readback']['value']}")

Control Layer Configuration

from ophyd import set_cl, get_cl

# Set control layer to caproto
set_cl('caproto')

# Get current control layer info
cl = get_cl()
print(f"Using control layer: {cl.name}")

# Access low-level control layer functions
cl.caput('XF:28IDC:TMP:01', 25.0)  # Direct PV access
value = cl.caget('XF:28IDC:TMP:01')
print(f"Direct PV read: {value}")

# Enable telemetry tracking
set_cl('pyepics', pv_telemetry=True)
cl = get_cl()

# After using signals, check PV access statistics
if hasattr(cl.get_pv, 'counter'):
    print("Most accessed PVs:")
    for pv_name, count in cl.get_pv.counter.most_common(10):
        print(f"  {pv_name}: {count} accesses")

Working with String PVs

from ophyd import EpicsSignal

# String PV for device name
device_name = EpicsSignal('XF:28IDC:DEV:NAME', string=True, name='device_name')
device_name.wait_for_connection()

# Read string value
name = device_name.get()
print(f"Device name: {name}")

# Set string value  
device_name.put("Sample_Detector_A")

# Waveform PV that contains string data
message_pv = EpicsSignal('XF:28IDC:MSG:01', name='message')
message_pv.wait_for_connection()

# Read waveform as string
from ophyd.utils.epics_pvs import waveform_to_string
raw_data = message_pv.get()
message = waveform_to_string(raw_data)
print(f"Message: {message}")

Error Handling with EPICS

from ophyd import EpicsSignal
from ophyd.utils.errors import DisconnectedError
import time

signal = EpicsSignal('XF:28IDC:NOEXIST:01', name='nonexistent')

try:
    # This will timeout if PV doesn't exist
    signal.wait_for_connection(timeout=2.0)
    value = signal.get()
except Exception as e:
    print(f"Connection failed: {e}")

# Check connection status
if signal.connected:
    print("Signal is connected")
else:
    print("Signal is not connected")

# Use raise_if_disconnected decorator
from ophyd.utils.epics_pvs import raise_if_disconnected

@raise_if_disconnected
def read_signal(sig):
    return sig.get()

try:
    value = read_signal(signal)
except DisconnectedError:
    print("Cannot read from disconnected signal")

Install with Tessl CLI

npx tessl i tessl/pypi-ophyd

docs

area-detectors.md

core-framework.md

epics-integration.md

flyers-continuous-scanning.md

index.md

motors-positioners.md

simulation-testing.md

specialized-devices.md

tile.json