A fully featured modbus protocol stack in python
—
PyModbus provides comprehensive client implementations supporting all standard Modbus function codes. Clients are available in both synchronous and asynchronous variants across multiple transport protocols (TCP, UDP, Serial, TLS).
PyModbus offers dedicated client classes for each transport protocol in both synchronous and asynchronous variants.
# Synchronous client classes
class ModbusTcpClient:
def __init__(self, host="127.0.0.1", port=502, framer=FramerType.SOCKET,
timeout=3, retries=3, retry_on_empty=False, close_comm_on_error=False,
strict=True, source_address=None, **kwargs): ...
def connect(self) -> bool: ...
def close(self) -> None: ...
class ModbusUdpClient:
def __init__(self, host="127.0.0.1", port=502, framer=FramerType.SOCKET,
timeout=3, retries=3, retry_on_empty=False, **kwargs): ...
def connect(self) -> bool: ...
def close(self) -> None: ...
class ModbusSerialClient:
def __init__(self, port=None, framer=FramerType.RTU, timeout=3, retries=3,
retry_on_empty=False, close_comm_on_error=False, strict=True,
baudrate=19200, bytesize=8, parity="N", stopbits=1, **kwargs): ...
def connect(self) -> bool: ...
def close(self) -> None: ...
class ModbusTlsClient:
def __init__(self, host="127.0.0.1", port=802, framer=FramerType.TLS,
timeout=3, retries=3, sslctx=None, certfile=None, keyfile=None,
password=None, server_hostname=None, **kwargs): ...
def connect(self) -> bool: ...
def close(self) -> None: ...
# Asynchronous client classes
class AsyncModbusTcpClient:
def __init__(self, host="127.0.0.1", port=502, framer=FramerType.SOCKET,
timeout=3, retries=3, retry_on_empty=False, close_comm_on_error=False,
strict=True, source_address=None, **kwargs): ...
async def connect(self) -> bool: ...
def close(self) -> None: ...
class AsyncModbusUdpClient:
def __init__(self, host="127.0.0.1", port=502, framer=FramerType.SOCKET,
timeout=3, retries=3, retry_on_empty=False, **kwargs): ...
async def connect(self) -> bool: ...
def close(self) -> None: ...
class AsyncModbusSerialClient:
def __init__(self, port=None, framer=FramerType.RTU, timeout=3, retries=3,
retry_on_empty=False, close_comm_on_error=False, strict=True,
baudrate=19200, bytesize=8, parity="N", stopbits=1, **kwargs): ...
async def connect(self) -> bool: ...
def close(self) -> None: ...
class AsyncModbusTlsClient:
def __init__(self, host="127.0.0.1", port=802, framer=FramerType.TLS,
timeout=3, retries=3, sslctx=None, certfile=None, keyfile=None,
password=None, server_hostname=None, **kwargs): ...
async def connect(self) -> bool: ...
def close(self) -> None: ...All client classes inherit core functionality from base classes and the ModbusClientMixin providing standard Modbus operations.
# Base client classes
class ModbusBaseClient:
"""Base asynchronous client class."""
def __init__(self, framer=None, **kwargs): ...
def connect(self) -> bool: ...
def close(self) -> None: ...
def execute(self, request) -> ModbusResponse: ...
def register(self, custom_response_class): ...
def set_max_no_responses(self, count: int): ...
class ModbusBaseSyncClient:
"""Base synchronous client class."""
def __init__(self, framer=None, **kwargs): ...
def connect(self) -> bool: ...
def close(self) -> None: ...
def execute(self, request) -> ModbusResponse: ...
def register(self, custom_response_class): ...
def set_max_no_responses(self, count: int): ...
class ModbusClientMixin:
"""
Mixin class providing all Modbus function code methods.
All client classes inherit from this mixin to provide standard operations.
"""
# Note: All the operation methods below (coils, registers, etc.)
# are provided by this mixin classdef connect(self) -> bool: ...
def close(self) -> None: ...
def is_socket_open(self) -> bool: ...Functions for reading and writing coil (binary) values.
def read_coils(self, address, count=1, device_id=0) -> ReadCoilsResponse:
"""
Read coil status (FC 01).
Parameters:
- address (int): Starting address
- count (int): Number of coils to read
- device_id (int): Device identifier
Returns:
ReadCoilsResponse with .bits attribute containing list of boolean values
"""
def write_coil(self, address, value, device_id=0) -> WriteSingleCoilResponse:
"""
Write single coil (FC 05).
Parameters:
- address (int): Coil address
- value (bool): Coil value
- device_id (int): Device identifier
Returns:
WriteSingleCoilResponse
"""
def write_coils(self, address, values, device_id=0) -> WriteMultipleCoilsResponse:
"""
Write multiple coils (FC 15).
Parameters:
- address (int): Starting address
- values (list): List of boolean values
- device_id (int): Device identifier
Returns:
WriteMultipleCoilsResponse
"""Functions for reading discrete input values.
def read_discrete_inputs(self, address, count=1, device_id=0) -> ReadDiscreteInputsResponse:
"""
Read discrete input status (FC 02).
Parameters:
- address (int): Starting address
- count (int): Number of inputs to read
- device_id (int): Device identifier
Returns:
ReadDiscreteInputsResponse with .bits attribute containing list of boolean values
"""Functions for reading and writing register (16-bit integer) values.
def read_holding_registers(self, address, count=1, device_id=0) -> ReadHoldingRegistersResponse:
"""
Read holding registers (FC 03).
Parameters:
- address (int): Starting address
- count (int): Number of registers to read
- device_id (int): Device identifier
Returns:
ReadHoldingRegistersResponse with .registers attribute containing list of int values
"""
def read_input_registers(self, address, count=1, device_id=0) -> ReadInputRegistersResponse:
"""
Read input registers (FC 04).
Parameters:
- address (int): Starting address
- count (int): Number of registers to read
- device_id (int): Device identifier
Returns:
ReadInputRegistersResponse with .registers attribute containing list of int values
"""
def write_register(self, address, value, device_id=0) -> WriteSingleRegisterResponse:
"""
Write single register (FC 06).
Parameters:
- address (int): Register address
- value (int): Register value (0-65535)
- device_id (int): Device identifier
Returns:
WriteSingleRegisterResponse
"""
def write_registers(self, address, values, device_id=0) -> WriteMultipleRegistersResponse:
"""
Write multiple registers (FC 16).
Parameters:
- address (int): Starting address
- values (list): List of register values
- device_id (int): Device identifier
Returns:
WriteMultipleRegistersResponse
"""
def readwrite_registers(self, read_address, read_count, write_address, write_registers, device_id=0) -> ReadWriteMultipleRegistersResponse:
"""
Read and write multiple registers in single transaction (FC 23).
Parameters:
- read_address (int): Starting read address
- read_count (int): Number of registers to read
- write_address (int): Starting write address
- write_registers (list): Values to write
- device_id (int): Device identifier
Returns:
ReadWriteMultipleRegistersResponse with .registers attribute
"""
def mask_write_register(self, address, and_mask, or_mask, device_id=0) -> MaskWriteRegisterResponse:
"""
Mask write register (FC 22).
Parameters:
- address (int): Register address
- and_mask (int): AND mask
- or_mask (int): OR mask
- device_id (int): Device identifier
Returns:
MaskWriteRegisterResponse
"""Functions for reading and writing file records.
def read_file_record(self, records, device_id=0) -> ReadFileRecordResponse:
"""
Read file record (FC 20).
Parameters:
- records (list): List of file record requests
- device_id (int): Device identifier
Returns:
ReadFileRecordResponse
"""
def write_file_record(self, records, device_id=0) -> WriteFileRecordResponse:
"""
Write file record (FC 21).
Parameters:
- records (list): List of file record data
- device_id (int): Device identifier
Returns:
WriteFileRecordResponse
"""
def read_fifo_queue(self, address, device_id=0) -> ReadFifoQueueResponse:
"""
Read FIFO queue (FC 24).
Parameters:
- address (int): FIFO pointer address
- device_id (int): Device identifier
Returns:
ReadFifoQueueResponse
"""Functions for device diagnostics and status reporting.
def diag_read_diagnostic_register(self, device_id=0) -> DiagnosticStatusResponse:
"""Read diagnostic register."""
def diag_change_ascii_input_delimiter(self, device_id=0) -> DiagnosticStatusResponse:
"""Change ASCII input delimiter."""
def diag_force_listen_only_mode(self, device_id=0) -> DiagnosticStatusResponse:
"""Force listen only mode."""
def diag_clear_counters(self, device_id=0) -> DiagnosticStatusResponse:
"""Clear counters and diagnostic register."""
def diag_read_bus_message_count(self, device_id=0) -> DiagnosticStatusResponse:
"""Read bus message count."""
def diag_read_bus_comm_error_count(self, device_id=0) -> DiagnosticStatusResponse:
"""Read bus communication error count."""
def diag_read_bus_exception_error_count(self, device_id=0) -> DiagnosticStatusResponse:
"""Read bus exception error count."""
def diag_read_device_message_count(self, device_id=0) -> DiagnosticStatusResponse:
"""Read device message count."""
def diag_read_device_no_response_count(self, device_id=0) -> DiagnosticStatusResponse:
"""Read device no response count."""
def diag_read_device_nak_count(self, device_id=0) -> DiagnosticStatusResponse:
"""Read device NAK count."""
def diag_read_device_busy_count(self, device_id=0) -> DiagnosticStatusResponse:
"""Read device busy count."""
def diag_read_bus_char_overrun_count(self, device_id=0) -> DiagnosticStatusResponse:
"""Read bus character overrun count."""
def diag_read_iop_overrun_count(self, device_id=0) -> DiagnosticStatusResponse:
"""Read IOP overrun count."""
def diag_clear_overrun_counter(self, device_id=0) -> DiagnosticStatusResponse:
"""Clear overrun counter and flag."""
def diag_get_clear_modbus_plus(self, device_id=0) -> DiagnosticStatusResponse:
"""Get/clear Modbus Plus statistics."""Functions for retrieving device information and identification.
def read_device_information(self, read_code=None, object_id=0x00, device_id=0) -> ReadDeviceInformationResponse:
"""
Read device information (FC 43).
Parameters:
- read_code (int): Read device ID code (0x01, 0x02, 0x03, 0x04)
- object_id (int): Object ID to read
- device_id (int): Device identifier
Returns:
ReadDeviceInformationResponse with device information
"""
def report_device_id(self, device_id=0) -> ReportServerIdResponse:
"""
Report device identification (FC 17).
Parameters:
- device_id (int): Device identifier
Returns:
ReportServerIdResponse with device ID information
"""Functions for reading communication events and counters.
def read_exception_status(self, device_id=0) -> ReadExceptionStatusResponse:
"""
Read exception status (FC 07).
Returns:
ReadExceptionStatusResponse with status byte
"""
def get_com_event_counter(self, device_id=0) -> GetCommEventCounterResponse:
"""
Get communication event counter (FC 11).
Returns:
GetCommEventCounterResponse with event count
"""
def get_com_event_log(self, device_id=0) -> GetCommEventLogResponse:
"""
Get communication event log (FC 12).
Returns:
GetCommEventLogResponse with event log data
"""Utility methods for converting between Python data types and Modbus register values.
def convert_to_registers(self, value, data_type) -> list:
"""
Convert Python value to Modbus registers.
Parameters:
- value: Python value to convert
- data_type: Data type constant (INT16, INT32, FLOAT32, etc.)
Returns:
List of register values
"""
def convert_from_registers(self, registers, data_type):
"""
Convert Modbus registers to Python value.
Parameters:
- registers (list): List of register values
- data_type: Data type constant
Returns:
Converted Python value
"""from pymodbus.client import ModbusTcpClient
from pymodbus import FramerType, ModbusException
client = ModbusTcpClient('192.168.1.10', port=502, framer=FramerType.SOCKET)
try:
if client.connect():
# Read holding registers
result = client.read_holding_registers(0, count=10, device_id=1)
if not result.isError():
print(f"Registers: {result.registers}")
else:
print(f"Error reading registers: {result}")
# Write multiple registers
values = [100, 200, 300, 400, 500]
result = client.write_registers(10, values, device_id=1)
if result.isError():
print(f"Error writing registers: {result}")
else:
print("Failed to connect to Modbus server")
except ModbusException as e:
print(f"Modbus exception: {e}")
finally:
client.close()from pymodbus.client import ModbusSerialClient
from pymodbus import FramerType
client = ModbusSerialClient(
port='/dev/ttyUSB0',
baudrate=9600,
bytesize=8,
parity='N',
stopbits=1,
framer=FramerType.RTU,
timeout=1
)
if client.connect():
try:
# Read coils
result = client.read_coils(0, count=16, device_id=1)
if not result.isError():
print(f"Coil states: {result.bits}")
# Write single coil
client.write_coil(5, True, device_id=1)
finally:
client.close()import asyncio
from pymodbus.client import AsyncModbusTcpClient
async def async_modbus_client():
client = AsyncModbusTcpClient('127.0.0.1', port=502)
try:
await client.connect()
# Read input registers
result = await client.read_input_registers(0, count=5, device_id=1)
if not result.isError():
print(f"Input registers: {result.registers}")
# Read/write operation
result = await client.readwrite_registers(
read_address=0, read_count=5,
write_address=10, write_registers=[1, 2, 3],
device_id=1
)
if not result.isError():
print(f"Read values: {result.registers}")
finally:
client.close()
# Run async client
asyncio.run(async_modbus_client())from pymodbus.client import ModbusTcpClient
import struct
client = ModbusTcpClient('127.0.0.1')
client.connect()
# Write float32 value using struct
float_value = 123.45
# Pack float as 2 registers (32-bit big-endian)
packed = struct.pack('>f', float_value)
registers = [struct.unpack('>H', packed[0:2])[0], struct.unpack('>H', packed[2:4])[0]]
client.write_registers(0, registers, device_id=1)
# Read float32 value
result = client.read_holding_registers(0, count=2, device_id=1)
if not result.isError():
# Unpack registers back to float
packed = struct.pack('>HH', result.registers[0], result.registers[1])
float_value = struct.unpack('>f', packed)[0]
print(f"Float value: {float_value}")
client.close()