CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-python-iptables

Python bindings for iptables providing programmatic control over Linux netfilter rules

Pending
Overview
Eval results
Files

high-level-interface.mddocs/

High-Level Interface

Simplified dictionary-based interface for common iptables operations provided through the iptc.easy module. This interface eliminates the need for object instantiation and provides convenient functions for table, chain, and rule management using dictionary representations of rules.

Capabilities

Table Management Functions

High-level functions for managing entire tables and their contents.

def flush_all(ipv6: bool = False) -> None:
    """
    Flush all available tables.
    
    Args:
        ipv6: Use IPv6 tables if True
    """

def flush_table(table: str, ipv6: bool = False, raise_exc: bool = True) -> None:
    """
    Flush all rules from a specific table.
    
    Args:
        table: Table name (filter, nat, mangle, raw, security)
        ipv6: Use IPv6 tables if True
        raise_exc: Raise exception on failure if True
    """

def flush_chain(table: str, chain: str, ipv6: bool = False, raise_exc: bool = True) -> None:
    """
    Flush all rules from a specific chain.
    
    Args:
        table: Table name
        chain: Chain name
        ipv6: Use IPv6 tables if True
        raise_exc: Raise exception on failure if True
    """

def zero_all(ipv6: bool = False) -> None:
    """
    Zero all counters in all available tables.
    
    Args:
        ipv6: Use IPv6 tables if True
    """

def zero_table(table: str, ipv6: bool = False, raise_exc: bool = True) -> None:
    """
    Zero all counters in a specific table.
    
    Args:
        table: Table name
        ipv6: Use IPv6 tables if True
        raise_exc: Raise exception on failure if True
    """

def zero_chain(table: str, chain: str, ipv6: bool = False, raise_exc: bool = True) -> None:
    """
    Zero all counters in a specific chain.
    
    Args:
        table: Table name
        chain: Chain name
        ipv6: Use IPv6 tables if True
        raise_exc: Raise exception on failure if True
    """

def get_tables(ipv6: bool = False) -> list:
    """
    Get list of available table names.
    
    Args:
        ipv6: Use IPv6 tables if True
        
    Returns:
        List of table name strings
    """

Chain Management Functions

Functions for creating, deleting, and managing chains within tables.

def get_chains(table: str, ipv6: bool = False) -> list:
    """
    Get list of chain names in a table.
    
    Args:
        table: Table name
        ipv6: Use IPv6 tables if True
        
    Returns:
        List of chain name strings
    """

def has_chain(table: str, chain: str, ipv6: bool = False) -> bool:
    """
    Check if a chain exists in a table.
    
    Args:
        table: Table name
        chain: Chain name
        ipv6: Use IPv6 tables if True
        
    Returns:
        True if chain exists
    """

def add_chain(table: str, chain: str, ipv6: bool = False, raise_exc: bool = True) -> bool:
    """
    Create a new chain in a table.
    
    Args:
        table: Table name
        chain: Chain name
        ipv6: Use IPv6 tables if True
        raise_exc: Raise exception on failure if True
        
    Returns:
        True if chain was created successfully
    """

def delete_chain(table: str, chain: str, ipv6: bool = False, flush: bool = False, raise_exc: bool = True) -> None:
    """
    Delete a chain from a table.
    
    Args:
        table: Table name
        chain: Chain name
        ipv6: Use IPv6 tables if True
        flush: Flush chain before deleting if True
        raise_exc: Raise exception on failure if True
    """

Rule Management Functions

Functions for adding, removing, and querying rules using dictionary representations.

def has_rule(table: str, chain: str, rule_d: dict, ipv6: bool = False) -> bool:
    """
    Check if a rule exists in a chain.
    
    Args:
        table: Table name
        chain: Chain name
        rule_d: Rule dictionary specification
        ipv6: Use IPv6 tables if True
        
    Returns:
        True if rule exists
    """

def add_rule(table: str, chain: str, rule_d: dict, position: int = 0, ipv6: bool = False) -> None:
    """
    Add a rule to a chain.
    
    Args:
        table: Table name
        chain: Chain name
        rule_d: Rule dictionary specification
        position: Position to insert (0 = append to end)
        ipv6: Use IPv6 tables if True
    """

def insert_rule(table: str, chain: str, rule_d: dict, ipv6: bool = False) -> None:
    """
    Insert a rule at the beginning of a chain.
    
    Args:
        table: Table name
        chain: Chain name
        rule_d: Rule dictionary specification
        ipv6: Use IPv6 tables if True
    """

