CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pymodbustcp

A simple Modbus/TCP client library for Python

Pending
Overview
Eval results
Files

server.mddocs/

Server Operations

TCP server functionality for creating Modbus servers that can handle client requests. The server components include the main ModbusServer class, data storage management through DataBank, request handling via DataHandler, and device identification configuration.

Capabilities

ModbusServer Class

Main server class that creates a multi-threaded TCP server to handle Modbus client connections.

class ModbusServer:
    """
    Modbus TCP server with multi-threaded client handling.
    
    Parameters:
        host (str): Server bind address (default: "localhost")
        port (int): TCP port number (default: 502)
        no_block (bool): Non-blocking server mode (default: False)
        ipv6 (bool): Enable IPv6 support (default: False)
        data_bank (DataBank): Data storage instance (default: new DataBank())
        data_hdl (DataHandler): Request handler instance (default: new DataHandler())
        ext_engine (object): External engine instance (default: None)
        device_id (DeviceIdentification): Device identification info (default: None)
    """
    def __init__(self, host="localhost", port=502, no_block=False, ipv6=False, data_bank=None, data_hdl=None, ext_engine=None, device_id=None):
        """Initialize Modbus TCP server."""

def start(self):
    """
    Start the server.
    
    Returns:
        bool: True if server started successfully, False otherwise
    """

def stop(self):
    """Stop the server."""

def is_run(self):
    """
    Check if server is running.
    
    Returns:
        bool: True if server is running, False otherwise
    """

Properties

@property
def host(self):
    """str: Server bind address."""

@property
def port(self):
    """int: TCP port number."""

@property
def data_bank(self):
    """DataBank: Data storage instance."""

@property
def data_hdl(self):
    """DataHandler: Request handler instance."""

@property
def device_identification(self):
    """DeviceIdentification: Device identification instance (read-write)."""

DataBank Class

Default data storage implementation providing in-memory storage for all Modbus data types.

class DataBank:
    """
    Default data bank for storing coils, discrete inputs, holding registers, and input registers.
    
    Parameters:
        coils_size (int): Number of coils (default: 0x10000)
        coils_default_value (bool): Default coil value (default: False)
        d_inputs_size (int): Number of discrete inputs (default: 0x10000)
        d_inputs_default_value (bool): Default discrete input value (default: False)
        h_regs_size (int): Number of holding registers (default: 0x10000)
        h_regs_default_value (int): Default holding register value (default: 0)
        i_regs_size (int): Number of input registers (default: 0x10000)
        i_regs_default_value (int): Default input register value (default: 0)
        virtual_mode (bool): Enable virtual mode for external data management (default: False)
    """
    def __init__(self, coils_size=0x10000, coils_default_value=False, 
                d_inputs_size=0x10000, d_inputs_default_value=False,
                h_regs_size=0x10000, h_regs_default_value=0,
                i_regs_size=0x10000, i_regs_default_value=0,
                virtual_mode=False):
        """Initialize data bank with specified sizes and default values."""

Coil Operations

def get_coils(self, address, number=1, srv_info=None):
    """
    Get coils values.
    
    Parameters:
        address (int): Starting coil address (0-65535)
        number (int): Number of coils to read (default: 1)
        srv_info (dict): Server information (optional)
    
    Returns:
        list[bool] or None: List of coil values, None on error
    """

def set_coils(self, address, bit_list, srv_info=None):
    """
    Set coils values.
    
    Parameters:
        address (int): Starting coil address (0-65535)
        bit_list (list[bool]): List of coil values to set
        srv_info (dict): Server information (optional)
    
    Returns:
        bool: True on success, False on error
    """

Discrete Input Operations

def get_discrete_inputs(self, address, number=1, srv_info=None):
    """
    Get discrete inputs values.
    
    Parameters:
        address (int): Starting input address (0-65535)
        number (int): Number of inputs to read (default: 1)
        srv_info (dict): Server information (optional)
    
    Returns:
        list[bool] or None: List of input values, None on error
    """

def set_discrete_inputs(self, address, bit_list):
    """
    Set discrete inputs values.
    
    Parameters:
        address (int): Starting input address (0-65535)
        bit_list (list[bool]): List of input values to set
    
    Returns:
        bool: True on success, False on error
    """

