Communication driver for reading and writing data from Rockwell Automation ControlLogix, CompactLogix, and Micro8xx PLCs over Ethernet I/P
—
Low-level CIP (Common Industrial Protocol) messaging capabilities for custom communication, diagnostic operations, and advanced PLC interactions beyond standard tag operations. These functions enable direct protocol-level communication and custom service implementation.
Send custom CIP messages directly to the PLC for advanced operations not covered by standard tag read/write functions.
def Message(cip_service, cip_class, cip_instance, cip_attribute=None, data=b''):
"""
Send custom CIP message to the PLC.
Args:
cip_service (int): CIP service code (e.g., 0x01=Get_Attribute_Single, 0x0E=Get_Attribute_List)
cip_class (int): CIP class code identifying the object class
cip_instance (int): CIP instance number within the class
cip_attribute (int or list, optional): CIP attribute number(s) to access
data (bytes): Message data payload for services that require data
Returns:
Response: Object containing raw response from PLC
- Value: Raw response data (bytes)
- Status: "Success" or error description
"""Usage Examples:
from pylogix import PLC
with PLC() as comm:
comm.IPAddress = '192.168.1.100'
# Get Identity Object (Class 0x01, Instance 0x01)
# Service 0x01 = Get_Attribute_Single, Attribute 0x01 = Vendor ID
response = comm.Message(0x01, 0x01, 0x01, 0x01)
if response.Status == 'Success':
# Response data contains the vendor ID
import struct
vendor_id = struct.unpack('<H', response.Value[44:46])[0] # Extract vendor ID
print(f"Vendor ID: {vendor_id}")
# Get multiple attributes at once
# Service 0x03 = Get_Attribute_List
response = comm.Message(0x03, 0x01, 0x01, [0x01, 0x02, 0x03]) # Vendor, Device Type, Product Code
if response.Status == 'Success':
print(f"Multiple attributes retrieved: {len(response.Value)} bytes")
# Send custom data to a service
custom_data = b'\x00\x01\x02\x03' # Example data
response = comm.Message(0x10, 0x04, 0x01, data=custom_data)Listen for incoming CIP messages, typically used for unsolicited data or event notifications from the PLC.
def ReceiveMessage(ip_address, callback):
"""
Listen for incoming CIP messages from the PLC.
Args:
ip_address (str): IP address to listen on
callback (function): Callback function to handle received messages
Signature: callback(Response) where Response contains
TagName, Value, and Status
Returns:
Response: Listener setup status
"""Usage Examples:
def message_handler(response):
"""Handle incoming CIP messages."""
if response.Status == 'Success':
print(f"Received message for tag: {response.TagName}")
print(f"Value: {response.Value}")
print(f"Timestamp: {datetime.now()}")
else:
print(f"Message receive error: {response.Status}")
with PLC() as comm:
comm.IPAddress = '192.168.1.100'
# Start listening for messages
response = comm.ReceiveMessage('192.168.1.100', message_handler)
if response.Status == 'Success':
print("Message listener started")
# Keep the program running to receive messages
import time
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping message listener")
else:
print(f"Failed to start listener: {response.Status}")Understanding standard CIP services for effective custom messaging.
Standard CIP Services:
# Common CIP service codes
CIP_SERVICES = {
0x01: 'Get_Attribute_Single', # Read single attribute
0x02: 'Set_Attribute_Single', # Write single attribute
0x03: 'Get_Attribute_List', # Read multiple attributes
0x04: 'Set_Attribute_List', # Write multiple attributes
0x05: 'Reset', # Reset service
0x06: 'Start', # Start service
0x07: 'Stop', # Stop service
0x08: 'Create', # Create instance
0x09: 'Delete', # Delete instance
0x0A: 'Multiple_Service_Request', # Multiple services in one message
0x0E: 'Get_Attribute_All', # Get all attributes
0x10: 'Set_Attribute_All', # Set all attributes
0x4C: 'Read_Tag', # Tag read service (PyLogix internal)
0x4D: 'Write_Tag', # Tag write service (PyLogix internal)
0x52: 'Read_Tag_Fragmented', # Fragmented read (PyLogix internal)
0x53: 'Write_Tag_Fragmented', # Fragmented write (PyLogix internal)
}
# Example: Get all attributes of Identity Object
response = comm.Message(0x0E, 0x01, 0x01) # Get_Attribute_All from Identity ObjectUnderstanding CIP object classes for targeting specific PLC functionality.
Standard CIP Classes:
# Common CIP class codes
CIP_CLASSES = {
0x01: 'Identity', # Device identity information
0x02: 'Message Router', # Message routing
0x04: 'Assembly', # I/O assembly objects
0x05: 'Connection', # Connection objects
0x06: 'Connection Manager', # Connection management
0x0F: 'Parameter', # Parameter objects
0x1A: 'File', # File objects
0x6B: 'Symbol', # Tag/symbol objects
0x6C: 'Template', # UDT template objects
0x8B: 'Time Sync', # Time synchronization
}
# Examples of accessing different classes
def get_device_identity(comm):
"""Get complete device identity information."""
response = comm.Message(0x0E, 0x01, 0x01) # Get all Identity attributes
return response
def get_connection_info(comm):
"""Get connection manager information."""
response = comm.Message(0x01, 0x06, 0x01, 0x01) # Get connection manager status
return responseUse custom CIP messages for advanced diagnostics and system information gathering.
def advanced_diagnostics(plc_ip):
"""Perform advanced PLC diagnostics using CIP messages."""
diagnostics = {
'identity': {},
'connection_status': {},
'memory_info': {},
'communication_status': {},
'errors': []
}
with PLC() as comm:
comm.IPAddress = plc_ip
try:
# Get Identity Object details
identity_response = comm.Message(0x0E, 0x01, 0x01)
if identity_response.Status == 'Success':
# Parse identity data (simplified)
data = identity_response.Value
if len(data) >= 60:
import struct
# Extract key identity fields (byte positions may vary)
vendor_id = struct.unpack('<H', data[48:50])[0]
device_type = struct.unpack('<H', data[50:52])[0]
product_code = struct.unpack('<H', data[52:54])[0]
revision = struct.unpack('<BB', data[54:56])
diagnostics['identity'] = {
'vendor_id': vendor_id,
'device_type': device_type,
'product_code': product_code,
'revision': f"{revision[0]}.{revision[1]}"
}
print(f"Identity: Vendor={vendor_id}, Type={device_type}, Code={product_code}")
else:
diagnostics['errors'].append(f"Identity query failed: {identity_response.Status}")
# Get Connection Manager status
conn_response = comm.Message(0x01, 0x06, 0x01, 0x01)
if conn_response.Status == 'Success':
diagnostics['connection_status']['raw_data'] = len(conn_response.Value)
print(f"Connection manager responded with {len(conn_response.Value)} bytes")
else:
diagnostics['errors'].append(f"Connection status failed: {conn_response.Status}")
# Get Time Sync Object status
time_response = comm.Message(0x01, 0x8B, 0x01, 0x01)
if time_response.Status == 'Success':
diagnostics['time_sync'] = {'available': True}
print("Time synchronization object accessible")
else:
diagnostics['time_sync'] = {'available': False}
print(f"Time sync not available: {time_response.Status}")
except Exception as e:
diagnostics['errors'].append(f"Diagnostic exception: {e}")
return diagnostics
# Usage
diag_results = advanced_diagnostics('192.168.1.100')
print(f"Diagnostic completed with {len(diag_results['errors'])} errors")Implement custom services for specialized PLC interactions.
class CustomPLCService:
"""Custom PLC service implementation using CIP messaging."""
def __init__(self, plc_ip):
self.plc_ip = plc_ip
self.comm = None
def __enter__(self):
self.comm = PLC()
self.comm.IPAddress = self.plc_ip
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self.comm:
self.comm.Close()
def read_vendor_specific_data(self, class_id, instance, attribute):
"""Read vendor-specific data using custom CIP messages."""
if not self.comm:
raise RuntimeError("Service not initialized - use with statement")
response = self.comm.Message(0x01, class_id, instance, attribute)
if response.Status == 'Success':
return response.Value
else:
raise RuntimeError(f"Vendor data read failed: {response.Status}")
def send_custom_command(self, service_code, class_id, instance, data=b''):
"""Send custom command to PLC."""
if not self.comm:
raise RuntimeError("Service not initialized - use with statement")
response = self.comm.Message(service_code, class_id, instance, data=data)
return response
def bulk_attribute_read(self, class_id, instance, attribute_list):
"""Read multiple attributes efficiently."""
if not self.comm:
raise RuntimeError("Service not initialized - use with statement")
# Use Get_Attribute_List service (0x03)
response = self.comm.Message(0x03, class_id, instance, attribute_list)
if response.Status == 'Success':
# Parse response data to extract individual attribute values
return self._parse_attribute_list_response(response.Value, attribute_list)
else:
raise RuntimeError(f"Bulk read failed: {response.Status}")
def _parse_attribute_list_response(self, data, attribute_list):
"""Parse response from Get_Attribute_List service."""
# Implementation depends on specific attribute types and PLC
# This is a simplified example
attributes = {}
offset = 44 # Skip CIP header
for attr_id in attribute_list:
if offset + 2 <= len(data):
import struct
value = struct.unpack('<H', data[offset:offset+2])[0]
attributes[attr_id] = value
offset += 2
return attributes
# Usage example
with CustomPLCService('192.168.1.100') as service:
try:
# Read vendor-specific information
vendor_data = service.read_vendor_specific_data(0x01, 0x01, 0x01)
print(f"Vendor data: {vendor_data}")
# Read multiple attributes at once
attributes = service.bulk_attribute_read(0x01, 0x01, [0x01, 0x02, 0x03])
print(f"Bulk attributes: {attributes}")
# Send custom command
response = service.send_custom_command(0x05, 0x01, 0x01) # Reset command
print(f"Custom command result: {response.Status}")
except RuntimeError as e:
print(f"Service error: {e}")Utilities for parsing raw CIP message responses.
import struct
def parse_cip_response(response_data):
"""Parse raw CIP response data."""
if len(response_data) < 44:
return {'error': 'Response too short'}
# CIP response structure (simplified)
parsed = {
'encap_command': struct.unpack('<H', response_data[0:2])[0],
'encap_length': struct.unpack('<H', response_data[2:4])[0],
'encap_session': struct.unpack('<I', response_data[4:8])[0],
'encap_status': struct.unpack('<I', response_data[8:12])[0],
'service_code': response_data[40] if len(response_data) > 40 else 0,
'response_status': response_data[42] if len(response_data) > 42 else 0,
'data_start': 44,
'data': response_data[44:] if len(response_data) > 44 else b''
}
return parsed
def extract_attribute_data(response_data, attribute_type='uint16'):
"""Extract attribute data based on type."""
parsed = parse_cip_response(response_data)
if parsed.get('error'):
return None
data = parsed['data']
if not data:
return None
if attribute_type == 'uint16':
return struct.unpack('<H', data[0:2])[0] if len(data) >= 2 else None
elif attribute_type == 'uint32':
return struct.unpack('<I', data[0:4])[0] if len(data) >= 4 else None
elif attribute_type == 'string':
if len(data) >= 2:
str_len = struct.unpack('<H', data[0:2])[0]
if len(data) >= 2 + str_len:
return data[2:2+str_len].decode('utf-8', errors='ignore')
return data
# Usage in custom messaging
with PLC() as comm:
comm.IPAddress = '192.168.1.100'
# Get vendor ID using custom parsing
response = comm.Message(0x01, 0x01, 0x01, 0x01)
if response.Status == 'Success':
vendor_id = extract_attribute_data(response.Value, 'uint16')
print(f"Parsed Vendor ID: {vendor_id}")
# Get product name (string attribute)
response = comm.Message(0x01, 0x01, 0x01, 0x07) # Product name attribute
if response.Status == 'Success':
product_name = extract_attribute_data(response.Value, 'string')
print(f"Product Name: {product_name}")Advanced messaging requires comprehensive error handling due to the low-level nature of CIP communication.
def safe_cip_message(comm, service, class_id, instance, attribute=None, data=b'', retries=3):
"""Send CIP message with error handling and retries."""
for attempt in range(retries):
try:
response = comm.Message(service, class_id, instance, attribute, data)
if response.Status == 'Success':
return response
else:
print(f"CIP message attempt {attempt + 1} failed: {response.Status}")
# Check for specific error conditions
if 'service not supported' in response.Status.lower():
print("Service not supported by this device")
break
elif 'path destination unknown' in response.Status.lower():
print("Invalid class/instance/attribute path")
break
elif 'connection' in response.Status.lower():
print("Connection issue - will retry")
import time
time.sleep(1)
continue
except Exception as e:
print(f"CIP message exception on attempt {attempt + 1}: {e}")
if attempt < retries - 1:
import time
time.sleep(1)
return None
# Usage with error handling
with PLC() as comm:
comm.IPAddress = '192.168.1.100'
# Safe CIP message with retries
response = safe_cip_message(comm, 0x01, 0x01, 0x01, 0x01)
if response:
print("CIP message successful")
# Process response...
else:
print("CIP message failed after all retries")Advanced messaging performance optimization techniques.
def optimized_bulk_operations(plc_ip, operations):
"""Perform bulk CIP operations efficiently."""
results = []
with PLC() as comm:
comm.IPAddress = plc_ip
comm.SocketTimeout = 30.0 # Longer timeout for bulk operations
# Group operations by class to minimize context switching
operations_by_class = {}
for op in operations:
class_id = op['class']
if class_id not in operations_by_class:
operations_by_class[class_id] = []
operations_by_class[class_id].append(op)
# Process operations class by class
for class_id, class_ops in operations_by_class.items():
print(f"Processing {len(class_ops)} operations for class 0x{class_id:02x}")
for op in class_ops:
response = comm.Message(
op['service'],
op['class'],
op['instance'],
op.get('attribute'),
op.get('data', b'')
)
results.append({
'operation': op,
'response': response,
'success': response.Status == 'Success'
})
return results
# Usage for bulk operations
bulk_ops = [
{'service': 0x01, 'class': 0x01, 'instance': 0x01, 'attribute': 0x01}, # Vendor ID
{'service': 0x01, 'class': 0x01, 'instance': 0x01, 'attribute': 0x02}, # Device Type
{'service': 0x01, 'class': 0x01, 'instance': 0x01, 'attribute': 0x03}, # Product Code
{'service': 0x01, 'class': 0x8B, 'instance': 0x01, 'attribute': 0x0B}, # Time status
]
results = optimized_bulk_operations('192.168.1.100', bulk_ops)
successful_ops = sum(1 for r in results if r['success'])
print(f"Completed {successful_ops}/{len(results)} operations successfully")Install with Tessl CLI
npx tessl i tessl/pypi-pylogix