A simple Modbus/TCP client library for Python
—
Protocol constants, function codes, exception codes, and error handling utilities for robust Modbus communication. These constants define the Modbus protocol specifications and provide comprehensive error reporting capabilities.
Basic package information and protocol defaults.
VERSION = '0.3.0' # Package version string
MODBUS_PORT = 502 # Default Modbus TCP port
MAX_PDU_SIZE = 253 # Maximum Protocol Data Unit sizeStandard Modbus function codes supported by the library.
# Read functions
READ_COILS = 0x01 # Read coils (outputs)
READ_DISCRETE_INPUTS = 0x02 # Read discrete inputs
READ_HOLDING_REGISTERS = 0x03 # Read holding registers
READ_INPUT_REGISTERS = 0x04 # Read input registers
# Write functions
WRITE_SINGLE_COIL = 0x05 # Write single coil
WRITE_SINGLE_REGISTER = 0x06 # Write single register
WRITE_MULTIPLE_COILS = 0x0F # Write multiple coils
WRITE_MULTIPLE_REGISTERS = 0x10 # Write multiple registers
# Advanced functions
WRITE_READ_MULTIPLE_REGISTERS = 0x17 # Write/read multiple registers
ENCAPSULATED_INTERFACE_TRANSPORT = 0x2B # Encapsulated interface transport
# Supported function codes tuple
SUPPORTED_FUNCTION_CODES = (
READ_COILS, READ_DISCRETE_INPUTS, READ_HOLDING_REGISTERS, READ_INPUT_REGISTERS,
WRITE_SINGLE_COIL, WRITE_SINGLE_REGISTER, WRITE_MULTIPLE_COILS, WRITE_MULTIPLE_REGISTERS,
WRITE_READ_MULTIPLE_REGISTERS, ENCAPSULATED_INTERFACE_TRANSPORT
)Modbus Encapsulated Interface (MEI) type definitions.
MEI_TYPE_READ_DEVICE_ID = 0x0E # MEI type for read device identificationStandard Modbus exception codes returned by servers when errors occur.
EXP_NONE = 0x00 # No exception
EXP_ILLEGAL_FUNCTION = 0x01 # Illegal function code
EXP_DATA_ADDRESS = 0x02 # Illegal data address
EXP_DATA_VALUE = 0x03 # Illegal data value
EXP_SLAVE_DEVICE_FAILURE = 0x04 # Slave device failure
EXP_ACKNOWLEDGE = 0x05 # Acknowledge
EXP_SLAVE_DEVICE_BUSY = 0x06 # Slave device busy
EXP_NEGATIVE_ACKNOWLEDGE = 0x07 # Negative acknowledge
EXP_MEMORY_PARITY_ERROR = 0x08 # Memory parity error
EXP_GATEWAY_PATH_UNAVAILABLE = 0x0A # Gateway path unavailable
EXP_GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND = 0x0B # Gateway target device failed to respondHuman-readable descriptions for exception codes.
# Short exception descriptions
EXP_TXT = {
EXP_NONE: 'no exception',
EXP_ILLEGAL_FUNCTION: 'illegal function',
EXP_DATA_ADDRESS: 'illegal data address',
EXP_DATA_VALUE: 'illegal data value',
EXP_SLAVE_DEVICE_FAILURE: 'slave device failure',
EXP_ACKNOWLEDGE: 'acknowledge',
EXP_SLAVE_DEVICE_BUSY: 'slave device busy',
EXP_NEGATIVE_ACKNOWLEDGE: 'negative acknowledge',
EXP_MEMORY_PARITY_ERROR: 'memory parity error',
EXP_GATEWAY_PATH_UNAVAILABLE: 'gateway path unavailable',
EXP_GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND: 'gateway target device failed to respond'
}
# Detailed exception descriptions
EXP_DETAILS = {
EXP_NONE: 'The last request produced no exceptions.',
EXP_ILLEGAL_FUNCTION: 'Function code received in the query is not recognized or allowed by slave.',
EXP_DATA_ADDRESS: 'Data address of some or all the required entities are not allowed or do not exist in slave.',
EXP_DATA_VALUE: 'Value is not accepted by slave.',
EXP_SLAVE_DEVICE_FAILURE: 'Unrecoverable error occurred while slave was attempting to perform requested action.',
EXP_ACKNOWLEDGE: 'Slave has accepted request and is processing it, but a long duration of time is required. This response is returned to prevent a timeout error from occurring in the master. Master can next issue a Poll Program Complete message to determine whether processing is completed.',
EXP_SLAVE_DEVICE_BUSY: 'Slave is engaged in processing a long-duration command. Master should retry later.',
EXP_NEGATIVE_ACKNOWLEDGE: 'Slave cannot perform the programming functions. Master should request diagnostic or error information from slave.',
EXP_MEMORY_PARITY_ERROR: 'Slave detected a parity error in memory. Master can retry the request, but service may be required on the slave device.',
EXP_GATEWAY_PATH_UNAVAILABLE: 'Specialized for Modbus gateways, this indicates a misconfiguration on gateway.',
EXP_GATEWAY_TARGET_DEVICE_FAILED_TO_RESPOND: 'Specialized for Modbus gateways, sent when slave fails to respond.'
}Internal library error codes for network and communication issues.
MB_NO_ERR = 0 # No error
MB_RESOLVE_ERR = 1 # Name resolve error
MB_CONNECT_ERR = 2 # Connect error
MB_SEND_ERR = 3 # Socket send error
MB_RECV_ERR = 4 # Socket receive error
MB_TIMEOUT_ERR = 5 # Receive timeout error
MB_FRAME_ERR = 6 # Frame format error
MB_EXCEPT_ERR = 7 # Modbus exception error
MB_CRC_ERR = 8 # Bad CRC on receive frame
MB_SOCK_CLOSE_ERR = 9 # Socket is closed errorHuman-readable descriptions for module error codes.
MB_ERR_TXT = {
MB_NO_ERR: 'no error',
MB_RESOLVE_ERR: 'name resolve error',
MB_CONNECT_ERR: 'connect error',
MB_SEND_ERR: 'socket send error',
MB_RECV_ERR: 'socket recv error',
MB_TIMEOUT_ERR: 'recv timeout occur',
MB_FRAME_ERR: 'frame format error',
MB_EXCEPT_ERR: 'modbus exception',
MB_CRC_ERR: 'bad CRC on receive frame',
MB_SOCK_CLOSE_ERR: 'socket is closed'
}from pyModbusTCP.client import ModbusClient
from pyModbusTCP.constants import READ_HOLDING_REGISTERS, WRITE_MULTIPLE_REGISTERS
client = ModbusClient(host="192.168.1.100", auto_open=True)
# Function codes are used internally, but you can reference them
print(f"Reading using function code: {READ_HOLDING_REGISTERS}")
registers = client.read_holding_registers(0, 10)
print(f"Writing using function code: {WRITE_MULTIPLE_REGISTERS}")
success = client.write_multiple_registers(0, [100, 200, 300])from pyModbusTCP.client import ModbusClient
from pyModbusTCP.constants import (
MB_NO_ERR, MB_CONNECT_ERR, MB_TIMEOUT_ERR, MB_ERR_TXT,
EXP_NONE, EXP_DATA_ADDRESS, EXP_TXT, EXP_DETAILS
)
client = ModbusClient(host="192.168.1.100", auto_open=True)
# Attempt to read registers
registers = client.read_holding_registers(0, 10)
if registers is None:
# Check for network/connection errors
error_code = client.last_error
if error_code != MB_NO_ERR:
print(f"Network error: {MB_ERR_TXT.get(error_code, 'unknown error')}")
if error_code == MB_CONNECT_ERR:
print("Check if server is running and network is accessible")
elif error_code == MB_TIMEOUT_ERR:
print("Server not responding, consider increasing timeout")
# Check for Modbus exceptions
except_code = client.last_except
if except_code != EXP_NONE:
print(f"Modbus exception: {EXP_TXT.get(except_code, 'unknown exception')}")
print(f"Details: {EXP_DETAILS.get(except_code, 'No details available')}")
if except_code == EXP_DATA_ADDRESS:
print("The requested register address may not exist on the server")
else:
print(f"Successfully read registers: {registers}")from pyModbusTCP.server import ModbusServer, DataHandler
from pyModbusTCP.constants import (
EXP_DATA_ADDRESS, EXP_DATA_VALUE, EXP_ILLEGAL_FUNCTION
)
class ValidatingDataHandler(DataHandler):
def read_h_regs(self, address, count, srv_info):
# Validate address range
if address >= 1000:
return self.Return(exp_code=EXP_DATA_ADDRESS)
# Validate count
if count > 100:
return self.Return(exp_code=EXP_DATA_VALUE)
return super().read_h_regs(address, count, srv_info)
def write_h_regs(self, address, words_l, srv_info):
# Validate write address
if address < 100: # Read-only area
return self.Return(exp_code=EXP_DATA_ADDRESS)
# Validate data values
if any(w > 32767 for w in words_l): # Signed 16-bit limit
return self.Return(exp_code=EXP_DATA_VALUE)
return super().write_h_regs(address, words_l, srv_info)
# Use validating handler
handler = ValidatingDataHandler()
server = ModbusServer(data_hdl=handler)
server.start()from pyModbusTCP.constants import SUPPORTED_FUNCTION_CODES
def is_supported_function(func_code):
"""Check if a function code is supported."""
return func_code in SUPPORTED_FUNCTION_CODES
# Check function code support
test_functions = [0x01, 0x03, 0x10, 0x2B, 0xFF]
for func in test_functions:
supported = is_supported_function(func)
print(f"Function code 0x{func:02X}: {'supported' if supported else 'not supported'}")from pyModbusTCP.client import ModbusClient
from pyModbusTCP.constants import *
def detailed_error_report(client):
"""Generate detailed error report for a client."""
report = []
# Network error information
error_code = client.last_error
if error_code != MB_NO_ERR:
error_text = MB_ERR_TXT.get(error_code, f'Unknown error code: {error_code}')
report.append(f"Network Error ({error_code}): {error_text}")
# Modbus exception information
except_code = client.last_except
if except_code != EXP_NONE:
except_text = EXP_TXT.get(except_code, f'Unknown exception code: {except_code}')
except_details = EXP_DETAILS.get(except_code, 'No additional details available')
report.append(f"Modbus Exception ({except_code}): {except_text}")
report.append(f"Details: {except_details}")
return report if report else ["No errors detected"]
# Example usage
client = ModbusClient(host="192.168.1.100", timeout=5.0, auto_open=True)
# Attempt operation that might fail
result = client.read_holding_registers(9999, 100) # Likely to cause address error
if result is None:
error_info = detailed_error_report(client)
print("Error Report:")
for line in error_info:
print(f" {line}")from pyModbusTCP.server import ModbusServer
from pyModbusTCP.constants import MODBUS_PORT, MAX_PDU_SIZE
# Use standard constants for configuration
server = ModbusServer(
host="0.0.0.0",
port=MODBUS_PORT, # Standard Modbus TCP port
no_block=False
)
print(f"Server configured on port {MODBUS_PORT}")
print(f"Maximum PDU size: {MAX_PDU_SIZE} bytes")
server.start()from pyModbusTCP.client import ModbusClient
from pyModbusTCP.constants import *
def robust_read_registers(client, address, count, max_retries=3):
"""Robust register reading with retry logic."""
for attempt in range(max_retries):
result = client.read_holding_registers(address, count)
if result is not None:
return result
error_code = client.last_error
except_code = client.last_except
if error_code == MB_TIMEOUT_ERR:
print(f"Timeout on attempt {attempt + 1}, retrying...")
continue
elif except_code == EXP_SLAVE_DEVICE_BUSY:
print(f"Device busy on attempt {attempt + 1}, retrying...")
continue
else:
# Non-recoverable error
break
return None
# Usage
client = ModbusClient(host="192.168.1.100", auto_open=True)
registers = robust_read_registers(client, 0, 10)
if registers:
print(f"Successfully read: {registers}")
else:
print("Failed to read registers after retries")from pyModbusTCP.server import ModbusServer, DataHandler
from pyModbusTCP.constants import *
class RobustDataHandler(DataHandler):
def __init__(self, data_bank=None):
super().__init__(data_bank)
self.valid_read_ranges = [(0, 999), (2000, 2999)] # Valid address ranges
self.valid_write_ranges = [(100, 899), (2100, 2899)]
def _validate_read_address(self, address, count):
"""Validate if address range is readable."""
end_address = address + count - 1
for start, end in self.valid_read_ranges:
if start <= address <= end and start <= end_address <= end:
return True
return False
def _validate_write_address(self, address, count):
"""Validate if address range is writable."""
end_address = address + count - 1
for start, end in self.valid_write_ranges:
if start <= address <= end and start <= end_address <= end:
return True
return False
def read_h_regs(self, address, count, srv_info):
if not self._validate_read_address(address, count):
return self.Return(exp_code=EXP_DATA_ADDRESS)
if count > 125: # Modbus limit
return self.Return(exp_code=EXP_DATA_VALUE)
return super().read_h_regs(address, count, srv_info)
def write_h_regs(self, address, words_l, srv_info):
if not self._validate_write_address(address, len(words_l)):
return self.Return(exp_code=EXP_DATA_ADDRESS)
# Validate data values
for word in words_l:
if not (0 <= word <= 65535):
return self.Return(exp_code=EXP_DATA_VALUE)
return super().write_h_regs(address, words_l, srv_info)
# Use robust handler
handler = RobustDataHandler()
server = ModbusServer(data_hdl=handler)
server.start()