Communication driver for reading and writing data from Rockwell Automation ControlLogix, CompactLogix, and Micro8xx PLCs over Ethernet I/P
—
Comprehensive capabilities for discovering available tags, programs, and retrieving detailed tag metadata including data types, structure information, and User Defined Types (UDTs). These functions enable dynamic PLC interaction and automated tag management.
Retrieve the complete list of tags available in the PLC, including both controller-scoped and program-scoped tags.
def GetTagList(allTags=True):
"""
Retrieve the complete tag list from the PLC.
Args:
allTags (bool): If True, include both controller and program tags.
If False, return only controller-scoped tags.
Returns:
Response: Object containing list of Tag objects with metadata
- Value: List of Tag objects
- Status: "Success" or error description
"""Usage Examples:
from pylogix import PLC
with PLC() as comm:
comm.IPAddress = '192.168.1.100'
# Get all tags (controller and program tags)
response = comm.GetTagList(allTags=True)
if response.Status == 'Success':
tags = response.Value
print(f"Found {len(tags)} total tags")
# Display tag information
for tag in tags[:10]: # Show first 10 tags
print(f"Tag: {tag.TagName}")
print(f" Type: {tag.DataType}")
print(f" Array: {'Yes' if tag.Array else 'No'}")
print(f" Struct: {'Yes' if tag.Struct else 'No'}")
if tag.Array:
print(f" Size: {tag.Size}")
print()
# Get only controller-scoped tags
response = comm.GetTagList(allTags=False)
if response.Status == 'Success':
controller_tags = response.Value
print(f"Controller tags only: {len(controller_tags)}")Retrieve tags that belong to a specific program within the PLC.
def GetProgramTagList(programName):
"""
Get tags for a specific program.
Args:
programName (str): Program name in format "Program:ProgramName"
Returns:
Response: Object containing list of Tag objects for the program
- Value: List of program-specific Tag objects
- Status: "Success" or error description
"""Usage Examples:
with PLC() as comm:
comm.IPAddress = '192.168.1.100'
# Get tags for a specific program
response = comm.GetProgramTagList("Program:MainProgram")
if response.Status == 'Success':
program_tags = response.Value
print(f"Tags in MainProgram: {len(program_tags)}")
for tag in program_tags:
print(f" {tag.TagName} ({tag.DataType})")
else:
print(f"Failed to get program tags: {response.Status}")
# Try different program
response = comm.GetProgramTagList("Program:SafetyProgram")
if response.Status == 'Success':
safety_tags = response.Value
print(f"Safety program has {len(safety_tags)} tags")Discover all available programs in the PLC before querying their specific tags.
def GetProgramsList():
"""
Get list of available programs in the PLC.
Returns:
Response: Object containing list of program names
- Value: List of program name strings
- Status: "Success" or error description
"""Usage Examples:
with PLC() as comm:
comm.IPAddress = '192.168.1.100'
# Discover all programs
response = comm.GetProgramsList()
if response.Status == 'Success':
programs = response.Value
print(f"Available programs: {len(programs)}")
for program in programs:
print(f" - {program}")
# Get tags for each program
for program in programs:
prog_response = comm.GetProgramTagList(program)
if prog_response.Status == 'Success':
tag_count = len(prog_response.Value)
print(f" {program}: {tag_count} tags")
else:
print(f"Failed to get programs list: {response.Status}")Work with detailed tag metadata to understand PLC structure and data organization.
Tag Object Properties:
# After getting tag list, analyze tag properties
response = comm.GetTagList()
if response.Status == 'Success':
tags = response.Value
for tag in tags:
print(f"Tag: {tag.TagName}")
print(f" Instance ID: {tag.InstanceID}")
print(f" Symbol Type: 0x{tag.SymbolType:02x}")
print(f" Data Type Value: 0x{tag.DataTypeValue:03x}")
print(f" Data Type: {tag.DataType}")
print(f" Is Array: {bool(tag.Array)}")
print(f" Is Struct: {bool(tag.Struct)}")
print(f" Size: {tag.Size}")
if hasattr(tag, 'AccessRight') and tag.AccessRight is not None:
print(f" Access Right: {tag.AccessRight}")
print()Organize tags by their data types for systematic processing.
def classify_tags_by_type(tags):
"""Classify tags by their data types."""
classification = {
'Integers': [],
'Floats': [],
'Booleans': [],
'Strings': [],
'Arrays': [],
'Structures': [],
'Unknown': []
}
for tag in tags:
if tag.DataType in ['DINT', 'INT', 'SINT', 'UDINT', 'UINT', 'USINT', 'LINT']:
classification['Integers'].append(tag)
elif tag.DataType in ['REAL', 'LREAL']:
classification['Floats'].append(tag)
elif tag.DataType == 'BOOL':
classification['Booleans'].append(tag)
elif tag.DataType == 'STRING':
classification['Strings'].append(tag)
elif tag.Array:
classification['Arrays'].append(tag)
elif tag.Struct:
classification['Structures'].append(tag)
else:
classification['Unknown'].append(tag)
return classification
# Usage
with PLC() as comm:
comm.IPAddress = '192.168.1.100'
response = comm.GetTagList()
if response.Status == 'Success':
classified = classify_tags_by_type(response.Value)
for category, tag_list in classified.items():
if tag_list:
print(f"{category}: {len(tag_list)} tags")
for tag in tag_list[:3]: # Show first 3 in each category
print(f" - {tag.TagName}")
if len(tag_list) > 3:
print(f" ... and {len(tag_list) - 3} more")
print()Discover and analyze User Defined Types (UDTs) and their field structures.
def discover_udts(plc_comm):
"""Discover all UDTs and their structures."""
response = plc_comm.GetTagList()
if response.Status != 'Success':
return None
tags = response.Value
udts = {}
# Find all UDT instances
for tag in tags:
if tag.Struct and tag.DataType:
if tag.DataType not in udts:
udts[tag.DataType] = {
'instances': [],
'fields': set()
}
udts[tag.DataType]['instances'].append(tag.TagName)
# Access UDT definitions from PLC object
if hasattr(plc_comm, 'UDTByName'):
for udt_name, udt_info in udts.items():
if udt_name in plc_comm.UDTByName:
udt_def = plc_comm.UDTByName[udt_name]
udt_info['definition'] = udt_def
udt_info['fields'] = [field.TagName for field in udt_def.Fields]
return udts
# Usage
with PLC() as comm:
comm.IPAddress = '192.168.1.100'
udts = discover_udts(comm)
if udts:
print("Discovered UDTs:")
for udt_name, info in udts.items():
print(f"\nUDT: {udt_name}")
print(f" Instances: {len(info['instances'])}")
for instance in info['instances'][:3]:
print(f" - {instance}")
if len(info['instances']) > 3:
print(f" ... and {len(info['instances']) - 3} more")
if 'fields' in info and info['fields']:
print(f" Fields: {len(info['fields'])}")
for field in info['fields'][:5]: # Show first 5 fields
print(f" - {field}")
if len(info['fields']) > 5:
print(f" ... and {len(info['fields']) - 5} more")Implement search and filtering capabilities for large tag lists.
def search_tags(tags, search_pattern, case_sensitive=False):
"""Search tags by name pattern."""
import re
if not case_sensitive:
pattern = re.compile(search_pattern, re.IGNORECASE)
else:
pattern = re.compile(search_pattern)
matching_tags = []
for tag in tags:
if pattern.search(tag.TagName):
matching_tags.append(tag)
return matching_tags
def filter_tags_by_type(tags, data_types):
"""Filter tags by data type."""
if isinstance(data_types, str):
data_types = [data_types]
filtered_tags = []
for tag in tags:
if tag.DataType in data_types:
filtered_tags.append(tag)
return filtered_tags
# Usage examples
with PLC() as comm:
comm.IPAddress = '192.168.1.100'
response = comm.GetTagList()
if response.Status == 'Success':
all_tags = response.Value
# Search for motor-related tags
motor_tags = search_tags(all_tags, r'motor|drive|speed', case_sensitive=False)
print(f"Motor-related tags: {len(motor_tags)}")
# Find all temperature tags
temp_tags = search_tags(all_tags, r'temp|temperature', case_sensitive=False)
print(f"Temperature tags: {len(temp_tags)}")
# Get all REAL (float) tags
float_tags = filter_tags_by_type(all_tags, 'REAL')
print(f"Float tags: {len(float_tags)}")
# Get all boolean tags
bool_tags = filter_tags_by_type(all_tags, 'BOOL')
print(f"Boolean tags: {len(bool_tags)}")Create a comprehensive inventory of all PLC tags with their properties.
def create_tag_inventory(plc_ip, output_file=None):
"""Create a complete tag inventory."""
inventory = {
'plc_ip': plc_ip,
'discovery_time': None,
'controller_tags': [],
'program_tags': {},
'programs': [],
'udts': {},
'summary': {
'total_tags': 0,
'controller_tags': 0,
'program_tags': 0,
'data_types': {},
'arrays': 0,
'structures': 0
}
}
with PLC() as comm:
comm.IPAddress = plc_ip
# Get timestamp
from datetime import datetime
inventory['discovery_time'] = datetime.now().isoformat()
# Get all tags
response = comm.GetTagList(allTags=True)
if response.Status != 'Success':
print(f"Failed to get tag list: {response.Status}")
return None
all_tags = response.Value
inventory['summary']['total_tags'] = len(all_tags)
# Separate controller and program tags
for tag in all_tags:
if 'Program:' in tag.TagName:
program_name = tag.TagName.split('.')[0]
if program_name not in inventory['program_tags']:
inventory['program_tags'][program_name] = []
inventory['program_tags'][program_name].append({
'name': tag.TagName,
'type': tag.DataType,
'array': bool(tag.Array),
'struct': bool(tag.Struct),
'size': tag.Size
})
inventory['summary']['program_tags'] += 1
else:
inventory['controller_tags'].append({
'name': tag.TagName,
'type': tag.DataType,
'array': bool(tag.Array),
'struct': bool(tag.Struct),
'size': tag.Size
})
inventory['summary']['controller_tags'] += 1
# Update summary statistics
data_type = tag.DataType or 'Unknown'
inventory['summary']['data_types'][data_type] = \
inventory['summary']['data_types'].get(data_type, 0) + 1
if tag.Array:
inventory['summary']['arrays'] += 1
if tag.Struct:
inventory['summary']['structures'] += 1
# Get programs list
programs_response = comm.GetProgramsList()
if programs_response.Status == 'Success':
inventory['programs'] = programs_response.Value
# Export to file if requested
if output_file:
import json
with open(output_file, 'w') as f:
json.dump(inventory, f, indent=2)
print(f"Tag inventory saved to {output_file}")
return inventory
# Usage
inventory = create_tag_inventory('192.168.1.100', 'plc_inventory.json')
if inventory:
print(f"PLC Inventory Summary:")
print(f" Total tags: {inventory['summary']['total_tags']}")
print(f" Controller tags: {inventory['summary']['controller_tags']}")
print(f" Program tags: {inventory['summary']['program_tags']}")
print(f" Programs: {len(inventory['programs'])}")
print(f" Arrays: {inventory['summary']['arrays']}")
print(f" Structures: {inventory['summary']['structures']}")
print(f" Data types:")
for dt, count in inventory['summary']['data_types'].items():
print(f" {dt}: {count}")Discovery operations can fail or return partial results. Implement proper error handling:
def robust_tag_discovery(plc_ip, max_retries=3):
"""Robust tag discovery with error handling."""
for attempt in range(max_retries):
try:
with PLC() as comm:
comm.IPAddress = plc_ip
comm.SocketTimeout = 30.0 # Longer timeout for large tag lists
# Try to get tag list
response = comm.GetTagList()
if response.Status == 'Success':
tags = response.Value
if tags: # Make sure we got some tags
print(f"Successfully discovered {len(tags)} tags")
return tags
else:
print("Got empty tag list")
else:
print(f"Tag discovery attempt {attempt + 1} failed: {response.Status}")
except Exception as e:
print(f"Exception during discovery attempt {attempt + 1}: {e}")
if attempt < max_retries - 1:
print(f"Retrying tag discovery...")
import time
time.sleep(2)
print(f"Failed to discover tags after {max_retries} attempts")
return None
# Usage
tags = robust_tag_discovery('192.168.1.100')
if tags:
print("Tag discovery successful")
# Process tags...
else:
print("Tag discovery failed")Install with Tessl CLI
npx tessl i tessl/pypi-pylogix