Interactive packet manipulation program and library for network security research and testing
—
Network I/O functions for sending packets, receiving responses, capturing traffic, and managing network communication across different platforms and socket types.
Core functions for sending packets and collecting responses, enabling interactive network communication and analysis.
def sr(x, promisc: bool = None, filter: str = None, timeout: float = None,
inter: float = 0, verbose: int = None, chainCC: bool = False,
retry: int = 0, multi: bool = False, **kwargs) -> tuple[SndRcvList, PacketList]:
"""
Send packets and receive answers.
Parameters:
- x: Packet(s) to send
- promisc: Enable promiscuous mode
- filter: BPF filter for received packets
- timeout: Timeout in seconds (None for no timeout)
- inter: Inter-packet interval in seconds
- verbose: Verbosity level (0=quiet, 1=normal, 2=verbose)
- chainCC: Chain packet completion callbacks
- retry: Number of retries for unanswered packets
- multi: Allow multiple answers per packet
- **kwargs: Additional arguments for socket
Returns:
tuple: (answered_packets, unanswered_packets)
"""
def sr1(x, promisc: bool = None, filter: str = None, timeout: float = None,
verbose: int = None, retry: int = 0, **kwargs) -> Packet:
"""
Send packets and receive first answer.
Parameters:
- x: Packet(s) to send
- timeout: Timeout in seconds
- verbose: Verbosity level
- retry: Number of retries
- filter: BPF filter for received packets
- iface: Interface to use
- **kwargs: Additional socket arguments
Returns:
Packet or None: First received packet or None if timeout
"""
def srp(x, timeout: float = None, verbose: int = None, retry: int = 3,
multi: bool = False, filter: str = None, iface: str = None,
**kwargs) -> tuple[SndRcvList, PacketList]:
"""
Send packets at layer 2 and receive answers.
Parameters:
- x: Packet(s) to send
- timeout: Timeout in seconds
- verbose: Verbosity level
- retry: Number of retries
- multi: Allow multiple answers per packet
- filter: BPF filter for received packets
- iface: Interface to use
- **kwargs: Additional socket arguments
Returns:
tuple: (answered_packets, unanswered_packets)
"""
def srp1(x, timeout: float = None, verbose: int = None, retry: int = 3,
filter: str = None, iface: str = None, **kwargs) -> Packet:
"""
Send packets at layer 2 and receive first answer.
Parameters:
- x: Packet(s) to send
- timeout: Timeout in seconds
- verbose: Verbosity level
- retry: Number of retries
- filter: BPF filter for received packets
- iface: Interface to use
- **kwargs: Additional socket arguments
Returns:
Packet or None: First received packet or None if timeout
"""Functions for sending packets without waiting for responses, useful for traffic generation and one-way communication.
def send(x, inter: float = 0, loop: int = 0, count: int = None,
verbose: int = None, realtime: bool = None, return_packets: bool = False,
socket: 'SuperSocket' = None, **kwargs) -> None:
"""
Send packets at layer 3.
Parameters:
- x: Packet(s) to send
- inter: Inter-packet interval in seconds
- loop: Loop forever if 1, or number of loops
- count: Number of packets to send
- verbose: Verbosity level
- realtime: Send at real-time intervals
- return_packets: Return sent packets
- socket: Socket to use for sending
- **kwargs: Additional arguments
"""
def sendp(x, inter: float = 0, loop: int = 0, count: int = None,
verbose: int = None, realtime: bool = None, return_packets: bool = False,
iface: str = None, socket: 'SuperSocket' = None, **kwargs) -> None:
"""
Send packets at layer 2.
Parameters:
- x: Packet(s) to send
- inter: Inter-packet interval in seconds
- loop: Loop forever if 1, or number of loops
- count: Number of packets to send
- verbose: Verbosity level
- realtime: Send at real-time intervals
- return_packets: Return sent packets
- iface: Interface to send on
- socket: Socket to use for sending
- **kwargs: Additional arguments
"""
def sendpfast(x, pps: int = None, mbps: float = None, realtime: bool = None,
loop: int = 0, file_cache: bool = False, iface: str = None) -> None:
"""
Send packets at high speed using tcpreplay.
Parameters:
- x: Packet(s) to send
- pps: Packets per second
- mbps: Megabits per second
- realtime: Send at recorded intervals
- loop: Number of loops
- file_cache: Use file caching
- iface: Interface to send on
"""Functions for capturing network traffic with flexible filtering and processing options.
def sniff(count: int = 0, store: bool = True, offline: str = None,
prn: callable = None, lfilter: callable = None,
L2socket: 'SuperSocket' = None, timeout: float = None,
opened_socket: 'SuperSocket' = None, stop_filter: callable = None,
iface: str = None, started_callback: callable = None,
filter: str = None, **kwargs) -> PacketList:
"""
Capture packets from the network.
Parameters:
- count: Number of packets to capture (0=infinite)
- store: Store captured packets in memory
- offline: Read from pcap file instead of network
- prn: Function to apply to each packet as it arrives
- lfilter: Python function to filter packets
- L2socket: Socket to use for capture
- timeout: Stop after timeout seconds
- opened_socket: Use existing socket
- stop_filter: Function to determine when to stop
- iface: Interface to capture on
- started_callback: Function called when capture starts
- filter: BPF filter string
- **kwargs: Additional arguments
Returns:
PacketList: Captured packets (if store=True)
"""
class AsyncSniffer:
"""
Asynchronous packet sniffer that runs in background.
"""
def __init__(self, count: int = 0, store: bool = True, prn: callable = None,
filter: str = None, iface: str = None, **kwargs):
"""
Initialize asynchronous sniffer.
Parameters:
- count: Number of packets to capture
- store: Store packets in memory
- prn: Function to apply to each packet
- filter: BPF filter string
- iface: Interface to capture on
- **kwargs: Additional sniff arguments
"""
def start(self) -> None:
"""Start packet capture in background thread."""
def stop(self) -> None:
"""Stop packet capture."""
def join(self, timeout: float = None) -> None:
"""Wait for capture to complete."""
@property
def results(self) -> PacketList:
"""Get captured packets."""Low-level send and receive operations using Scapy's socket abstraction.
def sndrcv(pks: 'SuperSocket', pkt: Packet, timeout: float = 2,
verbose: int = None) -> tuple[Packet, float]:
"""
Low-level send and receive operation.
Parameters:
- pks: Socket for sending/receiving
- pkt: Packet to send
- timeout: Receive timeout
- verbose: Verbosity level
Returns:
tuple: (received_packet, timestamp)
"""
def sndrcvflood(pks: 'SuperSocket', pkt: Packet, ans: Packet = None,
stopevent: 'Event' = None, **kwargs) -> None:
"""
Send packet in flood mode and receive answers.
Parameters:
- pks: Socket for operations
- pkt: Packet to send repeatedly
- ans: Expected answer packet
- stopevent: Event to stop flooding
- **kwargs: Additional arguments
"""Integration with external network tools for extended functionality.
def tshark(*args, **kwargs) -> PacketList:
"""
Use tshark for packet capture with advanced filtering.
Parameters:
- *args: Arguments passed to tshark
- **kwargs: Keyword arguments
Returns:
PacketList: Captured packets
"""
def tcpdump(count: int = None, **kwargs) -> PacketList:
"""
Use tcpdump for packet capture.
Parameters:
- count: Number of packets to capture
- **kwargs: Additional tcpdump arguments
Returns:
PacketList: Captured packets
"""Debug information container for troubleshooting send/receive operations.
class debug:
"""
Debug information for send/receive operations.
"""
recv: PacketList # Received packets
sent: PacketList # Sent packets
match: list # Matched packet pairsclass SuperSocket:
"""
Abstract base class for all Scapy sockets.
"""
def send(self, x: Packet) -> int:
"""Send a packet."""
def recv(self, x: int = None) -> Packet:
"""Receive a packet."""
def close(self) -> None:
"""Close the socket."""class L2Socket(SuperSocket):
"""Layer 2 socket for Ethernet-level communication."""
class L3Socket(SuperSocket):
"""Layer 3 socket for IP-level communication."""
class L2ListenSocket(SuperSocket):
"""Layer 2 listening socket."""
class L3ListenSocket(SuperSocket):
"""Layer 3 listening socket."""from scapy.all import *
# Send ICMP ping and wait for reply
packet = IP(dst="8.8.8.8") / ICMP()
reply = sr1(packet, timeout=2)
if reply:
print(f"Received reply from {reply.src}")
reply.show()
# Send TCP SYN and collect responses
syn_packets = [IP(dst="example.com")/TCP(dport=p, flags="S") for p in [22,80,443]]
answered, unanswered = sr(syn_packets, timeout=1)
answered.summary()# Capture 10 HTTP packets
packets = sniff(count=10, filter="tcp port 80")
packets.summary()
# Capture with custom processing
def packet_processor(pkt):
if pkt.haslayer(TCP):
print(f"TCP packet: {pkt[IP].src}:{pkt[TCP].sport} -> {pkt[IP].dst}:{pkt[TCP].dport}")
sniff(prn=packet_processor, filter="tcp", count=20)
# Asynchronous capture
sniffer = AsyncSniffer(filter="icmp", count=5)
sniffer.start()
# ... do other work ...
sniffer.join()
print(f"Captured {len(sniffer.results)} packets")# Send packets continuously
packets = [IP(dst="192.168.1.1")/UDP(dport=p)/Raw(b"test") for p in range(1000, 2000)]
send(packets, inter=0.001) # 1ms interval
# Layer 2 sending
eth_packets = [Ether(dst="aa:bb:cc:dd:ee:ff")/IP(dst="192.168.1.1")/ICMP() for _ in range(100)]
sendp(eth_packets, inter=0.01, iface="eth0")
# High-speed sending with tcpreplay
sendpfast(packets, pps=1000, iface="eth0")# Capture with complex filter
def custom_filter(pkt):
return (pkt.haslayer(TCP) and
pkt[TCP].dport == 80 and
len(pkt) > 100)
web_traffic = sniff(lfilter=custom_filter, timeout=30)
# Capture until specific condition
def stop_condition(pkt):
return pkt.haslayer(DNS) and "evil.com" in str(pkt[DNS].qd.qname)
packets = sniff(stop_filter=stop_condition, timeout=60)# ARP scanning
arp_requests = [Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=f"192.168.1.{i}")
for i in range(1, 255)]
answered, unanswered = srp(arp_requests, timeout=2, verbose=0)
# DNS queries
dns_queries = [IP(dst="8.8.8.8")/UDP(dport=53)/DNS(rd=1, qd=DNSQR(qname=host))
for host in ["google.com", "facebook.com", "github.com"]]
answers = sr1(dns_queries, timeout=3)
# TCP port scanning
def tcp_scan(target, ports):
syn_packets = [IP(dst=target)/TCP(dport=port, flags="S") for port in ports]
answered, unanswered = sr(syn_packets, timeout=1, verbose=0)
open_ports = []
for sent, received in answered:
if received.haslayer(TCP) and received[TCP].flags == 18: # SYN-ACK
open_ports.append(sent[TCP].dport)
return open_ports
open_ports = tcp_scan("example.com", [22, 80, 443, 8080])
print(f"Open ports: {open_ports}")Install with Tessl CLI
npx tessl i tessl/pypi-scapy