Holding Register Operations

def get_holding_registers(self, address, number=1, srv_info=None):
    """
    Get holding registers values.
    
    Parameters:
        address (int): Starting register address (0-65535)
        number (int): Number of registers to read (default: 1)
        srv_info (dict): Server information (optional)
    
    Returns:
        list[int] or None: List of register values (0-65535), None on error
    """

def set_holding_registers(self, address, word_list, srv_info=None):
    """
    Set holding registers values.
    
    Parameters:
        address (int): Starting register address (0-65535)
        word_list (list[int]): List of register values to set (0-65535)
        srv_info (dict): Server information (optional)
    
    Returns:
        bool: True on success, False on error
    """

Input Register Operations

def get_input_registers(self, address, number=1, srv_info=None):
    """
    Get input registers values.
    
    Parameters:
        address (int): Starting register address (0-65535)
        number (int): Number of registers to read (default: 1)
        srv_info (dict): Server information (optional)
    
    Returns:
        list[int] or None: List of register values (0-65535), None on error
    """

def set_input_registers(self, address, word_list):
    """
    Set input registers values.
    
    Parameters:
        address (int): Starting register address (0-65535)
        word_list (list[int]): List of register values to set (0-65535)
    
    Returns:
        bool: True on success, False on error
    """

Change Callbacks

def on_coils_change(self, address, from_value, to_value, srv_info):
    """
    Callback called when coils are modified (override in subclass).
    
    Parameters:
        address (int): Starting address that changed
        from_value (list[bool]): Previous values
        to_value (list[bool]): New values
        srv_info (dict): Server information
    """

def on_holding_registers_change(self, address, from_value, to_value, srv_info):
    """
    Callback called when holding registers are modified (override in subclass).
    
    Parameters:
        address (int): Starting address that changed
        from_value (list[int]): Previous values
        to_value (list[int]): New values
        srv_info (dict): Server information
    """

Static Methods (Override Points)

@staticmethod
def get_bits(*_args, **_kwargs):
    """Override this method for custom bit access logic."""

@staticmethod
def set_bits(*_args, **_kwargs):
    """Override this method for custom bit write logic."""

@staticmethod
def get_words(*_args, **_kwargs):
    """Override this method for custom word access logic."""

@staticmethod
def set_words(*_args, **_kwargs):
    """Override this method for custom word write logic."""

DataHandler Class

Request handler that processes Modbus function codes and interacts with the data bank.

class DataHandler:
    """
    Modbus request handler for processing function codes.
    
    Parameters:
        data_bank (DataBank): Data storage instance (default: new DataBank())
    """
    def __init__(self, data_bank=None):
        """Initialize request handler with data bank."""

Request Processing Methods

def read_coils(self, address, count, srv_info):
    """
    Handle read coils request (function code 1).
    
    Parameters:
        address (int): Starting coil address
        count (int): Number of coils to read
        srv_info (dict): Server information
    
    Returns:
        DataHandler.Return: Response object with success/error status and data
    """

def write_coils(self, address, bits_l, srv_info):
    """
    Handle write coils request (function code 15).
    
    Parameters:
        address (int): Starting coil address
        bits_l (list[bool]): List of coil values to write
        srv_info (dict): Server information
    
    Returns:
        DataHandler.Return: Response object with success/error status
    """

def read_d_inputs(self, address, count, srv_info):
    """
    Handle read discrete inputs request (function code 2).
    
    Parameters:
        address (int): Starting input address
        count (int): Number of inputs to read
        srv_info (dict): Server information
    
    Returns:
        DataHandler.Return: Response object with success/error status and data
    """

def read_h_regs(self, address, count, srv_info):
    """
    Handle read holding registers request (function code 3).
    
    Parameters:
        address (int): Starting register address
        count (int): Number of registers to read
        srv_info (dict): Server information
    
    Returns:
        DataHandler.Return: Response object with success/error status and data
    """

def write_h_regs(self, address, words_l, srv_info):
    """
    Handle write holding registers request (function code 16).
    
    Parameters:
        address (int): Starting register address
        words_l (list[int]): List of register values to write
        srv_info (dict): Server information
    
    Returns:
        DataHandler.Return: Response object with success/error status
    """

