A fully featured modbus protocol stack in python
npx @tessl/cli install tessl/pypi-pymodbus@3.11.0A comprehensive Python library providing both client and server implementations of the Modbus protocol. PyModbus supports multiple transport layers (TCP, UDP, Serial, TLS), various framing formats (RTU, ASCII, Socket), and offers both synchronous and asynchronous APIs for industrial automation, SCADA systems, and IoT applications.
pip install pymodbuspip install pymodbus[serial] for serial support, pip install pymodbus[all] for full featuresimport pymodbus.client as ModbusClient
from pymodbus import FramerType, ModbusException, pymodbus_apply_logging_configCommon client imports:
from pymodbus.client import ModbusTcpClient, AsyncModbusTcpClient
from pymodbus.client import ModbusSerialClient, AsyncModbusSerialClientServer imports:
from pymodbus.server import ModbusTcpServer, StartTcpServer
from pymodbus.datastore import ModbusServerContext, ModbusDeviceContext
from pymodbus.datastore import ModbusSequentialDataBlockTransport and utility imports:
from pymodbus.transport import CommType, CommParams, ModbusProtocol, NULLMODEM_HOST
from pymodbus.exceptions import (
ConnectionException, ModbusIOException, ParameterException,
NoSuchIdException, NotImplementedException, MessageRegisterException
)from pymodbus.client import ModbusTcpClient
from pymodbus import FramerType, ModbusException
# Create and connect client
client = ModbusTcpClient('127.0.0.1', port=502, framer=FramerType.SOCKET)
client.connect()
try:
# Read holding registers
result = client.read_holding_registers(0, count=10, device_id=1)
if not result.isError():
print(f"Register values: {result.registers}")
# Write single register
client.write_register(0, 123, device_id=1)
# Read coils
result = client.read_coils(0, count=8, device_id=1)
if not result.isError():
print(f"Coil values: {result.bits}")
except ModbusException as e:
print(f"Modbus error: {e}")
finally:
client.close()import asyncio
from pymodbus.client import AsyncModbusTcpClient
async def run_async_client():
client = AsyncModbusTcpClient('127.0.0.1', port=502)
await client.connect()
try:
# Read input registers
result = await client.read_input_registers(0, count=5, device_id=1)
if not result.isError():
print(f"Input registers: {result.registers}")
finally:
client.close()
# Run the async client
asyncio.run(run_async_client())from pymodbus.server import StartTcpServer
from pymodbus.datastore import ModbusSequentialDataBlock, ModbusServerContext, ModbusDeviceContext
# Create datastore with some initial values
store = ModbusSequentialDataBlock(0, [0] * 100)
context = ModbusDeviceContext(
co=store, # coils
di=store, # discrete inputs
hr=store, # holding registers
ir=store # input registers
)
server_context = ModbusServerContext(device_default=context, single=True)
# Start server
StartTcpServer(context=server_context, address=('127.0.0.1', 5020))PyModbus uses a layered architecture:
The library supports both synchronous and asynchronous operations, allowing integration with various Python frameworks and applications.
Comprehensive client implementations supporting all standard Modbus function codes with both synchronous and asynchronous APIs across multiple transport protocols.
# Synchronous clients
class ModbusTcpClient: ...
class ModbusUdpClient: ...
class ModbusSerialClient: ...
class ModbusTlsClient: ...
# Asynchronous clients
class AsyncModbusTcpClient: ...
class AsyncModbusUdpClient: ...
class AsyncModbusSerialClient: ...
class AsyncModbusTlsClient: ...Full-featured Modbus server implementations with configurable data stores, custom function handlers, and simulator capabilities for testing and development.
# Server classes
class ModbusTcpServer: ...
class ModbusUdpServer: ...
class ModbusSerialServer: ...
class ModbusTlsServer: ...
# Server start functions
def StartTcpServer(context, **kwargs): ...
def StartAsyncTcpServer(context, **kwargs): ...Complete implementation of all standard Modbus function codes including bit operations, register operations, file operations, and diagnostic functions.
# Bit access messages
class ReadCoilsRequest: ...
class ReadDiscreteInputsRequest: ...
class WriteSingleCoilRequest: ...
class WriteMultipleCoilsRequest: ...
# Register access messages
class ReadHoldingRegistersRequest: ...
class ReadInputRegistersRequest: ...
class WriteSingleRegisterRequest: ...
class WriteMultipleRegistersRequest: ...Flexible data storage implementations supporting both sequential and sparse memory layouts with full customization capabilities.
class ModbusServerContext: ...
class ModbusDeviceContext: ...
class ModbusSequentialDataBlock: ...
class ModbusSparseDataBlock: ...
class ModbusSimulatorContext: ...Low-level transport classes for custom protocol implementations and advanced connection handling.
class CommType(int, Enum):
TCP = 1
TLS = 2
UDP = 3
SERIAL = 4
class CommParams:
def __init__(self, comm_name=None, comm_type=None, reconnect_delay=None,
reconnect_delay_max=0.0, timeout_connect=0.0, host="localhost",
port=0, source_address=None, handle_local_echo=False,
sslctx=None, baudrate=-1, bytesize=-1, parity='', stopbits=-1): ...
class ModbusProtocol:
def __init__(self, params, is_server=False): ...
async def transport_connect(self): ...
def transport_close(self): ...
# Special testing constant for nullmodem connections
NULLMODEM_HOST = "__pymodbus_nullmodem"Specific exception classes for different types of Modbus errors and connection issues.
class ModbusException(Exception):
def __init__(self, string): ...
def isError(self) -> bool: ...
class ConnectionException(ModbusException):
"""Error resulting from a bad connection."""
def __init__(self, string=""): ...
class ModbusIOException(ModbusException):
"""Error resulting from data i/o."""
def __init__(self, string="", function_code=None): ...
class ParameterException(ModbusException):
"""Error resulting from invalid parameter."""
def __init__(self, string=""): ...
class NoSuchIdException(ModbusException):
"""Error resulting from making a request to a id that does not exist."""
def __init__(self, string=""): ...
class NotImplementedException(ModbusException):
"""Error resulting from not implemented function."""
def __init__(self, string=""): ...
class MessageRegisterException(ModbusException):
"""Error resulting from failing to register a custom message request/response."""
def __init__(self, string=""): ...Tools for debugging Modbus communications and configuring logging output.
def pymodbus_apply_logging_config(level=logging.DEBUG, log_file_name=None):
"""
Apply basic logging configuration used by default by Pymodbus maintainers.
Parameters:
- level: Set log level (str or int)
- log_file_name: Optional log file name for file output
"""
def pymodbus_get_last_frames() -> str:
"""
Prepare and return last frames for automatic debugging.
Returns:
str: Formatted string of recent frame data for debugging
"""Web-based Modbus server simulator with REST API and browser interface for testing and development.
class ModbusSimulatorServer:
"""HTTP server for modbus simulator with web interface."""
def __init__(self, json_file="setup.json", custom_actions_module=None, http_host="localhost", http_port=8080): ...
async def start_server(self, modbus_server=None, modbus_device=None): ...
def stop_server(self): ...
def get_simulator_commandline():
"""Get command line interface for simulator."""# Frame types
class FramerType(str, Enum):
ASCII = "ascii"
RTU = "rtu"
SOCKET = "socket"
TLS = "tls"
# Status codes
class ModbusStatus(int, Enum):
WAITING = 0xFFFF
READY = 0x0000
ON = 0xFF00
OFF = 0x0000
# Exception codes
class ExcCodes(int, Enum):
ILLEGAL_FUNCTION = 0x01
ILLEGAL_ADDRESS = 0x02
ILLEGAL_VALUE = 0x03
DEVICE_FAILURE = 0x04
ACKNOWLEDGE = 0x05
DEVICE_BUSY = 0x06
MEMORY_PARITY_ERROR = 0x08
GATEWAY_PATH_UNAVIABLE = 0x0A
GATEWAY_NO_RESPONSE = 0x0B
# Device information types
class DeviceInformation(int, Enum):
BASIC = 0x01
REGULAR = 0x02
EXTENDED = 0x03
SPECIFIC = 0x04
# More data indicators
class MoreData(int, Enum):
NOTHING = 0x00
KEEP_READING = 0xFF
# Modbus Plus operations
class ModbusPlusOperation(int, Enum):
GET_STATISTICS = 0x03
CLEAR_STATISTICS = 0x04
# Base exception class
class ModbusException(Exception): ...
# Device identification
class ModbusDeviceIdentification:
def __init__(self, info=None): ...
def update(self, value): ...
def __getitem__(self, key): ...
def __setitem__(self, key, value): ...
# Logging configuration
def pymodbus_apply_logging_config(level=logging.DEBUG): ...