or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-messaging.mdcore-operations.mddevice-discovery.mdindex.mdtag-discovery.mdtime-operations.md

advanced-messaging.mddocs/

0

# Advanced Messaging

1

2

Low-level CIP (Common Industrial Protocol) messaging capabilities for custom communication, diagnostic operations, and advanced PLC interactions beyond standard tag operations. These functions enable direct protocol-level communication and custom service implementation.

3

4

## Capabilities

5

6

### Custom CIP Message Sending

7

8

Send custom CIP messages directly to the PLC for advanced operations not covered by standard tag read/write functions.

9

10

```python { .api }

11

def Message(cip_service, cip_class, cip_instance, cip_attribute=None, data=b''):

12

"""

13

Send custom CIP message to the PLC.

14

15

Args:

16

cip_service (int): CIP service code (e.g., 0x01=Get_Attribute_Single, 0x0E=Get_Attribute_List)

17

cip_class (int): CIP class code identifying the object class

18

cip_instance (int): CIP instance number within the class

19

cip_attribute (int or list, optional): CIP attribute number(s) to access

20

data (bytes): Message data payload for services that require data

21

22

Returns:

23

Response: Object containing raw response from PLC

24

- Value: Raw response data (bytes)

25

- Status: "Success" or error description

26

"""

27

```

28

29

**Usage Examples:**

30

31

```python

32

from pylogix import PLC

33

34

with PLC() as comm:

35

comm.IPAddress = '192.168.1.100'

36

37

# Get Identity Object (Class 0x01, Instance 0x01)

38

# Service 0x01 = Get_Attribute_Single, Attribute 0x01 = Vendor ID

39

response = comm.Message(0x01, 0x01, 0x01, 0x01)

40

if response.Status == 'Success':

41

# Response data contains the vendor ID

42

import struct

43

vendor_id = struct.unpack('<H', response.Value[44:46])[0] # Extract vendor ID

44

print(f"Vendor ID: {vendor_id}")

45

46

# Get multiple attributes at once

47

# Service 0x03 = Get_Attribute_List

48

response = comm.Message(0x03, 0x01, 0x01, [0x01, 0x02, 0x03]) # Vendor, Device Type, Product Code

49

if response.Status == 'Success':

50

print(f"Multiple attributes retrieved: {len(response.Value)} bytes")

51

52

# Send custom data to a service

53

custom_data = b'\x00\x01\x02\x03' # Example data

54

response = comm.Message(0x10, 0x04, 0x01, data=custom_data)

55

```

56

57

### CIP Message Receiving

58

59

Listen for incoming CIP messages, typically used for unsolicited data or event notifications from the PLC.

60

61

```python { .api }

62

def ReceiveMessage(ip_address, callback):

63

"""

64

Listen for incoming CIP messages from the PLC.

65

66

Args:

67

ip_address (str): IP address to listen on

68

callback (function): Callback function to handle received messages

69

Signature: callback(Response) where Response contains

70

TagName, Value, and Status

71

72

Returns:

73

Response: Listener setup status

74

"""

75

```

76

77

**Usage Examples:**

78

79

```python

80

def message_handler(response):

81

"""Handle incoming CIP messages."""

82

if response.Status == 'Success':

83

print(f"Received message for tag: {response.TagName}")

84

print(f"Value: {response.Value}")

85

print(f"Timestamp: {datetime.now()}")

86

else:

87

print(f"Message receive error: {response.Status}")

88

89

with PLC() as comm:

90

comm.IPAddress = '192.168.1.100'

91

92

# Start listening for messages

93

response = comm.ReceiveMessage('192.168.1.100', message_handler)

94

if response.Status == 'Success':

95

print("Message listener started")

96

# Keep the program running to receive messages

97

import time

98

try:

99

while True:

100

time.sleep(1)

101

except KeyboardInterrupt:

102

print("Stopping message listener")

103

else:

104

print(f"Failed to start listener: {response.Status}")

105

```

106

107

### Common CIP Services

108

109

Understanding standard CIP services for effective custom messaging.

110

111

**Standard CIP Services:**

112

113

