0
# Simulation and Testing
1
2
Mock devices and signals for testing, development, and offline work without requiring actual hardware connections. These simulation tools enable development and testing of ophyd-based applications in environments where real hardware is not available.
3
4
## Capabilities
5
6
### Synthetic Signals
7
8
Simulated signals that provide realistic behavior for testing and development.
9
10
```python { .api }
11
class SynSignal(Signal):
12
"""
13
Synthetic signal with programmable behavior.
14
15
Generates synthetic data based on functions, noise models,
16
or static values for testing and simulation purposes.
17
18
Parameters:
19
- name (str): Signal name
20
- func (callable): Function to generate values func(time)
21
- **kwargs: Additional signal parameters
22
"""
23
def __init__(self, *, name, func=None, **kwargs): ...
24
25
def get(self, **kwargs):
26
"""
27
Get synthetic signal value.
28
29
Returns:
30
Generated value based on function or static value
31
"""
32
33
def put(self, value, **kwargs):
34
"""Set synthetic signal to new value."""
35
36
def set(self, value, **kwargs):
37
"""
38
Set synthetic signal asynchronously.
39
40
Returns:
41
Status: Immediately completed status
42
"""
43
44
class SynSignalRO(SynSignal):
45
"""
46
Read-only synthetic signal.
47
48
Cannot be written to, only provides synthetic readback values.
49
"""
50
def __init__(self, *, name, func=None, **kwargs): ...
51
52
class SynPeriodicSignal(SynSignal):
53
"""
54
Synthetic signal with periodic behavior.
55
56
Generates values based on periodic functions like sine waves,
57
square waves, or other repeating patterns.
58
59
Parameters:
60
- name (str): Signal name
61
- period (float): Period in seconds
62
- func (callable): Periodic function
63
"""
64
def __init__(self, *, name, period=1.0, func=None, **kwargs): ...
65
66
class EnumSignal(Signal):
67
"""
68
Signal with enumerated values for testing state machines.
69
70
Parameters:
71
- name (str): Signal name
72
- enum_strs (list): List of valid string values
73
- value: Initial value
74
"""
75
def __init__(self, *, name, enum_strs, value=0, **kwargs): ...
76
```
77
78
### Synthetic Positioners
79
80
Simulated positioners for testing motion control systems.
81
82
```python { .api }
83
class SynAxis(Device):
84
"""
85
Synthetic axis positioner with realistic motion behavior.
86
87
Simulates motor-like behavior including motion time,
88
velocity limits, and position feedback.
89
90
Parameters:
91
- name (str): Axis name
92
- delay (float): Simulated move time per unit distance
93
- precision (float): Position precision/tolerance
94
- **kwargs: Additional device parameters
95
"""
96
def __init__(self, *, name, delay=0.1, precision=0.1, **kwargs): ...
97
98
def move(self, position, **kwargs):
99
"""
100
Move to target position with simulated delay.
101
102
Parameters:
103
- position (float): Target position
104
105
Returns:
106
MoveStatus: Status with simulated move time
107
"""
108
109
def set(self, position, **kwargs):
110
"""Set position (alias for move)."""
111
112
@property
113
def position(self):
114
"""
115
Current axis position.
116
117
Returns:
118
float: Current position
119
"""
120
121
@property
122
def velocity(self):
123
"""
124
Axis velocity setting.
125
126
Returns:
127
SynSignal: Velocity signal
128
"""
129
130
class SynAxisEmptyHints(SynAxis):
131
"""Synthetic axis with empty hints for testing."""
132
def __init__(self, **kwargs): ...
133
134
class SynAxisNoHints(SynAxis):
135
"""Synthetic axis with no hints attribute."""
136
def __init__(self, **kwargs): ...
137
138
class SynAxisNoPosition(SynAxis):
139
"""Synthetic axis without position attribute for error testing."""
140
def __init__(self, **kwargs): ...
141
```
142
143
### Synthetic Detectors
144
145
Simulated detectors for testing data acquisition workflows.
146
147
```python { .api }
148
class SynGauss(Device):
149
"""
150
Synthetic Gaussian peak detector.
151
152
Generates Gaussian-shaped signals based on motor positions,
153
useful for simulating peak scanning and optimization.
154
155
Parameters:
156
- name (str): Detector name
157
- motor (Device): Motor device for position dependence
158
- motor_field (str): Motor field name to read position from
159
- center (float): Peak center position
160
- Imax (float): Peak maximum intensity
161
- sigma (float): Peak width (standard deviation)
162
- noise (str): Noise model ('poisson', 'uniform', None)
163
- noise_multiplier (float): Noise scaling factor
164
- random_state: Random state for reproducible noise
165
"""
166
def __init__(self, name, motor, motor_field, center, Imax, sigma=1,
167
noise='poisson', noise_multiplier=1, random_state=None, **kwargs): ...
168
169
def trigger(self):
170
"""
171
Trigger detector to generate Gaussian signal.
172
173
Returns:
174
Status: Trigger completion status
175
"""
176
177
def read(self):
178
"""
179
Read Gaussian detector value.
180
181
Returns:
182
dict: Reading with Gaussian value based on motor position
183
"""
184
185
class Syn2DGauss(Device):
186
"""
187
Synthetic 2D Gaussian detector.
188
189
Generates 2D Gaussian signals based on two motor positions.
190
191
Parameters:
192
- name (str): Detector name
193
- motor0 (Device): First motor (X axis)
194
- motor0_field (str): First motor field name
195
- motor1 (Device): Second motor (Y axis)
196
- motor1_field (str): Second motor field name
197
- center (tuple): Peak center (x, y)
198
- Imax (float): Peak maximum intensity
199
- sigma (float): Peak width
200
"""
201
def __init__(self, name, motor0, motor0_field, motor1, motor1_field,
202
center, Imax, sigma=1, **kwargs): ...
203
204
class ABDetector(Device):
205
"""
206
Simple A/B detector for basic testing.
207
208
Provides two correlated synthetic signals (A and B channels).
209
"""
210
def __init__(self, name, **kwargs): ...
211
212
def trigger(self):
213
"""Trigger A/B detector reading."""
214
215
def read(self):
216
"""Read A and B channel values."""
217
218
class DetWithCountTime(Device):
219
"""
220
Detector with configurable count time.
221
222
Simulates detectors that require count time configuration.
223
"""
224
def __init__(self, name, **kwargs): ...
225
226
@property
227
def count_time(self):
228
"""
229
Count time setting.
230
231
Returns:
232
SynSignal: Count time signal
233
"""
234
235
class InvariantSignal(Signal):
236
"""
237
Signal that maintains constant value for testing.
238
239
Useful for testing systems that expect unchanging reference values.
240
"""
241
def __init__(self, *, name, value=1, **kwargs): ...
242
```
243
244
### Fake EPICS Signals
245
246
Mock EPICS signals that simulate EPICS behavior without requiring EPICS infrastructure.
247
248
```python { .api }
249
class FakeEpicsSignal(Signal):
250
"""
251
Fake EPICS signal for testing without EPICS.
252
253
Simulates EpicsSignal behavior including connection status,
254
limits, precision, and units.
255
256
Parameters:
257
- read_pv (str): Simulated read PV name
258
- write_pv (str): Simulated write PV name (optional)
259
- **kwargs: Additional signal parameters
260
"""
261
def __init__(self, read_pv, write_pv=None, **kwargs): ...
262
263
def get(self, **kwargs):
264
"""Get fake PV value."""
265
266
def put(self, value, **kwargs):
267
"""Put value to fake PV."""
268
269
@property
270
def limits(self):
271
"""
272
Simulated PV limits.
273
274
Returns:
275
tuple: (low_limit, high_limit)
276
"""
277
278
@property
279
def precision(self):
280
"""
281
Simulated PV precision.
282
283
Returns:
284
int: Display precision
285
"""
286
287
@property
288
def units(self):
289
"""
290
Simulated PV units.
291
292
Returns:
293
str: Engineering units
294
"""
295
296
class FakeEpicsSignalRO(FakeEpicsSignal):
297
"""Read-only fake EPICS signal."""
298
def __init__(self, read_pv, **kwargs): ...
299
300
class FakeEpicsSignalWithRBV(FakeEpicsSignal):
301
"""
302
Fake EPICS signal with separate readback PV.
303
304
Simulates setpoint/readback PV pairs common in EPICS.
305
"""
306
def __init__(self, prefix, **kwargs): ...
307
```
308
309
### Testing Utilities
310
311
Utilities and classes specifically designed for testing ophyd applications.
312
313
```python { .api }
314
class NullStatus(StatusBase):
315
"""
316
Null status that completes immediately.
317
318
Useful for testing when you need a status object but
319
don't want any actual delay or processing.
320
"""
321
def __init__(self, **kwargs): ...
322
323
@property
324
def done(self):
325
"""Always returns True."""
326
return True
327
328
@property
329
def success(self):
330
"""Always returns True."""
331
return True
332
333
class MockFlyer(Device):
334
"""
335
Mock flyer device for testing fly scanning.
336
337
Simulates flyer interface without requiring actual
338
continuous scanning hardware.
339
"""
340
def __init__(self, name, **kwargs): ...
341
342
def kickoff(self):
343
"""
344
Start mock fly scan.
345
346
Returns:
347
NullStatus: Immediately complete status
348
"""
349
350
def complete(self):
351
"""
352
Complete mock fly scan.
353
354
Returns:
355
NullStatus: Immediately complete status
356
"""
357
358
def collect(self):
359
"""
360
Collect mock fly scan data.
361
362
Yields:
363
dict: Mock event documents
364
"""
365
366
class TrivialFlyer(Device):
367
"""
368
Trivial flyer implementation for basic testing.
369
370
Minimal flyer interface for testing flyer protocols.
371
"""
372
def __init__(self, **kwargs): ...
373
374
def kickoff(self):
375
"""Start trivial fly scan."""
376
377
def complete(self):
378
"""Complete trivial fly scan."""
379
380
def collect(self):
381
"""Collect trivial fly scan data."""
382
383
class SynSignalWithRegistry(SynSignal):
384
"""
385
Synthetic signal that registers itself for testing registry systems.
386
"""
387
def __init__(self, **kwargs): ...
388
389
class SPseudo3x3(PseudoPositioner):
390
"""
391
3x3 synthetic pseudo positioner for testing coordinate transformations.
392
393
Provides 3 pseudo axes that map to 3 real axes through
394
a 3x3 transformation matrix.
395
"""
396
def __init__(self, **kwargs): ...
397
398
def forward(self, pseudo_pos):
399
"""Transform 3 pseudo coordinates to 3 real coordinates."""
400
401
def inverse(self, real_pos):
402
"""Transform 3 real coordinates to 3 pseudo coordinates."""
403
404
class SPseudo1x3(PseudoPositioner):
405
"""
406
1x3 synthetic pseudo positioner.
407
408
Maps 1 pseudo axis to 3 real axes for testing
409
one-to-many transformations.
410
"""
411
def __init__(self, **kwargs): ...
412
413
def forward(self, pseudo_pos):
414
"""Transform 1 pseudo coordinate to 3 real coordinates."""
415
416
def inverse(self, real_pos):
417
"""Transform 3 real coordinates to 1 pseudo coordinate."""
418
```
419
420
### Additional Simulation Classes
421
422
Advanced simulation classes for specialized testing scenarios and device emulation.
423
424
```python { .api }
425
class DirectImage(Device):
426
"""
427
Synthetic area detector that directly stores image data.
428
429
Used for testing image data workflows without requiring
430
actual area detector hardware or complex image generation.
431
432
Parameters:
433
- name (str): Device name
434
- func (callable): Optional function to generate image data
435
"""
436
def __init__(self, *, name, func=None, **kwargs): ...
437
438
# Component for storing image data
439
img: Component # Direct image storage
440
441
class NewTrivialFlyer(Device):
442
"""
443
Enhanced TrivialFlyer with asset document support.
444
445
Implements the new-style API for Resource and Datum documents,
446
supporting modern asset management patterns in Bluesky.
447
"""
448
def __init__(self, **kwargs): ...
449
450
def kickoff(self):
451
"""Start flyer operation."""
452
453
def complete(self):
454
"""Complete flyer operation."""
455
456
def collect(self):
457
"""Collect flight data."""
458
459
def collect_asset_docs(self):
460
"""Collect asset documents (Resource and Datum)."""
461
462
class DetWithConf(Device):
463
"""
464
Detector with separate read and configuration attributes.
465
466
Demonstrates device configuration patterns where some
467
components are read regularly and others only for configuration.
468
"""
469
def __init__(self, **kwargs): ...
470
471
# Mixed read/config components
472
a: Component # read_attrs
473
b: Component # read_attrs
474
c: Component # configuration_attrs
475
d: Component # configuration_attrs
476
477
class FakeEpicsPathSignal(FakeEpicsSignal):
478
"""
479
Specialized fake EPICS signal for path handling.
480
481
Used in AreaDetector path management simulation,
482
supporting different path semantics and validation.
483
484
Parameters:
485
- read_pv (str): Read PV name
486
- write_pv (str): Write PV name
487
- path_semantics (str): Path validation semantics
488
"""
489
def __init__(self, read_pv, write_pv=None, *, path_semantics='posix', **kwargs): ...
490
491
class SynSignalWithRegistry(SynSignal):
492
"""
493
Synthetic signal with asset document registry support.
494
495
Generates asset documents (Resource and Datum) for testing
496
data management workflows with external file storage.
497
498
Parameters:
499
- name (str): Signal name
500
- save_path (str): Path for storing generated data files
501
- save_spec (str): File format specification
502
- save_func (callable): Function for saving data
503
"""
504
def __init__(self, *, name, save_path=None, save_spec='NSPY_SEQ', save_func=None, **kwargs): ...
505
506
def collect_asset_docs(self):
507
"""Generate Resource and Datum documents for collected data."""
508
```
509
510
### Utility Functions
511
512
Helper functions for creating and managing synthetic devices and test environments.
513
514
```python { .api }
515
def make_fake_device(cls):
516
"""
517
Create fake version of real EPICS device class.
518
519
Replaces all EpicsSignal components with FakeEpicsSignal
520
equivalents, enabling testing without EPICS infrastructure.
521
522
Parameters:
523
- cls (Device): Device class to make fake version of
524
525
Returns:
526
Device: New device class with fake signals
527
"""
528
529
def clear_fake_device(dev, *, exclude=None):
530
"""
531
Reset all signals in fake device to default values.
532
533
Useful for cleaning up test state between test runs.
534
535
Parameters:
536
- dev (Device): Fake device to clear
537
- exclude (list): Signal names to skip clearing
538
"""
539
540
def instantiate_fake_device(dev_cls, **kwargs):
541
"""
542
Create instance of fake device with sensible defaults.
543
544
Handles complex initialization requirements and provides
545
reasonable default values for testing.
546
547
Parameters:
548
- dev_cls (Device): Device class to instantiate
549
- **kwargs: Override default parameters
550
551
Returns:
552
Device: Configured fake device instance
553
"""
554
555
def hw(save_path=None):
556
"""
557
Factory function creating complete synthetic hardware suite.
558
559
Returns namespace containing motors, detectors, scalers,
560
area detectors, and other common beamline devices.
561
562
Parameters:
563
- save_path (str): Path for saving generated data
564
565
Returns:
566
SimpleNamespace: Collection of synthetic hardware devices
567
"""
568
569
def new_uid():
570
"""Generate new unique identifier string."""
571
572
def short_uid():
573
"""Generate short unique identifier (6 characters)."""
574
```
575
576
## Usage Examples
577
578
### Basic Synthetic Signal Testing
579
580
```python
581
from ophyd.sim import SynSignal, SynSignalRO
582
import time
583
import numpy as np
584
585
# Create synthetic signal with static value
586
temperature = SynSignal(name='temperature', value=25.0)
587
588
# Read and write synthetic signal
589
print(f"Initial temperature: {temperature.get()}")
590
temperature.put(30.0)
591
print(f"New temperature: {temperature.get()}")
592
593
# Create signal with time-based function
594
def sine_wave(t=None):
595
if t is None:
596
t = time.time()
597
return 10 * np.sin(2 * np.pi * t / 5.0) # 5 second period
598
599
oscillating_signal = SynSignal(name='oscillator', func=sine_wave)
600
601
# Read oscillating values
602
for i in range(5):
603
print(f"Oscillator value: {oscillating_signal.get():.3f}")
604
time.sleep(1)
605
```
606
607
### Synthetic Motor Testing
608
609
```python
610
from ophyd.sim import SynAxis
611
from ophyd.status import wait
612
import time
613
614
# Create synthetic motor
615
motor = SynAxis(name='synthetic_motor', delay=0.1) # 0.1s per unit move
616
617
print(f"Initial position: {motor.position}")
618
619
# Move motor with realistic delay
620
start_time = time.time()
621
status = motor.move(10.0)
622
623
print("Move started...")
624
wait(status) # Wait for simulated move to complete
625
end_time = time.time()
626
627
print(f"Final position: {motor.position}")
628
print(f"Move took: {end_time - start_time:.2f} seconds")
629
630
# Test position tolerance
631
motor.move(5.0)
632
wait(status)
633
print(f"Position after move to 5.0: {motor.position}")
634
```
635
636
### Gaussian Peak Detector Simulation
637
638
```python
639
from ophyd.sim import SynGauss, SynAxis
640
from ophyd.status import wait
641
import numpy as np
642
import matplotlib.pyplot as plt
643
644
# Create synthetic motor and Gaussian detector
645
motor = SynAxis(name='scan_motor')
646
detector = SynGauss(
647
'gauss_det',
648
motor, 'position', # Position dependence
649
center=5.0, # Peak at position 5.0
650
Imax=1000, # Peak intensity
651
sigma=1.0, # Peak width
652
noise='poisson' # Poisson noise
653
)
654
655
# Scan across peak
656
positions = np.linspace(0, 10, 21)
657
intensities = []
658
659
for pos in positions:
660
# Move motor
661
status = motor.move(pos)
662
wait(status)
663
664
# Trigger detector
665
det_status = detector.trigger()
666
wait(det_status)
667
668
# Read intensity
669
reading = detector.read()
670
intensity = reading['gauss_det']['value']
671
intensities.append(intensity)
672
673
print(f"Position: {pos:4.1f}, Intensity: {intensity:6.1f}")
674
675
# Plot results
676
plt.figure(figsize=(10, 6))
677
plt.plot(positions, intensities, 'bo-')
678
plt.xlabel('Motor Position')
679
plt.ylabel('Detector Intensity')
680
plt.title('Synthetic Gaussian Peak Scan')
681
plt.grid(True)
682
plt.show()
683
```
684
685
### Fake EPICS Signal Testing
686
687
```python
688
from ophyd.sim import FakeEpicsSignal, FakeEpicsSignalWithRBV
689
690
# Create fake EPICS signals for testing
691
temp_sensor = FakeEpicsSignal('XF:28IDC:TEMP:01', name='temperature')
692
pressure_ctrl = FakeEpicsSignalWithRBV('XF:28IDC:PRES:', name='pressure')
693
694
# Test EPICS-like behavior without EPICS
695
print(f"Temperature: {temp_sensor.get()}")
696
print(f"Temperature limits: {temp_sensor.limits}")
697
print(f"Temperature units: {temp_sensor.units}")
698
699
# Test setpoint/readback behavior
700
pressure_ctrl.put(1.5) # Set pressure
701
setpoint = pressure_ctrl.get()
702
readback = pressure_ctrl.user_readback.get()
703
704
print(f"Pressure setpoint: {setpoint}")
705
print(f"Pressure readback: {readback}")
706
```
707
708
### Mock Data Acquisition System
709
710
```python
711
from ophyd.sim import (SynAxis, SynGauss, ABDetector,
712
DetWithCountTime, NullStatus)
713
from ophyd.status import wait
714
715
class MockBeamline:
716
"""Mock beamline for testing data acquisition."""
717
718
def __init__(self):
719
# Create synthetic devices
720
self.motor_x = SynAxis(name='motor_x')
721
self.motor_y = SynAxis(name='motor_y')
722
723
self.detector_main = SynGauss(
724
'main_det', self.motor_x, 'position',
725
center=0, Imax=10000, sigma=2.0
726
)
727
728
self.detector_monitor = ABDetector('monitor')
729
self.detector_timed = DetWithCountTime('timed_det')
730
731
def scan_1d(self, motor, positions, detector):
732
"""Perform 1D scan."""
733
results = []
734
735
for pos in positions:
736
# Move motor
737
status = motor.move(pos)
738
wait(status)
739
740
# Trigger detector
741
det_status = detector.trigger()
742
wait(det_status)
743
744
# Read data
745
reading = detector.read()
746
results.append({
747
'position': pos,
748
'reading': reading
749
})
750
751
return results
752
753
def scan_2d(self, motor_x, positions_x, motor_y, positions_y, detector):
754
"""Perform 2D grid scan."""
755
results = []
756
757
for x_pos in positions_x:
758
for y_pos in positions_y:
759
# Move both motors
760
x_status = motor_x.move(x_pos)
761
y_status = motor_y.move(y_pos)
762
wait([x_status, y_status])
763
764
# Trigger detector
765
det_status = detector.trigger()
766
wait(det_status)
767
768
# Read data
769
reading = detector.read()
770
results.append({
771
'x_position': x_pos,
772
'y_position': y_pos,
773
'reading': reading
774
})
775
776
return results
777
778
# Use mock beamline
779
beamline = MockBeamline()
780
781
# Perform 1D scan
782
positions = [-5, -2, -1, 0, 1, 2, 5]
783
scan_results = beamline.scan_1d(
784
beamline.motor_x,
785
positions,
786
beamline.detector_main
787
)
788
789
print("1D Scan Results:")
790
for result in scan_results:
791
pos = result['position']
792
intensity = result['reading']['main_det']['value']
793
print(f"Position: {pos:4.1f}, Intensity: {intensity:8.1f}")
794
```
795
796
### Testing Device Staging
797
798
```python
799
from ophyd.sim import SynAxis, DetWithCountTime
800
from ophyd import Device, Component
801
802
class MockExperimentDevice(Device):
803
"""Mock device for testing staging protocols."""
804
805
motor = Component(SynAxis, '')
806
detector = Component(DetWithCountTime, '')
807
808
def stage(self):
809
"""Custom staging behavior."""
810
print("Staging experiment device...")
811
812
# Configure detector count time
813
self.detector.count_time.put(0.1)
814
815
# Call parent staging
816
staged = super().stage()
817
818
print("Device staged successfully")
819
return staged
820
821
def unstage(self):
822
"""Custom unstaging behavior."""
823
print("Unstaging experiment device...")
824
825
# Call parent unstaging
826
unstaged = super().unstage()
827
828
print("Device unstaged successfully")
829
return unstaged
830
831
# Test staging behavior
832
device = MockExperimentDevice(name='experiment')
833
834
# Stage device
835
staged_components = device.stage()
836
print(f"Staged components: {staged_components}")
837
838
# Use device (simulated)
839
print("Using device for data acquisition...")
840
841
# Unstage device
842
unstaged_components = device.unstage()
843
print(f"Unstaged components: {unstaged_components}")
844
```