or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

area-detectors.mdcore-framework.mdepics-integration.mdflyers-continuous-scanning.mdindex.mdmotors-positioners.mdsimulation-testing.mdspecialized-devices.md

epics-integration.mddocs/

0

# EPICS Integration

1

2

Hardware control through EPICS (Experimental Physics and Industrial Control System) process variables, providing the primary interface to laboratory instruments and control systems in scientific facilities. Ophyd abstracts the underlying EPICS client libraries (pyepics, caproto) and provides a unified interface for accessing EPICS PVs.

3

4

## Capabilities

5

6

### EPICS Signals

7

8

Signal classes that connect to EPICS process variables for reading and writing hardware values.

9

10

```python { .api }

11

class EpicsSignal(Signal):

12

"""

13

Signal connected to EPICS process variables.

14

15

Parameters:

16

- read_pv (str): PV name for reading values

17

- write_pv (str): PV name for writing (defaults to read_pv)

18

- pv_kw (dict): Additional PV connection parameters

19

- put_complete (bool): Whether to wait for put completion

20

- string (bool): Whether to treat PV as string type

21

- limits (bool): Whether to respect PV limits

22

- auto_monitor (bool): Whether to automatically monitor changes

23

- name (str): Signal name

24

"""

25

def __init__(self, read_pv, write_pv=None, *, pv_kw=None, put_complete=False, string=False, limits=False, auto_monitor=None, name=None, **kwargs): ...

26

27

def get(self, *, as_string=None, timeout=None, **kwargs):

28

"""

29

Get current PV value.

30

31

Parameters:

32

- as_string (bool): Return value as string

33

- timeout (float): Read timeout

34

35

Returns:

36

Current PV value

37

"""

38

39

def put(self, value, *, force=False, timeout=None, use_complete=None, **kwargs):

40

"""

41

Put value to PV synchronously.

42

43

Parameters:

44

- value: Value to write

45

- force (bool): Force write even if same value

46

- timeout (float): Write timeout

47

- use_complete (bool): Wait for put completion

48

"""

49

50

def set(self, value, *, timeout=None, settle_time=None, **kwargs):

51

"""

52

Set PV value asynchronously.

53

54

Parameters:

55

- value: Target value

56

- timeout (float): Operation timeout

57

- settle_time (float): Additional settling time

58

59

Returns:

60

StatusBase: Status tracking set completion

61

"""

62

63

@property

64

def limits(self):

65

"""

66

PV operating limits.

67

68

Returns:

69

tuple: (low_limit, high_limit)

70

"""

71

72

@property

73

def precision(self):

74

"""

75

PV display precision.

76

77

Returns:

78

int: Number of decimal places

79

"""

80

81

@property

82

def units(self):

83

"""

84

PV engineering units.

85

86

Returns:

87

str: Units string

88

"""

89

90

class EpicsSignalRO(EpicsSignal):

91

"""

92

Read-only EPICS signal.

93

94

Parameters:

95

- read_pv (str): PV name for reading

96

- pv_kw (dict): PV connection parameters

97

- string (bool): Treat as string PV

98

- limits (bool): Respect PV limits

99

- auto_monitor (bool): Auto-monitor changes

100

- name (str): Signal name

101

"""

102

def __init__(self, read_pv, *, pv_kw=None, string=False, limits=False, auto_monitor=None, name=None, **kwargs): ...

103

104

class EpicsSignalNoValidation(EpicsSignal):

105

"""

106

EPICS signal that does not verify values on set operations.

107

108

This signal bypasses readback verification and returns completed status

109

immediately after issuing a put command. Useful for cases where readback

110

verification is not needed or not reliable.

111

112

Parameters:

113

- read_pv (str): PV name for reading values

114

- write_pv (str): PV name for writing (defaults to read_pv)

115

- put_complete (bool): Whether to wait for put completion

116

- string (bool): Treat PV as string type

117

- limits (bool): Respect PV limits

118

- name (str): Signal name

119

- write_timeout (float): Timeout for put completion

120

- connection_timeout (float): Connection timeout

121

"""

122

def __init__(self, read_pv, write_pv=None, *, put_complete=False, string=False, limits=False, name=None, **kwargs): ...

123

124

class EpicsSignalWithRBV(EpicsSignal):

125

"""

126

EPICS signal with separate readback PV using AreaDetector convention.

127

128

This signal automatically constructs read and write PVs from a base prefix:

129

- Read PV: prefix + "_RBV" (readback value)

130

- Write PV: prefix (setpoint)

131

132

Commonly used with AreaDetector devices where this naming pattern is standard.

133

134

Parameters:

135

- prefix (str): Base PV name (write PV), read PV will be prefix + "_RBV"

136

"""

137

def __init__(self, prefix, **kwargs): ...

138

```

