CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pymodbustcp

A simple Modbus/TCP client library for Python

Pending
Overview
Eval results
Files

constants.mddocs/

Constants and Error Handling

Protocol constants, function codes, exception codes, and error handling utilities for robust Modbus communication. These constants define the Modbus protocol specifications and provide comprehensive error reporting capabilities.

Capabilities

Package Constants

Basic package information and protocol defaults.

VERSION = '0.3.0'  # Package version string
MODBUS_PORT = 502  # Default Modbus TCP port
MAX_PDU_SIZE = 253  # Maximum Protocol Data Unit size

Function Codes

Standard Modbus function codes supported by the library.

# Read functions
READ_COILS = 0x01                    # Read coils (outputs)
READ_DISCRETE_INPUTS = 0x02          # Read discrete inputs
READ_HOLDING_REGISTERS = 0x03        # Read holding registers
READ_INPUT_REGISTERS = 0x04          # Read input registers

# Write functions
WRITE_SINGLE_COIL = 0x05             # Write single coil
WRITE_SINGLE_REGISTER = 0x06         # Write single register
WRITE_MULTIPLE_COILS = 0x0F          # Write multiple coils
WRITE_MULTIPLE_REGISTERS = 0x10      # Write multiple registers

# Advanced functions
WRITE_READ_MULTIPLE_REGISTERS = 0x17 # Write/read multiple registers
ENCAPSULATED_INTERFACE_TRANSPORT = 0x2B  # Encapsulated interface transport

# Supported function codes tuple
SUPPORTED_FUNCTION_CODES = (
    READ_COILS, READ_DISCRETE_INPUTS, READ_HOLDING_REGISTERS, READ_INPUT_REGISTERS,
    WRITE_SINGLE_COIL, WRITE_SINGLE_REGISTER, WRITE_MULTIPLE_COILS, WRITE_MULTIPLE_REGISTERS,
    WRITE_READ_MULTIPLE_REGISTERS, ENCAPSULATED_INTERFACE_TRANSPORT
)

MEI Types

Modbus Encapsulated Interface (MEI) type definitions.

MEI_TYPE_READ_DEVICE_ID = 0x0E  # MEI type for read device identification

Modbus Exception Codes

Standard Modbus exception codes returned by servers when errors occur.

EXP_NONE = 0x00                                    # No exception
EXP_ILLEGAL_FUNCTION = 0x01                        # Illegal function code
EXP_DATA_ADDRESS = 0x02                            # Illegal data address
EXP_DATA_VALUE = 0x03                              # Illegal data value
EXP_SLAVE_DEVICE_FAILURE = 0x04                    # Slave device failure
EXP_ACKNOWLEDGE = 0x05                             # Acknowledge
EXP_SLAVE_DEVICE_BUSY = 0x06                       # Slave device busy
EXP_NEGATIVE_ACKNOWLEDGE = 0x07                    # Negative acknowledge
EXP_MEMORY_PARITY_ERROR = 0x08                     # Memory parity error
EXP_GATEWAY_PATH_UNAVAILABLE = 0x0A                # Gateway path unavailable
EXP_GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND = 0x0B # Gateway target device failed to respond

Exception Text Mappings

Human-readable descriptions for exception codes.

# Short exception descriptions
EXP_TXT = {
    EXP_NONE: 'no exception',
    EXP_ILLEGAL_FUNCTION: 'illegal function',
    EXP_DATA_ADDRESS: 'illegal data address',
    EXP_DATA_VALUE: 'illegal data value',
    EXP_SLAVE_DEVICE_FAILURE: 'slave device failure',
    EXP_ACKNOWLEDGE: 'acknowledge',
    EXP_SLAVE_DEVICE_BUSY: 'slave device busy',
    EXP_NEGATIVE_ACKNOWLEDGE: 'negative acknowledge',
    EXP_MEMORY_PARITY_ERROR: 'memory parity error',
    EXP_GATEWAY_PATH_UNAVAILABLE: 'gateway path unavailable',
    EXP_GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND: 'gateway target device failed to respond'
}

# Detailed exception descriptions
EXP_DETAILS = {
    EXP_NONE: 'The last request produced no exceptions.',
    EXP_ILLEGAL_FUNCTION: 'Function code received in the query is not recognized or allowed by slave.',
    EXP_DATA_ADDRESS: 'Data address of some or all the required entities are not allowed or do not exist in slave.',
    EXP_DATA_VALUE: 'Value is not accepted by slave.',
    EXP_SLAVE_DEVICE_FAILURE: 'Unrecoverable error occurred while slave was attempting to perform requested action.',
    EXP_ACKNOWLEDGE: 'Slave has accepted request and is processing it, but a long duration of time is required. This response is returned to prevent a timeout error from occurring in the master. Master can next issue a Poll Program Complete message to determine whether processing is completed.',
    EXP_SLAVE_DEVICE_BUSY: 'Slave is engaged in processing a long-duration command. Master should retry later.',
    EXP_NEGATIVE_ACKNOWLEDGE: 'Slave cannot perform the programming functions. Master should request diagnostic or error information from slave.',
    EXP_MEMORY_PARITY_ERROR: 'Slave detected a parity error in memory. Master can retry the request, but service may be required on the slave device.',
    EXP_GATEWAY_PATH_UNAVAILABLE: 'Specialized for Modbus gateways, this indicates a misconfiguration on gateway.',
    EXP_GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND: 'Specialized for Modbus gateways, sent when slave fails to respond.'
}