```python

114

# Common CIP service codes

115

CIP_SERVICES = {

116

0x01: 'Get_Attribute_Single', # Read single attribute

117

0x02: 'Set_Attribute_Single', # Write single attribute

118

0x03: 'Get_Attribute_List', # Read multiple attributes

119

0x04: 'Set_Attribute_List', # Write multiple attributes

120

0x05: 'Reset', # Reset service

121

0x06: 'Start', # Start service

122

0x07: 'Stop', # Stop service

123

0x08: 'Create', # Create instance

124

0x09: 'Delete', # Delete instance

125

0x0A: 'Multiple_Service_Request', # Multiple services in one message

126

0x0E: 'Get_Attribute_All', # Get all attributes

127

0x10: 'Set_Attribute_All', # Set all attributes

128

0x4C: 'Read_Tag', # Tag read service (PyLogix internal)

129

0x4D: 'Write_Tag', # Tag write service (PyLogix internal)

130

0x52: 'Read_Tag_Fragmented', # Fragmented read (PyLogix internal)

131

0x53: 'Write_Tag_Fragmented', # Fragmented write (PyLogix internal)

132

}

133

134

# Example: Get all attributes of Identity Object

135

response = comm.Message(0x0E, 0x01, 0x01) # Get_Attribute_All from Identity Object

136

```

137

138

### Common CIP Classes

139

140

Understanding CIP object classes for targeting specific PLC functionality.

141

142

**Standard CIP Classes:**

143

144

```python

145

# Common CIP class codes

146

CIP_CLASSES = {

147

0x01: 'Identity', # Device identity information

148

0x02: 'Message Router', # Message routing

149

0x04: 'Assembly', # I/O assembly objects

150

0x05: 'Connection', # Connection objects

151

0x06: 'Connection Manager', # Connection management

152

0x0F: 'Parameter', # Parameter objects

153

0x1A: 'File', # File objects

154

0x6B: 'Symbol', # Tag/symbol objects

155

0x6C: 'Template', # UDT template objects

156

0x8B: 'Time Sync', # Time synchronization

157

}

158

159

# Examples of accessing different classes

160

def get_device_identity(comm):

161

"""Get complete device identity information."""

162

response = comm.Message(0x0E, 0x01, 0x01) # Get all Identity attributes

163

return response

164

165

def get_connection_info(comm):

166

"""Get connection manager information."""

167

response = comm.Message(0x01, 0x06, 0x01, 0x01) # Get connection manager status

168

return response

169

```

170

171

### Advanced Diagnostic Operations

172

173

Use custom CIP messages for advanced diagnostics and system information gathering.

174

175

```python

176

def advanced_diagnostics(plc_ip):

177

"""Perform advanced PLC diagnostics using CIP messages."""

178

179

diagnostics = {

180

'identity': {},

181

'connection_status': {},

182

'memory_info': {},

183

'communication_status': {},

184

'errors': []

185

}

186

187

with PLC() as comm:

188

comm.IPAddress = plc_ip

189

190

try:

191

# Get Identity Object details

192

identity_response = comm.Message(0x0E, 0x01, 0x01)

193

if identity_response.Status == 'Success':

194

# Parse identity data (simplified)

195

data = identity_response.Value

196

if len(data) >= 60:

197

import struct

198

# Extract key identity fields (byte positions may vary)

199

vendor_id = struct.unpack('<H', data[48:50])[0]

200

device_type = struct.unpack('<H', data[50:52])[0]

201

product_code = struct.unpack('<H', data[52:54])[0]

202

revision = struct.unpack('<BB', data[54:56])

203

204

diagnostics['identity'] = {

205

'vendor_id': vendor_id,

206

'device_type': device_type,

207

'product_code': product_code,

208

'revision': f"{revision[0]}.{revision[1]}"

209

}

210

print(f"Identity: Vendor={vendor_id}, Type={device_type}, Code={product_code}")

211

else:

212

diagnostics['errors'].append(f"Identity query failed: {identity_response.Status}")

213

214

# Get Connection Manager status

215

conn_response = comm.Message(0x01, 0x06, 0x01, 0x01)

216

if conn_response.Status == 'Success':

217

diagnostics['connection_status']['raw_data'] = len(conn_response.Value)

218

print(f"Connection manager responded with {len(conn_response.Value)} bytes")

219

else:

220

diagnostics['errors'].append(f"Connection status failed: {conn_response.Status}")

221

222

# Get Time Sync Object status

223

time_response = comm.Message(0x01, 0x8B, 0x01, 0x01)

224

if time_response.Status == 'Success':

225

diagnostics['time_sync'] = {'available': True}

226

print("Time synchronization object accessible")

227

else:

228

diagnostics['time_sync'] = {'available': False}

229

print(f"Time sync not available: {time_response.Status}")

230

231

except Exception as e:

232

diagnostics['errors'].append(f"Diagnostic exception: {e}")

233

234

return diagnostics

235

236

# Usage

237

diag_results = advanced_diagnostics('192.168.1.100')

238

print(f"Diagnostic completed with {len(diag_results['errors'])} errors")

239

```

