CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pylogix

Communication driver for reading and writing data from Rockwell Automation ControlLogix, CompactLogix, and Micro8xx PLCs over Ethernet I/P

Pending
Overview
Eval results
Files

tag-discovery.mddocs/

Tag Discovery and Introspection

Comprehensive capabilities for discovering available tags, programs, and retrieving detailed tag metadata including data types, structure information, and User Defined Types (UDTs). These functions enable dynamic PLC interaction and automated tag management.

Capabilities

Complete Tag List Discovery

Retrieve the complete list of tags available in the PLC, including both controller-scoped and program-scoped tags.

def GetTagList(allTags=True):
    """
    Retrieve the complete tag list from the PLC.
    
    Args:
        allTags (bool): If True, include both controller and program tags.
                       If False, return only controller-scoped tags.
    
    Returns:
        Response: Object containing list of Tag objects with metadata
                 - Value: List of Tag objects
                 - Status: "Success" or error description
    """

Usage Examples:

from pylogix import PLC

with PLC() as comm:
    comm.IPAddress = '192.168.1.100'
    
    # Get all tags (controller and program tags)
    response = comm.GetTagList(allTags=True)
    if response.Status == 'Success':
        tags = response.Value
        print(f"Found {len(tags)} total tags")
        
        # Display tag information
        for tag in tags[:10]:  # Show first 10 tags
            print(f"Tag: {tag.TagName}")
            print(f"  Type: {tag.DataType}")
            print(f"  Array: {'Yes' if tag.Array else 'No'}")
            print(f"  Struct: {'Yes' if tag.Struct else 'No'}")
            if tag.Array:
                print(f"  Size: {tag.Size}")
            print()
    
    # Get only controller-scoped tags
    response = comm.GetTagList(allTags=False)
    if response.Status == 'Success':
        controller_tags = response.Value
        print(f"Controller tags only: {len(controller_tags)}")

Program-Specific Tag Discovery

Retrieve tags that belong to a specific program within the PLC.

def GetProgramTagList(programName):
    """
    Get tags for a specific program.
    
    Args:
        programName (str): Program name in format "Program:ProgramName"
    
    Returns:
        Response: Object containing list of Tag objects for the program
                 - Value: List of program-specific Tag objects
                 - Status: "Success" or error description
    """

Usage Examples:

with PLC() as comm:
    comm.IPAddress = '192.168.1.100'
    
    # Get tags for a specific program
    response = comm.GetProgramTagList("Program:MainProgram")
    if response.Status == 'Success':
        program_tags = response.Value
        print(f"Tags in MainProgram: {len(program_tags)}")
        
        for tag in program_tags:
            print(f"  {tag.TagName} ({tag.DataType})")
    else:
        print(f"Failed to get program tags: {response.Status}")
    
    # Try different program
    response = comm.GetProgramTagList("Program:SafetyProgram")
    if response.Status == 'Success':
        safety_tags = response.Value
        print(f"Safety program has {len(safety_tags)} tags")

Program List Discovery

Discover all available programs in the PLC before querying their specific tags.

def GetProgramsList():
    """
    Get list of available programs in the PLC.
    
    Returns:
        Response: Object containing list of program names
                 - Value: List of program name strings
                 - Status: "Success" or error description
    """

Usage Examples:

with PLC() as comm:
    comm.IPAddress = '192.168.1.100'
    
    # Discover all programs
    response = comm.GetProgramsList()
    if response.Status == 'Success':
        programs = response.Value
        print(f"Available programs: {len(programs)}")
        
        for program in programs:
            print(f"  - {program}")
        
        # Get tags for each program
        for program in programs:
            prog_response = comm.GetProgramTagList(program)
            if prog_response.Status == 'Success':
                tag_count = len(prog_response.Value)
                print(f"  {program}: {tag_count} tags")
    else:
        print(f"Failed to get programs list: {response.Status}")

Tag Metadata Analysis

Work with detailed tag metadata to understand PLC structure and data organization.

Tag Object Properties:

# After getting tag list, analyze tag properties
response = comm.GetTagList()
if response.Status == 'Success':
    tags = response.Value
    
    for tag in tags:
        print(f"Tag: {tag.TagName}")
        print(f"  Instance ID: {tag.InstanceID}")
        print(f"  Symbol Type: 0x{tag.SymbolType:02x}")
        print(f"  Data Type Value: 0x{tag.DataTypeValue:03x}")
        print(f"  Data Type: {tag.DataType}")
        print(f"  Is Array: {bool(tag.Array)}")
        print(f"  Is Struct: {bool(tag.Struct)}")
        print(f"  Size: {tag.Size}")
        if hasattr(tag, 'AccessRight') and tag.AccessRight is not None:
            print(f"  Access Right: {tag.AccessRight}")
        print()

Data Type Classification

