Simple library to encode/decode DNS wire-format packets
Framework for creating custom DNS resolvers with UDP/TCP server support, request handling, and logging capabilities. The framework provides a simple interface for building DNS servers by subclassing BaseResolver and implementing custom resolution logic.
Main DNS server class providing UDP and TCP protocol support with threading and logging.
class DNSServer:
"""
DNS server with UDP/TCP protocol support and request handling.
Args:
resolver (BaseResolver): Resolver instance for handling queries
port (int, optional): Server port (default: 53)
address (str, optional): Server bind address (default: "")
logger (DNSLogger, optional): Logger instance
tcp (bool, optional): Enable TCP server (default: False)
ipv6 (bool, optional): Enable IPv6 support (default: False)
timeout (int, optional): Request timeout in seconds (default: 5)
handler (class, optional): Custom request handler class
"""
def __init__(self, resolver, port=53, address="", logger=None, **kwargs): ...
def start(self):
"""
Start DNS server and handle requests (blocking).
Server runs until interrupted.
"""
def start_thread(self):
"""
Start DNS server in background thread (non-blocking).
Returns:
threading.Thread: Server thread object
"""
def stop(self):
"""
Stop DNS server and cleanup resources.
"""
def isAlive(self):
"""
Check if server thread is running.
Returns:
bool: True if server is running
"""Base request handler for processing DNS queries and generating responses.
class DNSHandler:
"""
Base DNS request handler for UDP and TCP protocols.
Processes incoming DNS queries and generates responses.
"""
def handle(self):
"""
Handle incoming DNS request.
Parses request, calls resolver, and sends response.
"""
def get_reply(self, data):
"""
Process DNS query data and generate response.
Args:
data (bytes): Raw DNS query packet
Returns:
bytes: DNS response packet
"""Low-level UDP and TCP server implementations with threading support.
class UDPServer:
"""
Threaded UDP server for DNS queries.
Args:
server_address (tuple): (address, port) tuple
handler (class): Request handler class
"""
def __init__(self, server_address, handler): ...
def serve_forever(self):
"""Start serving requests until interrupted."""
def shutdown(self):
"""Shutdown server and close socket."""
class TCPServer:
"""
Threaded TCP server for DNS queries.
Args:
server_address (tuple): (address, port) tuple
handler (class): Request handler class
"""
def __init__(self, server_address, handler): ...
def serve_forever(self):
"""Start serving requests until interrupted."""
def shutdown(self):
"""Shutdown server and close socket."""Abstract base class for implementing custom DNS resolvers.
class BaseResolver:
"""
Abstract base class for DNS resolvers.
Subclass this and implement resolve() method for custom logic.
"""
def resolve(self, request, handler):
"""
Resolve DNS query and return response.
Must be implemented by subclasses.
Args:
request (DNSRecord): Parsed DNS query
handler (DNSHandler): Request handler instance
Returns:
DNSRecord: DNS response record
"""
raise NotImplementedErrorConfigurable logging system for DNS server operations with request/response tracking.
class DNSLogger:
"""
DNS server logger with configurable output and formatting.
Args:
log (str, optional): Comma-separated log types
("request", "reply", "truncated", "error", "recv", "send")
(default: "request,reply,truncated,error")
prefix (bool, optional): Include timestamp prefix (default: True)
logf (callable, optional): Custom log function (default: print)
"""
def __init__(self, log="request,reply,truncated,error", prefix=True, logf=None): ...
def log_recv(self, handler, data):
"""
Log received data.
Args:
handler (DNSHandler): Request handler
data (bytes): Received data
"""
def log_send(self, handler, data):
"""
Log sent data.
Args:
handler (DNSHandler): Request handler
data (bytes): Sent data
"""
def log_request(self, handler, request):
"""
Log DNS request.
Args:
handler (DNSHandler): Request handler
request (DNSRecord): DNS request
"""
def log_reply(self, handler, reply):
"""
Log DNS reply.
Args:
handler (DNSHandler): Request handler
reply (DNSRecord): DNS reply
"""
def log_truncated(self, handler, reply):
"""
Log truncated response.
Args:
handler (DNSHandler): Request handler
reply (DNSRecord): Truncated DNS reply
"""
def log_error(self, handler, e):
"""
Log error during request processing.
Args:
handler (DNSHandler): Request handler
e (Exception): Exception that occurred
"""UDP DNS server implementation with threading support.
class UDPServer:
"""
UDP DNS server implementation.
Inherits from socketserver.ThreadingUDPServer.
Args:
server_address (tuple): (address, port) tuple
RequestHandlerClass (class): Request handler class
resolver (BaseResolver): DNS resolver instance
logger (DNSLogger, optional): Logger instance
timeout (int, optional): Request timeout
"""
def __init__(self, server_address, RequestHandlerClass, resolver, logger=None, timeout=5): ...TCP DNS server implementation with threading support.
class TCPServer:
"""
TCP DNS server implementation.
Inherits from socketserver.ThreadingTCPServer.
Args:
server_address (tuple): (address, port) tuple
RequestHandlerClass (class): Request handler class
resolver (BaseResolver): DNS resolver instance
logger (DNSLogger, optional): Logger instance
timeout (int, optional): Request timeout
"""
def __init__(self, server_address, RequestHandlerClass, resolver, logger=None, timeout=5): ...from dnslib import *
from dnslib.server import DNSServer, BaseResolver
class SimpleResolver(BaseResolver):
"""Simple resolver that returns fixed IP for all A queries."""
def resolve(self, request, handler):
# Create reply based on request
reply = request.reply()
# Get the query
qname = request.q.qname
qtype = request.q.qtype
# Handle A record queries
if qtype == QTYPE.A:
# Add answer record
reply.add_answer(RR(qname, QTYPE.A, rdata=A("192.0.2.1"), ttl=300))
else:
# Set NXDOMAIN for other types
reply.header.rcode = RCODE.NXDOMAIN
return reply
# Create and start server
resolver = SimpleResolver()
server = DNSServer(resolver, port=5353, address="127.0.0.1")
print("Starting DNS server on 127.0.0.1:5353")
server.start()from dnslib import *
from dnslib.server import DNSServer, BaseResolver, DNSLogger
import time
class TimestampResolver(BaseResolver):
"""Resolver that returns current timestamp in TXT records."""
def resolve(self, request, handler):
reply = request.reply()
qname = request.q.qname
qtype = request.q.qtype
if qtype == QTYPE.TXT:
# Return current timestamp
timestamp = str(int(time.time()))
reply.add_answer(RR(qname, QTYPE.TXT, rdata=TXT(f"timestamp={timestamp}"), ttl=1))
elif qtype == QTYPE.A:
# Return fixed IP
reply.add_answer(RR(qname, QTYPE.A, rdata=A("203.0.113.1"), ttl=300))
else:
reply.header.rcode = RCODE.NXDOMAIN
return reply
# Create logger with custom configuration
logger = DNSLogger(log="request,reply,error", prefix=True)
# Create resolver and server
resolver = TimestampResolver()
server = DNSServer(resolver, port=5353, address="127.0.0.1", logger=logger)
print("Starting DNS server with logging on 127.0.0.1:5353")
server.start()from dnslib import *
from dnslib.server import DNSServer, BaseResolver
import threading
class EchoResolver(BaseResolver):
"""Resolver that echoes query information."""
def resolve(self, request, handler):
reply = request.reply()
qname = request.q.qname
qtype = request.q.qtype
# Create echo response in TXT record
echo_data = f"You queried {qname} for {QTYPE[qtype]}"
reply.add_answer(RR(qname, QTYPE.TXT, rdata=TXT(echo_data), ttl=60))
return reply
# Create resolver
resolver = EchoResolver()
# Start UDP server in thread
udp_server = DNSServer(resolver, port=5353, address="127.0.0.1", tcp=False)
udp_thread = udp_server.start_thread()
# Start TCP server in thread
tcp_server = DNSServer(resolver, port=5353, address="127.0.0.1", tcp=True)
tcp_thread = tcp_server.start_thread()
print("DNS servers running on UDP and TCP port 5353")
print("Press Ctrl+C to stop")
try:
# Keep main thread alive
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping servers...")from dnslib import *
from dnslib.server import DNSServer, BaseResolver
class ZoneResolver(BaseResolver):
"""Resolver that serves from zone data."""
def __init__(self, zone_data):
self.records = {}
# Parse zone data and index by name/type
for rr in RR.fromZone(zone_data):
key = (str(rr.rname), rr.rtype)
if key not in self.records:
self.records[key] = []
self.records[key].append(rr)
def resolve(self, request, handler):
reply = request.reply()
qname = str(request.q.qname)
qtype = request.q.qtype
# Look for exact match
key = (qname, qtype)
if key in self.records:
for rr in self.records[key]:
reply.add_answer(rr)
else:
# Check for ANY query
if qtype == QTYPE.ANY:
found = False
for (name, rtype), records in self.records.items():
if name == qname:
for rr in records:
reply.add_answer(rr)
found = True
if not found:
reply.header.rcode = RCODE.NXDOMAIN
else:
reply.header.rcode = RCODE.NXDOMAIN
return reply
# Zone data
zone_data = """
$TTL 300
example.com. IN A 192.0.2.1
example.com. IN MX 10 mail.example.com.
www.example.com. IN A 192.0.2.2
mail.example.com. IN A 192.0.2.3
ftp.example.com. IN CNAME www.example.com.
"""
# Create and start server
resolver = ZoneResolver(zone_data.strip())
server = DNSServer(resolver, port=5353, address="127.0.0.1")
print("Starting zone-based DNS server on 127.0.0.1:5353")
server.start()from dnslib import *
from dnslib.server import DNSServer, BaseResolver, DNSHandler
import socketserver
class CustomDNSHandler(DNSHandler):
"""Custom handler with request filtering."""
def handle(self):
# Get client address
client_addr = self.client_address[0]
# Simple rate limiting example
if hasattr(self.server, 'client_counts'):
if client_addr not in self.server.client_counts:
self.server.client_counts[client_addr] = 0
self.server.client_counts[client_addr] += 1
# Reject if too many requests
if self.server.client_counts[client_addr] > 10:
print(f"Rate limiting client {client_addr}")
return
# Call parent handler
super().handle()
class RateLimitedResolver(BaseResolver):
"""Simple resolver with rate limiting."""
def resolve(self, request, handler):
reply = request.reply()
qname = request.q.qname
# Simple response
reply.add_answer(RR(qname, QTYPE.A, rdata=A("198.51.100.1"), ttl=300))
return reply
# Create server with custom handler
resolver = RateLimitedResolver()
server = DNSServer(resolver, port=5353, address="127.0.0.1", handler=CustomDNSHandler)
# Add client tracking
server.server.client_counts = {}
print("Starting rate-limited DNS server on 127.0.0.1:5353")
server.start()from dnslib import *
from dnslib.server import DNSServer, BaseResolver, DNSLogger
class RobustResolver(BaseResolver):
"""Resolver with comprehensive error handling."""
def resolve(self, request, handler):
try:
reply = request.reply()
qname = request.q.qname
qtype = request.q.qtype
# Validate query
if len(str(qname)) > 253: # Max domain length
reply.header.rcode = RCODE.FORMERR
return reply
# Handle different query types
if qtype == QTYPE.A:
reply.add_answer(RR(qname, QTYPE.A, rdata=A("203.0.113.10"), ttl=300))
elif qtype == QTYPE.AAAA:
reply.add_answer(RR(qname, QTYPE.AAAA, rdata=AAAA("2001:db8::10"), ttl=300))
elif qtype == QTYPE.TXT:
reply.add_answer(RR(qname, QTYPE.TXT, rdata=TXT("Hello from robust resolver"), ttl=60))
else:
reply.header.rcode = RCODE.NXDOMAIN
return reply
except Exception as e:
# Log error and return SERVFAIL
print(f"Error in resolver: {e}")
reply = request.reply()
reply.header.rcode = RCODE.SERVFAIL
return reply
# Create logger for debugging
logger = DNSLogger(log="request,reply,error,truncated", prefix=True)
# Create and start server
resolver = RobustResolver()
server = DNSServer(resolver, port=5353, address="127.0.0.1", logger=logger)
print("Starting robust DNS server with error handling on 127.0.0.1:5353")
server.start()Install with Tessl CLI
npx tessl i tessl/pypi-dnslib