CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-dnslib

Simple library to encode/decode DNS wire-format packets

Overview
Eval results
Files

dns-server.mddocs/

DNS Server Framework

Framework for creating custom DNS resolvers with UDP/TCP server support, request handling, and logging capabilities. The framework provides a simple interface for building DNS servers by subclassing BaseResolver and implementing custom resolution logic.

Capabilities

DNS Server

Main DNS server class providing UDP and TCP protocol support with threading and logging.

class DNSServer:
    """
    DNS server with UDP/TCP protocol support and request handling.
    
    Args:
        resolver (BaseResolver): Resolver instance for handling queries
        port (int, optional): Server port (default: 53)
        address (str, optional): Server bind address (default: "")
        logger (DNSLogger, optional): Logger instance
        tcp (bool, optional): Enable TCP server (default: False)
        ipv6 (bool, optional): Enable IPv6 support (default: False)
        timeout (int, optional): Request timeout in seconds (default: 5)
        handler (class, optional): Custom request handler class
    """
    def __init__(self, resolver, port=53, address="", logger=None, **kwargs): ...
    
    def start(self):
        """
        Start DNS server and handle requests (blocking).
        Server runs until interrupted.
        """
        
    def start_thread(self):
        """
        Start DNS server in background thread (non-blocking).
        
        Returns:
            threading.Thread: Server thread object
        """
        
    def stop(self):
        """
        Stop DNS server and cleanup resources.
        """
        
    def isAlive(self):
        """
        Check if server thread is running.
        
        Returns:
            bool: True if server is running
        """

DNS Request Handler

Base request handler for processing DNS queries and generating responses.

class DNSHandler:
    """
    Base DNS request handler for UDP and TCP protocols.
    Processes incoming DNS queries and generates responses.
    """
    def handle(self):
        """
        Handle incoming DNS request.
        Parses request, calls resolver, and sends response.
        """
        
    def get_reply(self, data):
        """
        Process DNS query data and generate response.
        
        Args:
            data (bytes): Raw DNS query packet
            
        Returns:
            bytes: DNS response packet
        """

Protocol Servers

Low-level UDP and TCP server implementations with threading support.

class UDPServer:
    """
    Threaded UDP server for DNS queries.
    
    Args:
        server_address (tuple): (address, port) tuple
        handler (class): Request handler class
    """
    def __init__(self, server_address, handler): ...
    
    def serve_forever(self):
        """Start serving requests until interrupted."""
        
    def shutdown(self):
        """Shutdown server and close socket."""

class TCPServer:
    """
    Threaded TCP server for DNS queries.
    
    Args:
        server_address (tuple): (address, port) tuple
        handler (class): Request handler class
    """
    def __init__(self, server_address, handler): ...
    
    def serve_forever(self):
        """Start serving requests until interrupted."""
        
    def shutdown(self):
        """Shutdown server and close socket."""

Base Resolver

Abstract base class for implementing custom DNS resolvers.

class BaseResolver:
    """
    Abstract base class for DNS resolvers.
    Subclass this and implement resolve() method for custom logic.
    """
    def resolve(self, request, handler):
        """
        Resolve DNS query and return response.
        Must be implemented by subclasses.
        
        Args:
            request (DNSRecord): Parsed DNS query
            handler (DNSHandler): Request handler instance
            
        Returns:
            DNSRecord: DNS response record
        """
        raise NotImplementedError

DNS Logger

Configurable logging system for DNS server operations with request/response tracking.

class DNSLogger:
    """
    DNS server logger with configurable output and formatting.
    
    Args:
        log (str, optional): Comma-separated log types 
                           ("request", "reply", "truncated", "error", "recv", "send")
                           (default: "request,reply,truncated,error")
        prefix (bool, optional): Include timestamp prefix (default: True)
        logf (callable, optional): Custom log function (default: print)
    """
    def __init__(self, log="request,reply,truncated,error", prefix=True, logf=None): ...
    
    def log_recv(self, handler, data):
        """
        Log received data.
        
        Args:
            handler (DNSHandler): Request handler
            data (bytes): Received data
        """
        
    def log_send(self, handler, data):
        """
        Log sent data.
        
        Args:
            handler (DNSHandler): Request handler  
            data (bytes): Sent data
        """
        
    def log_request(self, handler, request):
        """
        Log DNS request.
        
        Args:
            handler (DNSHandler): Request handler
            request (DNSRecord): DNS request
        """
        
    def log_reply(self, handler, reply):
        """
        Log DNS reply.
        
        Args:
            handler (DNSHandler): Request handler
            reply (DNSRecord): DNS reply
        """
        
    def log_truncated(self, handler, reply):
        """
        Log truncated response.
        
        Args:
            handler (DNSHandler): Request handler
            reply (DNSRecord): Truncated DNS reply
        """
        
    def log_error(self, handler, e):
        """
        Log error during request processing.
        
        Args:
            handler (DNSHandler): Request handler
            e (Exception): Exception that occurred
        """

