or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

advanced-messaging.mdcore-operations.mddevice-discovery.mdindex.mdtag-discovery.mdtime-operations.md
tile.json

tessl/pypi-pylogix

Communication driver for reading and writing data from Rockwell Automation ControlLogix, CompactLogix, and Micro8xx PLCs over Ethernet I/P

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pylogix@1.1.x

To install, run

npx @tessl/cli install tessl/pypi-pylogix@1.1.0

index.mddocs/

PyLogix

A Python communication driver that enables reading and writing data values from tags in Rockwell Automation ControlLogix, CompactLogix, and Micro8xx PLCs over Ethernet I/P. PyLogix provides a simple, Pythonic API for PLC communication without external dependencies, making it ideal for industrial automation applications, data acquisition systems, and monitoring tools.

Package Information

  • Package Name: pylogix
  • Language: Python
  • Installation: pip install pylogix
  • Compatibility: Python 2.7, Python 3.x, MicroPython 1.20.0+
  • Dependencies: None (standard library only)

Core Imports

from pylogix import PLC

Alternative imports for specific components:

from pylogix.eip import PLC
from pylogix.lgx_response import Response
from pylogix.lgx_tag import Tag, UDT
from pylogix.lgx_device import Device

Basic Usage

from pylogix import PLC

# Basic read/write operations
with PLC() as comm:
    comm.IPAddress = '192.168.1.100'
    
    # Read a single tag
    response = comm.Read('MyTagName')
    print(f"Tag: {response.TagName}, Value: {response.Value}, Status: {response.Status}")
    
    # Write a value to a tag
    response = comm.Write('OutputTag', 42)
    if response.Status == 'Success':
        print("Write successful")
    
    # Read multiple tags
    tags = ['Tag1', 'Tag2', 'Tag3']
    responses = comm.Read(tags)
    for resp in responses:
        print(f"{resp.TagName}: {resp.Value}")

# Manual connection management
comm = PLC()
comm.IPAddress = '192.168.1.100'
comm.ProcessorSlot = 0  # Default slot
response = comm.Read('MyTag')
comm.Close()  # Always close when done

Architecture

PyLogix follows a layered architecture designed for reliability and ease of use:

  • PLC Class: High-level interface providing read/write operations and device management
  • Connection Layer: Handles Ethernet I/P communication, connection pooling, and error recovery
  • CIP Protocol Layer: Implements Common Industrial Protocol (CIP) for Rockwell devices
  • Response System: Structured response objects with status codes and error handling
  • Tag Management: Automatic data type discovery, UDT handling, and tag metadata caching

The library supports both connected and unconnected messaging, automatic data type detection, and handles complex scenarios like bit manipulation, array operations, and User Defined Types (UDTs).

Capabilities

Core Tag Operations

Essential read and write operations for PLC tags, supporting individual tags, arrays, and batch operations. Includes automatic data type detection and conversion.

def Read(tag, count=1, datatype=None):
    """
    Read tag values from the PLC.
    
    Args:
        tag (str or list): Tag name(s) to read
        count (int): Number of elements for array reads
        datatype (int, optional): Force specific data type
    
    Returns:
        Response or list[Response]: Read results
    """

def Write(tag, value=None, datatype=None):
    """
    Write values to PLC tags.
    
    Args:
        tag (str or list): Tag name(s) to write
        value: Value(s) to write
        datatype (int, optional): Force specific data type
    
    Returns:
        Response or list[Response]: Write results
    """

Core Operations

PLC Time Management

Functions for getting and setting the PLC's internal clock, useful for time synchronization and timestamp operations.

def GetPLCTime(raw=False):
    """
    Get the PLC's current time.
    
    Args:
        raw (bool): Return raw microseconds if True, datetime if False
    
    Returns:
        Response: PLC time information
    """

def SetPLCTime(dst=None):
    """
    Set the PLC's clock to current system time.
    
    Args:
        dst (bool, optional): Daylight saving time flag
    
    Returns:
        Response: Set time operation result
    """

Time Operations

Tag Discovery and Introspection

Capabilities for discovering available tags, programs, and retrieving tag metadata including data types and structure information.

def GetTagList(allTags=True):
    """
    Retrieve the complete tag list from the PLC.
    
    Args:
        allTags (bool): Include program tags if True, controller only if False
    
    Returns:
        Response: List of Tag objects with metadata
    """

def GetProgramTagList(programName):
    """
    Get tags for a specific program.
    
    Args:
        programName (str): Program name (e.g., "Program:MainProgram")
    
    Returns:
        Response: List of program-specific Tag objects
    """

