CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pylogix

Communication driver for reading and writing data from Rockwell Automation ControlLogix, CompactLogix, and Micro8xx PLCs over Ethernet I/P

Pending
Overview
Eval results
Files

advanced-messaging.mddocs/

Advanced Messaging

Low-level CIP (Common Industrial Protocol) messaging capabilities for custom communication, diagnostic operations, and advanced PLC interactions beyond standard tag operations. These functions enable direct protocol-level communication and custom service implementation.

Capabilities

Custom CIP Message Sending

Send custom CIP messages directly to the PLC for advanced operations not covered by standard tag read/write functions.

def Message(cip_service, cip_class, cip_instance, cip_attribute=None, data=b''):
    """
    Send custom CIP message to the PLC.
    
    Args:
        cip_service (int): CIP service code (e.g., 0x01=Get_Attribute_Single, 0x0E=Get_Attribute_List)
        cip_class (int): CIP class code identifying the object class
        cip_instance (int): CIP instance number within the class
        cip_attribute (int or list, optional): CIP attribute number(s) to access
        data (bytes): Message data payload for services that require data
    
    Returns:
        Response: Object containing raw response from PLC
                 - Value: Raw response data (bytes)
                 - Status: "Success" or error description
    """

Usage Examples:

from pylogix import PLC

with PLC() as comm:
    comm.IPAddress = '192.168.1.100'
    
    # Get Identity Object (Class 0x01, Instance 0x01)
    # Service 0x01 = Get_Attribute_Single, Attribute 0x01 = Vendor ID
    response = comm.Message(0x01, 0x01, 0x01, 0x01)
    if response.Status == 'Success':
        # Response data contains the vendor ID
        import struct
        vendor_id = struct.unpack('<H', response.Value[44:46])[0]  # Extract vendor ID
        print(f"Vendor ID: {vendor_id}")
    
    # Get multiple attributes at once
    # Service 0x03 = Get_Attribute_List
    response = comm.Message(0x03, 0x01, 0x01, [0x01, 0x02, 0x03])  # Vendor, Device Type, Product Code
    if response.Status == 'Success':
        print(f"Multiple attributes retrieved: {len(response.Value)} bytes")
    
    # Send custom data to a service
    custom_data = b'\x00\x01\x02\x03'  # Example data
    response = comm.Message(0x10, 0x04, 0x01, data=custom_data)

CIP Message Receiving

Listen for incoming CIP messages, typically used for unsolicited data or event notifications from the PLC.

def ReceiveMessage(ip_address, callback):
    """
    Listen for incoming CIP messages from the PLC.
    
    Args:
        ip_address (str): IP address to listen on
        callback (function): Callback function to handle received messages
                           Signature: callback(Response) where Response contains
                           TagName, Value, and Status
    
    Returns:
        Response: Listener setup status
    """

Usage Examples:

def message_handler(response):
    """Handle incoming CIP messages."""
    if response.Status == 'Success':
        print(f"Received message for tag: {response.TagName}")
        print(f"Value: {response.Value}")
        print(f"Timestamp: {datetime.now()}")
    else:
        print(f"Message receive error: {response.Status}")

with PLC() as comm:
    comm.IPAddress = '192.168.1.100'
    
    # Start listening for messages
    response = comm.ReceiveMessage('192.168.1.100', message_handler)
    if response.Status == 'Success':
        print("Message listener started")
        # Keep the program running to receive messages
        import time
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            print("Stopping message listener")
    else:
        print(f"Failed to start listener: {response.Status}")

Common CIP Services

Understanding standard CIP services for effective custom messaging.

Standard CIP Services:

# Common CIP service codes
CIP_SERVICES = {
    0x01: 'Get_Attribute_Single',      # Read single attribute
    0x02: 'Set_Attribute_Single',      # Write single attribute  
    0x03: 'Get_Attribute_List',        # Read multiple attributes
    0x04: 'Set_Attribute_List',        # Write multiple attributes
    0x05: 'Reset',                     # Reset service
    0x06: 'Start',                     # Start service
    0x07: 'Stop',                      # Stop service
    0x08: 'Create',                    # Create instance
    0x09: 'Delete',                    # Delete instance
    0x0A: 'Multiple_Service_Request',  # Multiple services in one message
    0x0E: 'Get_Attribute_All',         # Get all attributes
    0x10: 'Set_Attribute_All',         # Set all attributes
    0x4C: 'Read_Tag',                  # Tag read service (PyLogix internal)
    0x4D: 'Write_Tag',                 # Tag write service (PyLogix internal)
    0x52: 'Read_Tag_Fragmented',       # Fragmented read (PyLogix internal)
    0x53: 'Write_Tag_Fragmented',      # Fragmented write (PyLogix internal)
}

