Simple library to encode/decode DNS wire-format packets
Additional utilities for DNS development including intercepting proxy functionality, helper functions for TCP/UDP communication, and specialized request handlers for advanced use cases.
Advanced resolver that can intercept specific queries locally while proxying others to upstream servers.
class InterceptResolver(BaseResolver):
"""
Intercepting resolver that matches queries against patterns and handles
them locally or forwards to upstream server.
Args:
address (str): Upstream DNS server address
port (int, optional): Upstream DNS server port (default: 53)
ttl (int, optional): TTL for intercepted responses (default: 60)
intercept (list, optional): List of domains/patterns to intercept
skip (list, optional): List of domains/patterns to skip interception
nxdomain (list, optional): List of domains to return NXDOMAIN
forward (list, optional): List of domains to always forward
all_qtypes (bool, optional): Intercept all query types (default: False)
timeout (int, optional): Upstream timeout in seconds (default: 5)
"""
def __init__(self, address, port, ttl, intercept, skip, nxdomain, forward, all_qtypes, timeout=0): ...
def resolve(self, request, handler):
"""
Resolve query with interception logic.
Args:
request (DNSRecord): DNS query to resolve
handler (DNSHandler): Request handler instance
Returns:
DNSRecord: DNS response (intercepted or proxied)
"""Specialized DNS handler that passes requests directly to upstream server without decoding/encoding for maximum performance.
class PassthroughDNSHandler(DNSHandler):
"""
DNS handler that forwards requests directly to upstream server
without parsing DNS packets for improved performance.
"""
def get_reply(self, data):
"""
Forward raw DNS packet to upstream server.
Args:
data (bytes): Raw DNS query packet
Returns:
bytes: Raw DNS response packet from upstream
"""Helper functions for TCP and UDP DNS communication.
def send_tcp(data, host, port):
"""
Send DNS query over TCP and return response.
Args:
data (bytes): DNS query packet
host (str): Target server address
port (int): Target server port
Returns:
bytes: DNS response packet
"""
def send_udp(data, host, port):
"""
Send DNS query over UDP and return response.
Args:
data (bytes): DNS query packet
host (str): Target server address
port (int): Target server port
Returns:
bytes: DNS response packet
"""Parser for DiG output format for compatibility with standard DNS tools.
class DigParser:
"""
Parser for DiG (Domain Information Groper) output format.
Enables parsing of DiG command output for DNS record extraction.
"""
def __init__(self): ...
def parse(self, dig_output):
"""
Parse DiG output and extract DNS records.
Args:
dig_output (str): DiG command output
Returns:
DNSRecord: Parsed DNS record from DiG output
"""
def parse_response(self, response_text):
"""
Parse DiG response section.
Args:
response_text (str): DiG response section text
Returns:
list[RR]: List of parsed resource records
"""Exception classes for error handling in DNS utilities.
class DNSError(Exception):
"""Base exception class for DNS-related errors."""
class DNSLabelError(Exception):
"""Exception for DNS label processing errors."""
class BufferError(Exception):
"""Exception for buffer operation errors."""
class BimapError(Exception):
"""Exception for bidirectional mapping errors."""from dnslib import *
from dnslib.server import DNSServer
from dnslib.intercept import InterceptResolver
# Create intercepting resolver
resolver = InterceptResolver(
address="8.8.8.8",
port=53,
ttl=300,
intercept=["example.com", "*.local"],
nxdomain=["blocked.com"],
forward=["important.com"],
all_qtypes=True,
timeout=5
)
# Start DNS server with intercepting resolver
server = DNSServer(resolver, port=5353, address="127.0.0.1")
server.start_thread()
print("Intercepting DNS server running on 127.0.0.1:5353")from dnslib import *
from dnslib.proxy import send_tcp, send_udp
# Create DNS query
query = DNSRecord.question("example.com", "A")
query_data = query.pack()
# Send via UDP
try:
response_data = send_udp(query_data, "8.8.8.8", 53)
response = DNSRecord.parse(response_data)
print(f"UDP Response: {response.a}")
except Exception as e:
print(f"UDP query failed: {e}")
# Send via TCP for large responses
try:
response_data = send_tcp(query_data, "8.8.8.8", 53)
response = DNSRecord.parse(response_data)
print(f"TCP Response: {response.a}")
except Exception as e:
print(f"TCP query failed: {e}")from dnslib import *
from dnslib.server import DNSServer
from dnslib.proxy import ProxyResolver, PassthroughDNSHandler
# Create proxy resolver
resolver = ProxyResolver("8.8.8.8", 53, timeout=5)
# Use passthrough handler for maximum performance
server = DNSServer(
resolver=resolver,
port=5353,
address="127.0.0.1",
handler=PassthroughDNSHandler
)
server.start_thread()
print("High-performance proxy server running on 127.0.0.1:5353")from dnslib.digparser import DigParser
import subprocess
# Run DiG command and capture output
dig_cmd = ["dig", "@8.8.8.8", "example.com", "A"]
result = subprocess.run(dig_cmd, capture_output=True, text=True)
# Parse DiG output
parser = DigParser()
try:
dns_record = parser.parse(result.stdout)
print(f"Parsed from DiG: {dns_record}")
# Extract answer records
for rr in dns_record.rr:
print(f"Answer: {rr}")
except Exception as e:
print(f"Failed to parse DiG output: {e}")from dnslib import *
from dnslib.label import DNSLabelError
try:
# Try to create DNS record with invalid data
query = DNSRecord.question("invalid..domain", "A")
response_data = query.send("8.8.8.8", 53)
response = DNSRecord.parse(response_data)
except DNSError as e:
print(f"DNS error: {e}")
except DNSLabelError as e:
print(f"DNS label error: {e}")
except BufferError as e:
print(f"Buffer error: {e}")
except Exception as e:
print(f"Unexpected error: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-dnslib