def GetProgramsList():
    """
    Get list of available programs in the PLC.
    
    Returns:
        Response: List of program names
    """

Tag Discovery

Device Discovery and Information

Network device discovery and device property retrieval for system diagnostics and configuration.

def Discover():
    """
    Discover Ethernet I/P devices on the network.
    
    Returns:
        Response: List of Device objects with device information
    """

def GetDeviceProperties():
    """
    Get properties of the connected device.
    
    Returns:
        Response: Device object with detailed properties
    """

def GetModuleProperties(slot):
    """
    Get properties of a module in a specific slot.
    
    Args:
        slot (int): Module slot number
    
    Returns:
        Response: Device object for the module
    """

Device Discovery

Advanced Messaging

Low-level CIP messaging capabilities for custom communication and advanced PLC interactions.

def Message(cip_service, cip_class, cip_instance, cip_attribute=None, data=b''):
    """
    Send custom CIP message to the PLC.
    
    Args:
        cip_service (int): CIP service code
        cip_class (int): CIP class code
        cip_instance (int): CIP instance number
        cip_attribute (int, optional): CIP attribute number
        data (bytes): Message data payload
    
    Returns:
        Response: Raw response from PLC
    """

def ReceiveMessage(ip_address, callback):
    """
    Listen for incoming CIP messages.
    
    Args:
        ip_address (str): IP address to listen on
        callback (function): Callback function for received messages
    
    Returns:
        Response: Listener status
    """

Advanced Messaging

Connection Configuration

class PLC:
    def __init__(ip_address="", slot=0, timeout=5.0, Micro800=False, port=44818):
        """
        Initialize PLC connection.
        
        Args:
            ip_address (str): PLC IP address
            slot (int): Processor slot number (default 0)
            timeout (float): Socket timeout in seconds
            Micro800 (bool): True for Micro800 series PLCs
            port (int): Communication port (default 44818)
        """
    
    # Properties
    IPAddress: str          # PLC IP address
    Port: int              # Communication port
    ProcessorSlot: int     # Processor slot
    SocketTimeout: float   # Socket timeout
    Micro800: bool         # Micro800 flag
    Route: object          # Routing configuration
    
    @property
    def ConnectionSize(self) -> int:
        """
        Connection packet size for Forward Open requests.
        
        Default behavior attempts Large Forward Open (508 bytes) followed by
        Small Forward Open if the first fails. For Explicit (Unconnected) 
        sessions, uses a sensible default size.
        
        Returns:
            int: Connection size in bytes (default: 508)
        """
        
    @ConnectionSize.setter
    def ConnectionSize(self, connection_size: int):
        """
        Set the connection packet size.
        
        Args:
            connection_size (int): Desired packet size in bytes
        """

Response Objects

All PLC operations return Response objects containing the results and status information.

class Response:
    def __init__(self, tag_name, value, status):
        """
        Response object for PLC operations.
        
        Args:
            tag_name (str): Tag name associated with the operation
            value: Returned value or data
            status (str or int): Operation status (string or CIP error code)
        """
    
    TagName: str    # Tag name associated with the operation
    Value: any      # Operation result value (tag data, lists, objects, etc.)
    Status: str     # Operation status ("Success" or descriptive error message)
    
    @staticmethod
    def get_error_code(status) -> str:
        """
        Convert CIP error code to descriptive error message.
        
        Args:
            status: CIP error code (int) or error message (str)
            
        Returns:
            str: Descriptive error message
        """

Data Types and Structures

class Tag:
    """Tag metadata object representing a PLC tag with all its properties."""
    TagName: str        # Tag name
    InstanceID: int     # Instance identifier  
    SymbolType: int     # Symbol type code
    DataTypeValue: int  # Data type value
    DataType: str       # Human-readable data type
    Array: int          # Array flag (0=not array, >0=array)
    Struct: int         # Structure flag (0=not struct, 1=struct)
    Size: int           # Tag size for arrays
    AccessRight: int    # Access rights
    Internal: bool      # Internal flag
    Meta: object        # Metadata information
    Scope0: object      # Scope level 0
    Scope1: object      # Scope level 1  
    Bytes: bytes        # Raw byte data
    
    @staticmethod
    def in_filter(tag: str) -> bool:
        """
        Check if the provided tag is in the filter list.
        
        Args:
            tag (str): Tag name to check
            
        Returns:
            bool: True if tag should be filtered out
        """
    
    @staticmethod
    def parse(packet: bytes, program_name: str) -> 'Tag':
        """
        Parse a tag from raw packet data.
        
        Args:
            packet (bytes): Raw packet data from PLC
            program_name (str): Program name for scoping
            
        Returns:
            Tag: Parsed tag object
        """

