Bluesky hardware abstraction library with EPICS control system integration for scientific instrument automation.
—
Interfaces and implementations for continuous scanning and fly scanning where data is collected while motors are in motion or during continuous acquisitions. The flyer interface enables coordinated data collection from multiple devices during uninterrupted operations.
The base protocol for all flyer devices, defining the standard workflow for continuous data collection operations.
class FlyerInterface:
"""
Interface for flyer devices that collect data continuously during motion.
The flyer protocol involves four key operations:
1. kickoff() - Start the continuous operation
2. complete() - Wait for or signal completion
3. collect() - Retrieve collected data as events
4. describe_collect() - Provide metadata about collected data
"""
def kickoff(self):
"""
Start the flyer operation.
Returns:
StatusBase: Status indicating when flying has started
"""
def complete(self):
"""
Wait for flying to complete or signal completion.
Can be either a query (returns immediately if done) or
a command (initiates completion sequence).
Returns:
StatusBase: Status tracking completion
"""
def collect(self):
"""
Retrieve collected data as generator of proto-events.
Yields:
dict: Events with keys {'time', 'timestamps', 'data'}
- time (float): UNIX timestamp for event
- timestamps (dict): Per-signal timestamps
- data (dict): Per-signal data values
"""
def describe_collect(self):
"""
Describe the format of data returned by collect().
Returns:
dict: Nested dictionary with stream names as keys,
signal descriptions as values
"""
def collect_tables(self):
"""
Alternative data collection format (proposed).
Returns:
Iterable: Data organized as tables rather than events
"""Flyer implementation for area detector time series data collection, enabling continuous acquisition of waveform data with timestamps.
class AreaDetectorTimeseriesCollector(Device):
"""
Collects time series data from area detectors during fly scans.
This flyer collects waveform data and timestamps from area detector
time series plugins, supporting pause/resume functionality.
Parameters:
- prefix (str): EPICS PV prefix for time series records
- stream_name (str): Name for the data stream (default: 'primary')
"""
def __init__(self, prefix, *, stream_name='primary', **kwargs): ...
# Component signals
control: EpicsSignal # TSControl - acquisition control
num_points: EpicsSignal # TSNumPoints - number of points to collect
cur_point: EpicsSignalRO # TSCurrentPoint - current point counter
waveform: EpicsSignalRO # TSTotal - collected waveform data
waveform_ts: EpicsSignalRO # TSTimestamp - waveform timestamps
def kickoff(self):
"""Start time series collection."""
def complete(self):
"""Wait for collection to finish."""
def collect(self):
"""Yield time series data as events."""
def describe_collect(self):
"""Describe time series data format."""
def pause(self):
"""Pause data collection."""
def resume(self):
"""Resume data collection."""General-purpose waveform collector for devices that provide time-stamped waveform data.
class WaveformCollector(Device):
"""
Collects waveform data with timestamps during fly scans.
Used with timestamp devices and other waveform-generating hardware.
Supports configurable data interpretation (time vs indices).
Parameters:
- prefix (str): EPICS PV prefix for waveform records
- stream_name (str): Name for the data stream
"""
def __init__(self, prefix, *, stream_name='primary', **kwargs): ...
# Component signals
select: EpicsSignal # Sw-Sel - waveform selection
reset: EpicsSignal # Rst-Sel - reset control
waveform_count: EpicsSignalRO # Val:TimeN-I - point count
waveform: EpicsSignalRO # Val:Time-Wfrm - waveform data
waveform_nord: EpicsSignalRO # Val:Time-Wfrm.NORD - number of points
data_is_time: Signal # Configure data interpretation
def kickoff(self):
"""Start waveform collection."""
def complete(self):
"""Wait for waveform collection to finish."""
def collect(self):
"""Yield waveform data as events."""
def describe_collect(self):
"""Describe waveform data format."""Mixin class that implements flyer functionality by monitoring device attributes during continuous operations.
class MonitorFlyerMixin:
"""
Flyer mixin that monitors specified attributes during flight.
This mixin enables any device to act as a flyer by subscribing to
attribute changes and collecting the monitored data.
Parameters:
- monitor_attrs (list): Signal attribute names to monitor during flight
- stream_names (dict): Mapping of attributes to stream names
- pivot (bool): Data organization mode:
- False: Single event with arrays of all values
- True: Separate events for each value/timestamp pair
"""
def __init__(self, *, monitor_attrs=None, stream_names=None, pivot=False, **kwargs): ...
def kickoff(self):
"""
Start monitoring specified attributes.
Subscribes to attribute changes and begins collecting data.
Returns:
StatusBase: Completed status (monitoring starts immediately)
"""
def complete(self):
"""
Signal that monitoring should stop.
Returns:
StatusBase: Status tracking monitor completion
"""
def collect(self):
"""
Yield collected monitor data as events.
Data organization depends on pivot setting:
- pivot=False: Single event with all collected data arrays
- pivot=True: One event per monitored value/timestamp pair
Yields:
dict: Events containing monitored attribute data
"""
def describe_collect(self):
"""
Describe monitored data format.
Returns:
dict: Description of monitored signals by stream name
"""
def pause(self):
"""Pause monitoring (unsubscribe from attributes)."""
def resume(self):
"""Resume monitoring (resubscribe to attributes)."""Mock flyer implementations for testing and development without requiring hardware.
class TrivialFlyer(Device):
"""
Simple flyer that returns empty data for testing.
Complies with flyer API but generates no actual data.
Used for testing flyscan protocols and timing.
"""
def __init__(self, **kwargs): ...
def kickoff(self):
"""Return completed status immediately."""
def complete(self):
"""Return completed status immediately."""
def collect(self):
"""Yield 100 empty events for testing."""
class MockFlyer(Device):
"""
Comprehensive mock flyer for testing flyscan APIs.
Simulates realistic flyer behavior with threaded execution,
motor movement, and detector data collection.
Parameters:
- detector (Device): Detector device to collect from
- motor (Device): Motor device to move during scan
- start (float): Starting motor position
- stop (float): Ending motor position
- num_points (int): Number of data points to collect
"""
def __init__(self, detector, motor, *, start=0, stop=1, num_points=10, **kwargs): ...
def kickoff(self):
"""Start threaded motor movement and data collection."""
def complete(self):
"""Wait for scan completion."""
def collect(self):
"""Yield collected motor and detector data."""from ophyd import Device
from ophyd.flyers import MonitorFlyerMixin
# Create a flyer device that monitors motor position
class MotorFlyer(MonitorFlyerMixin, Device):
def __init__(self, motor, **kwargs):
self.motor = motor
super().__init__(
monitor_attrs=['motor.position'],
stream_names={'motor.position': 'primary'},
**kwargs
)
# Use the flyer
motor_flyer = MotorFlyer(my_motor, name='motor_flyer')
# Standard flyer workflow
kickoff_status = motor_flyer.kickoff() # Start monitoring
# ... motor moves or other operations happen ...
complete_status = motor_flyer.complete() # Signal completion
wait(complete_status) # Wait for completion
# Collect the data
data_events = list(motor_flyer.collect())
data_description = motor_flyer.describe_collect()from ophyd.flyers import AreaDetectorTimeseriesCollector
# Create time series collector
ts_collector = AreaDetectorTimeseriesCollector(
'XF:DET:TS:',
name='ts_collector',
stream_name='detector_timeseries'
)
# Configure collection
ts_collector.num_points.set(1000) # Collect 1000 points
# Execute flyer protocol
kickoff_status = ts_collector.kickoff()
wait(kickoff_status) # Wait for acquisition to start
complete_status = ts_collector.complete()
wait(complete_status) # Wait for acquisition to finish
# Retrieve time series data
events = list(ts_collector.collect())
for event in events:
print(f"Time: {event['time']}")
print(f"Data: {event['data']}")# Monitor multiple devices during a scan
class MultiDeviceFlyer(MonitorFlyerMixin, Device):
def __init__(self, motor, detector, **kwargs):
self.motor = motor
self.detector = detector
super().__init__(
monitor_attrs=['motor.position', 'detector.value'],
stream_names={
'motor.position': 'primary',
'detector.value': 'detector_stream'
},
pivot=True, # Separate events for each reading
**kwargs
)
flyer = MultiDeviceFlyer(my_motor, my_detector, name='multi_flyer')
# Use with bluesky plans
from bluesky.plans import fly
from bluesky import RunEngine
RE = RunEngine()
RE(fly([flyer])) # Execute fly plan with the flyerFlyers integrate seamlessly with Bluesky's experiment orchestration:
Flyers enable sophisticated data collection scenarios including fly scans, continuous monitoring, and coordinated multi-device acquisition during uninterrupted motion or operations.
Install with Tessl CLI
npx tessl i tessl/pypi-ophyd