def delete_rule(table: str, chain: str, rule_d: dict, ipv6: bool = False, raise_exc: bool = True) -> None:
    """
    Delete a rule from a chain.
    
    Args:
        table: Table name
        chain: Chain name
        rule_d: Rule dictionary specification
        ipv6: Use IPv6 tables if True
        raise_exc: Raise exception on failure if True
    """

def get_rule(table: str, chain: str, position: int = 0, ipv6: bool = False, raise_exc: bool = True) -> dict:
    """
    Get rule(s) from a chain.
    
    Args:
        table: Table name
        chain: Chain name
        position: Rule position (0 = all rules, >0 = specific rule)
        ipv6: Use IPv6 tables if True
        raise_exc: Raise exception on failure if True
        
    Returns:
        Rule dictionary (if position > 0) or list of rule dictionaries (if position = 0)
    """

def replace_rule(table: str, chain: str, old_rule_d: dict, new_rule_d: dict, ipv6: bool = False) -> None:
    """
    Replace an existing rule with a new rule.
    
    Args:
        table: Table name
        chain: Chain name
        old_rule_d: Existing rule dictionary to replace
        new_rule_d: New rule dictionary
        ipv6: Use IPv6 tables if True
    """

def get_rule_counters(table: str, chain: str, rule_d: dict, ipv6: bool = False) -> tuple:
    """
    Get packet and byte counters for a specific rule.
    
    Args:
        table: Table name
        chain: Chain name
        rule_d: Rule dictionary specification
        ipv6: Use IPv6 tables if True
        
    Returns:
        Tuple of (packets, bytes)
    """

def get_rule_position(table: str, chain: str, rule_d: dict, ipv6: bool = False) -> int:
    """
    Get the position of a rule in a chain.
    
    Args:
        table: Table name
        chain: Chain name
        rule_d: Rule dictionary specification
        ipv6: Use IPv6 tables if True
        
    Returns:
        Rule position (1-based index)
    """

Policy Management Functions

Functions for managing chain default policies.

def get_policy(table: str, chain: str, ipv6: bool = False) -> str:
    """
    Get the default policy for a chain.
    
    Args:
        table: Table name
        chain: Chain name (built-in chains only)
        ipv6: Use IPv6 tables if True
        
    Returns:
        Policy name string (ACCEPT, DROP, etc.)
    """

def set_policy(table: str, chain: str, policy: str = 'ACCEPT', ipv6: bool = False) -> None:
    """
    Set the default policy for a chain.
    
    Args:
        table: Table name
        chain: Chain name (built-in chains only)
        policy: Policy name (ACCEPT, DROP, QUEUE, RETURN)
        ipv6: Use IPv6 tables if True
    """

Dump Functions

Functions for retrieving complete iptables state as structured data.

def dump_all(ipv6: bool = False) -> dict:
    """
    Get complete iptables state as nested dictionary.
    
    Args:
        ipv6: Use IPv6 tables if True
        
    Returns:
        Dictionary with all tables, chains, and rules
    """

def dump_table(table: str, ipv6: bool = False) -> dict:
    """
    Get table state as dictionary.
    
    Args:
        table: Table name
        ipv6: Use IPv6 tables if True
        
    Returns:
        Dictionary with chains and rules for the table
    """

def dump_chain(table: str, chain: str, ipv6: bool = False) -> list:
    """
    Get all rules in a chain as list of dictionaries.
    
    Args:
        table: Table name
        chain: Chain name
        ipv6: Use IPv6 tables if True
        
    Returns:
        List of rule dictionaries
    """

Batch Operations

Functions for performing multiple operations efficiently with disabled autocommit.

def batch_begin(table: str = None, ipv6: bool = False) -> None:
    """
    Start batch mode by disabling autocommit.
    
    Args:
        table: Table name (if None, affects all tables)
        ipv6: Use IPv6 tables if True
    """

def batch_end(table: str = None, ipv6: bool = False) -> None:
    """
    End batch mode by committing changes and re-enabling autocommit.
    
    Args:
        table: Table name (if None, affects all tables)
        ipv6: Use IPv6 tables if True
    """

def batch_add_rules(table: str, batch_rules: list, ipv6: bool = False) -> None:
    """
    Add multiple rules in batch mode.
    
    Args:
        table: Table name
        batch_rules: List of (chain_name, rule_dict) tuples
        ipv6: Use IPv6 tables if True
    """

Validation Functions