240

241

### Custom Service Implementation

242

243

Implement custom services for specialized PLC interactions.

244

245

```python

246

class CustomPLCService:

247

"""Custom PLC service implementation using CIP messaging."""

248

249

def __init__(self, plc_ip):

250

self.plc_ip = plc_ip

251

self.comm = None

252

253

def __enter__(self):

254

self.comm = PLC()

255

self.comm.IPAddress = self.plc_ip

256

return self

257

258

def __exit__(self, exc_type, exc_val, exc_tb):

259

if self.comm:

260

self.comm.Close()

261

262

def read_vendor_specific_data(self, class_id, instance, attribute):

263

"""Read vendor-specific data using custom CIP messages."""

264

if not self.comm:

265

raise RuntimeError("Service not initialized - use with statement")

266

267

response = self.comm.Message(0x01, class_id, instance, attribute)

268

if response.Status == 'Success':

269

return response.Value

270

else:

271

raise RuntimeError(f"Vendor data read failed: {response.Status}")

272

273

def send_custom_command(self, service_code, class_id, instance, data=b''):

274

"""Send custom command to PLC."""

275

if not self.comm:

276

raise RuntimeError("Service not initialized - use with statement")

277

278

response = self.comm.Message(service_code, class_id, instance, data=data)

279

return response

280

281

def bulk_attribute_read(self, class_id, instance, attribute_list):

282

"""Read multiple attributes efficiently."""

283

if not self.comm:

284

raise RuntimeError("Service not initialized - use with statement")

285

286

# Use Get_Attribute_List service (0x03)

287

response = self.comm.Message(0x03, class_id, instance, attribute_list)

288

if response.Status == 'Success':

289

# Parse response data to extract individual attribute values

290

return self._parse_attribute_list_response(response.Value, attribute_list)

291

else:

292

raise RuntimeError(f"Bulk read failed: {response.Status}")

293

294

def _parse_attribute_list_response(self, data, attribute_list):

295

"""Parse response from Get_Attribute_List service."""

296

# Implementation depends on specific attribute types and PLC

297

# This is a simplified example

298

attributes = {}

299

offset = 44 # Skip CIP header

300

301

for attr_id in attribute_list:

302

if offset + 2 <= len(data):

303

import struct

304

value = struct.unpack('<H', data[offset:offset+2])[0]

305

attributes[attr_id] = value

306

offset += 2

307

308

return attributes

309

310

# Usage example

311

with CustomPLCService('192.168.1.100') as service:

312

try:

313

# Read vendor-specific information

314

vendor_data = service.read_vendor_specific_data(0x01, 0x01, 0x01)

315

print(f"Vendor data: {vendor_data}")

316

317

# Read multiple attributes at once

318

attributes = service.bulk_attribute_read(0x01, 0x01, [0x01, 0x02, 0x03])

319

print(f"Bulk attributes: {attributes}")

320

321

# Send custom command

322

response = service.send_custom_command(0x05, 0x01, 0x01) # Reset command

323

print(f"Custom command result: {response.Status}")

324

325

except RuntimeError as e:

326

print(f"Service error: {e}")

327

```

328

329

### Message Data Parsing

330

331

Utilities for parsing raw CIP message responses.

332

333

```python

334

import struct

335

336

def parse_cip_response(response_data):

337

"""Parse raw CIP response data."""

338

if len(response_data) < 44:

339

return {'error': 'Response too short'}

340

341

# CIP response structure (simplified)

342

parsed = {

343

'encap_command': struct.unpack('<H', response_data[0:2])[0],

344

'encap_length': struct.unpack('<H', response_data[2:4])[0],

345

'encap_session': struct.unpack('<I', response_data[4:8])[0],

346

'encap_status': struct.unpack('<I', response_data[8:12])[0],

347

'service_code': response_data[40] if len(response_data) > 40 else 0,

348

'response_status': response_data[42] if len(response_data) > 42 else 0,

349

'data_start': 44,

350

'data': response_data[44:] if len(response_data) > 44 else b''

351

}

352

353

return parsed

354

355

def extract_attribute_data(response_data, attribute_type='uint16'):

356

"""Extract attribute data based on type."""

357

parsed = parse_cip_response(response_data)

358

359

if parsed.get('error'):

360

return None

361

362

data = parsed['data']

363

if not data:

364

return None

365

366

if attribute_type == 'uint16':

367

return struct.unpack('<H', data[0:2])[0] if len(data) >= 2 else None

368

elif attribute_type == 'uint32':

369

return struct.unpack('<I', data[0:4])[0] if len(data) >= 4 else None

370

elif attribute_type == 'string':

371

if len(data) >= 2:

372

str_len = struct.unpack('<H', data[0:2])[0]

373

if len(data) >= 2 + str_len:

374

return data[2:2+str_len].decode('utf-8', errors='ignore')

375

376

return data

377

378

# Usage in custom messaging

379

with PLC() as comm:

380

comm.IPAddress = '192.168.1.100'

381

382

# Get vendor ID using custom parsing

383

response = comm.Message(0x01, 0x01, 0x01, 0x01)

384

if response.Status == 'Success':

385

vendor_id = extract_attribute_data(response.Value, 'uint16')

386

print(f"Parsed Vendor ID: {vendor_id}")

387

388

# Get product name (string attribute)

389

response = comm.Message(0x01, 0x01, 0x01, 0x07) # Product name attribute

390

if response.Status == 'Success':

391

product_name = extract_attribute_data(response.Value, 'string')

392

print(f"Product Name: {product_name}")

393

```

