0
# EPICS Integration
1
2
Hardware control through EPICS (Experimental Physics and Industrial Control System) process variables, providing the primary interface to laboratory instruments and control systems in scientific facilities. Ophyd abstracts the underlying EPICS client libraries (pyepics, caproto) and provides a unified interface for accessing EPICS PVs.
3
4
## Capabilities
5
6
### EPICS Signals
7
8
Signal classes that connect to EPICS process variables for reading and writing hardware values.
9
10
```python { .api }
11
class EpicsSignal(Signal):
12
"""
13
Signal connected to EPICS process variables.
14
15
Parameters:
16
- read_pv (str): PV name for reading values
17
- write_pv (str): PV name for writing (defaults to read_pv)
18
- pv_kw (dict): Additional PV connection parameters
19
- put_complete (bool): Whether to wait for put completion
20
- string (bool): Whether to treat PV as string type
21
- limits (bool): Whether to respect PV limits
22
- auto_monitor (bool): Whether to automatically monitor changes
23
- name (str): Signal name
24
"""
25
def __init__(self, read_pv, write_pv=None, *, pv_kw=None, put_complete=False, string=False, limits=False, auto_monitor=None, name=None, **kwargs): ...
26
27
def get(self, *, as_string=None, timeout=None, **kwargs):
28
"""
29
Get current PV value.
30
31
Parameters:
32
- as_string (bool): Return value as string
33
- timeout (float): Read timeout
34
35
Returns:
36
Current PV value
37
"""
38
39
def put(self, value, *, force=False, timeout=None, use_complete=None, **kwargs):
40
"""
41
Put value to PV synchronously.
42
43
Parameters:
44
- value: Value to write
45
- force (bool): Force write even if same value
46
- timeout (float): Write timeout
47
- use_complete (bool): Wait for put completion
48
"""
49
50
def set(self, value, *, timeout=None, settle_time=None, **kwargs):
51
"""
52
Set PV value asynchronously.
53
54
Parameters:
55
- value: Target value
56
- timeout (float): Operation timeout
57
- settle_time (float): Additional settling time
58
59
Returns:
60
StatusBase: Status tracking set completion
61
"""
62
63
@property
64
def limits(self):
65
"""
66
PV operating limits.
67
68
Returns:
69
tuple: (low_limit, high_limit)
70
"""
71
72
@property
73
def precision(self):
74
"""
75
PV display precision.
76
77
Returns:
78
int: Number of decimal places
79
"""
80
81
@property
82
def units(self):
83
"""
84
PV engineering units.
85
86
Returns:
87
str: Units string
88
"""
89
90
class EpicsSignalRO(EpicsSignal):
91
"""
92
Read-only EPICS signal.
93
94
Parameters:
95
- read_pv (str): PV name for reading
96
- pv_kw (dict): PV connection parameters
97
- string (bool): Treat as string PV
98
- limits (bool): Respect PV limits
99
- auto_monitor (bool): Auto-monitor changes
100
- name (str): Signal name
101
"""
102
def __init__(self, read_pv, *, pv_kw=None, string=False, limits=False, auto_monitor=None, name=None, **kwargs): ...
103
104
class EpicsSignalNoValidation(EpicsSignal):
105
"""
106
EPICS signal that does not verify values on set operations.
107
108
This signal bypasses readback verification and returns completed status
109
immediately after issuing a put command. Useful for cases where readback
110
verification is not needed or not reliable.
111
112
Parameters:
113
- read_pv (str): PV name for reading values
114
- write_pv (str): PV name for writing (defaults to read_pv)
115
- put_complete (bool): Whether to wait for put completion
116
- string (bool): Treat PV as string type
117
- limits (bool): Respect PV limits
118
- name (str): Signal name
119
- write_timeout (float): Timeout for put completion
120
- connection_timeout (float): Connection timeout
121
"""
122
def __init__(self, read_pv, write_pv=None, *, put_complete=False, string=False, limits=False, name=None, **kwargs): ...
123
124
class EpicsSignalWithRBV(EpicsSignal):
125
"""
126
EPICS signal with separate readback PV using AreaDetector convention.
127
128
This signal automatically constructs read and write PVs from a base prefix:
129
- Read PV: prefix + "_RBV" (readback value)
130
- Write PV: prefix (setpoint)
131
132
Commonly used with AreaDetector devices where this naming pattern is standard.
133
134
Parameters:
135
- prefix (str): Base PV name (write PV), read PV will be prefix + "_RBV"
136
"""
137
def __init__(self, prefix, **kwargs): ...
138
```
139
140
### Control Layer Management
141
142
Functions to configure and manage the underlying EPICS client library backend.
143
144
```python { .api }
145
def set_cl(control_layer=None, *, pv_telemetry=False):
146
"""
147
Set the control layer (EPICS client backend).
148
149
Parameters:
150
- control_layer (str): Backend to use ('pyepics', 'caproto', 'dummy', or 'any')
151
- pv_telemetry (bool): Enable PV access telemetry collection
152
153
Raises:
154
ImportError: If specified backend is not available
155
ValueError: If control_layer is unknown
156
"""
157
158
def get_cl():
159
"""
160
Get the current control layer.
161
162
Returns:
163
SimpleNamespace: Current control layer with methods:
164
- get_pv(pvname): Get PV object
165
- caput(pvname, value): Put value to PV
166
- caget(pvname): Get value from PV
167
- setup(): Setup backend
168
- thread_class: Thread class for backend
169
- name: Backend name
170
171
Raises:
172
RuntimeError: If control layer not set
173
"""
174
```
175
176
### EPICS Utilities
177
178
Utility functions for working with EPICS PV names and data types.
179
180
```python { .api }
181
def validate_pv_name(pv_name):
182
"""
183
Validate EPICS PV name format.
184
185
Parameters:
186
- pv_name (str): PV name to validate
187
188
Returns:
189
bool: True if valid PV name
190
"""
191
192
def split_record_field(pv_name):
193
"""
194
Split PV name into record and field parts.
195
196
Parameters:
197
- pv_name (str): Full PV name
198
199
Returns:
200
tuple: (record_name, field_name)
201
"""
202
203
def strip_field(pv_name):
204
"""
205
Remove field from PV name, returning just record name.
206
207
Parameters:
208
- pv_name (str): PV name potentially with field
209
210
Returns:
211
str: Record name without field
212
"""
213
214
def record_field(record, field):
215
"""
216
Combine record name and field into full PV name.
217
218
Parameters:
219
- record (str): Record name
220
- field (str): Field name
221
222
Returns:
223
str: Combined PV name
224
"""
225
226
def set_and_wait(signal, value, timeout=10):
227
"""
228
Set EPICS signal value and wait for completion.
229
230
Parameters:
231
- signal (EpicsSignal): Signal to set
232
- value: Target value
233
- timeout (float): Maximum wait time
234
235
Returns:
236
StatusBase: Completion status
237
"""
238
239
def data_type(pv):
240
"""
241
Get EPICS data type of PV.
242
243
Parameters:
244
- pv: PV object or name
245
246
Returns:
247
str: EPICS data type name
248
"""
249
250
def data_shape(pv):
251
"""
252
Get shape of PV data.
253
254
Parameters:
255
- pv: PV object or name
256
257
Returns:
258
tuple: Data shape dimensions
259
"""
260
261
def raise_if_disconnected(func):
262
"""
263
Decorator to check PV connection before operation.
264
265
Parameters:
266
- func (callable): Function to decorate
267
268
Returns:
269
callable: Decorated function that checks connection
270
271
Raises:
272
DisconnectedError: If PV is not connected
273
"""
274
275
def waveform_to_string(value, encoding='utf-8'):
276
"""
277
Convert EPICS waveform to string.
278
279
Parameters:
280
- value (array): Waveform data
281
- encoding (str): Character encoding
282
283
Returns:
284
str: Converted string
285
"""
286
```
287
288
## Usage Examples
289
290
### Basic EPICS Signal Usage
291
292
```python
293
from ophyd import EpicsSignal, EpicsSignalRO
294
from ophyd.status import wait
295
296
# Create read-write EPICS signal
297
temperature = EpicsSignal('XF:28IDC:TMP:01', name='temperature')
298
temperature.wait_for_connection()
299
300
# Read current value
301
current_temp = temperature.get()
302
print(f"Current temperature: {current_temp} {temperature.units}")
303
304
# Set new value asynchronously
305
status = temperature.set(25.0)
306
wait(status) # Wait for completion
307
308
# Create read-only signal
309
pressure = EpicsSignalRO('XF:28IDC:PRES:01', name='pressure')
310
pressure.wait_for_connection()
311
312
current_pressure = pressure.get()
313
print(f"Current pressure: {current_pressure}")
314
```
315
316
### Device with EPICS Components
317
318
```python
319
from ophyd import Device, Component, EpicsSignal, EpicsSignalRO
320
321
class TemperatureController(Device):
322
"""EPICS-based temperature controller."""
323
setpoint = Component(EpicsSignal, 'SP')
324
readback = Component(EpicsSignalRO, 'RBV')
325
status = Component(EpicsSignalRO, 'STAT')
326
327
def set_temperature(self, temp):
328
"""Set target temperature."""
329
return self.setpoint.set(temp)
330
331
@property
332
def temperature(self):
333
"""Current temperature reading."""
334
return self.readback.get()
335
336
# Create device instance
337
temp_ctrl = TemperatureController('XF:28IDC:TMP:', name='temp_controller')
338
temp_ctrl.wait_for_connection()
339
340
# Use device
341
status = temp_ctrl.set_temperature(22.5)
342
wait(status)
343
344
reading = temp_ctrl.read()
345
print(f"Temperature: {reading['temp_controller_readback']['value']}")
346
```
347
348
### Control Layer Configuration
349
350
```python
351
from ophyd import set_cl, get_cl
352
353
# Set control layer to caproto
354
set_cl('caproto')
355
356
# Get current control layer info
357
cl = get_cl()
358
print(f"Using control layer: {cl.name}")
359
360
# Access low-level control layer functions
361
cl.caput('XF:28IDC:TMP:01', 25.0) # Direct PV access
362
value = cl.caget('XF:28IDC:TMP:01')
363
print(f"Direct PV read: {value}")
364
365
# Enable telemetry tracking
366
set_cl('pyepics', pv_telemetry=True)
367
cl = get_cl()
368
369
# After using signals, check PV access statistics
370
if hasattr(cl.get_pv, 'counter'):
371
print("Most accessed PVs:")
372
for pv_name, count in cl.get_pv.counter.most_common(10):
373
print(f" {pv_name}: {count} accesses")
374
```
375
376
### Working with String PVs
377
378
```python
379
from ophyd import EpicsSignal
380
381
# String PV for device name
382
device_name = EpicsSignal('XF:28IDC:DEV:NAME', string=True, name='device_name')
383
device_name.wait_for_connection()
384
385
# Read string value
386
name = device_name.get()
387
print(f"Device name: {name}")
388
389
# Set string value
390
device_name.put("Sample_Detector_A")
391
392
# Waveform PV that contains string data
393
message_pv = EpicsSignal('XF:28IDC:MSG:01', name='message')
394
message_pv.wait_for_connection()
395
396
# Read waveform as string
397
from ophyd.utils.epics_pvs import waveform_to_string
398
raw_data = message_pv.get()
399
message = waveform_to_string(raw_data)
400
print(f"Message: {message}")
401
```
402
403
### Error Handling with EPICS
404
405
```python
406
from ophyd import EpicsSignal
407
from ophyd.utils.errors import DisconnectedError
408
import time
409
410
signal = EpicsSignal('XF:28IDC:NOEXIST:01', name='nonexistent')
411
412
try:
413
# This will timeout if PV doesn't exist
414
signal.wait_for_connection(timeout=2.0)
415
value = signal.get()
416
except Exception as e:
417
print(f"Connection failed: {e}")
418
419
# Check connection status
420
if signal.connected:
421
print("Signal is connected")
422
else:
423
print("Signal is not connected")
424
425
# Use raise_if_disconnected decorator
426
from ophyd.utils.epics_pvs import raise_if_disconnected
427
428
@raise_if_disconnected
429
def read_signal(sig):
430
return sig.get()
431
432
try:
433
value = read_signal(signal)
434
except DisconnectedError:
435
print("Cannot read from disconnected signal")
436
```