Communication driver for reading and writing data from Rockwell Automation ControlLogix, CompactLogix, and Micro8xx PLCs over Ethernet I/P
—
Network device discovery and device property retrieval capabilities for system diagnostics, configuration, and network management. These functions enable automatic detection of Ethernet I/P devices and detailed device information gathering.
Discover all Ethernet I/P devices on the local network using broadcast discovery packets.
def Discover():
"""
Discover Ethernet I/P devices on the network.
Note: Not available on MicroPython due to socket limitations.
Returns:
Response: Object containing list of discovered devices
- Value: List of Device objects with device information
- Status: "Success" or error description
"""Usage Examples:
from pylogix import PLC
with PLC() as comm:
# Discover all devices on the network
response = comm.Discover()
if response.Status == 'Success':
devices = response.Value
print(f"Discovered {len(devices)} devices:")
for device in devices:
print(f"\nDevice: {device.IPAddress}")
print(f" Product: {device.ProductName}")
print(f" Vendor: {device.Vendor}")
print(f" Device Type: {device.DeviceType}")
print(f" Product Code: {device.ProductCode}")
print(f" Revision: {device.Revision}")
print(f" Serial Number: {device.SerialNumber}")
print(f" Status: 0x{device.Status:04x}")
else:
print(f"Discovery failed: {response.Status}")Get detailed properties of the currently connected device.
def GetDeviceProperties():
"""
Get properties of the device at the specified IP address.
Returns:
Response: Object containing device information
- Value: Device object with detailed properties
- Status: "Success" or error description
"""Usage Examples:
with PLC() as comm:
comm.IPAddress = '192.168.1.100'
# Get properties of the connected device
response = comm.GetDeviceProperties()
if response.Status == 'Success':
device = response.Value
print(f"Device Properties for {device.IPAddress}:")
print(f" Product Name: {device.ProductName}")
print(f" Vendor: {device.Vendor} (ID: {device.VendorID})")
print(f" Device Type: {device.DeviceType} (ID: {device.DeviceID})")
print(f" Product Code: {device.ProductCode}")
print(f" Revision: {device.Revision}")
print(f" Serial Number: {device.SerialNumber}")
print(f" Encapsulation Version: {device.EncapsulationVersion}")
print(f" Status: 0x{device.Status:04x}")
print(f" State: {device.State}")
else:
print(f"Failed to get device properties: {response.Status}")Get properties of modules installed in specific slots of a chassis-based system.
def GetModuleProperties(slot):
"""
Get properties of a module in a specific slot.
Args:
slot (int): Slot number to query (0-based)
Returns:
Response: Object containing module information
- Value: Device object representing the module
- Status: "Success" or error description
"""Usage Examples:
with PLC() as comm:
comm.IPAddress = '192.168.1.100'
# Query modules in different slots
for slot in range(8): # Check slots 0-7
response = comm.GetModuleProperties(slot)
if response.Status == 'Success':
module = response.Value
if module.ProductName: # Module present
print(f"Slot {slot}: {module.ProductName}")
print(f" Type: {module.DeviceType}")
print(f" Vendor: {module.Vendor}")
print(f" Product Code: {module.ProductCode}")
print(f" Revision: {module.Revision}")
print(f" Serial: {module.SerialNumber}")
print()
else:
print(f"Slot {slot}: Empty or error - {response.Status}")Perform a complete network scan to map all available devices and their capabilities.
def network_scan_and_analysis():
"""Perform comprehensive network device analysis."""
with PLC() as comm:
# Discover all devices
discovery_response = comm.Discover()
if discovery_response.Status != 'Success':
print(f"Network discovery failed: {discovery_response.Status}")
return None
devices = discovery_response.Value
print(f"Network Scan Results: {len(devices)} devices found\n")
device_analysis = []
for i, device in enumerate(devices, 1):
print(f"Device {i}: {device.IPAddress}")
print(f" Product: {device.ProductName}")
print(f" Vendor: {device.Vendor}")
print(f" Type: {device.DeviceType}")
print(f" Revision: {device.Revision}")
# Try to connect and get detailed properties
comm.IPAddress = device.IPAddress
detailed_response = comm.GetDeviceProperties()
device_info = {
'ip_address': device.IPAddress,
'product_name': device.ProductName,
'vendor': device.Vendor,
'device_type': device.DeviceType,
'product_code': device.ProductCode,
'revision': device.Revision,
'serial_number': device.SerialNumber,
'can_connect': detailed_response.Status == 'Success',
'is_plc': False,
'slot_count': 0,
'modules': []
}
# Check if this is a PLC by looking for common PLC device types
plc_types = ['Programmable Logic Controller', 'ControlLogix', 'CompactLogix']
if any(plc_type.lower() in device.DeviceType.lower() for plc_type in plc_types):
device_info['is_plc'] = True
print(" -> Identified as PLC")
# If it's a PLC, check for modules in slots
if detailed_response.Status == 'Success':
modules_found = 0
for slot in range(16): # Check up to 16 slots
module_response = comm.GetModuleProperties(slot)
if module_response.Status == 'Success':
module = module_response.Value
if module.ProductName:
modules_found += 1
device_info['modules'].append({
'slot': slot,
'product_name': module.ProductName,
'device_type': module.DeviceType,
'product_code': module.ProductCode
})
print(f" Slot {slot}: {module.ProductName}")
device_info['slot_count'] = modules_found
if modules_found > 0:
print(f" -> Found {modules_found} modules")
device_analysis.append(device_info)
print()
# Summary analysis
print("=== Network Analysis Summary ===")
plc_count = sum(1 for d in device_analysis if d['is_plc'])
connectable_count = sum(1 for d in device_analysis if d['can_connect'])
print(f"Total devices: {len(device_analysis)}")
print(f"PLCs identified: {plc_count}")
print(f"Connectable devices: {connectable_count}")
if plc_count > 0:
print("\nPLC Details:")
for device in device_analysis:
if device['is_plc']:
print(f" {device['ip_address']}: {device['product_name']}")
if device['modules']:
print(f" Modules: {len(device['modules'])}")
return device_analysis
# Usage
network_devices = network_scan_and_analysis()Analyze device information to understand network topology and capabilities.
def analyze_device_capabilities(device):
"""Analyze device capabilities based on properties."""
capabilities = {
'is_plc': False,
'is_io_module': False,
'is_drive': False,
'is_hmi': False,
'supports_messaging': False,
'ethernet_capable': True, # All discovered devices are Ethernet I/P
'vendor_category': 'Unknown'
}
# Device type analysis
device_type_lower = device.DeviceType.lower()
if 'programmable logic controller' in device_type_lower or 'plc' in device_type_lower:
capabilities['is_plc'] = True
capabilities['supports_messaging'] = True
elif 'discrete i/o' in device_type_lower or 'analog i/o' in device_type_lower:
capabilities['is_io_module'] = True
elif 'drive' in device_type_lower or 'motor' in device_type_lower:
capabilities['is_drive'] = True
elif 'human-machine interface' in device_type_lower or 'hmi' in device_type_lower:
capabilities['is_hmi'] = True
capabilities['supports_messaging'] = True
# Vendor analysis
vendor_lower = device.Vendor.lower()
if 'rockwell' in vendor_lower or 'allen-bradley' in vendor_lower:
capabilities['vendor_category'] = 'Rockwell Automation'
elif 'schneider' in vendor_lower:
capabilities['vendor_category'] = 'Schneider Electric'
elif 'siemens' in vendor_lower:
capabilities['vendor_category'] = 'Siemens'
return capabilities
def detailed_device_report(plc_ip):
"""Generate detailed device report."""
with PLC() as comm:
comm.IPAddress = plc_ip
# Get device properties
response = comm.GetDeviceProperties()
if response.Status != 'Success':
print(f"Cannot connect to device at {plc_ip}: {response.Status}")
return None
device = response.Value
capabilities = analyze_device_capabilities(device)
print(f"=== Device Report for {plc_ip} ===")
print(f"Product Name: {device.ProductName}")
print(f"Vendor: {device.Vendor} ({capabilities['vendor_category']})")
print(f"Device Type: {device.DeviceType}")
print(f"Product Code: {device.ProductCode}")
print(f"Firmware Revision: {device.Revision}")
print(f"Serial Number: {device.SerialNumber}")
print(f"Device Status: 0x{device.Status:04x}")
print(f"Device State: {device.State}")
print(f"Encapsulation Version: {device.EncapsulationVersion}")
print(f"\nCapabilities:")
print(f" PLC: {'Yes' if capabilities['is_plc'] else 'No'}")
print(f" I/O Module: {'Yes' if capabilities['is_io_module'] else 'No'}")
print(f" Drive: {'Yes' if capabilities['is_drive'] else 'No'}")
print(f" HMI: {'Yes' if capabilities['is_hmi'] else 'No'}")
print(f" Messaging Support: {'Yes' if capabilities['supports_messaging'] else 'No'}")
# If it's a PLC, get additional information
if capabilities['is_plc']:
print(f"\nPLC-Specific Information:")
# Try to get tag count
try:
tag_response = comm.GetTagList(allTags=False) # Controller tags only
if tag_response.Status == 'Success':
controller_tags = len(tag_response.Value)
print(f" Controller Tags: {controller_tags}")
# Try to get programs
program_response = comm.GetProgramsList()
if program_response.Status == 'Success':
programs = program_response.Value
print(f" Programs: {len(programs)}")
if programs:
for program in programs[:5]: # Show first 5
print(f" - {program}")
if len(programs) > 5:
print(f" ... and {len(programs) - 5} more")
except Exception as e:
print(f" Could not retrieve PLC details: {e}")
# Check for modules
print(f"\n Module Scan:")
modules_found = 0
for slot in range(8):
module_response = comm.GetModuleProperties(slot)
if module_response.Status == 'Success':
module = module_response.Value
if module.ProductName:
modules_found += 1
print(f" Slot {slot}: {module.ProductName} (Rev {module.Revision})")
if modules_found == 0:
print(f" No modules found or standalone PLC")
return {
'device': device,
'capabilities': capabilities,
'ip_address': plc_ip
}
# Usage
device_report = detailed_device_report('192.168.1.100')Discovery operations may fail due to network issues, device compatibility, or access restrictions.
def robust_device_discovery(target_ip=None, timeout=10):
"""Robust device discovery with comprehensive error handling."""
results = {
'network_devices': [],
'target_device': None,
'errors': [],
'warnings': []
}
with PLC() as comm:
comm.SocketTimeout = timeout
# Network discovery
try:
print("Performing network discovery...")
discovery_response = comm.Discover()
if discovery_response.Status == 'Success':
devices = discovery_response.Value
results['network_devices'] = devices
print(f"Network discovery successful: {len(devices)} devices found")
else:
error_msg = f"Network discovery failed: {discovery_response.Status}"
results['errors'].append(error_msg)
print(error_msg)
# Check for common issues
if 'micropython' in discovery_response.Status.lower():
results['warnings'].append("Network discovery not available on MicroPython")
elif 'socket' in discovery_response.Status.lower():
results['warnings'].append("Network discovery requires socket permissions")
except Exception as e:
error_msg = f"Network discovery exception: {e}"
results['errors'].append(error_msg)
print(error_msg)
# Target device query
if target_ip:
try:
print(f"Querying target device {target_ip}...")
comm.IPAddress = target_ip
device_response = comm.GetDeviceProperties()
if device_response.Status == 'Success':
results['target_device'] = device_response.Value
print(f"Target device query successful")
else:
error_msg = f"Target device query failed: {device_response.Status}"
results['errors'].append(error_msg)
print(error_msg)
except Exception as e:
error_msg = f"Target device query exception: {e}"
results['errors'].append(error_msg)
print(error_msg)
# Summary
print(f"\nDiscovery Summary:")
print(f" Network devices found: {len(results['network_devices'])}")
print(f" Target device: {'Found' if results['target_device'] else 'Not found'}")
print(f" Errors: {len(results['errors'])}")
print(f" Warnings: {len(results['warnings'])}")
if results['errors']:
print("Errors encountered:")
for error in results['errors']:
print(f" - {error}")
if results['warnings']:
print("Warnings:")
for warning in results['warnings']:
print(f" - {warning}")
return results
# Usage
discovery_results = robust_device_discovery(target_ip='192.168.1.100')Understand device status codes and states for diagnostics.
def interpret_device_status(device):
"""Interpret device status codes and provide diagnostic information."""
status_info = {
'status_code': device.Status,
'state_code': device.State,
'interpretation': {},
'recommendations': []
}
# Common status bit interpretations (device-dependent)
status = device.Status
if status == 0x0000:
status_info['interpretation']['status'] = "Normal operation"
elif status & 0x0001:
status_info['interpretation']['status'] = "Minor fault present"
status_info['recommendations'].append("Check device diagnostics")
elif status & 0x0002:
status_info['interpretation']['status'] = "Major fault present"
status_info['recommendations'].append("Device requires attention")
elif status & 0x0004:
status_info['interpretation']['status'] = "Configuration error"
status_info['recommendations'].append("Check device configuration")
# Device state interpretation
state = device.State
state_meanings = {
0: "Non-existent",
1: "Device operational",
2: "Standby",
3: "Major fault",
4: "Configuration/setup mode",
5: "Not configured"
}
status_info['interpretation']['state'] = state_meanings.get(state, f"Unknown state ({state})")
return status_info
# Usage in device reporting
def enhanced_device_report(plc_ip):
"""Enhanced device report with status interpretation."""
with PLC() as comm:
comm.IPAddress = plc_ip
response = comm.GetDeviceProperties()
if response.Status == 'Success':
device = response.Value
status_info = interpret_device_status(device)
print(f"Device Status Analysis for {plc_ip}:")
print(f" Status Code: 0x{device.Status:04x}")
print(f" Status Meaning: {status_info['interpretation'].get('status', 'Unknown')}")
print(f" Device State: {device.State}")
print(f" State Meaning: {status_info['interpretation'].get('state', 'Unknown')}")
if status_info['recommendations']:
print(f" Recommendations:")
for rec in status_info['recommendations']:
print(f" - {rec}")
else:
print(f"Cannot get device status for {plc_ip}: {response.Status}")
# Usage
enhanced_device_report('192.168.1.100')Install with Tessl CLI
npx tessl i tessl/pypi-pylogix