394

395

### Error Handling for Advanced Messaging

396

397

Advanced messaging requires comprehensive error handling due to the low-level nature of CIP communication.

398

399

```python

400

def safe_cip_message(comm, service, class_id, instance, attribute=None, data=b'', retries=3):

401

"""Send CIP message with error handling and retries."""

402

403

for attempt in range(retries):

404

try:

405

response = comm.Message(service, class_id, instance, attribute, data)

406

407

if response.Status == 'Success':

408

return response

409

else:

410

print(f"CIP message attempt {attempt + 1} failed: {response.Status}")

411

412

# Check for specific error conditions

413

if 'service not supported' in response.Status.lower():

414

print("Service not supported by this device")

415

break

416

elif 'path destination unknown' in response.Status.lower():

417

print("Invalid class/instance/attribute path")

418

break

419

elif 'connection' in response.Status.lower():

420

print("Connection issue - will retry")

421

import time

422

time.sleep(1)

423

continue

424

425

except Exception as e:

426

print(f"CIP message exception on attempt {attempt + 1}: {e}")

427

if attempt < retries - 1:

428

import time

429

time.sleep(1)

430

431

return None

432

433

# Usage with error handling

434

with PLC() as comm:

435

comm.IPAddress = '192.168.1.100'

436

437

# Safe CIP message with retries

438

response = safe_cip_message(comm, 0x01, 0x01, 0x01, 0x01)

439

if response:

440

print("CIP message successful")

441

# Process response...

442

else:

443

print("CIP message failed after all retries")

444

```

445

446

### Performance Considerations

447

448

Advanced messaging performance optimization techniques.

449

450

```python

451

def optimized_bulk_operations(plc_ip, operations):

452

"""Perform bulk CIP operations efficiently."""

453

454

results = []

455

456

with PLC() as comm:

457

comm.IPAddress = plc_ip

458

comm.SocketTimeout = 30.0 # Longer timeout for bulk operations

459

460

# Group operations by class to minimize context switching

461

operations_by_class = {}

462

for op in operations:

463

class_id = op['class']

464

if class_id not in operations_by_class:

465

operations_by_class[class_id] = []

466

operations_by_class[class_id].append(op)

467

468

# Process operations class by class

469

for class_id, class_ops in operations_by_class.items():

470

print(f"Processing {len(class_ops)} operations for class 0x{class_id:02x}")

471

472

for op in class_ops:

473

response = comm.Message(

474

op['service'],

475

op['class'],

476

op['instance'],

477

op.get('attribute'),

478

op.get('data', b'')

479

)

480

481

results.append({

482

'operation': op,

483

'response': response,

484

'success': response.Status == 'Success'

485

})

486

487

return results

488

489

# Usage for bulk operations

490

bulk_ops = [

491

{'service': 0x01, 'class': 0x01, 'instance': 0x01, 'attribute': 0x01}, # Vendor ID

492

{'service': 0x01, 'class': 0x01, 'instance': 0x01, 'attribute': 0x02}, # Device Type

493

{'service': 0x01, 'class': 0x01, 'instance': 0x01, 'attribute': 0x03}, # Product Code

494

{'service': 0x01, 'class': 0x8B, 'instance': 0x01, 'attribute': 0x0B}, # Time status

495

]

496

497

results = optimized_bulk_operations('192.168.1.100', bulk_ops)

498

successful_ops = sum(1 for r in results if r['success'])

499

print(f"Completed {successful_ops}/{len(results)} operations successfully")

500

```