A fully featured modbus protocol stack in python
—
PyModbus provides full-featured Modbus server implementations supporting TCP, UDP, Serial, and TLS transports. Servers can be configured with custom data stores, function handlers, and simulation capabilities for development and testing.
Server implementations for different transport protocols with both blocking and async variants.
class ModbusTcpServer:
def __init__(self, context, framer=None, identity=None, address=None,
handler=None, allow_reuse_address=False, allow_reuse_port=False,
defer_start=False, backlog=20, loop=None, **kwargs): ...
def serve_forever(self): ...
def server_close(self): ...
def shutdown(self): ...
class ModbusUdpServer:
def __init__(self, context, framer=None, identity=None, address=None,
handler=None, allow_reuse_address=False, allow_reuse_port=False,
defer_start=False, backlog=20, loop=None, **kwargs): ...
def serve_forever(self): ...
def server_close(self): ...
def shutdown(self): ...
class ModbusSerialServer:
def __init__(self, context, framer=None, identity=None, port=None,
stopbits=1, bytesize=8, parity='N', baudrate=19200,
timeout=1, handle_local_echo=False, ignore_missing_slaves=False,
broadcast_enable=False, auto_open=True, auto_close=False, **kwargs): ...
def serve_forever(self): ...
def server_close(self): ...
def shutdown(self): ...
class ModbusTlsServer:
def __init__(self, context, framer=None, identity=None, address=None,
sslctx=None, certfile=None, keyfile=None, allow_reuse_address=False,
allow_reuse_port=False, defer_start=False, backlog=20, loop=None, **kwargs): ...
def serve_forever(self): ...
def server_close(self): ...
def shutdown(self): ...Convenience functions for starting servers with simplified configuration.
def StartTcpServer(context=None, identity=None, address=None, custom_functions=None,
**kwargs) -> None:
"""
Start TCP server (blocking).
Parameters:
- context: ModbusServerContext instance
- identity: ModbusDeviceIdentification instance
- address: tuple of (host, port), defaults to ('127.0.0.1', 502)
- custom_functions: list of custom function handlers
- **kwargs: additional server parameters
"""
def StartUdpServer(context=None, identity=None, address=None, custom_functions=None,
**kwargs) -> None:
"""
Start UDP server (blocking).
Parameters:
- context: ModbusServerContext instance
- identity: ModbusDeviceIdentification instance
- address: tuple of (host, port), defaults to ('127.0.0.1', 502)
- custom_functions: list of custom function handlers
- **kwargs: additional server parameters
"""
def StartSerialServer(context=None, identity=None, port=None, custom_functions=None,
**kwargs) -> None:
"""
Start Serial server (blocking).
Parameters:
- context: ModbusServerContext instance
- identity: ModbusDeviceIdentification instance
- port: serial port device (e.g., '/dev/ttyUSB0', 'COM1')
- custom_functions: list of custom function handlers
- **kwargs: additional serial and server parameters
"""
def StartTlsServer(context=None, identity=None, address=None, sslctx=None,
custom_functions=None, **kwargs) -> None:
"""
Start TLS server (blocking).
Parameters:
- context: ModbusServerContext instance
- identity: ModbusDeviceIdentification instance
- address: tuple of (host, port), defaults to ('127.0.0.1', 802)
- sslctx: SSL context or None for default
- custom_functions: list of custom function handlers
- **kwargs: additional server parameters
"""
# Asynchronous server start functions
async def StartAsyncTcpServer(context=None, identity=None, address=None,
custom_functions=None, **kwargs) -> ModbusTcpServer:
"""Start async TCP server."""
async def StartAsyncUdpServer(context=None, identity=None, address=None,
custom_functions=None, **kwargs) -> ModbusUdpServer:
"""Start async UDP server."""
async def StartAsyncSerialServer(context=None, identity=None, port=None,
custom_functions=None, **kwargs) -> ModbusSerialServer:
"""Start async Serial server."""
async def StartAsyncTlsServer(context=None, identity=None, address=None, sslctx=None,
custom_functions=None, **kwargs) -> ModbusTlsServer:
"""Start async TLS server."""Functions for stopping running servers.
def ServerStop() -> None:
"""Stop currently running synchronous server."""
async def ServerAsyncStop() -> None:
"""Stop currently running asynchronous server."""Base classes providing common server functionality.
class ModbusBaseServer:
def __init__(self, context, framer=None, identity=None, **kwargs): ...
def serve_forever(self): ...
def server_close(self): ...
def shutdown(self): ...
def process_request(self, request, client): ...
def validate_device_id(self, device_id): ...HTTP-based simulator server for testing and development.
class ModbusSimulatorServer:
def __init__(self, modbus_server=None, modbus_device=None, http_host="localhost",
http_port=8080, json_file=None, custom_actions_module=None,
web_app_path=None): ...
async def run_forever(self, only_start=False): ...
def stop(self): ...
def build_html(self, html_template, json_data): ...Classes for managing server data and device contexts.
class ModbusServerContext:
def __init__(self, device_default=None, single=False, **kwargs): ...
def __getitem__(self, device_id): ...
def __setitem__(self, device_id, context): ...
def __delitem__(self, device_id): ...
def __contains__(self, device_id): ...
class ModbusDeviceContext:
def __init__(self, co=None, di=None, hr=None, ir=None, zero_mode=False): ...
def validate(self, fx, address, count=1): ...
def getValues(self, fx, address, count=1): ...
def setValues(self, fx, address, values): ...
class ModbusBaseDeviceContext:
def decode(self, fx): ...
def encode(self, fx): ...
def reset(self): ...
def register(self, fx, slave_id=0x00, func=None): ...
def validate(self, fx, address, count=1): ...
def getValues(self, fx, address, count=1): ...
def setValues(self, fx, address, values): ...from pymodbus.server import StartTcpServer
from pymodbus.datastore import ModbusSequentialDataBlock, ModbusServerContext, ModbusDeviceContext
# Create data stores for different data types
store = ModbusSequentialDataBlock(0, [0] * 100)
# Create device context with data stores
context = ModbusDeviceContext(
co=store, # coils
di=store, # discrete inputs
hr=store, # holding registers
ir=store # input registers
)
# Create server context
server_context = ModbusServerContext(device_default=context, single=True)
# Start TCP server (blocking)
StartTcpServer(
context=server_context,
address=('127.0.0.1', 5020)
)from pymodbus.server import StartTcpServer
from pymodbus.datastore import ModbusSequentialDataBlock, ModbusServerContext, ModbusDeviceContext
# Create data stores for device 1
device1_store = ModbusSequentialDataBlock(0, [10, 20, 30, 40, 50])
device1_context = ModbusDeviceContext(
co=device1_store,
di=device1_store,
hr=device1_store,
ir=device1_store
)
# Create data stores for device 2
device2_store = ModbusSequentialDataBlock(0, [100, 200, 300, 400, 500])
device2_context = ModbusDeviceContext(
co=device2_store,
di=device2_store,
hr=device2_store,
ir=device2_store
)
# Create server context with multiple devices
server_context = ModbusServerContext(single=False)
server_context[1] = device1_context # Device ID 1
server_context[2] = device2_context # Device ID 2
# Start server
StartTcpServer(context=server_context, address=('0.0.0.0', 502))from pymodbus.server import StartTcpServer
from pymodbus.datastore import ModbusSequentialDataBlock, ModbusServerContext, ModbusDeviceContext
from pymodbus import ModbusDeviceIdentification
# Create data store
store = ModbusSequentialDataBlock(0, [0] * 100)
context = ModbusDeviceContext(co=store, di=store, hr=store, ir=store)
server_context = ModbusServerContext(device_default=context, single=True)
# Configure device identity
identity = ModbusDeviceIdentification()
identity.VendorName = 'PyModbus'
identity.ProductCode = 'PM'
identity.VendorUrl = 'http://github.com/riptideio/pymodbus/'
identity.ProductName = 'PyModbus Server'
identity.ModelName = 'PyModbus Server'
identity.MajorMinorRevision = '3.11.1'
# Start server with identity
StartTcpServer(
context=server_context,
identity=identity,
address=('127.0.0.1', 502)
)import asyncio
from pymodbus.server import StartAsyncTcpServer, ServerAsyncStop
from pymodbus.datastore import ModbusSequentialDataBlock, ModbusServerContext, ModbusDeviceContext
async def run_async_server():
# Create data store
store = ModbusSequentialDataBlock(0, list(range(100)))
context = ModbusDeviceContext(co=store, di=store, hr=store, ir=store)
server_context = ModbusServerContext(device_default=context, single=True)
# Start async server
server = await StartAsyncTcpServer(
context=server_context,
address=('127.0.0.1', 5020)
)
try:
# Server runs until interrupted
await asyncio.Event().wait()
except KeyboardInterrupt:
print("Server interrupted, shutting down...")
finally:
await ServerAsyncStop()
# Run async server
asyncio.run(run_async_server())from pymodbus.server import StartSerialServer
from pymodbus.datastore import ModbusSequentialDataBlock, ModbusServerContext, ModbusDeviceContext
from pymodbus import FramerType
# Create data store
store = ModbusSequentialDataBlock(0, [0] * 100)
context = ModbusDeviceContext(co=store, di=store, hr=store, ir=store)
server_context = ModbusServerContext(device_default=context, single=True)
# Start serial server
StartSerialServer(
context=server_context,
port='/dev/ttyUSB0', # Linux
# port='COM1', # Windows
framer=FramerType.RTU,
baudrate=9600,
bytesize=8,
parity='N',
stopbits=1,
timeout=1
)from pymodbus.server import StartTcpServer
from pymodbus.datastore import ModbusSequentialDataBlock, ModbusServerContext, ModbusDeviceContext
from pymodbus.pdu import ModbusPDU
from pymodbus import ModbusDeviceIdentification
# Define custom function
class CustomFunction(ModbusPDU):
function_code = 0x40 # Custom function code
def __init__(self, **kwargs):
ModbusPDU.__init__(self, **kwargs)
def encode(self):
return b''
def decode(self, data):
pass
def execute(self, context):
# Custom function logic
return CustomFunctionResponse()
class CustomFunctionResponse(ModbusPDU):
function_code = 0x40
def __init__(self, **kwargs):
ModbusPDU.__init__(self, **kwargs)
def encode(self):
return b'Custom response'
# Create server context
store = ModbusSequentialDataBlock(0, [0] * 100)
context = ModbusDeviceContext(co=store, di=store, hr=store, ir=store)
server_context = ModbusServerContext(device_default=context, single=True)
# Start server with custom function
StartTcpServer(
context=server_context,
custom_functions=[CustomFunction],
address=('127.0.0.1', 502)
)import ssl
from pymodbus.server import StartTlsServer
from pymodbus.datastore import ModbusSequentialDataBlock, ModbusServerContext, ModbusDeviceContext
# Create SSL context
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
ssl_context.load_cert_chain(certfile="server.crt", keyfile="server.key")
# Create data store
store = ModbusSequentialDataBlock(0, [0] * 100)
context = ModbusDeviceContext(co=store, di=store, hr=store, ir=store)
server_context = ModbusServerContext(device_default=context, single=True)
# Start TLS server
StartTlsServer(
context=server_context,
sslctx=ssl_context,
address=('127.0.0.1', 802)
)import asyncio
from pymodbus.server import ModbusSimulatorServer
async def run_simulator():
# Create simulator server
simulator = ModbusSimulatorServer(
http_host="localhost",
http_port=8080,
json_file="simulation_config.json"
)
try:
# Start simulator (includes HTTP interface)
await simulator.run_forever()
except KeyboardInterrupt:
print("Stopping simulator...")
simulator.stop()
# Run simulator
asyncio.run(run_simulator())from pymodbus.server import StartTcpServer
from pymodbus.datastore import ModbusSequentialDataBlock, ModbusServerContext, ModbusDeviceContext
# Create data store
store = ModbusSequentialDataBlock(0, [0] * 100)
context = ModbusDeviceContext(co=store, di=store, hr=store, ir=store)
server_context = ModbusServerContext(device_default=context, single=True)
# Start server with various configuration options
StartTcpServer(
context=server_context,
address=('0.0.0.0', 502),
allow_reuse_address=True,
allow_reuse_port=True,
defer_start=False,
backlog=20,
ignore_missing_slaves=True,
broadcast_enable=False,
timeout=1
)