CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pymodbus

A fully featured modbus protocol stack in python

Pending
Overview
Eval results
Files

pdu.mddocs/

PDU Messages

PyModbus provides complete implementation of all standard Modbus Protocol Data Units (PDUs) including bit operations, register operations, file operations, diagnostic functions, and device information queries. PDU classes can be used directly for custom protocol implementations or advanced use cases.

Capabilities

Base PDU Classes

Core PDU classes providing the foundation for all Modbus messages.

class ModbusPDU:
    def __init__(self, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...
    def calculate_rtu_frame_size(data: bytes) -> int: ...

class ExceptionResponse(ModbusPDU):
    function_code = 0x80
    def __init__(self, function_code, exception_code=None, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

class DecodePDU:
    def __init__(self, data: bytes, **kwargs): ...
    def decode(self) -> ModbusPDU: ...

Bit Access Operations

PDU messages for reading and writing coil (binary) values.

# Read Coils (FC 01)
class ReadCoilsRequest(ModbusPDU):
    function_code = 0x01
    def __init__(self, address=None, count=None, slave=0, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

class ReadCoilsResponse(ModbusPDU):
    function_code = 0x01
    def __init__(self, values=None, **kwargs): ...
    @property
    def bits(self) -> list: ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

# Read Discrete Inputs (FC 02)
class ReadDiscreteInputsRequest(ModbusPDU):
    function_code = 0x02
    def __init__(self, address=None, count=None, slave=0, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

class ReadDiscreteInputsResponse(ModbusPDU):
    function_code = 0x02
    def __init__(self, values=None, **kwargs): ...
    @property
    def bits(self) -> list: ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

# Write Single Coil (FC 05)
class WriteSingleCoilRequest(ModbusPDU):
    function_code = 0x05
    def __init__(self, address=None, value=None, slave=0, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

class WriteSingleCoilResponse(ModbusPDU):
    function_code = 0x05
    def __init__(self, address=None, value=None, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

# Write Multiple Coils (FC 15)
class WriteMultipleCoilsRequest(ModbusPDU):
    function_code = 0x0F
    def __init__(self, address=None, values=None, slave=0, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

class WriteMultipleCoilsResponse(ModbusPDU):
    function_code = 0x0F
    def __init__(self, address=None, count=None, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

Register Access Operations

PDU messages for reading and writing register (16-bit integer) values.

# Read Holding Registers (FC 03)
class ReadHoldingRegistersRequest(ModbusPDU):
    function_code = 0x03
    def __init__(self, address=None, count=None, slave=0, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

class ReadHoldingRegistersResponse(ModbusPDU):
    function_code = 0x03
    def __init__(self, values=None, **kwargs): ...
    @property
    def registers(self) -> list: ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

# Read Input Registers (FC 04)
class ReadInputRegistersRequest(ModbusPDU):
    function_code = 0x04
    def __init__(self, address=None, count=None, slave=0, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

class ReadInputRegistersResponse(ModbusPDU):
    function_code = 0x04
    def __init__(self, values=None, **kwargs): ...
    @property
    def registers(self) -> list: ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

# Write Single Register (FC 06)
class WriteSingleRegisterRequest(ModbusPDU):
    function_code = 0x06
    def __init__(self, address=None, value=None, slave=0, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

class WriteSingleRegisterResponse(ModbusPDU):
    function_code = 0x06
    def __init__(self, address=None, value=None, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

# Write Multiple Registers (FC 16)
class WriteMultipleRegistersRequest(ModbusPDU):
    function_code = 0x10
    def __init__(self, address=None, values=None, slave=0, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

class WriteMultipleRegistersResponse(ModbusPDU):
    function_code = 0x10
    def __init__(self, address=None, count=None, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

# Read/Write Multiple Registers (FC 23)
class ReadWriteMultipleRegistersRequest(ModbusPDU):
    function_code = 0x17
    def __init__(self, read_address=None, read_count=None, write_address=None, 
                 write_registers=None, slave=0, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

class ReadWriteMultipleRegistersResponse(ModbusPDU):
    function_code = 0x17
    def __init__(self, values=None, **kwargs): ...
    @property
    def registers(self) -> list: ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

# Mask Write Register (FC 22)
class MaskWriteRegisterRequest(ModbusPDU):
    function_code = 0x16
    def __init__(self, address=None, and_mask=0xFFFF, or_mask=0x0000, slave=0, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

class MaskWriteRegisterResponse(ModbusPDU):
    function_code = 0x16
    def __init__(self, address=None, and_mask=0xFFFF, or_mask=0x0000, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

File Record Operations

PDU messages for file record access operations.

# File Record data structure
class FileRecord:
    def __init__(self, file_number=0x00, record_number=0x0000, record_data=None): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

# Read File Record (FC 20)
class ReadFileRecordRequest(ModbusPDU):
    function_code = 0x14
    def __init__(self, records=None, slave=0, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

class ReadFileRecordResponse(ModbusPDU):
    function_code = 0x14
    def __init__(self, records=None, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

# Write File Record (FC 21)
class WriteFileRecordRequest(ModbusPDU):
    function_code = 0x15
    def __init__(self, records=None, slave=0, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

class WriteFileRecordResponse(ModbusPDU):
    function_code = 0x15
    def __init__(self, records=None, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

# Read FIFO Queue (FC 24)
class ReadFifoQueueRequest(ModbusPDU):
    function_code = 0x18
    def __init__(self, address=None, slave=0, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

class ReadFifoQueueResponse(ModbusPDU):
    function_code = 0x18
    def __init__(self, values=None, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

Diagnostic Operations

PDU messages for diagnostic and status operations.

# Diagnostic Status (FC 08)
class DiagnosticStatusRequest(ModbusPDU):
    function_code = 0x08
    def __init__(self, sub_function_code=0x0000, data=None, slave=0, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

class DiagnosticStatusResponse(ModbusPDU):
    function_code = 0x08
    def __init__(self, sub_function_code=0x0000, data=None, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

# Read Exception Status (FC 07)
class ReadExceptionStatusRequest(ModbusPDU):
    function_code = 0x07
    def __init__(self, slave=0, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

class ReadExceptionStatusResponse(ModbusPDU):
    function_code = 0x07
    def __init__(self, status=0x00, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

# Get Communication Event Counter (FC 11)
class GetCommEventCounterRequest(ModbusPDU):
    function_code = 0x0B
    def __init__(self, slave=0, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

class GetCommEventCounterResponse(ModbusPDU):
    function_code = 0x0B
    def __init__(self, status=0x0000, count=0x0000, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

# Get Communication Event Log (FC 12)
class GetCommEventLogRequest(ModbusPDU):
    function_code = 0x0C
    def __init__(self, slave=0, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

class GetCommEventLogResponse(ModbusPDU):
    function_code = 0x0C
    def __init__(self, status=0x0000, event_count=0x0000, message_count=0x0000, 
                 events=None, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

Device Information Operations

PDU messages for device identification and server information.

# Report Server ID (FC 17)
class ReportServerIdRequest(ModbusPDU):
    function_code = 0x11
    def __init__(self, slave=0, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

class ReportServerIdResponse(ModbusPDU):
    function_code = 0x11
    def __init__(self, identifier=None, status=0x00, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

# Read Device Information (FC 43)
class ReadDeviceInformationRequest(ModbusPDU):
    function_code = 0x2B
    def __init__(self, read_code=None, object_id=0x00, slave=0, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

class ReadDeviceInformationResponse(ModbusPDU):
    function_code = 0x2B
    def __init__(self, read_code=None, conformity=0x01, more_follows=0x00,
                 next_object_id=0x00, number_of_objects=0x00, information=None, **kwargs): ...
    def encode(self) -> bytes: ...
    def decode(self, data: bytes): ...

Device Support Classes

Supporting classes for device identification and statistics.

class ModbusDeviceIdentification:
    def __init__(self, info=None): ...
    def update(self, value: dict): ...
    def __getitem__(self, key): ...
    def __setitem__(self, key, value): ...
    def __iter__(self): ...
    def summary(self) -> dict: ...

class ModbusPlusStatistics:
    def __init__(self): ...
    def reset(self): ...
    def summary(self) -> dict: ...

Usage Examples

Direct PDU Usage

from pymodbus.pdu.bit_message import ReadCoilsRequest, ReadCoilsResponse
from pymodbus.pdu.register_message import ReadHoldingRegistersRequest

# Create a read coils request
request = ReadCoilsRequest(address=0, count=8, slave=1)
encoded_request = request.encode()
print(f"Encoded request: {encoded_request.hex()}")

# Decode the request
decoded_request = ReadCoilsRequest()
decoded_request.decode(encoded_request)
print(f"Address: {decoded_request.address}, Count: {decoded_request.count}")

# Create a response
response = ReadCoilsResponse(values=[True, False, True, True, False, False, True, False])
encoded_response = response.encode()
print(f"Encoded response: {encoded_response.hex()}")
print(f"Coil bits: {response.bits}")

Custom PDU Implementation

from pymodbus.pdu import ModbusPDU
import struct

class CustomReadRequest(ModbusPDU):
    function_code = 0x40  # Custom function code
    
    def __init__(self, address=None, count=None, slave=0, **kwargs):
        ModbusPDU.__init__(self, **kwargs)
        self.address = address
        self.count = count
        self.slave = slave
    
    def encode(self):
        return struct.pack('>HH', self.address, self.count)
    
    def decode(self, data):
        self.address, self.count = struct.unpack('>HH', data)

class CustomReadResponse(ModbusPDU):
    function_code = 0x40
    
    def __init__(self, values=None, **kwargs):
        ModbusPDU.__init__(self, **kwargs)
        self.values = values or []
    
    def encode(self):
        result = struct.pack('B', len(self.values))
        for value in self.values:
            result += struct.pack('>H', value)
        return result
    
    def decode(self, data):
        count = struct.unpack('B', data[0:1])[0]
        self.values = []
        for i in range(count):
            value = struct.unpack('>H', data[1 + i*2:3 + i*2])[0]
            self.values.append(value)

# Usage
request = CustomReadRequest(address=100, count=5, slave=1)
encoded = request.encode()
print(f"Custom request: {encoded.hex()}")

response = CustomReadResponse(values=[1000, 2000, 3000, 4000, 5000])
encoded = response.encode()
print(f"Custom response: {encoded.hex()}")

Exception Response Handling

from pymodbus.pdu.exceptionresponse import ExceptionResponse
from pymodbus.constants import ExcCodes

# Create exception response
exception = ExceptionResponse(function_code=0x03, exception_code=ExcCodes.ILLEGAL_ADDRESS)
encoded = exception.encode()
print(f"Exception response: {encoded.hex()}")

# Decode exception
decoded_exception = ExceptionResponse()
decoded_exception.decode(encoded)
print(f"Function code: {decoded_exception.function_code}")
print(f"Exception code: {decoded_exception.exception_code}")

PDU Message Factory

from pymodbus.pdu.decoders import DecodePDU
from pymodbus.pdu.bit_message import ReadCoilsRequest
from pymodbus.pdu.register_message import ReadHoldingRegistersRequest

# Register message types
DecodePDU.register(ReadCoilsRequest)
DecodePDU.register(ReadHoldingRegistersRequest)

# Create and encode different message types
coil_request = ReadCoilsRequest(address=0, count=10, slave=1)
register_request = ReadHoldingRegistersRequest(address=100, count=5, slave=2)

# Decode messages dynamically
decoder = DecodePDU()
decoded_coil = decoder.decode(coil_request.encode())
decoded_register = decoder.decode(register_request.encode())

print(f"Decoded coil request: FC={decoded_coil.function_code}, addr={decoded_coil.address}")
print(f"Decoded register request: FC={decoded_register.function_code}, addr={decoded_register.address}")

File Record Operations

from pymodbus.pdu.file_message import FileRecord, ReadFileRecordRequest, WriteFileRecordRequest

# Create file records
records_to_read = [
    FileRecord(file_number=1, record_number=0, record_data=None),
    FileRecord(file_number=1, record_number=1, record_data=None)
]

# Read file record request
read_request = ReadFileRecordRequest(records=records_to_read, slave=1)
encoded_read = read_request.encode()
print(f"Read file record request: {encoded_read.hex()}")

# Write file record request
records_to_write = [
    FileRecord(file_number=1, record_number=0, record_data=[0x1234, 0x5678])
]
write_request = WriteFileRecordRequest(records=records_to_write, slave=1)
encoded_write = write_request.encode()
print(f"Write file record request: {encoded_write.hex()}")

Device Information Queries

from pymodbus.pdu.mei_message import ReadDeviceInformationRequest
from pymodbus import ModbusDeviceIdentification

# Read device information request
device_info_request = ReadDeviceInformationRequest(read_code=0x01, object_id=0x00, slave=1)
encoded = device_info_request.encode()
print(f"Device info request: {encoded.hex()}")

# Create device identification
device_id = ModbusDeviceIdentification()
device_id.VendorName = 'PyModbus'
device_id.ProductCode = 'PM' 
device_id.VendorUrl = 'http://github.com/riptideio/pymodbus/'
device_id.ProductName = 'PyModbus Server'
device_id.ModelName = 'PyModbus Server'
device_id.MajorMinorRevision = '3.11.1'

print(f"Device identification: {device_id.summary()}")

Install with Tessl CLI

npx tessl i tessl/pypi-pymodbus

docs

client.md

datastore.md

index.md

pdu.md

server.md

tile.json