def read_i_regs(self, address, count, srv_info):
    """
    Handle read input registers request (function code 4).
    
    Parameters:
        address (int): Starting register address
        count (int): Number of registers to read
        srv_info (dict): Server information
    
    Returns:
        DataHandler.Return: Response object with success/error status and data
    """

DeviceIdentification Class

Container for device identification objects used in function code 43 responses.

class DeviceIdentification:
    """
    Device identification information container.
    
    Parameters:
        vendor_name (bytes): Vendor name (object ID 0x00, default: b'')
        product_code (bytes): Product code (object ID 0x01, default: b'')
        major_minor_revision (bytes): Major/minor revision (object ID 0x02, default: b'')
        vendor_url (bytes): Vendor URL (object ID 0x03, default: b'')
        product_name (bytes): Product name (object ID 0x04, default: b'')
        model_name (bytes): Model name (object ID 0x05, default: b'')
        user_application_name (bytes): User application name (object ID 0x06, default: b'')
        objects_id (dict): Additional identification objects (object_id: bytes_value, default: None)
    """
    def __init__(self, vendor_name=b'', product_code=b'', major_minor_revision=b'', 
                vendor_url=b'', product_name=b'', model_name=b'', user_application_name=b'', objects_id=None):
        """Initialize device identification with standard objects and optional additional objects."""

def __getitem__(self, key):
    """
    Get identification object by ID.
    
    Parameters:
        key (int): Object ID (0-255)
    
    Returns:
        bytes: Object value
    """

def __setitem__(self, key, value):
    """
    Set identification object by ID.
    
    Parameters:
        key (int): Object ID (0-255)
        value (bytes): Object value
    """

def items(self):
    """
    Get all identification objects as key-value pairs.
    
    Returns:
        dict_items: All object ID and value pairs
    """

@property
def vendor_name(self):
    """bytes: Vendor name (object ID 0x00, read-only)."""

@property
def product_code(self):
    """bytes: Product code (object ID 0x01, read-only)."""

@property
def major_minor_revision(self):
    """bytes: Major/minor revision (object ID 0x02, read-only)."""

@property
def vendor_url(self):
    """bytes: Vendor URL (object ID 0x03, read-only)."""

@property
def product_name(self):
    """bytes: Product name (object ID 0x04, read-only)."""

@property
def model_name(self):
    """bytes: Model name (object ID 0x05, read-only)."""

@property
def user_application_name(self):
    """bytes: User application name (object ID 0x06, read-only)."""

Usage Examples

Basic Server Setup

from pyModbusTCP.server import ModbusServer, DataBank

# Create data bank with initial values
data_bank = DataBank()
data_bank.set_holding_registers(0, [100, 200, 300, 400, 500])
data_bank.set_coils(0, [True, False, True, False])

# Create and start server
server = ModbusServer(host="0.0.0.0", port=502, data_bank=data_bank)
if server.start():
    print("Server started successfully")
    # Server runs in background thread
    # Call server.stop() when done
else:
    print("Failed to start server")

Custom Data Bank with Change Monitoring

from pyModbusTCP.server import ModbusServer, DataBank

class MonitoredDataBank(DataBank):
    def on_holding_registers_change(self, address, from_value, to_value, srv_info):
        print(f"Registers {address}-{address+len(to_value)-1} changed:")
        print(f"  From: {from_value}")
        print(f"  To: {to_value}")
        print(f"  Client: {srv_info.get('client_addr', 'unknown')}")
    
    def on_coils_change(self, address, from_value, to_value, srv_info):
        print(f"Coils {address}-{address+len(to_value)-1} changed:")
        print(f"  From: {from_value}")
        print(f"  To: {to_value}")

# Use custom data bank
custom_data_bank = MonitoredDataBank()
server = ModbusServer(data_bank=custom_data_bank)
server.start()

Server with Device Identification

from pyModbusTCP.server import ModbusServer, DataBank, DeviceIdentification

# Create device identification
device_id = DeviceIdentification(
    vendor_name=b'My Company',
    product_code=b'MC-001',
    major_minor_revision=b'1.0',
    vendor_url=b'https://mycompany.com',
    product_name=b'Industrial Controller',
    model_name=b'IC-2024',
    user_application_name=b'Factory Automation'
)

