CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-scapy

Interactive packet manipulation program and library for network security research and testing

Pending
Overview
Eval results
Files

send-receive.mddocs/

Send/Receive Operations

Network I/O functions for sending packets, receiving responses, capturing traffic, and managing network communication across different platforms and socket types.

Capabilities

Send and Receive with Answers

Core functions for sending packets and collecting responses, enabling interactive network communication and analysis.

def sr(x, promisc: bool = None, filter: str = None, timeout: float = None, 
       inter: float = 0, verbose: int = None, chainCC: bool = False, 
       retry: int = 0, multi: bool = False, **kwargs) -> tuple[SndRcvList, PacketList]:
    """
    Send packets and receive answers.
    
    Parameters:
    - x: Packet(s) to send
    - promisc: Enable promiscuous mode
    - filter: BPF filter for received packets
    - timeout: Timeout in seconds (None for no timeout)
    - inter: Inter-packet interval in seconds
    - verbose: Verbosity level (0=quiet, 1=normal, 2=verbose)
    - chainCC: Chain packet completion callbacks
    - retry: Number of retries for unanswered packets
    - multi: Allow multiple answers per packet
    - **kwargs: Additional arguments for socket
    
    Returns:
    tuple: (answered_packets, unanswered_packets)
    """

def sr1(x, promisc: bool = None, filter: str = None, timeout: float = None, 
        verbose: int = None, retry: int = 0, **kwargs) -> Packet:
    """
    Send packets and receive first answer.
    
    Parameters:
    - x: Packet(s) to send  
    - timeout: Timeout in seconds
    - verbose: Verbosity level
    - retry: Number of retries
    - filter: BPF filter for received packets
    - iface: Interface to use
    - **kwargs: Additional socket arguments
    
    Returns:
    Packet or None: First received packet or None if timeout
    """

def srp(x, timeout: float = None, verbose: int = None, retry: int = 3,
        multi: bool = False, filter: str = None, iface: str = None,
        **kwargs) -> tuple[SndRcvList, PacketList]:
    """
    Send packets at layer 2 and receive answers.
    
    Parameters:
    - x: Packet(s) to send
    - timeout: Timeout in seconds
    - verbose: Verbosity level
    - retry: Number of retries
    - multi: Allow multiple answers per packet
    - filter: BPF filter for received packets
    - iface: Interface to use
    - **kwargs: Additional socket arguments
    
    Returns:
    tuple: (answered_packets, unanswered_packets)
    """

def srp1(x, timeout: float = None, verbose: int = None, retry: int = 3,
         filter: str = None, iface: str = None, **kwargs) -> Packet:
    """
    Send packets at layer 2 and receive first answer.
    
    Parameters:
    - x: Packet(s) to send
    - timeout: Timeout in seconds  
    - verbose: Verbosity level
    - retry: Number of retries
    - filter: BPF filter for received packets
    - iface: Interface to use
    - **kwargs: Additional socket arguments
    
    Returns:
    Packet or None: First received packet or None if timeout
    """

Send-Only Operations

Functions for sending packets without waiting for responses, useful for traffic generation and one-way communication.

def send(x, inter: float = 0, loop: int = 0, count: int = None,
         verbose: int = None, realtime: bool = None, return_packets: bool = False,
         socket: 'SuperSocket' = None, **kwargs) -> None:
    """
    Send packets at layer 3.
    
    Parameters:
    - x: Packet(s) to send
    - inter: Inter-packet interval in seconds
    - loop: Loop forever if 1, or number of loops
    - count: Number of packets to send
    - verbose: Verbosity level
    - realtime: Send at real-time intervals
    - return_packets: Return sent packets
    - socket: Socket to use for sending
    - **kwargs: Additional arguments
    """

def sendp(x, inter: float = 0, loop: int = 0, count: int = None,
          verbose: int = None, realtime: bool = None, return_packets: bool = False,
          iface: str = None, socket: 'SuperSocket' = None, **kwargs) -> None:
    """
    Send packets at layer 2.
    
    Parameters:
    - x: Packet(s) to send
    - inter: Inter-packet interval in seconds
    - loop: Loop forever if 1, or number of loops  
    - count: Number of packets to send
    - verbose: Verbosity level
    - realtime: Send at real-time intervals
    - return_packets: Return sent packets
    - iface: Interface to send on
    - socket: Socket to use for sending
    - **kwargs: Additional arguments
    """

