CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-ophyd

Bluesky hardware abstraction library with EPICS control system integration for scientific instrument automation.

Pending
Overview
Eval results
Files

flyers-continuous-scanning.mddocs/

Flyers and Continuous Scanning

Interfaces and implementations for continuous scanning and fly scanning where data is collected while motors are in motion or during continuous acquisitions. The flyer interface enables coordinated data collection from multiple devices during uninterrupted operations.

Capabilities

Flyer Interface

The base protocol for all flyer devices, defining the standard workflow for continuous data collection operations.

class FlyerInterface:
    """
    Interface for flyer devices that collect data continuously during motion.
    
    The flyer protocol involves four key operations:
    1. kickoff() - Start the continuous operation
    2. complete() - Wait for or signal completion  
    3. collect() - Retrieve collected data as events
    4. describe_collect() - Provide metadata about collected data
    """
    def kickoff(self):
        """
        Start the flyer operation.
        
        Returns:
        StatusBase: Status indicating when flying has started
        """
    
    def complete(self):
        """
        Wait for flying to complete or signal completion.
        
        Can be either a query (returns immediately if done) or 
        a command (initiates completion sequence).
        
        Returns:
        StatusBase: Status tracking completion
        """
    
    def collect(self):
        """
        Retrieve collected data as generator of proto-events.
        
        Yields:
        dict: Events with keys {'time', 'timestamps', 'data'}
            - time (float): UNIX timestamp for event
            - timestamps (dict): Per-signal timestamps  
            - data (dict): Per-signal data values
        """
    
    def describe_collect(self):
        """
        Describe the format of data returned by collect().
        
        Returns:
        dict: Nested dictionary with stream names as keys,
              signal descriptions as values
        """
    
    def collect_tables(self):
        """
        Alternative data collection format (proposed).
        
        Returns:
        Iterable: Data organized as tables rather than events
        """

Area Detector Time Series Collection

Flyer implementation for area detector time series data collection, enabling continuous acquisition of waveform data with timestamps.

class AreaDetectorTimeseriesCollector(Device):
    """
    Collects time series data from area detectors during fly scans.
    
    This flyer collects waveform data and timestamps from area detector
    time series plugins, supporting pause/resume functionality.
    
    Parameters:
    - prefix (str): EPICS PV prefix for time series records
    - stream_name (str): Name for the data stream (default: 'primary')
    """
    def __init__(self, prefix, *, stream_name='primary', **kwargs): ...
    
    # Component signals
    control: EpicsSignal     # TSControl - acquisition control
    num_points: EpicsSignal  # TSNumPoints - number of points to collect
    cur_point: EpicsSignalRO # TSCurrentPoint - current point counter
    waveform: EpicsSignalRO  # TSTotal - collected waveform data
    waveform_ts: EpicsSignalRO # TSTimestamp - waveform timestamps
    
    def kickoff(self):
        """Start time series collection."""
    
    def complete(self):
        """Wait for collection to finish."""
    
    def collect(self):
        """Yield time series data as events."""
    
    def describe_collect(self):
        """Describe time series data format."""
    
    def pause(self):
        """Pause data collection."""
    
    def resume(self):
        """Resume data collection."""

Waveform Collection

General-purpose waveform collector for devices that provide time-stamped waveform data.

class WaveformCollector(Device):
    """
    Collects waveform data with timestamps during fly scans.
    
    Used with timestamp devices and other waveform-generating hardware.
    Supports configurable data interpretation (time vs indices).
    
    Parameters:
    - prefix (str): EPICS PV prefix for waveform records
    - stream_name (str): Name for the data stream
    """
    def __init__(self, prefix, *, stream_name='primary', **kwargs): ...
    
    # Component signals
    select: EpicsSignal           # Sw-Sel - waveform selection
    reset: EpicsSignal            # Rst-Sel - reset control
    waveform_count: EpicsSignalRO # Val:TimeN-I - point count
    waveform: EpicsSignalRO       # Val:Time-Wfrm - waveform data
    waveform_nord: EpicsSignalRO  # Val:Time-Wfrm.NORD - number of points
    data_is_time: Signal          # Configure data interpretation
    
    def kickoff(self):
        """Start waveform collection."""
    
    def complete(self):
        """Wait for waveform collection to finish."""
    
    def collect(self):
        """Yield waveform data as events."""
    
    def describe_collect(self):
        """Describe waveform data format."""

Monitor-Based Flying

Mixin class that implements flyer functionality by monitoring device attributes during continuous operations.

class MonitorFlyerMixin:
    """
    Flyer mixin that monitors specified attributes during flight.
    
    This mixin enables any device to act as a flyer by subscribing to
    attribute changes and collecting the monitored data.
    
    Parameters:
    - monitor_attrs (list): Signal attribute names to monitor during flight
    - stream_names (dict): Mapping of attributes to stream names
    - pivot (bool): Data organization mode:
        - False: Single event with arrays of all values
        - True: Separate events for each value/timestamp pair
    """
    def __init__(self, *, monitor_attrs=None, stream_names=None, pivot=False, **kwargs): ...
    
    def kickoff(self):
        """
        Start monitoring specified attributes.
        
        Subscribes to attribute changes and begins collecting data.
        
        Returns:
        StatusBase: Completed status (monitoring starts immediately)
        """
    
    def complete(self):
        """
        Signal that monitoring should stop.
        
        Returns:
        StatusBase: Status tracking monitor completion
        """
    
    def collect(self):
        """
        Yield collected monitor data as events.
        
        Data organization depends on pivot setting:
        - pivot=False: Single event with all collected data arrays
        - pivot=True: One event per monitored value/timestamp pair
        
        Yields:
        dict: Events containing monitored attribute data
        """
    
    def describe_collect(self):
        """
        Describe monitored data format.
        
        Returns:
        dict: Description of monitored signals by stream name
        """
    
    def pause(self):
        """Pause monitoring (unsubscribe from attributes)."""
    
    def resume(self):
        """Resume monitoring (resubscribe to attributes)."""

