Interactive packet manipulation program and library for network security research and testing
—
Packet collection management, filtering, analysis, and visualization tools for working with captured network traffic, packet sequences, and network communication patterns.
Container classes for managing collections of packets with analysis and manipulation methods.
class PacketList:
"""
List container for network packets with analysis methods.
"""
def __init__(self, res: list = None, name: str = "PacketList"):
"""
Initialize packet list.
Parameters:
- res: List of packets
- name: Name for the packet list
"""
def summary(self, prn: callable = None, lfilter: callable = None) -> None:
"""
Display summary of packets in the list.
Parameters:
- prn: Function to generate custom summary for each packet
- lfilter: Function to filter packets for summary
"""
def show(self, *args, **kwargs) -> None:
"""
Display detailed information for all packets.
Parameters:
- *args: Arguments passed to packet.show()
- **kwargs: Keyword arguments passed to packet.show()
"""
def filter(self, func: callable) -> 'PacketList':
"""
Filter packets using a custom function.
Parameters:
- func: Function that takes a packet and returns bool
Returns:
PacketList: New list with filtered packets
"""
def split(self, func: callable) -> list['PacketList']:
"""
Split packet list into multiple lists based on a function.
Parameters:
- func: Function that returns a key for grouping
Returns:
list[PacketList]: List of packet lists
"""
def plot(self, f: callable = None, lfilter: callable = None, **kwargs):
"""
Plot packet data using matplotlib.
Parameters:
- f: Function to extract plot data from packets
- lfilter: Function to filter packets for plotting
- **kwargs: Additional plotting arguments
"""
def make_table(self, *args, **kwargs) -> str:
"""
Create a formatted table from packet data.
Parameters:
- *args: Column functions or field names
- **kwargs: Formatting options
Returns:
str: Formatted table string
"""
def conversations(self, getsrcdst: callable = None) -> dict:
"""
Extract network conversations from packet list.
Parameters:
- getsrcdst: Function to extract source/destination from packets
Returns:
dict: Dictionary of conversations with statistics
"""
def replace(self, *args, **kwargs) -> 'PacketList':
"""
Replace packet fields with new values.
Parameters:
- *args: Field replacement specifications
- **kwargs: Keyword field replacements
Returns:
PacketList: New list with replaced fields
"""
class SndRcvList(PacketList):
"""
List of sent/received packet pairs from sr() operations.
"""
def __init__(self, res: list = None, name: str = "Results"):
"""
Initialize send/receive list.
Parameters:
- res: List of (sent, received) packet tuples
- name: Name for the list
"""
def make_table(self, *args, **kwargs) -> str:
"""
Create table showing sent vs received packet pairs.
Parameters:
- *args: Column functions for sent/received packets
- **kwargs: Formatting options
Returns:
str: Formatted table string
"""
class QueryAnswer(SndRcvList):
"""
Query/answer packet pairs with additional analysis methods.
"""Functions for reading and writing packet capture files in various formats.
def rdpcap(filename: str, count: int = -1) -> PacketList:
"""
Read packets from a pcap file.
Parameters:
- filename: Path to pcap file
- count: Maximum number of packets to read (-1 for all)
Returns:
PacketList: Packets read from file
"""
def wrpcap(filename: str, pkt: PacketList, linktype: int = None,
gz: bool = False, endianness: str = "=", append: bool = False,
sync: bool = False) -> None:
"""
Write packets to a pcap file.
Parameters:
- filename: Output file path
- pkt: Packets to write
- linktype: Data link type for pcap header
- gz: Compress with gzip
- endianness: Byte order ("=" native, "<" little, ">" big)
- append: Append to existing file
- sync: Sync to disk after each packet
"""
def rdpcapng(filename: str) -> PacketList:
"""
Read packets from a pcapng file.
Parameters:
- filename: Path to pcapng file
Returns:
PacketList: Packets read from file
"""
def wrpcapng(filename: str, pkt: PacketList, **kwargs) -> None:
"""
Write packets to a pcapng file.
Parameters:
- filename: Output file path
- pkt: Packets to write
- **kwargs: Additional options
"""
class PcapReader:
"""
Iterator for reading large pcap files efficiently.
"""
def __init__(self, filename: str):
"""
Initialize pcap reader.
Parameters:
- filename: Path to pcap file
"""
def __iter__(self):
"""Return iterator."""
def __next__(self) -> Packet:
"""Get next packet from file."""
def close(self) -> None:
"""Close the file."""
class PcapWriter:
"""
Writer for creating pcap files incrementally.
"""
def __init__(self, filename: str, append: bool = False, sync: bool = False):
"""
Initialize pcap writer.
Parameters:
- filename: Output file path
- append: Append to existing file
- sync: Sync after each write
"""
def write(self, pkt: Packet) -> None:
"""
Write a packet to the file.
Parameters:
- pkt: Packet to write
"""
def close(self) -> None:
"""Close the file."""Functions for analyzing network traffic patterns and extracting insights from packet data.
def traceroute(target: str, dport: int = 80, minttl: int = 1, maxttl: int = 30,
timeout: float = 2, verbose: int = None, **kwargs) -> TracerouteResult:
"""
Perform traceroute to a target.
Parameters:
- target: Target IP address or hostname
- dport: Destination port
- minttl: Minimum TTL value
- maxttl: Maximum TTL value
- timeout: Timeout per hop
- verbose: Verbosity level
- **kwargs: Additional arguments
Returns:
TracerouteResult: Traceroute results with hop information
"""
class TracerouteResult:
"""
Container for traceroute results with analysis methods.
"""
def show(self) -> None:
"""Display traceroute results."""
def graph(self, **kwargs):
"""Generate graph visualization of the route."""
def world_trace(self) -> None:
"""Show route on world map."""
def arping(net: str, timeout: float = 2, cache: bool = 0, verbose: int = None,
**kwargs) -> tuple[SndRcvList, PacketList]:
"""
Send ARP requests to discover hosts on network.
Parameters:
- net: Network address (e.g., "192.168.1.0/24")
- timeout: Timeout for responses
- cache: Use ARP cache
- verbose: Verbosity level
- **kwargs: Additional arguments
Returns:
tuple: (answered_requests, unanswered_requests)
"""
def arpcachepoison(target: str, victim: str, interval: float = 60) -> None:
"""
Perform ARP cache poisoning attack.
Parameters:
- target: Target IP address
- victim: Victim IP address
- interval: Poisoning interval in seconds
"""Tools for reconstructing network sessions and conversations from packet captures.
class DefaultSession:
"""
Default session for basic packet processing.
"""
def on_packet_received(self, pkt: Packet) -> Packet:
"""Process received packet."""
class TCPSession:
"""
TCP session reconstruction and stream following.
"""
def __init__(self):
"""Initialize TCP session tracker."""
def process(self, pkt: Packet) -> None:
"""Process a packet for session reconstruction."""
def get_sessions(self) -> dict:
"""Get reconstructed TCP sessions."""
class IPSession:
"""
IP-level session management and defragmentation.
"""
def __init__(self):
"""Initialize IP session tracker."""Functions for statistical analysis of network traffic patterns.
def conversations(pkts: PacketList, getsrcdst: callable = None) -> dict:
"""
Extract conversations from packet list.
Parameters:
- pkts: Packet list to analyze
- getsrcdst: Function to extract source/destination
Returns:
dict: Conversation statistics
"""
def sessions(pkts: PacketList) -> dict:
"""
Extract session information from packets.
Parameters:
- pkts: Packet list to analyze
Returns:
dict: Session statistics
"""
def protocols(pkts: PacketList) -> dict:
"""
Analyze protocol distribution in packet list.
Parameters:
- pkts: Packet list to analyze
Returns:
dict: Protocol statistics
"""# Common filter functions for packet analysis
lambda_filters = {
"tcp": lambda pkt: pkt.haslayer(TCP),
"udp": lambda pkt: pkt.haslayer(UDP),
"icmp": lambda pkt: pkt.haslayer(ICMP),
"dns": lambda pkt: pkt.haslayer(DNS),
"http": lambda pkt: pkt.haslayer(TCP) and (pkt[TCP].dport == 80 or pkt[TCP].sport == 80),
"https": lambda pkt: pkt.haslayer(TCP) and (pkt[TCP].dport == 443 or pkt[TCP].sport == 443),
}from scapy.all import *
# Read packets from file
packets = rdpcap("capture.pcap")
print(f"Loaded {len(packets)} packets")
# Display packet summaries
packets.summary()
# Filter for TCP packets only
tcp_packets = packets.filter(lambda pkt: pkt.haslayer(TCP))
print(f"Found {len(tcp_packets)} TCP packets")
# Show detailed view of first 5 packets
tcp_packets[:5].show()# Analyze conversations in capture
conversations = packets.conversations()
for conv, stats in conversations.items():
print(f"Conversation {conv}: {stats['packets']} packets, {stats['bytes']} bytes")
# Create summary table
table = packets.make_table(
lambda pkt: pkt[IP].src if pkt.haslayer(IP) else "N/A",
lambda pkt: pkt[IP].dst if pkt.haslayer(IP) else "N/A",
lambda pkt: pkt[IP].proto if pkt.haslayer(IP) else "N/A"
)
print(table)
# Plot packet sizes over time
packets.plot(lambda pkt: len(pkt), title="Packet Sizes Over Time")# ARP scan of local network
answered, unanswered = arping("192.168.1.0/24")
print("Live hosts:")
for sent, received in answered:
print(f" {received.psrc} -> {received.hwsrc}")
# Traceroute to target
trace_result = traceroute("8.8.8.8")
trace_result.show()
# DNS enumeration
dns_queries = [IP(dst="8.8.8.8")/UDP(dport=53)/DNS(rd=1, qd=DNSQR(qname=f"{word}.example.com"))
for word in ["www", "mail", "ftp", "admin", "test"]]
answered, unanswered = sr(dns_queries, timeout=2, verbose=0)# Process large pcap files efficiently
with PcapReader("large_capture.pcap") as reader:
tcp_count = 0
udp_count = 0
for packet in reader:
if packet.haslayer(TCP):
tcp_count += 1
elif packet.haslayer(UDP):
udp_count += 1
if (tcp_count + udp_count) % 1000 == 0:
print(f"Processed {tcp_count + udp_count} packets")
print(f"Final counts: TCP={tcp_count}, UDP={udp_count}")
# Create filtered capture file
web_traffic = packets.filter(lambda pkt: pkt.haslayer(TCP) and
(pkt[TCP].dport in [80, 443] or pkt[TCP].sport in [80, 443]))
wrpcap("web_traffic.pcap", web_traffic)# TCP session reconstruction
sessions = {}
for pkt in packets.filter(lambda p: p.haslayer(TCP)):
if pkt.haslayer(IP):
session_key = (pkt[IP].src, pkt[TCP].sport, pkt[IP].dst, pkt[TCP].dport)
if session_key not in sessions:
sessions[session_key] = []
sessions[session_key].append(pkt)
# Analyze each session
for session_key, session_packets in sessions.items():
src_ip, src_port, dst_ip, dst_port = session_key
print(f"Session {src_ip}:{src_port} -> {dst_ip}:{dst_port}")
print(f" Packets: {len(session_packets)}")
print(f" Duration: {session_packets[-1].time - session_packets[0].time:.2f}s")
# Protocol distribution analysis
protocols = {}
for pkt in packets:
if pkt.haslayer(IP):
proto = pkt[IP].proto
proto_name = {1: "ICMP", 6: "TCP", 17: "UDP"}.get(proto, f"Proto-{proto}")
protocols[proto_name] = protocols.get(proto_name, 0) + 1
print("Protocol distribution:")
for proto, count in sorted(protocols.items(), key=lambda x: x[1], reverse=True):
print(f" {proto}: {count} packets")# Plot traffic over time
import matplotlib.pyplot as plt
timestamps = [float(pkt.time) for pkt in packets]
start_time = min(timestamps)
relative_times = [(t - start_time) for t in timestamps]
plt.figure(figsize=(12, 6))
plt.hist(relative_times, bins=50, alpha=0.7)
plt.xlabel("Time (seconds)")
plt.ylabel("Packet count")
plt.title("Traffic Distribution Over Time")
plt.show()
# Generate traffic report
def generate_report(packets):
report = []
report.append(f"Traffic Analysis Report")
report.append(f"=" * 30)
report.append(f"Total packets: {len(packets)}")
# Protocol breakdown
proto_counts = {}
for pkt in packets:
if pkt.haslayer(IP):
proto = {1: "ICMP", 6: "TCP", 17: "UDP"}.get(pkt[IP].proto, "Other")
proto_counts[proto] = proto_counts.get(proto, 0) + 1
report.append("\\nProtocol Distribution:")
for proto, count in proto_counts.items():
pct = (count / len(packets)) * 100
report.append(f" {proto}: {count} packets ({pct:.1f}%)")
return "\\n".join(report)
print(generate_report(packets))Install with Tessl CLI
npx tessl i tessl/pypi-scapy