Functions for testing rule validity without actually applying changes.

def test_rule(rule_d: dict, ipv6: bool = False) -> bool:
    """
    Test if a rule dictionary is valid.
    
    Args:
        rule_d: Rule dictionary to test
        ipv6: Use IPv6 validation if True
        
    Returns:
        True if rule is valid
    """

def test_match(name: str, value, ipv6: bool = False) -> bool:
    """
    Test if a match specification is valid.
    
    Args:
        name: Match name
        value: Match parameters (string or dict)
        ipv6: Use IPv6 validation if True
        
    Returns:
        True if match is valid
    """

def test_target(name: str, value, ipv6: bool = False) -> bool:
    """
    Test if a target specification is valid.
    
    Args:
        name: Target name
        value: Target parameters (string or dict)
        ipv6: Use IPv6 validation if True
        
    Returns:
        True if target is valid
    """

Rule Dictionary Conversion

Functions for converting between Rule objects and dictionary representations.

def encode_iptc_rule(rule_d: dict) -> 'Rule':
    """
    Convert rule dictionary to Rule object.
    
    Args:
        rule_d: Rule dictionary specification
        
    Returns:
        Rule object
    """

def decode_iptc_rule(rule: 'Rule') -> dict:
    """
    Convert Rule object to dictionary representation.
    
    Args:
        rule: Rule object
        
    Returns:
        Rule dictionary
    """

Usage Examples

Basic Rule Management

import iptc.easy as easy

# Add a simple rule using dictionary
rule_dict = {
    'protocol': 'tcp',
    'src': '192.168.1.0/24',
    'tcp': {'dport': '22'},
    'target': 'ACCEPT',
    'comment': {'comment': 'Allow SSH from LAN'}
}

# Insert rule at beginning of INPUT chain
easy.insert_rule('filter', 'INPUT', rule_dict)

# Check if rule exists
if easy.has_rule('filter', 'INPUT', rule_dict):
    print("Rule exists")

# Get all rules in chain
rules = easy.dump_chain('filter', 'INPUT')
for rule in rules:
    print(rule)

# Delete the rule
easy.delete_rule('filter', 'INPUT', rule_dict)

Chain Management

import iptc.easy as easy

# Create custom chain
easy.add_chain('filter', 'my_custom_chain')

# Check if chain exists
if easy.has_chain('filter', 'my_custom_chain'):
    print("Chain created successfully")

# List all chains in filter table
chains = easy.get_chains('filter')
print("Available chains:", chains)

# Add rule to custom chain
rule_dict = {
    'protocol': 'tcp',
    'tcp': {'dport': '80'},
    'target': 'ACCEPT'
}
easy.add_rule('filter', 'my_custom_chain', rule_dict)

# Delete chain (flush first)
easy.delete_chain('filter', 'my_custom_chain', flush=True)

Policy Management

import iptc.easy as easy

# Set restrictive default policies
easy.set_policy('filter', 'INPUT', 'DROP')
easy.set_policy('filter', 'FORWARD', 'DROP')
easy.set_policy('filter', 'OUTPUT', 'ACCEPT')

# Check current policies
input_policy = easy.get_policy('filter', 'INPUT')
print(f"INPUT policy: {input_policy}")

Complex Rule Examples

import iptc.easy as easy

# Multi-match rule with state and tcp
rule_dict = {
    'protocol': 'tcp',
    'tcp': {'dport': '80'},
    'state': {'state': 'NEW,ESTABLISHED'},
    'target': 'ACCEPT',
    'comment': {'comment': 'Allow new HTTP connections'}
}
easy.add_rule('filter', 'INPUT', rule_dict)

# NAT rule for port forwarding
nat_rule = {
    'protocol': 'tcp',
    'dst': '203.0.113.1',
    'tcp': {'dport': '80'},
    'target': {'DNAT': {'to-destination': '192.168.1.100:8080'}}
}
easy.add_rule('nat', 'PREROUTING', nat_rule)

# Rule with negation
block_rule = {
    'protocol': 'tcp',
    'tcp': {'dport': '22'},
    'mac': {'mac-source': '!00:11:22:33:44:55'},
    'target': 'DROP'
}
easy.add_rule('filter', 'INPUT', block_rule)

Batch Operations

import iptc.easy as easy

# Start batch mode for efficient multiple operations
easy.batch_begin('filter')

