Bluesky hardware abstraction library with EPICS control system integration for scientific instrument automation.
—
Positioning devices including EPICS motors, software positioners, and pseudo positioners for coordinate transformations and multi-axis control. These classes provide standardized interfaces for moving hardware to specific positions and tracking movement status.
Interface to EPICS motor records providing comprehensive motor control functionality.
class EpicsMotor(Device):
"""
EPICS motor record interface with comprehensive positioning capabilities.
Parameters:
- prefix (str): EPICS motor record PV prefix
- name (str): Motor name for identification
- settle_time (float): Time to wait after moves complete
- timeout (float): Default timeout for operations
"""
def __init__(self, prefix='', *, name, settle_time=0.0, timeout=None, **kwargs): ...
def move(self, position, *, wait=False, timeout=None, **kwargs):
"""
Move motor to absolute position.
Parameters:
- position (float): Target position in user units
- wait (bool): Whether to wait for move completion
- timeout (float): Move timeout
Returns:
MoveStatus: Status object tracking move progress
"""
def set(self, position, **kwargs):
"""
Set motor position (alias for move).
Parameters:
- position (float): Target position
Returns:
MoveStatus: Move status object
"""
def home(self, direction, *, wait=False, timeout=None):
"""
Home motor in specified direction.
Parameters:
- direction (str or HomeEnum): 'forward', 'reverse', or HomeEnum value
- wait (bool): Wait for homing completion
- timeout (float): Homing timeout
Returns:
StatusBase: Homing status
"""
def stop(self, *, success=False):
"""
Stop motor motion.
Parameters:
- success (bool): Whether stop should be considered successful
"""
@property
def position(self):
"""
Current motor position.
Returns:
float: Current position in user units
"""
@property
def user_readback(self):
"""
User readback position.
Returns:
EpicsSignalRO: Readback position signal
"""
@property
def user_setpoint(self):
"""
User setpoint position.
Returns:
EpicsSignal: Setpoint position signal
"""
@property
def limits(self):
"""
Motor soft limits.
Returns:
tuple: (low_limit, high_limit)
"""
@property
def moving(self):
"""
Whether motor is currently moving.
Returns:
bool: True if motor is in motion
"""
@property
def settle_time(self):
"""
Time to wait after moves complete.
Returns:
float: Settle time in seconds
"""
class MotorBundle(Device):
"""
Collection of motors that can be moved together.
"""
def __init__(self, *args, **kwargs): ...
def move(self, position_dict, **kwargs):
"""
Move multiple motors simultaneously.
Parameters:
- position_dict (dict): Mapping of motor names to positions
Returns:
AndStatus: Combined status for all motor moves
"""
# Motor-related enums
class HomeEnum(IntEnum):
"""Enumeration for motor homing directions."""
forward = 1
reverse = 0Abstract base classes for positioning devices.
class PositionerBase(Device):
"""
Base class for all positioner devices.
"""
def __init__(self, *, name=None, **kwargs): ...
def move(self, position, **kwargs):
"""
Move to target position.
Parameters:
- position: Target position
Returns:
StatusBase: Move status
"""
def set(self, position, **kwargs):
"""
Set position (alias for move).
Parameters:
- position: Target position
Returns:
StatusBase: Move status
"""
@property
def position(self):
"""
Current position.
Returns:
Current position value
"""
def stop(self):
"""Stop positioner motion."""
class SoftPositioner(PositionerBase):
"""
Software-based positioner without hardware backing.
Parameters:
- name (str): Positioner name
- init_pos (float): Initial position value
"""
def __init__(self, *, name=None, init_pos=0, **kwargs): ...
def move(self, position, **kwargs):
"""
Move to position (instantaneous for software positioner).
Parameters:
- position (float): Target position
Returns:
Status: Completed status
"""
@property
def position(self):
"""
Current software position.
Returns:
float: Current position
"""Positioners that transform between different coordinate systems.
class PseudoPositioner(Device):
"""
Multi-axis pseudo positioner with coordinate transformations.
Transforms between pseudo (user) coordinates and real (motor) coordinates
using forward() and inverse() transformation methods.
"""
def __init__(self, prefix='', *, configuration_attrs=None, read_attrs=None, **kwargs): ...
def move(self, position, **kwargs):
"""
Move to pseudo position.
Parameters:
- position: Target pseudo position (single value or dict/tuple for multi-axis)
Returns:
StatusBase: Combined move status for all real motors
"""
def set(self, position, **kwargs):
"""
Set pseudo position.
Parameters:
- position: Target pseudo position
Returns:
StatusBase: Move status
"""
def forward(self, pseudo_pos):
"""
Transform pseudo coordinates to real coordinates.
Parameters:
- pseudo_pos: Pseudo position(s)
Returns:
Real position(s) for underlying motors
Note: Must be implemented by subclasses
"""
raise NotImplementedError()
def inverse(self, real_pos):
"""
Transform real coordinates to pseudo coordinates.
Parameters:
- real_pos: Real motor position(s)
Returns:
Pseudo position(s)
Note: Must be implemented by subclasses
"""
raise NotImplementedError()
@property
def position(self):
"""
Current pseudo position.
Returns:
Current pseudo coordinates
"""
@property
def real_position(self):
"""
Current real motor positions.
Returns:
tuple: Current real motor coordinates
"""
class PseudoSingle(PseudoPositioner):
"""
Single-axis pseudo positioner.
Simplified pseudo positioner for single pseudo axis transformations.
"""
def __init__(self, prefix='', *, name=None, **kwargs): ...
@property
def position(self):
"""
Current pseudo position.
Returns:
float: Single pseudo position value
"""Positioners controlled through individual EPICS process variables.
class PVPositioner(Device):
"""
Positioner controlled via separate setpoint and readback PVs.
Parameters:
- prefix (str): PV prefix
- name (str): Positioner name
- limits (tuple): Software limits (low, high)
- egu (str): Engineering units
"""
def __init__(self, prefix='', *, limits=None, name=None, egu='', **kwargs): ...
def move(self, position, **kwargs):
"""
Move to target position.
Parameters:
- position (float): Target position
Returns:
MoveStatus: Move completion status
"""
def stop(self):
"""Stop positioner movement."""
@property
def position(self):
"""
Current position from readback PV.
Returns:
float: Current position
"""
class PVPositionerPC(PVPositioner):
"""
PV positioner with position compare functionality.
Uses EPICS position compare feature for precise positioning.
"""
def __init__(self, prefix='', **kwargs): ...
class PVPositionerIsClose(PVPositioner):
"""
PV positioner that uses proximity checking for move completion.
Parameters:
- atol (float): Absolute tolerance for position comparison
- rtol (float): Relative tolerance for position comparison
"""
def __init__(self, prefix='', *, atol=1e-6, rtol=1e-6, **kwargs): ...
class PVPositionerDone(PVPositioner):
"""
PV positioner that uses a separate "done" PV for move completion.
"""
def __init__(self, prefix='', **kwargs): ...from ophyd import EpicsMotor
from ophyd.status import wait
# Create motor instance
motor_x = EpicsMotor('XF:28IDC:MOT:01', name='motor_x')
motor_x.wait_for_connection()
# Check current position
print(f"Current position: {motor_x.position}")
print(f"Limits: {motor_x.limits}")
print(f"Moving: {motor_x.moving}")
# Move to absolute position
status = motor_x.move(10.5)
wait(status) # Wait for move to complete
# Move relative to current position
current_pos = motor_x.position
status = motor_x.move(current_pos + 2.0)
wait(status)
# Home motor
status = motor_x.home('forward')
wait(status)
# Stop motor if needed
if motor_x.moving:
motor_x.stop()from ophyd import EpicsMotor, MotorBundle
# Create individual motors
motor_x = EpicsMotor('XF:28IDC:MOT:01', name='motor_x')
motor_y = EpicsMotor('XF:28IDC:MOT:02', name='motor_y')
motor_z = EpicsMotor('XF:28IDC:MOT:03', name='motor_z')
# Create motor bundle
xyz_stage = MotorBundle(motor_x, motor_y, motor_z, name='xyz_stage')
xyz_stage.wait_for_connection()
# Move all motors simultaneously
target_positions = {
'motor_x': 5.0,
'motor_y': -2.5,
'motor_z': 10.0
}
status = xyz_stage.move(target_positions)
wait(status) # All motors will move simultaneously
print("All motors have reached target positions")from ophyd import SoftPositioner
# Create software positioner
virtual_motor = SoftPositioner(name='virtual_motor', init_pos=0.0)
# Move instantly (no real hardware)
status = virtual_motor.move(25.0)
print(f"Status done: {status.done}") # True immediately
print(f"Position: {virtual_motor.position}") # 25.0
# Useful for testing or simulationfrom ophyd import PseudoPositioner, EpicsMotor, Component
import numpy as np
class TwoThetaStage(PseudoPositioner):
"""Pseudo positioner for 2-theta diffractometer geometry."""
# Real motors
theta = Component(EpicsMotor, 'THETA')
two_theta = Component(EpicsMotor, 'TWO_THETA')
# Pseudo axes
h = Cpt(PseudoSingle, '')
k = Cpt(PseudoSingle, '')
def forward(self, pseudo_pos):
"""Convert (h,k) to (theta, two_theta)."""
h, k = pseudo_pos
# Simple example: assume direct relationship
theta_angle = np.arctan2(k, h) * 180 / np.pi
two_theta_angle = 2 * theta_angle
return (theta_angle, two_theta_angle)
def inverse(self, real_pos):
"""Convert (theta, two_theta) to (h,k)."""
theta_angle, two_theta_angle = real_pos
# Convert back to reciprocal space coordinates
theta_rad = theta_angle * np.pi / 180
h = np.cos(theta_rad)
k = np.sin(theta_rad)
return (h, k)
# Use pseudo positioner
diffractometer = TwoThetaStage('XF:28IDC:DIFF:', name='diffractometer')
diffractometer.wait_for_connection()
# Move in reciprocal space coordinates
status = diffractometer.move((1.0, 0.5)) # Move to h=1.0, k=0.5
wait(status)
print(f"Pseudo position (h,k): {diffractometer.position}")
print(f"Real positions (theta, 2theta): {diffractometer.real_position}")from ophyd import PVPositioner, Component, EpicsSignal, EpicsSignalRO
class SlitPositioner(PVPositioner):
"""Custom slit positioner with separate setpoint and readback."""
# Define required PVs
setpoint = Component(EpicsSignal, 'SP')
readback = Component(EpicsSignalRO, 'RBV')
actuate = Component(EpicsSignal, 'GO')
# Optional: done signal
done = Component(EpicsSignalRO, 'DONE')
# Create slit positioner
slit_gap = SlitPositioner('XF:28IDC:SLIT:GAP:', name='slit_gap', limits=(0, 20))
slit_gap.wait_for_connection()
# Move slit gap
status = slit_gap.move(5.5)
wait(status)
print(f"Slit gap: {slit_gap.position} mm")from ophyd import EpicsMotor
motor = EpicsMotor('XF:28IDC:MOT:01', name='motor')
motor.wait_for_connection()
# Subscribe to position changes
def position_changed(value=None, **kwargs):
print(f"Motor position changed to: {value}")
motor.user_readback.subscribe(position_changed)
# Move motor - will trigger callbacks
status = motor.move(15.0)
# Monitor move progress
def move_progress(fraction=None, **kwargs):
print(f"Move progress: {fraction * 100:.1f}%")
status.add_callback(move_progress)
wait(status)
# Clear subscription
motor.user_readback.clear_sub(position_changed)Install with Tessl CLI
npx tessl i tessl/pypi-ophyd