Module Error Codes

Internal library error codes for network and communication issues.

MB_NO_ERR = 0           # No error
MB_RESOLVE_ERR = 1      # Name resolve error
MB_CONNECT_ERR = 2      # Connect error
MB_SEND_ERR = 3         # Socket send error
MB_RECV_ERR = 4         # Socket receive error
MB_TIMEOUT_ERR = 5      # Receive timeout error
MB_FRAME_ERR = 6        # Frame format error
MB_EXCEPT_ERR = 7       # Modbus exception error
MB_CRC_ERR = 8          # Bad CRC on receive frame
MB_SOCK_CLOSE_ERR = 9   # Socket is closed error

Module Error Text Mapping

Human-readable descriptions for module error codes.

MB_ERR_TXT = {
    MB_NO_ERR: 'no error',
    MB_RESOLVE_ERR: 'name resolve error',
    MB_CONNECT_ERR: 'connect error',
    MB_SEND_ERR: 'socket send error',
    MB_RECV_ERR: 'socket recv error',
    MB_TIMEOUT_ERR: 'recv timeout occur',
    MB_FRAME_ERR: 'frame format error',
    MB_EXCEPT_ERR: 'modbus exception',
    MB_CRC_ERR: 'bad CRC on receive frame',
    MB_SOCK_CLOSE_ERR: 'socket is closed'
}

Usage Examples

Using Function Codes

from pyModbusTCP.client import ModbusClient
from pyModbusTCP.constants import READ_HOLDING_REGISTERS, WRITE_MULTIPLE_REGISTERS

client = ModbusClient(host="192.168.1.100", auto_open=True)

# Function codes are used internally, but you can reference them
print(f"Reading using function code: {READ_HOLDING_REGISTERS}")
registers = client.read_holding_registers(0, 10)

print(f"Writing using function code: {WRITE_MULTIPLE_REGISTERS}")
success = client.write_multiple_registers(0, [100, 200, 300])

Error Handling with Constants

from pyModbusTCP.client import ModbusClient
from pyModbusTCP.constants import (
    MB_NO_ERR, MB_CONNECT_ERR, MB_TIMEOUT_ERR, MB_ERR_TXT,
    EXP_NONE, EXP_DATA_ADDRESS, EXP_TXT, EXP_DETAILS
)

client = ModbusClient(host="192.168.1.100", auto_open=True)

# Attempt to read registers
registers = client.read_holding_registers(0, 10)

if registers is None:
    # Check for network/connection errors
    error_code = client.last_error
    if error_code != MB_NO_ERR:
        print(f"Network error: {MB_ERR_TXT.get(error_code, 'unknown error')}")
        
        if error_code == MB_CONNECT_ERR:
            print("Check if server is running and network is accessible")
        elif error_code == MB_TIMEOUT_ERR:
            print("Server not responding, consider increasing timeout")
    
    # Check for Modbus exceptions
    except_code = client.last_except
    if except_code != EXP_NONE:
        print(f"Modbus exception: {EXP_TXT.get(except_code, 'unknown exception')}")
        print(f"Details: {EXP_DETAILS.get(except_code, 'No details available')}")
        
        if except_code == EXP_DATA_ADDRESS:
            print("The requested register address may not exist on the server")
else:
    print(f"Successfully read registers: {registers}")

Server Exception Handling

from pyModbusTCP.server import ModbusServer, DataHandler
from pyModbusTCP.constants import (
    EXP_DATA_ADDRESS, EXP_DATA_VALUE, EXP_ILLEGAL_FUNCTION
)

class ValidatingDataHandler(DataHandler):
    def read_h_regs(self, address, count, srv_info):
        # Validate address range
        if address >= 1000:
            return self.Return(exp_code=EXP_DATA_ADDRESS)
        
        # Validate count
        if count > 100:
            return self.Return(exp_code=EXP_DATA_VALUE)
        
        return super().read_h_regs(address, count, srv_info)
    
    def write_h_regs(self, address, words_l, srv_info):
        # Validate write address
        if address < 100:  # Read-only area
            return self.Return(exp_code=EXP_DATA_ADDRESS)
        
        # Validate data values
        if any(w > 32767 for w in words_l):  # Signed 16-bit limit
            return self.Return(exp_code=EXP_DATA_VALUE)
        
        return super().write_h_regs(address, words_l, srv_info)

# Use validating handler
handler = ValidatingDataHandler()
server = ModbusServer(data_hdl=handler)
server.start()

Function Code Validation

