A fully featured modbus protocol stack in python
—
PyModbus provides complete implementation of all standard Modbus Protocol Data Units (PDUs) including bit operations, register operations, file operations, diagnostic functions, and device information queries. PDU classes can be used directly for custom protocol implementations or advanced use cases.
Core PDU classes providing the foundation for all Modbus messages.
class ModbusPDU:
def __init__(self, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
def calculate_rtu_frame_size(data: bytes) -> int: ...
class ExceptionResponse(ModbusPDU):
function_code = 0x80
def __init__(self, function_code, exception_code=None, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
class DecodePDU:
def __init__(self, data: bytes, **kwargs): ...
def decode(self) -> ModbusPDU: ...PDU messages for reading and writing coil (binary) values.
# Read Coils (FC 01)
class ReadCoilsRequest(ModbusPDU):
function_code = 0x01
def __init__(self, address=None, count=None, slave=0, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
class ReadCoilsResponse(ModbusPDU):
function_code = 0x01
def __init__(self, values=None, **kwargs): ...
@property
def bits(self) -> list: ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
# Read Discrete Inputs (FC 02)
class ReadDiscreteInputsRequest(ModbusPDU):
function_code = 0x02
def __init__(self, address=None, count=None, slave=0, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
class ReadDiscreteInputsResponse(ModbusPDU):
function_code = 0x02
def __init__(self, values=None, **kwargs): ...
@property
def bits(self) -> list: ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
# Write Single Coil (FC 05)
class WriteSingleCoilRequest(ModbusPDU):
function_code = 0x05
def __init__(self, address=None, value=None, slave=0, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
class WriteSingleCoilResponse(ModbusPDU):
function_code = 0x05
def __init__(self, address=None, value=None, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
# Write Multiple Coils (FC 15)
class WriteMultipleCoilsRequest(ModbusPDU):
function_code = 0x0F
def __init__(self, address=None, values=None, slave=0, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
class WriteMultipleCoilsResponse(ModbusPDU):
function_code = 0x0F
def __init__(self, address=None, count=None, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...PDU messages for reading and writing register (16-bit integer) values.
# Read Holding Registers (FC 03)
class ReadHoldingRegistersRequest(ModbusPDU):
function_code = 0x03
def __init__(self, address=None, count=None, slave=0, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
class ReadHoldingRegistersResponse(ModbusPDU):
function_code = 0x03
def __init__(self, values=None, **kwargs): ...
@property
def registers(self) -> list: ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
# Read Input Registers (FC 04)
class ReadInputRegistersRequest(ModbusPDU):
function_code = 0x04
def __init__(self, address=None, count=None, slave=0, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
class ReadInputRegistersResponse(ModbusPDU):
function_code = 0x04
def __init__(self, values=None, **kwargs): ...
@property
def registers(self) -> list: ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
# Write Single Register (FC 06)
class WriteSingleRegisterRequest(ModbusPDU):
function_code = 0x06
def __init__(self, address=None, value=None, slave=0, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
class WriteSingleRegisterResponse(ModbusPDU):
function_code = 0x06
def __init__(self, address=None, value=None, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
# Write Multiple Registers (FC 16)
class WriteMultipleRegistersRequest(ModbusPDU):
function_code = 0x10
def __init__(self, address=None, values=None, slave=0, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
class WriteMultipleRegistersResponse(ModbusPDU):
function_code = 0x10
def __init__(self, address=None, count=None, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
# Read/Write Multiple Registers (FC 23)
class ReadWriteMultipleRegistersRequest(ModbusPDU):
function_code = 0x17
def __init__(self, read_address=None, read_count=None, write_address=None,
write_registers=None, slave=0, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
class ReadWriteMultipleRegistersResponse(ModbusPDU):
function_code = 0x17
def __init__(self, values=None, **kwargs): ...
@property
def registers(self) -> list: ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
# Mask Write Register (FC 22)
class MaskWriteRegisterRequest(ModbusPDU):
function_code = 0x16
def __init__(self, address=None, and_mask=0xFFFF, or_mask=0x0000, slave=0, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
class MaskWriteRegisterResponse(ModbusPDU):
function_code = 0x16
def __init__(self, address=None, and_mask=0xFFFF, or_mask=0x0000, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...PDU messages for file record access operations.
# File Record data structure
class FileRecord:
def __init__(self, file_number=0x00, record_number=0x0000, record_data=None): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
# Read File Record (FC 20)
class ReadFileRecordRequest(ModbusPDU):
function_code = 0x14
def __init__(self, records=None, slave=0, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
class ReadFileRecordResponse(ModbusPDU):
function_code = 0x14
def __init__(self, records=None, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
# Write File Record (FC 21)
class WriteFileRecordRequest(ModbusPDU):
function_code = 0x15
def __init__(self, records=None, slave=0, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
class WriteFileRecordResponse(ModbusPDU):
function_code = 0x15
def __init__(self, records=None, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
# Read FIFO Queue (FC 24)
class ReadFifoQueueRequest(ModbusPDU):
function_code = 0x18
def __init__(self, address=None, slave=0, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
class ReadFifoQueueResponse(ModbusPDU):
function_code = 0x18
def __init__(self, values=None, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...PDU messages for diagnostic and status operations.
# Diagnostic Status (FC 08)
class DiagnosticStatusRequest(ModbusPDU):
function_code = 0x08
def __init__(self, sub_function_code=0x0000, data=None, slave=0, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
class DiagnosticStatusResponse(ModbusPDU):
function_code = 0x08
def __init__(self, sub_function_code=0x0000, data=None, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
# Read Exception Status (FC 07)
class ReadExceptionStatusRequest(ModbusPDU):
function_code = 0x07
def __init__(self, slave=0, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
class ReadExceptionStatusResponse(ModbusPDU):
function_code = 0x07
def __init__(self, status=0x00, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
# Get Communication Event Counter (FC 11)
class GetCommEventCounterRequest(ModbusPDU):
function_code = 0x0B
def __init__(self, slave=0, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
class GetCommEventCounterResponse(ModbusPDU):
function_code = 0x0B
def __init__(self, status=0x0000, count=0x0000, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
# Get Communication Event Log (FC 12)
class GetCommEventLogRequest(ModbusPDU):
function_code = 0x0C
def __init__(self, slave=0, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
class GetCommEventLogResponse(ModbusPDU):
function_code = 0x0C
def __init__(self, status=0x0000, event_count=0x0000, message_count=0x0000,
events=None, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...PDU messages for device identification and server information.
# Report Server ID (FC 17)
class ReportServerIdRequest(ModbusPDU):
function_code = 0x11
def __init__(self, slave=0, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
class ReportServerIdResponse(ModbusPDU):
function_code = 0x11
def __init__(self, identifier=None, status=0x00, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
# Read Device Information (FC 43)
class ReadDeviceInformationRequest(ModbusPDU):
function_code = 0x2B
def __init__(self, read_code=None, object_id=0x00, slave=0, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...
class ReadDeviceInformationResponse(ModbusPDU):
function_code = 0x2B
def __init__(self, read_code=None, conformity=0x01, more_follows=0x00,
next_object_id=0x00, number_of_objects=0x00, information=None, **kwargs): ...
def encode(self) -> bytes: ...
def decode(self, data: bytes): ...Supporting classes for device identification and statistics.
class ModbusDeviceIdentification:
def __init__(self, info=None): ...
def update(self, value: dict): ...
def __getitem__(self, key): ...
def __setitem__(self, key, value): ...
def __iter__(self): ...
def summary(self) -> dict: ...
class ModbusPlusStatistics:
def __init__(self): ...
def reset(self): ...
def summary(self) -> dict: ...from pymodbus.pdu.bit_message import ReadCoilsRequest, ReadCoilsResponse
from pymodbus.pdu.register_message import ReadHoldingRegistersRequest
# Create a read coils request
request = ReadCoilsRequest(address=0, count=8, slave=1)
encoded_request = request.encode()
print(f"Encoded request: {encoded_request.hex()}")
# Decode the request
decoded_request = ReadCoilsRequest()
decoded_request.decode(encoded_request)
print(f"Address: {decoded_request.address}, Count: {decoded_request.count}")
# Create a response
response = ReadCoilsResponse(values=[True, False, True, True, False, False, True, False])
encoded_response = response.encode()
print(f"Encoded response: {encoded_response.hex()}")
print(f"Coil bits: {response.bits}")from pymodbus.pdu import ModbusPDU
import struct
class CustomReadRequest(ModbusPDU):
function_code = 0x40 # Custom function code
def __init__(self, address=None, count=None, slave=0, **kwargs):
ModbusPDU.__init__(self, **kwargs)
self.address = address
self.count = count
self.slave = slave
def encode(self):
return struct.pack('>HH', self.address, self.count)
def decode(self, data):
self.address, self.count = struct.unpack('>HH', data)
class CustomReadResponse(ModbusPDU):
function_code = 0x40
def __init__(self, values=None, **kwargs):
ModbusPDU.__init__(self, **kwargs)
self.values = values or []
def encode(self):
result = struct.pack('B', len(self.values))
for value in self.values:
result += struct.pack('>H', value)
return result
def decode(self, data):
count = struct.unpack('B', data[0:1])[0]
self.values = []
for i in range(count):
value = struct.unpack('>H', data[1 + i*2:3 + i*2])[0]
self.values.append(value)
# Usage
request = CustomReadRequest(address=100, count=5, slave=1)
encoded = request.encode()
print(f"Custom request: {encoded.hex()}")
response = CustomReadResponse(values=[1000, 2000, 3000, 4000, 5000])
encoded = response.encode()
print(f"Custom response: {encoded.hex()}")from pymodbus.pdu.exceptionresponse import ExceptionResponse
from pymodbus.constants import ExcCodes
# Create exception response
exception = ExceptionResponse(function_code=0x03, exception_code=ExcCodes.ILLEGAL_ADDRESS)
encoded = exception.encode()
print(f"Exception response: {encoded.hex()}")
# Decode exception
decoded_exception = ExceptionResponse()
decoded_exception.decode(encoded)
print(f"Function code: {decoded_exception.function_code}")
print(f"Exception code: {decoded_exception.exception_code}")from pymodbus.pdu.decoders import DecodePDU
from pymodbus.pdu.bit_message import ReadCoilsRequest
from pymodbus.pdu.register_message import ReadHoldingRegistersRequest
# Register message types
DecodePDU.register(ReadCoilsRequest)
DecodePDU.register(ReadHoldingRegistersRequest)
# Create and encode different message types
coil_request = ReadCoilsRequest(address=0, count=10, slave=1)
register_request = ReadHoldingRegistersRequest(address=100, count=5, slave=2)
# Decode messages dynamically
decoder = DecodePDU()
decoded_coil = decoder.decode(coil_request.encode())
decoded_register = decoder.decode(register_request.encode())
print(f"Decoded coil request: FC={decoded_coil.function_code}, addr={decoded_coil.address}")
print(f"Decoded register request: FC={decoded_register.function_code}, addr={decoded_register.address}")from pymodbus.pdu.file_message import FileRecord, ReadFileRecordRequest, WriteFileRecordRequest
# Create file records
records_to_read = [
FileRecord(file_number=1, record_number=0, record_data=None),
FileRecord(file_number=1, record_number=1, record_data=None)
]
# Read file record request
read_request = ReadFileRecordRequest(records=records_to_read, slave=1)
encoded_read = read_request.encode()
print(f"Read file record request: {encoded_read.hex()}")
# Write file record request
records_to_write = [
FileRecord(file_number=1, record_number=0, record_data=[0x1234, 0x5678])
]
write_request = WriteFileRecordRequest(records=records_to_write, slave=1)
encoded_write = write_request.encode()
print(f"Write file record request: {encoded_write.hex()}")from pymodbus.pdu.mei_message import ReadDeviceInformationRequest
from pymodbus import ModbusDeviceIdentification
# Read device information request
device_info_request = ReadDeviceInformationRequest(read_code=0x01, object_id=0x00, slave=1)
encoded = device_info_request.encode()
print(f"Device info request: {encoded.hex()}")
# Create device identification
device_id = ModbusDeviceIdentification()
device_id.VendorName = 'PyModbus'
device_id.ProductCode = 'PM'
device_id.VendorUrl = 'http://github.com/riptideio/pymodbus/'
device_id.ProductName = 'PyModbus Server'
device_id.ModelName = 'PyModbus Server'
device_id.MajorMinorRevision = '3.11.1'
print(f"Device identification: {device_id.summary()}")