Simulation and Testing Flyers

Mock flyer implementations for testing and development without requiring hardware.

class TrivialFlyer(Device):
    """
    Simple flyer that returns empty data for testing.
    
    Complies with flyer API but generates no actual data.
    Used for testing flyscan protocols and timing.
    """
    def __init__(self, **kwargs): ...
    
    def kickoff(self):
        """Return completed status immediately."""
    
    def complete(self):
        """Return completed status immediately."""
    
    def collect(self):
        """Yield 100 empty events for testing."""

class MockFlyer(Device):
    """
    Comprehensive mock flyer for testing flyscan APIs.
    
    Simulates realistic flyer behavior with threaded execution,
    motor movement, and detector data collection.
    
    Parameters:
    - detector (Device): Detector device to collect from
    - motor (Device): Motor device to move during scan
    - start (float): Starting motor position
    - stop (float): Ending motor position  
    - num_points (int): Number of data points to collect
    """
    def __init__(self, detector, motor, *, start=0, stop=1, num_points=10, **kwargs): ...
    
    def kickoff(self):
        """Start threaded motor movement and data collection."""
    
    def complete(self):
        """Wait for scan completion."""
    
    def collect(self):
        """Yield collected motor and detector data."""

Usage Examples

Basic Flyer Operation

from ophyd import Device
from ophyd.flyers import MonitorFlyerMixin

# Create a flyer device that monitors motor position
class MotorFlyer(MonitorFlyerMixin, Device):
    def __init__(self, motor, **kwargs):
        self.motor = motor
        super().__init__(
            monitor_attrs=['motor.position'],
            stream_names={'motor.position': 'primary'},
            **kwargs
        )

# Use the flyer
motor_flyer = MotorFlyer(my_motor, name='motor_flyer')

# Standard flyer workflow
kickoff_status = motor_flyer.kickoff()  # Start monitoring
# ... motor moves or other operations happen ...
complete_status = motor_flyer.complete()  # Signal completion
wait(complete_status)  # Wait for completion

# Collect the data
data_events = list(motor_flyer.collect())
data_description = motor_flyer.describe_collect()

Area Detector Time Series

from ophyd.flyers import AreaDetectorTimeseriesCollector

# Create time series collector
ts_collector = AreaDetectorTimeseriesCollector(
    'XF:DET:TS:', 
    name='ts_collector',
    stream_name='detector_timeseries'
)

# Configure collection
ts_collector.num_points.set(1000)  # Collect 1000 points

# Execute flyer protocol
kickoff_status = ts_collector.kickoff()
wait(kickoff_status)  # Wait for acquisition to start

complete_status = ts_collector.complete()  
wait(complete_status)  # Wait for acquisition to finish

# Retrieve time series data
events = list(ts_collector.collect())
for event in events:
    print(f"Time: {event['time']}")
    print(f"Data: {event['data']}")

Monitor-Based Multi-Device Flying

# Monitor multiple devices during a scan
class MultiDeviceFlyer(MonitorFlyerMixin, Device):
    def __init__(self, motor, detector, **kwargs):
        self.motor = motor
        self.detector = detector
        super().__init__(
            monitor_attrs=['motor.position', 'detector.value'],
            stream_names={
                'motor.position': 'primary',
                'detector.value': 'detector_stream'
            },
            pivot=True,  # Separate events for each reading
            **kwargs
        )

flyer = MultiDeviceFlyer(my_motor, my_detector, name='multi_flyer')

# Use with bluesky plans
from bluesky.plans import fly
from bluesky import RunEngine

RE = RunEngine()
RE(fly([flyer]))  # Execute fly plan with the flyer

Integration with Bluesky

Flyers integrate seamlessly with Bluesky's experiment orchestration:

  • Run Engine Compatibility: All flyers work with Bluesky's RunEngine
  • Event Model: Data follows Bluesky's event model for analysis pipeline integration
  • Stream Organization: Multiple data streams allow flexible data organization
  • Status Protocol: Uses ophyd's StatusBase for proper asynchronous operation tracking
  • Pause/Resume: Supports experiment pause/resume functionality where implemented

Flyers enable sophisticated data collection scenarios including fly scans, continuous monitoring, and coordinated multi-device acquisition during uninterrupted motion or operations.

Install with Tessl CLI

npx tessl i tessl/pypi-ophyd

docs

area-detectors.md

core-framework.md

epics-integration.md

flyers-continuous-scanning.md

index.md

motors-positioners.md

simulation-testing.md

specialized-devices.md

tile.json