A fully featured modbus protocol stack in python
—
PyModbus provides flexible data storage implementations for server contexts, supporting both sequential and sparse memory layouts with full customization capabilities. The datastore system manages coils, discrete inputs, holding registers, and input registers with configurable addressing modes.
Classes for managing server data contexts and device instances.
class ModbusServerContext:
def __init__(self, device_default=None, single=False, **kwargs):
"""
Server context managing multiple device contexts.
Parameters:
- device_default: Default device context for single device mode
- single (bool): True for single device mode, False for multi-device
- **kwargs: Additional context parameters
"""
def __getitem__(self, device_id: int):
"""Get device context by device ID."""
def __setitem__(self, device_id: int, context):
"""Set device context for device ID."""
def __delitem__(self, device_id: int):
"""Remove device context for device ID."""
def __contains__(self, device_id: int) -> bool:
"""Check if device ID exists in context."""
def __iter__(self):
"""Iterate over device IDs."""
class ModbusDeviceContext:
def __init__(self, co=None, di=None, hr=None, ir=None, zero_mode=False):
"""
Device context managing data stores for different data types.
Parameters:
- co: Coils data store
- di: Discrete inputs data store
- hr: Holding registers data store
- ir: Input registers data store
- zero_mode (bool): Use zero-based addressing if True
"""
def validate(self, fx: int, address: int, count: int = 1) -> bool:
"""
Validate data store access.
Parameters:
- fx (int): Function code (1=coils, 2=discrete inputs, 3=holding regs, 4=input regs)
- address (int): Starting address
- count (int): Number of values
Returns:
bool: True if access is valid
"""
def getValues(self, fx: int, address: int, count: int = 1) -> list:
"""
Get values from data store.
Parameters:
- fx (int): Function code
- address (int): Starting address
- count (int): Number of values to read
Returns:
list: Retrieved values
"""
def setValues(self, fx: int, address: int, values: list):
"""
Set values in data store.
Parameters:
- fx (int): Function code
- address (int): Starting address
- values (list): Values to write
"""
class ModbusBaseDeviceContext:
def decode(self, fx: int) -> str:
"""Decode function code to data type name."""
def encode(self, fx: int) -> int:
"""Encode data type name to function code."""
def reset(self):
"""Reset all data stores to default values."""
def register(self, fx: int, slave_id: int = 0x00, func=None):
"""Register custom function handler."""
def validate(self, fx: int, address: int, count: int = 1) -> bool:
"""Validate data store access."""
def getValues(self, fx: int, address: int, count: int = 1) -> list:
"""Get values from data store."""
def setValues(self, fx: int, address: int, values: list):
"""Set values in data store."""Different data storage backends for various use cases.
class ModbusSequentialDataBlock:
def __init__(self, address: int, values: list):
"""
Sequential data block with contiguous memory layout.
Parameters:
- address (int): Starting address
- values (list): Initial values (list of integers for registers, booleans for coils)
"""
def validate(self, address: int, count: int = 1) -> bool:
"""
Validate address range access.
Parameters:
- address (int): Starting address
- count (int): Number of values
Returns:
bool: True if address range is valid
"""
def getValues(self, address: int, count: int = 1) -> list:
"""
Get values from the data block.
Parameters:
- address (int): Starting address
- count (int): Number of values to read
Returns:
list: Retrieved values
"""
def setValues(self, address: int, values: list):
"""
Set values in the data block.
Parameters:
- address (int): Starting address
- values (list): Values to write
"""
def reset(self):
"""Reset all values to 0."""
class ModbusSparseDataBlock:
def __init__(self, values: dict = None):
"""
Sparse data block with dictionary-based storage.
Parameters:
- values (dict): Dictionary mapping addresses to values
"""
def validate(self, address: int, count: int = 1) -> bool:
"""Validate address range access."""
def getValues(self, address: int, count: int = 1) -> list:
"""Get values from specific addresses."""
def setValues(self, address: int, values: list):
"""Set values at specific addresses."""
def reset(self):
"""Clear all stored values."""Specialized context for simulation with advanced features.
class ModbusSimulatorContext:
def __init__(self, config: dict = None, custom_actions_module=None):
"""
Simulator context with configurable behavior.
Parameters:
- config (dict): Simulation configuration
- custom_actions_module: Module containing custom action functions
"""
def validate(self, fx: int, address: int, count: int = 1) -> bool:
"""Validate access with simulation rules."""
def getValues(self, fx: int, address: int, count: int = 1) -> list:
"""Get values with simulation behavior."""
def setValues(self, fx: int, address: int, values: list):
"""Set values with simulation actions."""
def apply_actions(self, fx: int, address: int, values: list):
"""Apply configured simulation actions."""
def reset(self):
"""Reset simulator to initial state."""from pymodbus.datastore import ModbusSequentialDataBlock, ModbusDeviceContext, ModbusServerContext
# Create data blocks for different data types
coils = ModbusSequentialDataBlock(0, [False] * 100) # 100 coils starting at address 0
discrete_inputs = ModbusSequentialDataBlock(0, [False] * 100) # 100 discrete inputs
holding_registers = ModbusSequentialDataBlock(0, [0] * 100) # 100 holding registers
input_registers = ModbusSequentialDataBlock(0, [0] * 100) # 100 input registers
# Create device context with data stores
device_context = ModbusDeviceContext(
co=coils, # coils
di=discrete_inputs, # discrete inputs
hr=holding_registers, # holding registers
ir=input_registers # input registers
)
# Create server context (single device mode)
server_context = ModbusServerContext(device_default=device_context, single=True)from pymodbus.datastore import ModbusSequentialDataBlock, ModbusDeviceContext, ModbusServerContext
# Create different data stores for different devices
def create_device_context(initial_values):
store = ModbusSequentialDataBlock(0, initial_values)
return ModbusDeviceContext(co=store, di=store, hr=store, ir=store)
# Device 1: Sensor data
device1 = create_device_context([10, 20, 30, 40, 50])
# Device 2: Actuator controls
device2 = create_device_context([100, 200, 300, 400, 500])
# Device 3: System status
device3 = create_device_context([1, 0, 1, 1, 0])
# Create multi-device server context
server_context = ModbusServerContext(single=False)
server_context[1] = device1 # Device ID 1
server_context[2] = device2 # Device ID 2
server_context[3] = device3 # Device ID 3
# Check device availability
if 1 in server_context:
print("Device 1 is available")
# Get device context
device1_context = server_context[1]
values = device1_context.getValues(3, 0, 5) # Read holding registers
print(f"Device 1 holding registers: {values}")from pymodbus.datastore import ModbusSparseDataBlock, ModbusDeviceContext, ModbusServerContext
# Create sparse data blocks with non-contiguous addresses
sparse_coils = ModbusSparseDataBlock({
0: True, # Address 0
10: False, # Address 10
100: True, # Address 100
1000: False # Address 1000
})
sparse_registers = ModbusSparseDataBlock({
0: 12345, # Address 0
50: 67890, # Address 50
500: 11111, # Address 500
5000: 22222 # Address 5000
})
# Create device context with sparse stores
device_context = ModbusDeviceContext(
co=sparse_coils,
di=sparse_coils,
hr=sparse_registers,
ir=sparse_registers
)
server_context = ModbusServerContext(device_default=device_context, single=True)
# Access sparse data
print(f"Coil at address 0: {device_context.getValues(1, 0, 1)}")
print(f"Register at address 50: {device_context.getValues(3, 50, 1)}")from pymodbus.datastore import ModbusDeviceContext, ModbusServerContext
class DatabaseDataBlock:
"""Custom data block that stores values in a database."""
def __init__(self, table_name, db_connection):
self.table_name = table_name
self.db = db_connection
def validate(self, address, count=1):
# Check if addresses exist in database
return True # Simplified validation
def getValues(self, address, count=1):
# Read values from database
cursor = self.db.cursor()
cursor.execute(
f"SELECT value FROM {self.table_name} WHERE address BETWEEN ? AND ? ORDER BY address",
(address, address + count - 1)
)
results = cursor.fetchall()
return [row[0] for row in results]
def setValues(self, address, values):
# Write values to database
cursor = self.db.cursor()
for i, value in enumerate(values):
cursor.execute(
f"INSERT OR REPLACE INTO {self.table_name} (address, value) VALUES (?, ?)",
(address + i, value)
)
self.db.commit()
# Usage (assuming database connection exists)
# db_store = DatabaseDataBlock('modbus_registers', db_connection)
# device_context = ModbusDeviceContext(hr=db_store, ir=db_store)from pymodbus.datastore import ModbusSequentialDataBlock, ModbusDeviceContext
class ValidatedDataBlock(ModbusSequentialDataBlock):
"""Data block with value validation."""
def __init__(self, address, values, min_value=0, max_value=65535):
super().__init__(address, values)
self.min_value = min_value
self.max_value = max_value
def setValues(self, address, values):
# Validate values before storing
validated_values = []
for value in values:
if isinstance(value, bool):
validated_values.append(value)
else:
# Clamp numeric values to valid range
clamped = max(self.min_value, min(self.max_value, int(value)))
validated_values.append(clamped)
super().setValues(address, validated_values)
# Create validated data store
validated_registers = ValidatedDataBlock(0, [0] * 100, min_value=0, max_value=1000)
device_context = ModbusDeviceContext(hr=validated_registers)
# Values will be clamped to valid range
device_context.setValues(3, 0, [500, 1500, -100]) # [500, 1000, 0]from pymodbus.datastore import ModbusSequentialDataBlock, ModbusDeviceContext
# Create data store
registers = ModbusSequentialDataBlock(0, list(range(100)))
# Zero-based addressing (default)
zero_based_context = ModbusDeviceContext(hr=registers, zero_mode=True)
# One-based addressing (traditional Modbus)
one_based_context = ModbusDeviceContext(hr=registers, zero_mode=False)
# With zero_mode=True, address 0 maps to first element
zero_values = zero_based_context.getValues(3, 0, 5) # Gets elements [0,1,2,3,4]
# With zero_mode=False, address 1 maps to first element
one_values = one_based_context.getValues(3, 1, 5) # Gets elements [0,1,2,3,4]
print(f"Zero-based: {zero_values}")
print(f"One-based: {one_values}")import threading
import time
from pymodbus.datastore import ModbusSequentialDataBlock, ModbusDeviceContext, ModbusServerContext
# Create data store
registers = ModbusSequentialDataBlock(0, [0] * 100)
device_context = ModbusDeviceContext(hr=registers)
server_context = ModbusServerContext(device_default=device_context, single=True)
def update_simulation_data():
"""Background thread to update simulation data."""
counter = 0
while True:
# Update some register values
device_context.setValues(3, 0, [counter]) # Counter at address 0
device_context.setValues(3, 1, [counter % 100]) # Modulo counter at address 1
device_context.setValues(3, 2, [int(time.time()) % 65536]) # Timestamp at address 2
counter += 1
time.sleep(1)
# Start background update thread
update_thread = threading.Thread(target=update_simulation_data, daemon=True)
update_thread.start()
# Server context now has dynamically updating datafrom pymodbus.datastore import ModbusSimulatorContext
# Simulator configuration
simulator_config = {
"setup": {
"co size": 100,
"di size": 100,
"hr size": 100,
"ir size": 100
},
"invalid": [
{"device": 1, "function": 3, "address": [5, 6]}, # Invalid addresses
],
"write": [
{"device": 1, "function": 16, "address": 10, "action": "random"} # Random values on write
],
"repeat": [
{"device": 1, "function": 3, "address": 20, "period": 5} # Update every 5 seconds
]
}
# Create simulator context
simulator_context = ModbusSimulatorContext(config=simulator_config)
# Simulator will handle reads/writes according to configurationThe datastore system uses function codes to identify data types:
# Function codes for data store access
FUNCTION_CODES = {
1: "coils", # Read Coils
2: "discrete_inputs", # Read Discrete Inputs
3: "holding_registers", # Read Holding Registers
4: "input_registers", # Read Input Registers
5: "coils", # Write Single Coil
6: "holding_registers", # Write Single Register
15: "coils", # Write Multiple Coils
16: "holding_registers" # Write Multiple Registers
}
# Example of function code usage
device_context = ModbusDeviceContext(hr=registers)
values = device_context.getValues(3, 0, 10) # Function code 3 = holding registers
device_context.setValues(3, 0, [1, 2, 3]) # Write to holding registers