A simple Modbus/TCP client library for Python
—
TCP client functionality for connecting to and communicating with Modbus servers. The ModbusClient class provides synchronous communication with automatic connection management, comprehensive error handling, and support for all standard Modbus function codes.
Main client class for Modbus/TCP communication with automatic connection management and comprehensive error handling.
class ModbusClient:
"""
Modbus TCP client with automatic connection management.
Parameters:
host (str): Hostname or IPv4/IPv6 address (default: 'localhost')
port (int): TCP port number (default: 502)
unit_id (int): Unit ID for requests (default: 1)
timeout (float): Socket timeout in seconds (default: 30.0)
auto_open (bool): Automatically open connection when needed (default: True)
auto_close (bool): Automatically close connection after each request (default: False)
"""
def __init__(self, host='localhost', port=502, unit_id=1, timeout=30.0, auto_open=True, auto_close=False):
"""Initialize Modbus TCP client."""def open(self):
"""
Open TCP connection to server.
Returns:
bool: True if connection successful, False otherwise
"""
def close(self):
"""Close TCP connection."""
@property
def is_open(self):
"""bool: True if TCP connection is open, False otherwise (read-only)."""@property
def host(self):
"""str: Server hostname or IP address."""
@property
def port(self):
"""int: TCP port number."""
@property
def unit_id(self):
"""int: Unit ID for Modbus requests."""
@property
def timeout(self):
"""float: Socket timeout in seconds."""
@property
def auto_open(self):
"""bool: Automatic connection opening enabled."""
@property
def auto_close(self):
"""bool: Automatic connection closing enabled."""
@property
def version(self):
"""str: Package version (read-only)."""
@property
def last_error(self):
"""int: Last error code (read-only)."""
@property
def last_error_as_txt(self):
"""str: Last error as text (read-only)."""
@property
def last_except(self):
"""int: Last Modbus exception code (read-only)."""
@property
def last_except_as_txt(self):
"""str: Last exception as short text (read-only)."""
@property
def last_except_as_full_txt(self):
"""str: Last exception as detailed text (read-only)."""Functions for reading data from Modbus servers using standard function codes.
def read_coils(self, bit_addr, bit_nb=1):
"""
Read coils (function code 1).
Parameters:
bit_addr (int): Starting coil address (0-65535)
bit_nb (int): Number of coils to read (1-2000, default: 1)
Returns:
list[bool] or None: List of coil values, None on error
"""
def read_discrete_inputs(self, bit_addr, bit_nb=1):
"""
Read discrete inputs (function code 2).
Parameters:
bit_addr (int): Starting input address (0-65535)
bit_nb (int): Number of inputs to read (1-2000, default: 1)
Returns:
list[bool] or None: List of input values, None on error
"""
def read_holding_registers(self, reg_addr, reg_nb=1):
"""
Read holding registers (function code 3).
Parameters:
reg_addr (int): Starting register address (0-65535)
reg_nb (int): Number of registers to read (1-125, default: 1)
Returns:
list[int] or None: List of register values (0-65535), None on error
"""
def read_input_registers(self, reg_addr, reg_nb=1):
"""
Read input registers (function code 4).
Parameters:
reg_addr (int): Starting register address (0-65535)
reg_nb (int): Number of registers to read (1-125, default: 1)
Returns:
list[int] or None: List of register values (0-65535), None on error
"""Functions for writing data to Modbus servers using standard function codes.
def write_single_coil(self, bit_addr, bit_value):
"""
Write single coil (function code 5).
Parameters:
bit_addr (int): Coil address (0-65535)
bit_value (bool): Value to write
Returns:
bool: True on success, False on error
"""
def write_single_register(self, reg_addr, reg_value):
"""
Write single register (function code 6).
Parameters:
reg_addr (int): Register address (0-65535)
reg_value (int): Value to write (0-65535)
Returns:
bool: True on success, False on error
"""
def write_multiple_coils(self, bits_addr, bits_value):
"""
Write multiple coils (function code 15).
Parameters:
bits_addr (int): Starting coil address (0-65535)
bits_value (list[bool]): List of coil values to write
Returns:
bool: True on success, False on error
"""
def write_multiple_registers(self, regs_addr, regs_value):
"""
Write multiple registers (function code 16).
Parameters:
regs_addr (int): Starting register address (0-65535)
regs_value (list[int]): List of register values to write (0-65535)
Returns:
bool: True on success, False on error
"""
def write_read_multiple_registers(self, write_addr, write_values, read_addr, read_nb=1):
"""
Write and read multiple registers (function code 23).
Parameters:
write_addr (int): Starting write address (0-65535)
write_values (list[int]): List of values to write (0-65535)
read_addr (int): Starting read address (0-65535)
read_nb (int): Number of registers to read (1-125, default: 1)
Returns:
list[int] or None: List of read register values, None on error
"""Functions for reading device identification information.
def read_device_identification(self, read_code=1, object_id=0):
"""
Read device identification (function code 43).
Parameters:
read_code (int): Read device ID code (1-4, default: 1)
1: Basic identification (mandatory objects)
2: Regular identification (basic + optional)
3: Extended identification (regular + private)
4: Individual object access
object_id (int): Object ID when read_code=4 (0-255, default: 0)
Returns:
DeviceIdentificationResponse or None: Device identification response, None on error
"""def custom_request(self, pdu):
"""
Send custom PDU request.
Parameters:
pdu (bytes): Protocol Data Unit to send
Returns:
bytes or None: Response PDU, None on error
"""
def on_tx_rx(self, frame, is_tx):
"""
Hook for transmission/reception events (override in subclass).
Parameters:
frame (bytes): Frame data
is_tx (bool): True for transmission, False for reception
"""from pyModbusTCP.client import ModbusClient
# Create client with automatic connection
client = ModbusClient(host="192.168.1.100", port=502, unit_id=1, auto_open=True)
# Read holding registers
values = client.read_holding_registers(0, 10)
if values:
print(f"Register values: {values}")
else:
print(f"Error: {client.last_error_as_txt}")
# Write and verify
success = client.write_single_register(100, 1234)
if success:
result = client.read_holding_registers(100, 1)
print(f"Written value: {result[0] if result else 'read failed'}")from pyModbusTCP.client import ModbusClient
# Create client without automatic connection
client = ModbusClient(host="192.168.1.100", auto_open=False)
# Manual connection management
if client.open():
print("Connected successfully")
# Perform operations
coils = client.read_coils(0, 16)
if coils:
print(f"Coils: {coils}")
# Close when done
client.close()
else:
print(f"Connection failed: {client.last_error_as_txt}")from pyModbusTCP.client import ModbusClient
client = ModbusClient(host="192.168.1.100", auto_open=True)
# Read basic device identification
device_id = client.read_device_identification(read_code=1)
if device_id:
print(f"Vendor: {device_id.vendor_name}")
print(f"Product: {device_id.product_name}")
print(f"Version: {device_id.major_minor_revision}")
else:
print(f"Device identification failed: {client.last_error_as_txt}")from pyModbusTCP.client import ModbusClient
from pyModbusTCP.utils import decode_ieee, encode_ieee, word_list_to_long, long_list_to_word
client = ModbusClient(host="192.168.1.100", auto_open=True)
# Read IEEE 754 float from two registers
registers = client.read_holding_registers(0, 2)
if registers:
# Convert registers to 32-bit integer
float_as_int = word_list_to_long(registers, big_endian=True)[0]
# Decode as IEEE 754 float
float_value = decode_ieee(float_as_int)
print(f"Float value: {float_value}")
# Write IEEE 754 float to two registers
float_to_write = 123.456
# Encode as IEEE 754
float_as_int = encode_ieee(float_to_write)
# Convert to register list
registers = long_list_to_word([float_as_int], big_endian=True)
# Write to device
success = client.write_multiple_registers(0, registers)
if success:
print("Float written successfully")class ModbusClient._InternalError(Exception):
"""Base class for internal client errors."""
class ModbusClient._NetworkError(_InternalError):
"""
Network-related errors during Modbus communication.
Attributes:
code (int): Error code from MB_ERR constants
message (str): Error message
"""
class ModbusClient._ModbusExcept(_InternalError):
"""
Modbus protocol exceptions from server responses.
Attributes:
code (int): Exception code from EXP constants
"""class DeviceIdentificationResponse:
"""
Response object for device identification queries.
Attributes:
conformity_level (int): Supported access and object type
more_follows (int): Indicates if more objects are available (0x00 or 0xFF)
next_object_id (int): Next object ID for following transaction
objects_by_id (dict): Dictionary with requested objects (key: object_id, value: bytes)
Properties (read-only):
vendor_name (bytes): Vendor name (object ID 0x00)
product_code (bytes): Product code (object ID 0x01)
major_minor_revision (bytes): Major/minor revision (object ID 0x02)
vendor_url (bytes): Vendor URL (object ID 0x03)
product_name (bytes): Product name (object ID 0x04)
model_name (bytes): Model name (object ID 0x05)
user_application_name (bytes): User application name (object ID 0x06)
"""