139

140

### Control Layer Management

141

142

Functions to configure and manage the underlying EPICS client library backend.

143

144

```python { .api }

145

def set_cl(control_layer=None, *, pv_telemetry=False):

146

"""

147

Set the control layer (EPICS client backend).

148

149

Parameters:

150

- control_layer (str): Backend to use ('pyepics', 'caproto', 'dummy', or 'any')

151

- pv_telemetry (bool): Enable PV access telemetry collection

152

153

Raises:

154

ImportError: If specified backend is not available

155

ValueError: If control_layer is unknown

156

"""

157

158

def get_cl():

159

"""

160

Get the current control layer.

161

162

Returns:

163

SimpleNamespace: Current control layer with methods:

164

- get_pv(pvname): Get PV object

165

- caput(pvname, value): Put value to PV

166

- caget(pvname): Get value from PV

167

- setup(): Setup backend

168

- thread_class: Thread class for backend

169

- name: Backend name

170

171

Raises:

172

RuntimeError: If control layer not set

173

"""

174

```

175

176

### EPICS Utilities

177

178

Utility functions for working with EPICS PV names and data types.

179

180

```python { .api }

181

def validate_pv_name(pv_name):

182

"""

183

Validate EPICS PV name format.

184

185

Parameters:

186

- pv_name (str): PV name to validate

187

188

Returns:

189

bool: True if valid PV name

190

"""

191

192

def split_record_field(pv_name):

193

"""

194

Split PV name into record and field parts.

195

196

Parameters:

197

- pv_name (str): Full PV name

198

199

Returns:

200

tuple: (record_name, field_name)

201

"""

202

203

def strip_field(pv_name):

204

"""

205

Remove field from PV name, returning just record name.

206

207

Parameters:

208

- pv_name (str): PV name potentially with field

209

210

Returns:

211

str: Record name without field

212

"""

213

214

def record_field(record, field):

215

"""

216

Combine record name and field into full PV name.

217

218

Parameters:

219

- record (str): Record name

220

- field (str): Field name

221

222

Returns:

223

str: Combined PV name

224

"""

225

226

def set_and_wait(signal, value, timeout=10):

227

"""

228

Set EPICS signal value and wait for completion.

229

230

Parameters:

231

- signal (EpicsSignal): Signal to set

232

- value: Target value

233

- timeout (float): Maximum wait time

234

235

Returns:

236

StatusBase: Completion status

237

"""

238

239

def data_type(pv):

240

"""

241

Get EPICS data type of PV.

242

243

Parameters:

244

- pv: PV object or name

245

246

Returns:

247

str: EPICS data type name

248

"""

249

250

def data_shape(pv):

251

"""

252

Get shape of PV data.

253

254

Parameters:

255

- pv: PV object or name

256

257

Returns:

258

tuple: Data shape dimensions

259

"""

260

261

def raise_if_disconnected(func):

262

"""

263

Decorator to check PV connection before operation.

264

265

Parameters:

266

- func (callable): Function to decorate

267

268

Returns:

269

callable: Decorated function that checks connection

270

271

Raises:

272

DisconnectedError: If PV is not connected

273

"""

274

275

def waveform_to_string(value, encoding='utf-8'):

276

"""

277

Convert EPICS waveform to string.

278

279

Parameters:

280

- value (array): Waveform data

281

- encoding (str): Character encoding

282

283

Returns:

284

str: Converted string

285

"""

286

```

287

288

## Usage Examples

289

290

### Basic EPICS Signal Usage

291

292