UDP Server

UDP DNS server implementation with threading support.

class UDPServer:
    """
    UDP DNS server implementation.
    Inherits from socketserver.ThreadingUDPServer.
    
    Args:
        server_address (tuple): (address, port) tuple
        RequestHandlerClass (class): Request handler class
        resolver (BaseResolver): DNS resolver instance
        logger (DNSLogger, optional): Logger instance
        timeout (int, optional): Request timeout
    """
    def __init__(self, server_address, RequestHandlerClass, resolver, logger=None, timeout=5): ...

TCP Server

TCP DNS server implementation with threading support.

class TCPServer:
    """
    TCP DNS server implementation.
    Inherits from socketserver.ThreadingTCPServer.
    
    Args:
        server_address (tuple): (address, port) tuple
        RequestHandlerClass (class): Request handler class
        resolver (BaseResolver): DNS resolver instance  
        logger (DNSLogger, optional): Logger instance
        timeout (int, optional): Request timeout
    """
    def __init__(self, server_address, RequestHandlerClass, resolver, logger=None, timeout=5): ...

Usage Examples

Basic Custom Resolver

from dnslib import *
from dnslib.server import DNSServer, BaseResolver

class SimpleResolver(BaseResolver):
    """Simple resolver that returns fixed IP for all A queries."""
    
    def resolve(self, request, handler):
        # Create reply based on request
        reply = request.reply()
        
        # Get the query
        qname = request.q.qname
        qtype = request.q.qtype
        
        # Handle A record queries
        if qtype == QTYPE.A:
            # Add answer record
            reply.add_answer(RR(qname, QTYPE.A, rdata=A("192.0.2.1"), ttl=300))
        else:
            # Set NXDOMAIN for other types
            reply.header.rcode = RCODE.NXDOMAIN
            
        return reply

# Create and start server
resolver = SimpleResolver()
server = DNSServer(resolver, port=5353, address="127.0.0.1")
print("Starting DNS server on 127.0.0.1:5353")
server.start()

Advanced Custom Resolver with Logging

from dnslib import *
from dnslib.server import DNSServer, BaseResolver, DNSLogger
import time

class TimestampResolver(BaseResolver):
    """Resolver that returns current timestamp in TXT records."""
    
    def resolve(self, request, handler):
        reply = request.reply()
        qname = request.q.qname
        qtype = request.q.qtype
        
        if qtype == QTYPE.TXT:
            # Return current timestamp
            timestamp = str(int(time.time()))
            reply.add_answer(RR(qname, QTYPE.TXT, rdata=TXT(f"timestamp={timestamp}"), ttl=1))
        elif qtype == QTYPE.A:
            # Return fixed IP
            reply.add_answer(RR(qname, QTYPE.A, rdata=A("203.0.113.1"), ttl=300))
        else:
            reply.header.rcode = RCODE.NXDOMAIN
            
        return reply

# Create logger with custom configuration
logger = DNSLogger(log="request,reply,error", prefix=True)

# Create resolver and server
resolver = TimestampResolver()
server = DNSServer(resolver, port=5353, address="127.0.0.1", logger=logger)

print("Starting DNS server with logging on 127.0.0.1:5353")
server.start()

Multi-Protocol Server (UDP + TCP)

from dnslib import *
from dnslib.server import DNSServer, BaseResolver
import threading

class EchoResolver(BaseResolver):
    """Resolver that echoes query information."""
    
    def resolve(self, request, handler):
        reply = request.reply()
        qname = request.q.qname
        qtype = request.q.qtype
        
        # Create echo response in TXT record
        echo_data = f"You queried {qname} for {QTYPE[qtype]}"
        reply.add_answer(RR(qname, QTYPE.TXT, rdata=TXT(echo_data), ttl=60))
        
        return reply

# Create resolver
resolver = EchoResolver()

# Start UDP server in thread
udp_server = DNSServer(resolver, port=5353, address="127.0.0.1", tcp=False)
udp_thread = udp_server.start_thread()

# Start TCP server in thread  
tcp_server = DNSServer(resolver, port=5353, address="127.0.0.1", tcp=True)
tcp_thread = tcp_server.start_thread()

print("DNS servers running on UDP and TCP port 5353")
print("Press Ctrl+C to stop")

try:
    # Keep main thread alive
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print("Stopping servers...")

Zone-Based Resolver

from dnslib import *
from dnslib.server import DNSServer, BaseResolver