# Add multiple rules
rules = [
    ('INPUT', {'protocol': 'tcp', 'tcp': {'dport': '22'}, 'target': 'ACCEPT'}),
    ('INPUT', {'protocol': 'tcp', 'tcp': {'dport': '80'}, 'target': 'ACCEPT'}),
    ('INPUT', {'protocol': 'tcp', 'tcp': {'dport': '443'}, 'target': 'ACCEPT'}),
]

easy.batch_add_rules('filter', rules)

# Commit all changes at once
easy.batch_end('filter')

Complete Firewall Setup

import iptc.easy as easy

def setup_basic_firewall():
    """Set up a basic firewall configuration"""
    
    # Set default policies
    easy.set_policy('filter', 'INPUT', 'DROP')
    easy.set_policy('filter', 'FORWARD', 'DROP')
    easy.set_policy('filter', 'OUTPUT', 'ACCEPT')
    
    # Allow loopback
    easy.add_rule('filter', 'INPUT', {
        'in_interface': 'lo',
        'target': 'ACCEPT'
    })
    
    # Allow established connections
    easy.add_rule('filter', 'INPUT', {
        'state': {'state': 'ESTABLISHED,RELATED'},
        'target': 'ACCEPT'
    })
    
    # Allow SSH from management network
    easy.add_rule('filter', 'INPUT', {
        'protocol': 'tcp',
        'src': '192.168.100.0/24',
        'tcp': {'dport': '22'},
        'target': 'ACCEPT',
        'comment': {'comment': 'SSH from management'}
    })
    
    # Allow HTTP and HTTPS
    for port in ['80', '443']:
        easy.add_rule('filter', 'INPUT', {
            'protocol': 'tcp',
            'tcp': {'dport': port},
            'state': {'state': 'NEW,ESTABLISHED'},
            'target': 'ACCEPT'
        })
    
    print("Basic firewall configured")

setup_basic_firewall()

IPv6 Support

import iptc.easy as easy

# IPv6 rule using the same interface
ipv6_rule = {
    'protocol': 'tcp',
    'src': '2001:db8::/32',
    'tcp': {'dport': '22'},
    'target': 'ACCEPT'
}

# Add IPv6 rule
easy.add_rule('filter', 'INPUT', ipv6_rule, ipv6=True)

# Dump IPv6 rules
ipv6_rules = easy.dump_chain('filter', 'INPUT', ipv6=True)
print("IPv6 rules:", ipv6_rules)

Error Handling and Validation

import iptc.easy as easy

# Test rule validity before adding
rule_dict = {
    'protocol': 'tcp',
    'tcp': {'dport': '80'},
    'target': 'ACCEPT'
}

if easy.test_rule(rule_dict):
    easy.add_rule('filter', 'INPUT', rule_dict)
    print("Rule added successfully")
else:
    print("Invalid rule")

# Test individual matches and targets
if easy.test_match('tcp', {'dport': '80'}):
    print("Valid TCP match")

if easy.test_target('ACCEPT', None):
    print("Valid ACCEPT target")

# Safe operations with exception handling
try:
    easy.delete_rule('filter', 'INPUT', rule_dict, raise_exc=True)
except Exception as e:
    print(f"Failed to delete rule: {e}")

Rule Dictionary Format

The rule dictionary format uses the following structure:

rule_dict = {
    # Basic packet matching
    'src': '192.168.1.0/24',        # Source address/network
    'dst': '10.0.0.1',              # Destination address
    'protocol': 'tcp',              # Protocol (tcp, udp, icmp, etc.)
    'in_interface': 'eth0',         # Input interface
    'out_interface': 'eth1',        # Output interface
    
    # Protocol-specific matches (match name as key)
    'tcp': {
        'sport': '80',              # Source port
        'dport': '443',             # Destination port
        'tcp_flags': 'SYN,ACK SYN'  # TCP flags
    },
    
    'udp': {
        'sport': '53',
        'dport': '53'
    },
    
    'state': {
        'state': 'NEW,ESTABLISHED,RELATED'
    },
    
    'comment': {
        'comment': 'Rule description'
    },
    
    # Target (required)
    'target': 'ACCEPT',             # Simple target
    
    # Or complex target with parameters
    'target': {
        'DNAT': {
            'to-destination': '192.168.1.100:8080'
        }
    },
    
    # Counters (optional)
    'counters': (100, 5000)         # (packets, bytes)
}

Install with Tessl CLI

npx tessl i tessl/pypi-python-iptables

docs

high-level-interface.md

index.md

ipv4-tables-chains.md

ipv6-support.md

rules-matches-targets.md

tile.json