Communication driver for reading and writing data from Rockwell Automation ControlLogix, CompactLogix, and Micro8xx PLCs over Ethernet I/P
npx @tessl/cli install tessl/pypi-pylogix@1.1.0A Python communication driver that enables reading and writing data values from tags in Rockwell Automation ControlLogix, CompactLogix, and Micro8xx PLCs over Ethernet I/P. PyLogix provides a simple, Pythonic API for PLC communication without external dependencies, making it ideal for industrial automation applications, data acquisition systems, and monitoring tools.
pip install pylogixfrom pylogix import PLCAlternative imports for specific components:
from pylogix.eip import PLC
from pylogix.lgx_response import Response
from pylogix.lgx_tag import Tag, UDT
from pylogix.lgx_device import Devicefrom pylogix import PLC
# Basic read/write operations
with PLC() as comm:
comm.IPAddress = '192.168.1.100'
# Read a single tag
response = comm.Read('MyTagName')
print(f"Tag: {response.TagName}, Value: {response.Value}, Status: {response.Status}")
# Write a value to a tag
response = comm.Write('OutputTag', 42)
if response.Status == 'Success':
print("Write successful")
# Read multiple tags
tags = ['Tag1', 'Tag2', 'Tag3']
responses = comm.Read(tags)
for resp in responses:
print(f"{resp.TagName}: {resp.Value}")
# Manual connection management
comm = PLC()
comm.IPAddress = '192.168.1.100'
comm.ProcessorSlot = 0 # Default slot
response = comm.Read('MyTag')
comm.Close() # Always close when donePyLogix follows a layered architecture designed for reliability and ease of use:
The library supports both connected and unconnected messaging, automatic data type detection, and handles complex scenarios like bit manipulation, array operations, and User Defined Types (UDTs).
Essential read and write operations for PLC tags, supporting individual tags, arrays, and batch operations. Includes automatic data type detection and conversion.
def Read(tag, count=1, datatype=None):
"""
Read tag values from the PLC.
Args:
tag (str or list): Tag name(s) to read
count (int): Number of elements for array reads
datatype (int, optional): Force specific data type
Returns:
Response or list[Response]: Read results
"""
def Write(tag, value=None, datatype=None):
"""
Write values to PLC tags.
Args:
tag (str or list): Tag name(s) to write
value: Value(s) to write
datatype (int, optional): Force specific data type
Returns:
Response or list[Response]: Write results
"""Functions for getting and setting the PLC's internal clock, useful for time synchronization and timestamp operations.
def GetPLCTime(raw=False):
"""
Get the PLC's current time.
Args:
raw (bool): Return raw microseconds if True, datetime if False
Returns:
Response: PLC time information
"""
def SetPLCTime(dst=None):
"""
Set the PLC's clock to current system time.
Args:
dst (bool, optional): Daylight saving time flag
Returns:
Response: Set time operation result
"""Capabilities for discovering available tags, programs, and retrieving tag metadata including data types and structure information.
def GetTagList(allTags=True):
"""
Retrieve the complete tag list from the PLC.
Args:
allTags (bool): Include program tags if True, controller only if False
Returns:
Response: List of Tag objects with metadata
"""
def GetProgramTagList(programName):
"""
Get tags for a specific program.
Args:
programName (str): Program name (e.g., "Program:MainProgram")
Returns:
Response: List of program-specific Tag objects
"""
def GetProgramsList():
"""
Get list of available programs in the PLC.
Returns:
Response: List of program names
"""Network device discovery and device property retrieval for system diagnostics and configuration.
def Discover():
"""
Discover Ethernet I/P devices on the network.
Returns:
Response: List of Device objects with device information
"""
def GetDeviceProperties():
"""
Get properties of the connected device.
Returns:
Response: Device object with detailed properties
"""
def GetModuleProperties(slot):
"""
Get properties of a module in a specific slot.
Args:
slot (int): Module slot number
Returns:
Response: Device object for the module
"""Low-level CIP messaging capabilities for custom communication and advanced PLC interactions.
def Message(cip_service, cip_class, cip_instance, cip_attribute=None, data=b''):
"""
Send custom CIP message to the PLC.
Args:
cip_service (int): CIP service code
cip_class (int): CIP class code
cip_instance (int): CIP instance number
cip_attribute (int, optional): CIP attribute number
data (bytes): Message data payload
Returns:
Response: Raw response from PLC
"""
def ReceiveMessage(ip_address, callback):
"""
Listen for incoming CIP messages.
Args:
ip_address (str): IP address to listen on
callback (function): Callback function for received messages
Returns:
Response: Listener status
"""class PLC:
def __init__(ip_address="", slot=0, timeout=5.0, Micro800=False, port=44818):
"""
Initialize PLC connection.
Args:
ip_address (str): PLC IP address
slot (int): Processor slot number (default 0)
timeout (float): Socket timeout in seconds
Micro800 (bool): True for Micro800 series PLCs
port (int): Communication port (default 44818)
"""
# Properties
IPAddress: str # PLC IP address
Port: int # Communication port
ProcessorSlot: int # Processor slot
SocketTimeout: float # Socket timeout
Micro800: bool # Micro800 flag
Route: object # Routing configuration
@property
def ConnectionSize(self) -> int:
"""
Connection packet size for Forward Open requests.
Default behavior attempts Large Forward Open (508 bytes) followed by
Small Forward Open if the first fails. For Explicit (Unconnected)
sessions, uses a sensible default size.
Returns:
int: Connection size in bytes (default: 508)
"""
@ConnectionSize.setter
def ConnectionSize(self, connection_size: int):
"""
Set the connection packet size.
Args:
connection_size (int): Desired packet size in bytes
"""All PLC operations return Response objects containing the results and status information.
class Response:
def __init__(self, tag_name, value, status):
"""
Response object for PLC operations.
Args:
tag_name (str): Tag name associated with the operation
value: Returned value or data
status (str or int): Operation status (string or CIP error code)
"""
TagName: str # Tag name associated with the operation
Value: any # Operation result value (tag data, lists, objects, etc.)
Status: str # Operation status ("Success" or descriptive error message)
@staticmethod
def get_error_code(status) -> str:
"""
Convert CIP error code to descriptive error message.
Args:
status: CIP error code (int) or error message (str)
Returns:
str: Descriptive error message
"""class Tag:
"""Tag metadata object representing a PLC tag with all its properties."""
TagName: str # Tag name
InstanceID: int # Instance identifier
SymbolType: int # Symbol type code
DataTypeValue: int # Data type value
DataType: str # Human-readable data type
Array: int # Array flag (0=not array, >0=array)
Struct: int # Structure flag (0=not struct, 1=struct)
Size: int # Tag size for arrays
AccessRight: int # Access rights
Internal: bool # Internal flag
Meta: object # Metadata information
Scope0: object # Scope level 0
Scope1: object # Scope level 1
Bytes: bytes # Raw byte data
@staticmethod
def in_filter(tag: str) -> bool:
"""
Check if the provided tag is in the filter list.
Args:
tag (str): Tag name to check
Returns:
bool: True if tag should be filtered out
"""
@staticmethod
def parse(packet: bytes, program_name: str) -> 'Tag':
"""
Parse a tag from raw packet data.
Args:
packet (bytes): Raw packet data from PLC
program_name (str): Program name for scoping
Returns:
Tag: Parsed tag object
"""
class UDT:
"""User Defined Type structure."""
Type: int # UDT type identifier
Name: str # UDT name
Fields: list[Tag] # List of field Tag objects
FieldsByName: dict[str, Tag] # Dictionary mapping field names to Tags
class Device:
"""Network device information."""
Length: int # Packet length
EncapsulationVersion: int # Encapsulation version
IPAddress: str # Device IP address
VendorID: int # Vendor identifier
Vendor: str # Vendor name
DeviceID: int # Device identifier
DeviceType: str # Device type description
ProductCode: int # Product code
ProductName: str # Product name
Revision: str # Device revision
Status: int # Device status
SerialNumber: str # Serial number (hex format)
ProductNameLength: int # Length of product name
State: int # Device state
@staticmethod
def get_device(device_id: int) -> str:
"""
Get device type description from device ID.
Args:
device_id (int): Device ID code
Returns:
str: Device type description or "Unknown"
"""
@staticmethod
def get_vendor(vendor_id: int) -> str:
"""
Get vendor name from vendor ID.
Args:
vendor_id (int): Vendor ID code
Returns:
str: Vendor name or "Unknown"
"""
@staticmethod
def parse(data: bytes, ip_address: str = None) -> 'Device':
"""
Parse a device from raw packet data.
Args:
data (bytes): Raw packet data
ip_address (str, optional): Override IP address
Returns:
Device: Parsed device object
"""PyLogix uses structured error handling through Response objects. The Status field contains either "Success" or a descriptive error message. Error codes are automatically converted from CIP (Common Industrial Protocol) status codes to human-readable messages.
# CIP Error Codes (hex: description)
0x00: 'Success'
0x01: 'Connection failure'
0x02: 'Resource unavailable'
0x03: 'Invalid parameter value'
0x04: 'Path segment error'
0x05: 'Path destination unknown'
0x06: 'Partial transfer'
0x07: 'Connection lost'
0x08: 'Service not supported'
0x09: 'Invalid Attribute'
0x0A: 'Attribute list error'
0x0B: 'Already in requested mode/state'
0x0C: 'Object state conflict'
0x0D: 'Object already exists'
0x0E: 'Attribute not settable'
0x0F: 'Privilege violation'
0x10: 'Device state conflict'
0x11: 'Reply data too large'
0x12: 'Fragmentation of a primitive value'
0x13: 'Not enough data'
0x14: 'Attribute not supported'
0x15: 'Too much data'
0x16: 'Object does not exist'
0x17: 'Service fragmentation sequence not in progress'
0x18: 'No stored attribute data'
0x19: 'Store operation failure'
0x1A: 'Routing failure, request packet too large'
0x1B: 'Routing failure, response packet too large'
0x1C: 'Missing attribute list entry data'
0x1D: 'Invalid attribute value list'
0x1E: 'Embedded service error'
0x1F: 'Vendor specific'
0x20: 'Invalid Parameter'
0x21: 'Write once value or medium already written'
0x22: 'Invalid reply received'
0x23: 'Buffer overflow'
0x24: 'Invalid message format'
0x25: 'Key failure in path'
0x26: 'Path size invalid'
0x27: 'Unexpected attribute in list'
0x28: 'Invalid member ID'
0x29: 'Member not settable'
0x2A: 'Group 2 only server general failure'
0x2B: 'Unknown Modbus error'
0x2C: 'Attribute not gettable'from pylogix import PLC
with PLC() as comm:
comm.IPAddress = '192.168.1.100'
# Handle connection errors
response = comm.Read('MyTag')
if response.Status == 'Success':
print(f"Value: {response.Value}")
elif response.Status == 'Connection failure':
print("Could not connect to PLC - check IP address and network")
elif response.Status == 'Path destination unknown':
print("Invalid tag name or PLC path")
elif response.Status == 'Object does not exist':
print("Tag not found in PLC")
else:
print(f"Unexpected error: {response.Status}")