Organize tags by their data types for systematic processing.

def classify_tags_by_type(tags):
    """Classify tags by their data types."""
    classification = {
        'Integers': [],
        'Floats': [],
        'Booleans': [],
        'Strings': [],
        'Arrays': [],
        'Structures': [],
        'Unknown': []
    }
    
    for tag in tags:
        if tag.DataType in ['DINT', 'INT', 'SINT', 'UDINT', 'UINT', 'USINT', 'LINT']:
            classification['Integers'].append(tag)
        elif tag.DataType in ['REAL', 'LREAL']:
            classification['Floats'].append(tag)
        elif tag.DataType == 'BOOL':
            classification['Booleans'].append(tag)
        elif tag.DataType == 'STRING':
            classification['Strings'].append(tag)
        elif tag.Array:
            classification['Arrays'].append(tag)
        elif tag.Struct:
            classification['Structures'].append(tag)
        else:
            classification['Unknown'].append(tag)
    
    return classification

# Usage
with PLC() as comm:
    comm.IPAddress = '192.168.1.100'
    response = comm.GetTagList()
    
    if response.Status == 'Success':
        classified = classify_tags_by_type(response.Value)
        
        for category, tag_list in classified.items():
            if tag_list:
                print(f"{category}: {len(tag_list)} tags")
                for tag in tag_list[:3]:  # Show first 3 in each category
                    print(f"  - {tag.TagName}")
                if len(tag_list) > 3:
                    print(f"  ... and {len(tag_list) - 3} more")
                print()

User Defined Type (UDT) Discovery

Discover and analyze User Defined Types (UDTs) and their field structures.

def discover_udts(plc_comm):
    """Discover all UDTs and their structures."""
    response = plc_comm.GetTagList()
    if response.Status != 'Success':
        return None
    
    tags = response.Value
    udts = {}
    
    # Find all UDT instances
    for tag in tags:
        if tag.Struct and tag.DataType:
            if tag.DataType not in udts:
                udts[tag.DataType] = {
                    'instances': [],
                    'fields': set()
                }
            udts[tag.DataType]['instances'].append(tag.TagName)
    
    # Access UDT definitions from PLC object
    if hasattr(plc_comm, 'UDTByName'):
        for udt_name, udt_info in udts.items():
            if udt_name in plc_comm.UDTByName:
                udt_def = plc_comm.UDTByName[udt_name]
                udt_info['definition'] = udt_def
                udt_info['fields'] = [field.TagName for field in udt_def.Fields]
    
    return udts

# Usage
with PLC() as comm:
    comm.IPAddress = '192.168.1.100'
    
    udts = discover_udts(comm)
    if udts:
        print("Discovered UDTs:")
        for udt_name, info in udts.items():
            print(f"\nUDT: {udt_name}")
            print(f"  Instances: {len(info['instances'])}")
            for instance in info['instances'][:3]:
                print(f"    - {instance}")
            if len(info['instances']) > 3:
                print(f"    ... and {len(info['instances']) - 3} more")
            
            if 'fields' in info and info['fields']:
                print(f"  Fields: {len(info['fields'])}")
                for field in info['fields'][:5]:  # Show first 5 fields
                    print(f"    - {field}")
                if len(info['fields']) > 5:
                    print(f"    ... and {len(info['fields']) - 5} more")

Tag Filtering and Search

Implement search and filtering capabilities for large tag lists.

def search_tags(tags, search_pattern, case_sensitive=False):
    """Search tags by name pattern."""
    import re
    
    if not case_sensitive:
        pattern = re.compile(search_pattern, re.IGNORECASE)
    else:
        pattern = re.compile(search_pattern)
    
    matching_tags = []
    for tag in tags:
        if pattern.search(tag.TagName):
            matching_tags.append(tag)
    
    return matching_tags

def filter_tags_by_type(tags, data_types):
    """Filter tags by data type."""
    if isinstance(data_types, str):
        data_types = [data_types]
    
    filtered_tags = []
    for tag in tags:
        if tag.DataType in data_types:
            filtered_tags.append(tag)
    
    return filtered_tags

# Usage examples
with PLC() as comm:
    comm.IPAddress = '192.168.1.100'
    response = comm.GetTagList()
    
    if response.Status == 'Success':
        all_tags = response.Value
        
        # Search for motor-related tags
        motor_tags = search_tags(all_tags, r'motor|drive|speed', case_sensitive=False)
        print(f"Motor-related tags: {len(motor_tags)}")
        
        # Find all temperature tags
        temp_tags = search_tags(all_tags, r'temp|temperature', case_sensitive=False)
        print(f"Temperature tags: {len(temp_tags)}")
        
        # Get all REAL (float) tags
        float_tags = filter_tags_by_type(all_tags, 'REAL')
        print(f"Float tags: {len(float_tags)}")
        
        # Get all boolean tags
        bool_tags = filter_tags_by_type(all_tags, 'BOOL')
        print(f"Boolean tags: {len(bool_tags)}")