def sendpfast(x, pps: int = None, mbps: float = None, realtime: bool = None,
              loop: int = 0, file_cache: bool = False, iface: str = None) -> None:
    """
    Send packets at high speed using tcpreplay.
    
    Parameters:
    - x: Packet(s) to send
    - pps: Packets per second
    - mbps: Megabits per second
    - realtime: Send at recorded intervals
    - loop: Number of loops
    - file_cache: Use file caching
    - iface: Interface to send on
    """

Packet Capture

Functions for capturing network traffic with flexible filtering and processing options.

def sniff(count: int = 0, store: bool = True, offline: str = None,
          prn: callable = None, lfilter: callable = None, 
          L2socket: 'SuperSocket' = None, timeout: float = None,
          opened_socket: 'SuperSocket' = None, stop_filter: callable = None,
          iface: str = None, started_callback: callable = None,
          filter: str = None, **kwargs) -> PacketList:
    """
    Capture packets from the network.
    
    Parameters:
    - count: Number of packets to capture (0=infinite)
    - store: Store captured packets in memory
    - offline: Read from pcap file instead of network
    - prn: Function to apply to each packet as it arrives
    - lfilter: Python function to filter packets
    - L2socket: Socket to use for capture
    - timeout: Stop after timeout seconds
    - opened_socket: Use existing socket
    - stop_filter: Function to determine when to stop
    - iface: Interface to capture on
    - started_callback: Function called when capture starts
    - filter: BPF filter string
    - **kwargs: Additional arguments
    
    Returns:
    PacketList: Captured packets (if store=True)
    """

class AsyncSniffer:
    """
    Asynchronous packet sniffer that runs in background.
    """
    def __init__(self, count: int = 0, store: bool = True, prn: callable = None,
                 filter: str = None, iface: str = None, **kwargs):
        """
        Initialize asynchronous sniffer.
        
        Parameters:
        - count: Number of packets to capture
        - store: Store packets in memory
        - prn: Function to apply to each packet
        - filter: BPF filter string
        - iface: Interface to capture on
        - **kwargs: Additional sniff arguments
        """
    
    def start(self) -> None:
        """Start packet capture in background thread."""
    
    def stop(self) -> None:
        """Stop packet capture."""
    
    def join(self, timeout: float = None) -> None:
        """Wait for capture to complete."""
    
    @property
    def results(self) -> PacketList:
        """Get captured packets."""

Socket Interface

Low-level send and receive operations using Scapy's socket abstraction.

def sndrcv(pks: 'SuperSocket', pkt: Packet, timeout: float = 2, 
           verbose: int = None) -> tuple[Packet, float]:
    """
    Low-level send and receive operation.
    
    Parameters:
    - pks: Socket for sending/receiving
    - pkt: Packet to send
    - timeout: Receive timeout
    - verbose: Verbosity level
    
    Returns:
    tuple: (received_packet, timestamp)
    """

def sndrcvflood(pks: 'SuperSocket', pkt: Packet, ans: Packet = None,
                stopevent: 'Event' = None, **kwargs) -> None:
    """
    Send packet in flood mode and receive answers.
    
    Parameters:
    - pks: Socket for operations
    - pkt: Packet to send repeatedly
    - ans: Expected answer packet
    - stopevent: Event to stop flooding
    - **kwargs: Additional arguments
    """

External Tool Integration

Integration with external network tools for extended functionality.

def tshark(*args, **kwargs) -> PacketList:
    """
    Use tshark for packet capture with advanced filtering.
    
    Parameters:
    - *args: Arguments passed to tshark
    - **kwargs: Keyword arguments
    
    Returns:
    PacketList: Captured packets
    """

def tcpdump(count: int = None, **kwargs) -> PacketList:
    """
    Use tcpdump for packet capture.
    
    Parameters:
    - count: Number of packets to capture
    - **kwargs: Additional tcpdump arguments
    
    Returns:
    PacketList: Captured packets
    """

Debug Information

Debug information container for troubleshooting send/receive operations.

class debug:
    """
    Debug information for send/receive operations.
    """
    recv: PacketList      # Received packets
    sent: PacketList      # Sent packets  
    match: list          # Matched packet pairs