# Example: Get all attributes of Identity Object
response = comm.Message(0x0E, 0x01, 0x01)  # Get_Attribute_All from Identity Object

Common CIP Classes

Understanding CIP object classes for targeting specific PLC functionality.

Standard CIP Classes:

# Common CIP class codes
CIP_CLASSES = {
    0x01: 'Identity',                  # Device identity information
    0x02: 'Message Router',            # Message routing
    0x04: 'Assembly',                  # I/O assembly objects
    0x05: 'Connection',                # Connection objects
    0x06: 'Connection Manager',        # Connection management
    0x0F: 'Parameter',                 # Parameter objects
    0x1A: 'File',                      # File objects
    0x6B: 'Symbol',                    # Tag/symbol objects
    0x6C: 'Template',                  # UDT template objects
    0x8B: 'Time Sync',                 # Time synchronization
}

# Examples of accessing different classes
def get_device_identity(comm):
    """Get complete device identity information."""
    response = comm.Message(0x0E, 0x01, 0x01)  # Get all Identity attributes
    return response

def get_connection_info(comm):
    """Get connection manager information."""  
    response = comm.Message(0x01, 0x06, 0x01, 0x01)  # Get connection manager status
    return response

Advanced Diagnostic Operations

Use custom CIP messages for advanced diagnostics and system information gathering.

def advanced_diagnostics(plc_ip):
    """Perform advanced PLC diagnostics using CIP messages."""
    
    diagnostics = {
        'identity': {},
        'connection_status': {},
        'memory_info': {},
        'communication_status': {},
        'errors': []
    }
    
    with PLC() as comm:
        comm.IPAddress = plc_ip
        
        try:
            # Get Identity Object details
            identity_response = comm.Message(0x0E, 0x01, 0x01)
            if identity_response.Status == 'Success':
                # Parse identity data (simplified)
                data = identity_response.Value
                if len(data) >= 60:
                    import struct
                    # Extract key identity fields (byte positions may vary)
                    vendor_id = struct.unpack('<H', data[48:50])[0]
                    device_type = struct.unpack('<H', data[50:52])[0]
                    product_code = struct.unpack('<H', data[52:54])[0]
                    revision = struct.unpack('<BB', data[54:56])
                    
                    diagnostics['identity'] = {
                        'vendor_id': vendor_id,
                        'device_type': device_type,
                        'product_code': product_code,
                        'revision': f"{revision[0]}.{revision[1]}"
                    }
                    print(f"Identity: Vendor={vendor_id}, Type={device_type}, Code={product_code}")
            else:
                diagnostics['errors'].append(f"Identity query failed: {identity_response.Status}")
            
            # Get Connection Manager status
            conn_response = comm.Message(0x01, 0x06, 0x01, 0x01)
            if conn_response.Status == 'Success':
                diagnostics['connection_status']['raw_data'] = len(conn_response.Value)
                print(f"Connection manager responded with {len(conn_response.Value)} bytes")
            else:
                diagnostics['errors'].append(f"Connection status failed: {conn_response.Status}")
            
            # Get Time Sync Object status
            time_response = comm.Message(0x01, 0x8B, 0x01, 0x01)
            if time_response.Status == 'Success':
                diagnostics['time_sync'] = {'available': True}
                print("Time synchronization object accessible")
            else:
                diagnostics['time_sync'] = {'available': False}
                print(f"Time sync not available: {time_response.Status}")
            
        except Exception as e:
            diagnostics['errors'].append(f"Diagnostic exception: {e}")
    
    return diagnostics

# Usage
diag_results = advanced_diagnostics('192.168.1.100')
print(f"Diagnostic completed with {len(diag_results['errors'])} errors")

Custom Service Implementation

Implement custom services for specialized PLC interactions.