Complete Tag Inventory

Create a comprehensive inventory of all PLC tags with their properties.

def create_tag_inventory(plc_ip, output_file=None):
    """Create a complete tag inventory."""
    inventory = {
        'plc_ip': plc_ip,
        'discovery_time': None,
        'controller_tags': [],
        'program_tags': {},
        'programs': [],
        'udts': {},
        'summary': {
            'total_tags': 0,
            'controller_tags': 0,
            'program_tags': 0,
            'data_types': {},
            'arrays': 0,
            'structures': 0
        }
    }
    
    with PLC() as comm:
        comm.IPAddress = plc_ip
        
        # Get timestamp
        from datetime import datetime
        inventory['discovery_time'] = datetime.now().isoformat()
        
        # Get all tags
        response = comm.GetTagList(allTags=True)
        if response.Status != 'Success':
            print(f"Failed to get tag list: {response.Status}")
            return None
        
        all_tags = response.Value
        inventory['summary']['total_tags'] = len(all_tags)
        
        # Separate controller and program tags
        for tag in all_tags:
            if 'Program:' in tag.TagName:
                program_name = tag.TagName.split('.')[0]
                if program_name not in inventory['program_tags']:
                    inventory['program_tags'][program_name] = []
                inventory['program_tags'][program_name].append({
                    'name': tag.TagName,
                    'type': tag.DataType,
                    'array': bool(tag.Array),
                    'struct': bool(tag.Struct),
                    'size': tag.Size
                })
                inventory['summary']['program_tags'] += 1
            else:
                inventory['controller_tags'].append({
                    'name': tag.TagName,
                    'type': tag.DataType,
                    'array': bool(tag.Array),
                    'struct': bool(tag.Struct),
                    'size': tag.Size
                })
                inventory['summary']['controller_tags'] += 1
            
            # Update summary statistics
            data_type = tag.DataType or 'Unknown'
            inventory['summary']['data_types'][data_type] = \
                inventory['summary']['data_types'].get(data_type, 0) + 1
            
            if tag.Array:
                inventory['summary']['arrays'] += 1
            if tag.Struct:
                inventory['summary']['structures'] += 1
        
        # Get programs list
        programs_response = comm.GetProgramsList()
        if programs_response.Status == 'Success':
            inventory['programs'] = programs_response.Value
        
        # Export to file if requested
        if output_file:
            import json
            with open(output_file, 'w') as f:
                json.dump(inventory, f, indent=2)
            print(f"Tag inventory saved to {output_file}")
        
        return inventory

# Usage
inventory = create_tag_inventory('192.168.1.100', 'plc_inventory.json')
if inventory:
    print(f"PLC Inventory Summary:")
    print(f"  Total tags: {inventory['summary']['total_tags']}")
    print(f"  Controller tags: {inventory['summary']['controller_tags']}")
    print(f"  Program tags: {inventory['summary']['program_tags']}")
    print(f"  Programs: {len(inventory['programs'])}")
    print(f"  Arrays: {inventory['summary']['arrays']}")
    print(f"  Structures: {inventory['summary']['structures']}")
    print(f"  Data types:")
    for dt, count in inventory['summary']['data_types'].items():
        print(f"    {dt}: {count}")

Error Handling for Discovery Operations

Discovery operations can fail or return partial results. Implement proper error handling:

def robust_tag_discovery(plc_ip, max_retries=3):
    """Robust tag discovery with error handling."""
    
    for attempt in range(max_retries):
        try:
            with PLC() as comm:
                comm.IPAddress = plc_ip
                comm.SocketTimeout = 30.0  # Longer timeout for large tag lists
                
                # Try to get tag list
                response = comm.GetTagList()
                
                if response.Status == 'Success':
                    tags = response.Value
                    if tags:  # Make sure we got some tags
                        print(f"Successfully discovered {len(tags)} tags")
                        return tags
                    else:
                        print("Got empty tag list")
                else:
                    print(f"Tag discovery attempt {attempt + 1} failed: {response.Status}")
                    
        except Exception as e:
            print(f"Exception during discovery attempt {attempt + 1}: {e}")
        
        if attempt < max_retries - 1:
            print(f"Retrying tag discovery...")
            import time
            time.sleep(2)
    
    print(f"Failed to discover tags after {max_retries} attempts")
    return None

# Usage
tags = robust_tag_discovery('192.168.1.100')
if tags:
    print("Tag discovery successful")
    # Process tags...
else:
    print("Tag discovery failed")

Install with Tessl CLI

npx tessl i tessl/pypi-pylogix

docs

advanced-messaging.md

core-operations.md

device-discovery.md

index.md

tag-discovery.md

time-operations.md

tile.json