class UDT:
    """User Defined Type structure."""
    Type: int                    # UDT type identifier
    Name: str                   # UDT name
    Fields: list[Tag]           # List of field Tag objects
    FieldsByName: dict[str, Tag] # Dictionary mapping field names to Tags

class Device:
    """Network device information."""
    Length: int             # Packet length
    EncapsulationVersion: int # Encapsulation version
    IPAddress: str          # Device IP address
    VendorID: int          # Vendor identifier
    Vendor: str            # Vendor name
    DeviceID: int          # Device identifier
    DeviceType: str        # Device type description
    ProductCode: int       # Product code
    ProductName: str       # Product name
    Revision: str          # Device revision
    Status: int            # Device status
    SerialNumber: str      # Serial number (hex format)
    ProductNameLength: int # Length of product name
    State: int             # Device state
    
    @staticmethod
    def get_device(device_id: int) -> str:
        """
        Get device type description from device ID.
        
        Args:
            device_id (int): Device ID code
            
        Returns:
            str: Device type description or "Unknown"
        """
    
    @staticmethod
    def get_vendor(vendor_id: int) -> str:
        """
        Get vendor name from vendor ID.
        
        Args:
            vendor_id (int): Vendor ID code
            
        Returns:
            str: Vendor name or "Unknown"
        """
    
    @staticmethod
    def parse(data: bytes, ip_address: str = None) -> 'Device':
        """
        Parse a device from raw packet data.
        
        Args:
            data (bytes): Raw packet data
            ip_address (str, optional): Override IP address
            
        Returns:
            Device: Parsed device object
        """

Error Handling

PyLogix uses structured error handling through Response objects. The Status field contains either "Success" or a descriptive error message. Error codes are automatically converted from CIP (Common Industrial Protocol) status codes to human-readable messages.

Complete CIP Error Code Mapping

# CIP Error Codes (hex: description)
0x00: 'Success'
0x01: 'Connection failure'
0x02: 'Resource unavailable'
0x03: 'Invalid parameter value'
0x04: 'Path segment error'
0x05: 'Path destination unknown'
0x06: 'Partial transfer'
0x07: 'Connection lost'
0x08: 'Service not supported'
0x09: 'Invalid Attribute'
0x0A: 'Attribute list error'
0x0B: 'Already in requested mode/state'
0x0C: 'Object state conflict'
0x0D: 'Object already exists'
0x0E: 'Attribute not settable'
0x0F: 'Privilege violation'
0x10: 'Device state conflict'
0x11: 'Reply data too large'
0x12: 'Fragmentation of a primitive value'
0x13: 'Not enough data'
0x14: 'Attribute not supported'
0x15: 'Too much data'
0x16: 'Object does not exist'
0x17: 'Service fragmentation sequence not in progress'
0x18: 'No stored attribute data'
0x19: 'Store operation failure'
0x1A: 'Routing failure, request packet too large'
0x1B: 'Routing failure, response packet too large'
0x1C: 'Missing attribute list entry data'
0x1D: 'Invalid attribute value list'
0x1E: 'Embedded service error'
0x1F: 'Vendor specific'
0x20: 'Invalid Parameter'
0x21: 'Write once value or medium already written'
0x22: 'Invalid reply received'
0x23: 'Buffer overflow'
0x24: 'Invalid message format'
0x25: 'Key failure in path'
0x26: 'Path size invalid'
0x27: 'Unexpected attribute in list'
0x28: 'Invalid member ID'
0x29: 'Member not settable'
0x2A: 'Group 2 only server general failure'
0x2B: 'Unknown Modbus error'
0x2C: 'Attribute not gettable'

Common Error Scenarios

from pylogix import PLC

with PLC() as comm:
    comm.IPAddress = '192.168.1.100'
    
    # Handle connection errors
    response = comm.Read('MyTag')
    if response.Status == 'Success':
        print(f"Value: {response.Value}")
    elif response.Status == 'Connection failure':
        print("Could not connect to PLC - check IP address and network")
    elif response.Status == 'Path destination unknown':
        print("Invalid tag name or PLC path")
    elif response.Status == 'Object does not exist':
        print("Tag not found in PLC")
    else:
        print(f"Unexpected error: {response.Status}")