```python

293

from ophyd import EpicsSignal, EpicsSignalRO

294

from ophyd.status import wait

295

296

# Create read-write EPICS signal

297

temperature = EpicsSignal('XF:28IDC:TMP:01', name='temperature')

298

temperature.wait_for_connection()

299

300

# Read current value

301

current_temp = temperature.get()

302

print(f"Current temperature: {current_temp} {temperature.units}")

303

304

# Set new value asynchronously

305

status = temperature.set(25.0)

306

wait(status) # Wait for completion

307

308

# Create read-only signal

309

pressure = EpicsSignalRO('XF:28IDC:PRES:01', name='pressure')

310

pressure.wait_for_connection()

311

312

current_pressure = pressure.get()

313

print(f"Current pressure: {current_pressure}")

314

```

315

316

### Device with EPICS Components

317

318

```python

319

from ophyd import Device, Component, EpicsSignal, EpicsSignalRO

320

321

class TemperatureController(Device):

322

"""EPICS-based temperature controller."""

323

setpoint = Component(EpicsSignal, 'SP')

324

readback = Component(EpicsSignalRO, 'RBV')

325

status = Component(EpicsSignalRO, 'STAT')

326

327

def set_temperature(self, temp):

328

"""Set target temperature."""

329

return self.setpoint.set(temp)

330

331

@property

332

def temperature(self):

333

"""Current temperature reading."""

334

return self.readback.get()

335

336

# Create device instance

337

temp_ctrl = TemperatureController('XF:28IDC:TMP:', name='temp_controller')

338

temp_ctrl.wait_for_connection()

339

340

# Use device

341

status = temp_ctrl.set_temperature(22.5)

342

wait(status)

343

344

reading = temp_ctrl.read()

345

print(f"Temperature: {reading['temp_controller_readback']['value']}")

346

```

347

348

### Control Layer Configuration

349

350

```python

351

from ophyd import set_cl, get_cl

352

353

# Set control layer to caproto

354

set_cl('caproto')

355

356

# Get current control layer info

357

cl = get_cl()

358

print(f"Using control layer: {cl.name}")

359

360

# Access low-level control layer functions

361

cl.caput('XF:28IDC:TMP:01', 25.0) # Direct PV access

362

value = cl.caget('XF:28IDC:TMP:01')

363

print(f"Direct PV read: {value}")

364

365

# Enable telemetry tracking

366

set_cl('pyepics', pv_telemetry=True)

367

cl = get_cl()

368

369

# After using signals, check PV access statistics

370

if hasattr(cl.get_pv, 'counter'):

371

print("Most accessed PVs:")

372

for pv_name, count in cl.get_pv.counter.most_common(10):

373

print(f" {pv_name}: {count} accesses")

374

```

375

376

### Working with String PVs

377

378

```python

379

from ophyd import EpicsSignal

380

381

# String PV for device name

382

device_name = EpicsSignal('XF:28IDC:DEV:NAME', string=True, name='device_name')

383

device_name.wait_for_connection()

384

385

# Read string value

386

name = device_name.get()

387

print(f"Device name: {name}")

388

389

# Set string value

390

device_name.put("Sample_Detector_A")

391

392

# Waveform PV that contains string data

393

message_pv = EpicsSignal('XF:28IDC:MSG:01', name='message')

394

message_pv.wait_for_connection()

395

396

# Read waveform as string

397

from ophyd.utils.epics_pvs import waveform_to_string

398

raw_data = message_pv.get()

399

message = waveform_to_string(raw_data)

400

print(f"Message: {message}")

401

```

402

403

### Error Handling with EPICS

404

405

```python

406

from ophyd import EpicsSignal

407

from ophyd.utils.errors import DisconnectedError

408

import time

409

410

signal = EpicsSignal('XF:28IDC:NOEXIST:01', name='nonexistent')

411

412

try:

413

# This will timeout if PV doesn't exist

414

signal.wait_for_connection(timeout=2.0)

415

value = signal.get()

416

except Exception as e:

417

print(f"Connection failed: {e}")

418

419

# Check connection status

420

if signal.connected:

421

print("Signal is connected")

422

else:

423

print("Signal is not connected")

424

425

# Use raise_if_disconnected decorator

426

from ophyd.utils.epics_pvs import raise_if_disconnected

427

428

@raise_if_disconnected

429

def read_signal(sig):

430

return sig.get()

431

432

try:

433

value = read_signal(signal)

434

except DisconnectedError:

435

print("Cannot read from disconnected signal")

436

```