Socket Types

SuperSocket Base

class SuperSocket:
    """
    Abstract base class for all Scapy sockets.
    """
    def send(self, x: Packet) -> int:
        """Send a packet."""
    
    def recv(self, x: int = None) -> Packet:
        """Receive a packet."""
    
    def close(self) -> None:
        """Close the socket."""

Layer-Specific Sockets

class L2Socket(SuperSocket):
    """Layer 2 socket for Ethernet-level communication."""

class L3Socket(SuperSocket):
    """Layer 3 socket for IP-level communication."""

class L2ListenSocket(SuperSocket):
    """Layer 2 listening socket."""

class L3ListenSocket(SuperSocket):
    """Layer 3 listening socket."""

Usage Examples

Basic Send/Receive

from scapy.all import *

# Send ICMP ping and wait for reply
packet = IP(dst="8.8.8.8") / ICMP()
reply = sr1(packet, timeout=2)
if reply:
    print(f"Received reply from {reply.src}")
    reply.show()

# Send TCP SYN and collect responses
syn_packets = [IP(dst="example.com")/TCP(dport=p, flags="S") for p in [22,80,443]]
answered, unanswered = sr(syn_packets, timeout=1)
answered.summary()

Packet Capture

# Capture 10 HTTP packets
packets = sniff(count=10, filter="tcp port 80")
packets.summary()

# Capture with custom processing
def packet_processor(pkt):
    if pkt.haslayer(TCP):
        print(f"TCP packet: {pkt[IP].src}:{pkt[TCP].sport} -> {pkt[IP].dst}:{pkt[TCP].dport}")

sniff(prn=packet_processor, filter="tcp", count=20)

# Asynchronous capture
sniffer = AsyncSniffer(filter="icmp", count=5)
sniffer.start()
# ... do other work ...
sniffer.join()
print(f"Captured {len(sniffer.results)} packets")

High-Speed Sending

# Send packets continuously
packets = [IP(dst="192.168.1.1")/UDP(dport=p)/Raw(b"test") for p in range(1000, 2000)]
send(packets, inter=0.001)  # 1ms interval

# Layer 2 sending  
eth_packets = [Ether(dst="aa:bb:cc:dd:ee:ff")/IP(dst="192.168.1.1")/ICMP() for _ in range(100)]
sendp(eth_packets, inter=0.01, iface="eth0")

# High-speed sending with tcpreplay
sendpfast(packets, pps=1000, iface="eth0")

Advanced Filtering

# Capture with complex filter
def custom_filter(pkt):
    return (pkt.haslayer(TCP) and 
            pkt[TCP].dport == 80 and 
            len(pkt) > 100)

web_traffic = sniff(lfilter=custom_filter, timeout=30)

# Capture until specific condition
def stop_condition(pkt):
    return pkt.haslayer(DNS) and "evil.com" in str(pkt[DNS].qd.qname)

packets = sniff(stop_filter=stop_condition, timeout=60)

Protocol-Specific Operations

# ARP scanning
arp_requests = [Ether(dst="ff:ff:ff:ff:ff:ff")/ARP(pdst=f"192.168.1.{i}") 
                for i in range(1, 255)]
answered, unanswered = srp(arp_requests, timeout=2, verbose=0)

# DNS queries
dns_queries = [IP(dst="8.8.8.8")/UDP(dport=53)/DNS(rd=1, qd=DNSQR(qname=host)) 
               for host in ["google.com", "facebook.com", "github.com"]]
answers = sr1(dns_queries, timeout=3)

# TCP port scanning
def tcp_scan(target, ports):
    syn_packets = [IP(dst=target)/TCP(dport=port, flags="S") for port in ports]
    answered, unanswered = sr(syn_packets, timeout=1, verbose=0)
    
    open_ports = []
    for sent, received in answered:
        if received.haslayer(TCP) and received[TCP].flags == 18:  # SYN-ACK
            open_ports.append(sent[TCP].dport)
    
    return open_ports

open_ports = tcp_scan("example.com", [22, 80, 443, 8080])
print(f"Open ports: {open_ports}")

Install with Tessl CLI

npx tessl i tessl/pypi-scapy

docs

automation.md

config-utilities.md

core-packet-system.md

index.md

packet-analysis.md

protocol-layers.md

send-receive.md

tile.json