class CustomPLCService:
    """Custom PLC service implementation using CIP messaging."""
    
    def __init__(self, plc_ip):
        self.plc_ip = plc_ip
        self.comm = None
    
    def __enter__(self):
        self.comm = PLC()
        self.comm.IPAddress = self.plc_ip
        return self
    
    def __exit__(self, exc_type, exc_val, exc_tb):
        if self.comm:
            self.comm.Close()
    
    def read_vendor_specific_data(self, class_id, instance, attribute):
        """Read vendor-specific data using custom CIP messages."""
        if not self.comm:
            raise RuntimeError("Service not initialized - use with statement")
        
        response = self.comm.Message(0x01, class_id, instance, attribute)
        if response.Status == 'Success':
            return response.Value
        else:
            raise RuntimeError(f"Vendor data read failed: {response.Status}")
    
    def send_custom_command(self, service_code, class_id, instance, data=b''):
        """Send custom command to PLC."""
        if not self.comm:
            raise RuntimeError("Service not initialized - use with statement")
        
        response = self.comm.Message(service_code, class_id, instance, data=data)
        return response
    
    def bulk_attribute_read(self, class_id, instance, attribute_list):
        """Read multiple attributes efficiently."""
        if not self.comm:
            raise RuntimeError("Service not initialized - use with statement")
        
        # Use Get_Attribute_List service (0x03)
        response = self.comm.Message(0x03, class_id, instance, attribute_list)
        if response.Status == 'Success':
            # Parse response data to extract individual attribute values
            return self._parse_attribute_list_response(response.Value, attribute_list)
        else:
            raise RuntimeError(f"Bulk read failed: {response.Status}")
    
    def _parse_attribute_list_response(self, data, attribute_list):
        """Parse response from Get_Attribute_List service."""
        # Implementation depends on specific attribute types and PLC
        # This is a simplified example
        attributes = {}
        offset = 44  # Skip CIP header
        
        for attr_id in attribute_list:
            if offset + 2 <= len(data):
                import struct
                value = struct.unpack('<H', data[offset:offset+2])[0]
                attributes[attr_id] = value
                offset += 2
        
        return attributes

# Usage example
with CustomPLCService('192.168.1.100') as service:
    try:
        # Read vendor-specific information
        vendor_data = service.read_vendor_specific_data(0x01, 0x01, 0x01)
        print(f"Vendor data: {vendor_data}")
        
        # Read multiple attributes at once
        attributes = service.bulk_attribute_read(0x01, 0x01, [0x01, 0x02, 0x03])
        print(f"Bulk attributes: {attributes}")
        
        # Send custom command
        response = service.send_custom_command(0x05, 0x01, 0x01)  # Reset command
        print(f"Custom command result: {response.Status}")
        
    except RuntimeError as e:
        print(f"Service error: {e}")

Message Data Parsing

Utilities for parsing raw CIP message responses.

import struct

def parse_cip_response(response_data):
    """Parse raw CIP response data."""
    if len(response_data) < 44:
        return {'error': 'Response too short'}
    
    # CIP response structure (simplified)
    parsed = {
        'encap_command': struct.unpack('<H', response_data[0:2])[0],
        'encap_length': struct.unpack('<H', response_data[2:4])[0],
        'encap_session': struct.unpack('<I', response_data[4:8])[0],
        'encap_status': struct.unpack('<I', response_data[8:12])[0],
        'service_code': response_data[40] if len(response_data) > 40 else 0,
        'response_status': response_data[42] if len(response_data) > 42 else 0,
        'data_start': 44,
        'data': response_data[44:] if len(response_data) > 44 else b''
    }
    
    return parsed

def extract_attribute_data(response_data, attribute_type='uint16'):
    """Extract attribute data based on type."""
    parsed = parse_cip_response(response_data)
    
    if parsed.get('error'):
        return None
    
    data = parsed['data']
    if not data:
        return None
    
    if attribute_type == 'uint16':
        return struct.unpack('<H', data[0:2])[0] if len(data) >= 2 else None
    elif attribute_type == 'uint32':
        return struct.unpack('<I', data[0:4])[0] if len(data) >= 4 else None
    elif attribute_type == 'string':
        if len(data) >= 2:
            str_len = struct.unpack('<H', data[0:2])[0]
            if len(data) >= 2 + str_len:
                return data[2:2+str_len].decode('utf-8', errors='ignore')
    
    return data