# Configure server with device identification
data_bank = DataBank()
server = ModbusServer(data_bank=data_bank)
server.device_identification = device_id
server.start()

Custom Request Handler

from pyModbusTCP.server import ModbusServer, DataBank, DataHandler
from pyModbusTCP.constants import EXP_ILLEGAL_FUNCTION

class CustomDataHandler(DataHandler):
    def read_h_regs(self, address, count, srv_info):
        # Custom logic for holding register reads
        if address >= 1000:  # Restrict access to certain addresses
            return self.Return(exp_code=EXP_ILLEGAL_FUNCTION)
        
        # Log the request
        client_addr = srv_info.get('client_addr', 'unknown')
        print(f"Client {client_addr} reading {count} registers from {address}")
        
        # Use default implementation
        return super().read_h_regs(address, count, srv_info)
    
    def write_h_regs(self, address, words_l, srv_info):
        # Custom validation for writes
        if any(w > 1000 for w in words_l):  # Limit register values
            return self.Return(exp_code=EXP_DATA_VALUE)
        
        return super().write_h_regs(address, words_l, srv_info)

# Use custom handler
data_bank = DataBank()
custom_handler = CustomDataHandler(data_bank)
server = ModbusServer(data_bank=data_bank, data_hdl=custom_handler)
server.start()

Virtual Data Generation

from pyModbusTCP.server import ModbusServer, DataBank
import time
import threading
import random

class VirtualDataBank(DataBank):
    def __init__(self):
        super().__init__()
        self._running = True
        self._thread = threading.Thread(target=self._update_data)
        self._thread.daemon = True
        self._thread.start()
    
    def _update_data(self):
        """Generate virtual data continuously."""
        while self._running:
            # Simulate sensor readings
            temperature = int(random.uniform(18.0, 25.0) * 10)  # Temperature * 10
            humidity = int(random.uniform(30.0, 70.0))
            pressure = int(random.uniform(990.0, 1020.0))
            
            # Update input registers (sensor readings)
            self.set_input_registers(0, [temperature, humidity, pressure])
            
            # Update status coils
            self.set_discrete_inputs(0, [True, temperature > 220, humidity > 60])
            
            time.sleep(1)
    
    def stop(self):
        self._running = False

# Use virtual data bank
virtual_data = VirtualDataBank()
server = ModbusServer(data_bank=virtual_data)
server.start()

Exception Classes

class ModbusServer.Error(Exception):
    """Base class for server errors."""

class ModbusServer.NetworkError(Error):
    """Network-related server errors."""

class ModbusServer.DataFormatError(Error):
    """Data format validation errors."""

class DataHandler.ModbusExcept(Exception):
    """
    Exception for Modbus protocol errors in request handling.
    
    Attributes:
        exp_code (int): Modbus exception code
        data (bytes): Optional additional data
    """

Container Classes

class ModbusServer.ClientInfo:
    """
    Container for client connection information.
    
    Attributes:
        address (str): Client IP address
        port (int): Client port number
    """

class ModbusServer.ServerInfo:
    """
    Container for server instance information.
    
    Attributes:
        host (str): Server bind address
        port (int): Server port number
    """

class ModbusServer.SessionData:
    """
    Container for server session data and state information.
    
    Attributes:
        client_addr (str): Connected client address
        unit_id (int): Current request unit ID
        transaction_id (int): Current transaction ID
    """

class ModbusServer.Frame:
    """Raw Modbus frame container."""

class ModbusServer.MBAP:
    """Modbus Application Protocol header container."""

class ModbusServer.PDU:
    """Protocol Data Unit container."""

Types

class DataHandler.Return:
    """
    Return object for data handler methods.
    
    Attributes:
        exp_code (int): Exception code (0 for success, other values indicate specific errors)
        data (list): Response data (for read operations, None for write operations)
    
    Methods:
        ok() -> bool: Returns True if operation was successful (exp_code == 0)
    """

Install with Tessl CLI

npx tessl i tessl/pypi-pymodbustcp

docs

client.md

constants.md

index.md

server.md

utils.md

tile.json