from pyModbusTCP.constants import SUPPORTED_FUNCTION_CODES

def is_supported_function(func_code):
    """Check if a function code is supported."""
    return func_code in SUPPORTED_FUNCTION_CODES

# Check function code support
test_functions = [0x01, 0x03, 0x10, 0x2B, 0xFF]
for func in test_functions:
    supported = is_supported_function(func)
    print(f"Function code 0x{func:02X}: {'supported' if supported else 'not supported'}")

Comprehensive Error Reporting

from pyModbusTCP.client import ModbusClient
from pyModbusTCP.constants import *

def detailed_error_report(client):
    """Generate detailed error report for a client."""
    report = []
    
    # Network error information
    error_code = client.last_error
    if error_code != MB_NO_ERR:
        error_text = MB_ERR_TXT.get(error_code, f'Unknown error code: {error_code}')
        report.append(f"Network Error ({error_code}): {error_text}")
    
    # Modbus exception information
    except_code = client.last_except
    if except_code != EXP_NONE:
        except_text = EXP_TXT.get(except_code, f'Unknown exception code: {except_code}')
        except_details = EXP_DETAILS.get(except_code, 'No additional details available')
        report.append(f"Modbus Exception ({except_code}): {except_text}")
        report.append(f"Details: {except_details}")
    
    return report if report else ["No errors detected"]

# Example usage
client = ModbusClient(host="192.168.1.100", timeout=5.0, auto_open=True)

# Attempt operation that might fail
result = client.read_holding_registers(9999, 100)  # Likely to cause address error

if result is None:
    error_info = detailed_error_report(client)
    print("Error Report:")
    for line in error_info:
        print(f"  {line}")

Protocol Configuration

from pyModbusTCP.server import ModbusServer
from pyModbusTCP.constants import MODBUS_PORT, MAX_PDU_SIZE

# Use standard constants for configuration
server = ModbusServer(
    host="0.0.0.0",
    port=MODBUS_PORT,  # Standard Modbus TCP port
    no_block=False
)

print(f"Server configured on port {MODBUS_PORT}")
print(f"Maximum PDU size: {MAX_PDU_SIZE} bytes")
server.start()

Error Handling Best Practices

Client-Side Error Handling

from pyModbusTCP.client import ModbusClient
from pyModbusTCP.constants import *

def robust_read_registers(client, address, count, max_retries=3):
    """Robust register reading with retry logic."""
    for attempt in range(max_retries):
        result = client.read_holding_registers(address, count)
        
        if result is not None:
            return result
        
        error_code = client.last_error
        except_code = client.last_except
        
        if error_code == MB_TIMEOUT_ERR:
            print(f"Timeout on attempt {attempt + 1}, retrying...")
            continue
        elif except_code == EXP_SLAVE_DEVICE_BUSY:
            print(f"Device busy on attempt {attempt + 1}, retrying...")
            continue
        else:
            # Non-recoverable error
            break
    
    return None

# Usage
client = ModbusClient(host="192.168.1.100", auto_open=True)
registers = robust_read_registers(client, 0, 10)
if registers:
    print(f"Successfully read: {registers}")
else:
    print("Failed to read registers after retries")

Server-Side Validation

from pyModbusTCP.server import ModbusServer, DataHandler
from pyModbusTCP.constants import *

class RobustDataHandler(DataHandler):
    def __init__(self, data_bank=None):
        super().__init__(data_bank)
        self.valid_read_ranges = [(0, 999), (2000, 2999)]  # Valid address ranges
        self.valid_write_ranges = [(100, 899), (2100, 2899)]
    
    def _validate_read_address(self, address, count):
        """Validate if address range is readable."""
        end_address = address + count - 1
        for start, end in self.valid_read_ranges:
            if start <= address <= end and start <= end_address <= end:
                return True
        return False
    
    def _validate_write_address(self, address, count):
        """Validate if address range is writable."""
        end_address = address + count - 1
        for start, end in self.valid_write_ranges:
            if start <= address <= end and start <= end_address <= end:
                return True
        return False
    
    def read_h_regs(self, address, count, srv_info):
        if not self._validate_read_address(address, count):
            return self.Return(exp_code=EXP_DATA_ADDRESS)
        
        if count > 125:  # Modbus limit
            return self.Return(exp_code=EXP_DATA_VALUE)
        
        return super().read_h_regs(address, count, srv_info)
    
    def write_h_regs(self, address, words_l, srv_info):
        if not self._validate_write_address(address, len(words_l)):
            return self.Return(exp_code=EXP_DATA_ADDRESS)
        
        # Validate data values
        for word in words_l:
            if not (0 <= word <= 65535):
                return self.Return(exp_code=EXP_DATA_VALUE)
        
        return super().write_h_regs(address, words_l, srv_info)

# Use robust handler
handler = RobustDataHandler()
server = ModbusServer(data_hdl=handler)
server.start()

Install with Tessl CLI

npx tessl i tessl/pypi-pymodbustcp

docs

client.md

constants.md

index.md

server.md

utils.md

tile.json