# Usage in custom messaging
with PLC() as comm:
    comm.IPAddress = '192.168.1.100'
    
    # Get vendor ID using custom parsing
    response = comm.Message(0x01, 0x01, 0x01, 0x01)
    if response.Status == 'Success':
        vendor_id = extract_attribute_data(response.Value, 'uint16')
        print(f"Parsed Vendor ID: {vendor_id}")
    
    # Get product name (string attribute)
    response = comm.Message(0x01, 0x01, 0x01, 0x07)  # Product name attribute
    if response.Status == 'Success':
        product_name = extract_attribute_data(response.Value, 'string')
        print(f"Product Name: {product_name}")

Error Handling for Advanced Messaging

Advanced messaging requires comprehensive error handling due to the low-level nature of CIP communication.

def safe_cip_message(comm, service, class_id, instance, attribute=None, data=b'', retries=3):
    """Send CIP message with error handling and retries."""
    
    for attempt in range(retries):
        try:
            response = comm.Message(service, class_id, instance, attribute, data)
            
            if response.Status == 'Success':
                return response
            else:
                print(f"CIP message attempt {attempt + 1} failed: {response.Status}")
                
                # Check for specific error conditions
                if 'service not supported' in response.Status.lower():
                    print("Service not supported by this device")
                    break
                elif 'path destination unknown' in response.Status.lower():
                    print("Invalid class/instance/attribute path")
                    break
                elif 'connection' in response.Status.lower():
                    print("Connection issue - will retry")
                    import time
                    time.sleep(1)
                    continue
                    
        except Exception as e:
            print(f"CIP message exception on attempt {attempt + 1}: {e}")
            if attempt < retries - 1:
                import time
                time.sleep(1)
    
    return None

# Usage with error handling
with PLC() as comm:
    comm.IPAddress = '192.168.1.100'
    
    # Safe CIP message with retries
    response = safe_cip_message(comm, 0x01, 0x01, 0x01, 0x01)
    if response:
        print("CIP message successful")
        # Process response...
    else:
        print("CIP message failed after all retries")

Performance Considerations

Advanced messaging performance optimization techniques.

def optimized_bulk_operations(plc_ip, operations):
    """Perform bulk CIP operations efficiently."""
    
    results = []
    
    with PLC() as comm:
        comm.IPAddress = plc_ip
        comm.SocketTimeout = 30.0  # Longer timeout for bulk operations
        
        # Group operations by class to minimize context switching
        operations_by_class = {}
        for op in operations:
            class_id = op['class']
            if class_id not in operations_by_class:
                operations_by_class[class_id] = []
            operations_by_class[class_id].append(op)
        
        # Process operations class by class
        for class_id, class_ops in operations_by_class.items():
            print(f"Processing {len(class_ops)} operations for class 0x{class_id:02x}")
            
            for op in class_ops:
                response = comm.Message(
                    op['service'],
                    op['class'],
                    op['instance'],
                    op.get('attribute'),
                    op.get('data', b'')
                )
                
                results.append({
                    'operation': op,
                    'response': response,
                    'success': response.Status == 'Success'
                })
    
    return results

# Usage for bulk operations
bulk_ops = [
    {'service': 0x01, 'class': 0x01, 'instance': 0x01, 'attribute': 0x01},  # Vendor ID
    {'service': 0x01, 'class': 0x01, 'instance': 0x01, 'attribute': 0x02},  # Device Type
    {'service': 0x01, 'class': 0x01, 'instance': 0x01, 'attribute': 0x03},  # Product Code
    {'service': 0x01, 'class': 0x8B, 'instance': 0x01, 'attribute': 0x0B}, # Time status
]

results = optimized_bulk_operations('192.168.1.100', bulk_ops)
successful_ops = sum(1 for r in results if r['success'])
print(f"Completed {successful_ops}/{len(results)} operations successfully")

Install with Tessl CLI

npx tessl i tessl/pypi-pylogix

docs

advanced-messaging.md

core-operations.md

device-discovery.md

index.md

tag-discovery.md

time-operations.md

tile.json