class ZoneResolver(BaseResolver):
    """Resolver that serves from zone data."""
    
    def __init__(self, zone_data):
        self.records = {}
        # Parse zone data and index by name/type
        for rr in RR.fromZone(zone_data):
            key = (str(rr.rname), rr.rtype)
            if key not in self.records:
                self.records[key] = []
            self.records[key].append(rr)
    
    def resolve(self, request, handler):
        reply = request.reply()
        qname = str(request.q.qname)
        qtype = request.q.qtype
        
        # Look for exact match
        key = (qname, qtype)
        if key in self.records:
            for rr in self.records[key]:
                reply.add_answer(rr)
        else:
            # Check for ANY query
            if qtype == QTYPE.ANY:
                found = False
                for (name, rtype), records in self.records.items():
                    if name == qname:
                        for rr in records:
                            reply.add_answer(rr)
                        found = True
                if not found:
                    reply.header.rcode = RCODE.NXDOMAIN
            else:
                reply.header.rcode = RCODE.NXDOMAIN
                
        return reply

# Zone data
zone_data = """
$TTL 300
example.com.        IN      A       192.0.2.1
example.com.        IN      MX      10  mail.example.com.
www.example.com.    IN      A       192.0.2.2
mail.example.com.   IN      A       192.0.2.3
ftp.example.com.    IN      CNAME   www.example.com.
"""

# Create and start server
resolver = ZoneResolver(zone_data.strip())
server = DNSServer(resolver, port=5353, address="127.0.0.1")

print("Starting zone-based DNS server on 127.0.0.1:5353")
server.start()

Server with Custom Request Handler

from dnslib import *
from dnslib.server import DNSServer, BaseResolver, DNSHandler
import socketserver

class CustomDNSHandler(DNSHandler):
    """Custom handler with request filtering."""
    
    def handle(self):
        # Get client address
        client_addr = self.client_address[0]
        
        # Simple rate limiting example
        if hasattr(self.server, 'client_counts'):
            if client_addr not in self.server.client_counts:
                self.server.client_counts[client_addr] = 0
            self.server.client_counts[client_addr] += 1
            
            # Reject if too many requests
            if self.server.client_counts[client_addr] > 10:
                print(f"Rate limiting client {client_addr}")
                return
        
        # Call parent handler
        super().handle()

class RateLimitedResolver(BaseResolver):
    """Simple resolver with rate limiting."""
    
    def resolve(self, request, handler):
        reply = request.reply()
        qname = request.q.qname
        
        # Simple response
        reply.add_answer(RR(qname, QTYPE.A, rdata=A("198.51.100.1"), ttl=300))
        
        return reply

# Create server with custom handler
resolver = RateLimitedResolver()
server = DNSServer(resolver, port=5353, address="127.0.0.1", handler=CustomDNSHandler)

# Add client tracking
server.server.client_counts = {}

print("Starting rate-limited DNS server on 127.0.0.1:5353")
server.start()

Server with Error Handling

from dnslib import *
from dnslib.server import DNSServer, BaseResolver, DNSLogger

class RobustResolver(BaseResolver):
    """Resolver with comprehensive error handling."""
    
    def resolve(self, request, handler):
        try:
            reply = request.reply()
            qname = request.q.qname
            qtype = request.q.qtype
            
            # Validate query
            if len(str(qname)) > 253:  # Max domain length
                reply.header.rcode = RCODE.FORMERR
                return reply
                
            # Handle different query types
            if qtype == QTYPE.A:
                reply.add_answer(RR(qname, QTYPE.A, rdata=A("203.0.113.10"), ttl=300))
            elif qtype == QTYPE.AAAA:
                reply.add_answer(RR(qname, QTYPE.AAAA, rdata=AAAA("2001:db8::10"), ttl=300))
            elif qtype == QTYPE.TXT:
                reply.add_answer(RR(qname, QTYPE.TXT, rdata=TXT("Hello from robust resolver"), ttl=60))
            else:
                reply.header.rcode = RCODE.NXDOMAIN
                
            return reply
            
        except Exception as e:
            # Log error and return SERVFAIL
            print(f"Error in resolver: {e}")
            reply = request.reply()
            reply.header.rcode = RCODE.SERVFAIL
            return reply

# Create logger for debugging
logger = DNSLogger(log="request,reply,error,truncated", prefix=True)

# Create and start server
resolver = RobustResolver()
server = DNSServer(resolver, port=5353, address="127.0.0.1", logger=logger)

print("Starting robust DNS server with error handling on 127.0.0.1:5353")
server.start()

Install with Tessl CLI

npx tessl i tessl/pypi-dnslib@0.9.2

docs

dns-client.md

dns-core.md

dns-records.md

dns-resolvers.md

dns-server.md

dns-utils.md

index.md

tile.json