Simple library to encode/decode DNS wire-format packets
Ready-to-use resolver implementations including proxy resolvers, fixed response resolvers, zone file resolvers, and shell script resolvers. These implementations provide complete DNS server functionality for different use cases.
DNS proxy resolver that forwards queries to upstream servers and returns responses.
class ProxyResolver(BaseResolver):
"""
DNS proxy resolver forwarding queries to upstream server.
Args:
address (str): Upstream DNS server address
port (int, optional): Upstream DNS server port (default: 53)
timeout (int, optional): Query timeout in seconds (default: 5)
tcp (bool, optional): Use TCP for upstream queries (default: False)
"""
def __init__(self, address, port=53, timeout=5, tcp=False): ...
def resolve(self, request, handler):
"""
Forward query to upstream server and return response.
Args:
request (DNSRecord): DNS query to forward
handler (DNSHandler): Request handler instance
Returns:
DNSRecord: Response from upstream server
"""Resolver that returns fixed responses to all queries, useful for testing and blocking.
class FixedResolver(BaseResolver):
"""
Resolver returning fixed responses to all queries.
Args:
zone (str): Zone file format responses to return
ttl (int, optional): TTL for responses (default: 60)
"""
def __init__(self, zone, ttl=60): ...
def resolve(self, request, handler):
"""
Return fixed response regardless of query.
Args:
request (DNSRecord): DNS query (ignored)
handler (DNSHandler): Request handler instance
Returns:
DNSRecord: Fixed DNS response
"""Resolver that serves DNS responses from zone file data with wildcard support.
class ZoneResolver(BaseResolver):
"""
Resolver serving responses from zone file data.
Args:
zone (str): Zone file format data
glob (bool, optional): Enable wildcard matching (default: False)
origin (str, optional): Default origin domain
ttl (int, optional): Default TTL value
"""
def __init__(self, zone, glob=False, origin=None, ttl=None): ...
def resolve(self, request, handler):
"""
Resolve query from zone file data.
Args:
request (DNSRecord): DNS query
handler (DNSHandler): Request handler instance
Returns:
DNSRecord: DNS response from zone data or NXDOMAIN
"""Resolver that calls external shell scripts to generate dynamic responses.
class ShellResolver(BaseResolver):
"""
Resolver calling shell scripts for dynamic responses.
Args:
script (str): Path to shell script
timeout (int, optional): Script execution timeout (default: 10)
shell (bool, optional): Use shell for execution (default: True)
"""
def __init__(self, script, timeout=10, shell=True): ...
def resolve(self, request, handler):
"""
Execute shell script and parse response.
Args:
request (DNSRecord): DNS query
handler (DNSHandler): Request handler instance
Returns:
DNSRecord: DNS response from script output
"""Advanced proxy resolver that can intercept and modify responses for specific domains.
class InterceptResolver(BaseResolver):
"""
Intercepting proxy resolver with response modification.
Args:
address (str): Upstream DNS server address
port (int, optional): Upstream DNS server port (default: 53)
timeout (int, optional): Query timeout in seconds (default: 5)
intercept (dict, optional): Domain interception rules
"""
def __init__(self, address, port=53, timeout=5, intercept=None): ...
def resolve(self, request, handler):
"""
Forward query with optional response interception.
Args:
request (DNSRecord): DNS query
handler (DNSHandler): Request handler instance
Returns:
DNSRecord: Original or intercepted DNS response
"""
def add_intercept(self, domain, qtype, response):
"""
Add domain interception rule.
Args:
domain (str): Domain to intercept
qtype (str or int): Query type to intercept
response (str or DNSRecord): Replacement response
"""
def remove_intercept(self, domain, qtype=None):
"""
Remove domain interception rule.
Args:
domain (str): Domain to stop intercepting
qtype (str or int, optional): Specific query type
"""Each resolver can be run as a standalone DNS server from the command line.
# Basic proxy server
python -m dnslib.proxy
# Proxy with specific upstream server
python -m dnslib.proxy --upstream 8.8.8.8
# Proxy with custom port and TCP
python -m dnslib.proxy --port 5353 --upstream 1.1.1.1 --tcp# Fixed response server
python -m dnslib.fixedresolver
# Fixed response with custom data
python -m dnslib.fixedresolver --zone "example.com 300 IN A 192.0.2.1"
# Fixed response on custom port
python -m dnslib.fixedresolver --port 5353# Zone file server
python -m dnslib.zoneresolver --zone-file /path/to/zone.txt
# Zone with wildcard support
python -m dnslib.zoneresolver --zone-file zone.txt --glob
# Zone server on custom port
python -m dnslib.zoneresolver --zone-file zone.txt --port 5353# Shell script resolver
python -m dnslib.shellresolver --script /path/to/resolver.sh
# Shell resolver with timeout
python -m dnslib.shellresolver --script resolver.sh --timeout 5
# Shell resolver on custom address
python -m dnslib.shellresolver --script resolver.sh --address 127.0.0.1 --port 5353from dnslib.server import DNSServer
from dnslib.proxy import ProxyResolver
# Create proxy resolver
resolver = ProxyResolver("8.8.8.8", port=53, timeout=10)
# Start server
server = DNSServer(resolver, port=5353, address="127.0.0.1")
print("Starting DNS proxy server on 127.0.0.1:5353")
print("Forwarding to 8.8.8.8:53")
server.start()from dnslib.server import DNSServer
from dnslib.fixedresolver import FixedResolver
# Fixed responses for blocking ads
fixed_zone = """
ads.example.com. 300 IN A 127.0.0.1
tracker.example.com. 300 IN A 127.0.0.1
malware.example.com. 300 IN A 127.0.0.1
"""
# Create resolver
resolver = FixedResolver(fixed_zone.strip())
# Start server
server = DNSServer(resolver, port=5353, address="127.0.0.1")
print("Starting ad-blocking DNS server on 127.0.0.1:5353")
server.start()from dnslib.server import DNSServer
from dnslib.zoneresolver import ZoneResolver
# Zone file data
zone_data = """
$TTL 300
$ORIGIN example.local.
@ IN SOA ns1.example.local. admin.example.local. (
2023010101 ; Serial
3600 ; Refresh
1800 ; Retry
604800 ; Expire
86400 ) ; Minimum
IN NS ns1.example.local.
IN A 192.168.1.1
IN MX 10 mail.example.local.
ns1 IN A 192.168.1.1
mail IN A 192.168.1.10
www IN A 192.168.1.20
ftp IN CNAME www.example.local.
; Wildcard support
*.subdomain IN A 192.168.1.100
"""
# Create resolver with wildcard support
resolver = ZoneResolver(zone_data.strip(), glob=True)
# Start server
server = DNSServer(resolver, port=5353, address="127.0.0.1")
print("Starting zone file DNS server on 127.0.0.1:5353")
server.start()from dnslib.server import DNSServer
from dnslib.shellresolver import ShellResolver
# Create shell script resolver
# Script should output zone file format responses
resolver = ShellResolver("/path/to/dns_resolver.sh", timeout=5)
# Start server
server = DNSServer(resolver, port=5353, address="127.0.0.1")
print("Starting shell script DNS server on 127.0.0.1:5353")
server.start()Example shell script (dns_resolver.sh):
#!/bin/bash
# DNS resolver shell script
# Arguments: QNAME QTYPE QCLASS
QNAME=$1
QTYPE=$2
QCLASS=$3
case "$QNAME" in
"time.local.")
echo "time.local. 60 IN TXT \"Current time: $(date)\""
;;
"ip.local.")
echo "ip.local. 60 IN TXT \"Your IP: $REMOTE_ADDR\""
;;
"random.local.")
IP="192.168.1.$((RANDOM % 254 + 1))"
echo "random.local. 60 IN A $IP"
;;
*)
# Return NXDOMAIN for unknown domains
exit 3
;;
esacfrom dnslib.server import DNSServer
from dnslib.intercept import InterceptResolver
# Create intercepting proxy
resolver = InterceptResolver("8.8.8.8", port=53)
# Add interception rules
resolver.add_intercept("blocked.com", "A", "127.0.0.1")
resolver.add_intercept("ads.example.com", "A", "0.0.0.0")
resolver.add_intercept("tracking.com", "*", "127.0.0.1")
# Start server
server = DNSServer(resolver, port=5353, address="127.0.0.1")
print("Starting intercepting DNS proxy on 127.0.0.1:5353")
print("Intercepting blocked domains")
server.start()from dnslib.server import DNSServer, BaseResolver
from dnslib import *
import random
import time
class LoadBalancingResolver(BaseResolver):
"""Resolver providing load balancing with health checks."""
def __init__(self, servers):
self.servers = servers
self.healthy_servers = servers.copy()
self.last_check = 0
def health_check(self):
"""Simple health check for servers."""
if time.time() - self.last_check > 60: # Check every minute
self.healthy_servers = []
for server in self.servers:
try:
# Simple TCP connect test
import socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(2)
result = sock.connect_ex((server, 80))
sock.close()
if result == 0:
self.healthy_servers.append(server)
except:
pass
self.last_check = time.time()
def resolve(self, request, handler):
reply = request.reply()
qname = request.q.qname
qtype = request.q.qtype
if qtype == QTYPE.A and str(qname) == "service.example.com.":
self.health_check()
if self.healthy_servers:
# Return random healthy server
server_ip = random.choice(self.healthy_servers)
reply.add_answer(RR(qname, QTYPE.A, rdata=A(server_ip), ttl=30))
else:
reply.header.rcode = RCODE.SERVFAIL
else:
reply.header.rcode = RCODE.NXDOMAIN
return reply
# Server pool
server_pool = ["192.168.1.10", "192.168.1.11", "192.168.1.12"]
# Create and start load balancing resolver
resolver = LoadBalancingResolver(server_pool)
server = DNSServer(resolver, port=5353, address="127.0.0.1")
print("Starting load balancing DNS server on 127.0.0.1:5353")
server.start()from dnslib.server import DNSServer, BaseResolver
from dnslib.proxy import ProxyResolver
from dnslib import *
class ConditionalResolver(BaseResolver):
"""Resolver with conditional forwarding rules."""
def __init__(self, default_upstream, rules):
self.default_resolver = ProxyResolver(default_upstream)
self.rules = {}
# Create resolvers for each rule
for domain, upstream in rules.items():
self.rules[domain] = ProxyResolver(upstream)
def resolve(self, request, handler):
qname = str(request.q.qname).lower()
# Check forwarding rules
for domain, resolver in self.rules.items():
if qname.endswith(domain.lower()):
return resolver.resolve(request, handler)
# Use default resolver
return self.default_resolver.resolve(request, handler)
# Forwarding rules
forwarding_rules = {
"internal.company.com.": "192.168.1.1",
"dev.company.com.": "192.168.10.1",
"staging.company.com.": "192.168.20.1"
}
# Create conditional resolver
resolver = ConditionalResolver("8.8.8.8", forwarding_rules)
# Start server
server = DNSServer(resolver, port=5353, address="127.0.0.1")
print("Starting conditional forwarding DNS server on 127.0.0.1:5353")
print("Rules:")
for domain, upstream in forwarding_rules.items():
print(f" {domain} -> {upstream}")
print(f" * -> 8.8.8.8")
server.start()from dnslib.server import DNSServer, BaseResolver
from dnslib.proxy import ProxyResolver
from dnslib import *
import time
class CachingResolver(BaseResolver):
"""Simple caching DNS resolver."""
def __init__(self, upstream):
self.upstream_resolver = ProxyResolver(upstream)
self.cache = {}
def cache_key(self, request):
"""Generate cache key for request."""
return (str(request.q.qname), request.q.qtype, request.q.qclass)
def resolve(self, request, handler):
key = self.cache_key(request)
now = time.time()
# Check cache
if key in self.cache:
response, timestamp, ttl = self.cache[key]
if now - timestamp < ttl:
# Adjust TTL in response
cached_response = DNSRecord.parse(response.pack())
age = int(now - timestamp)
for rr in cached_response.rr:
rr.ttl = max(1, rr.ttl - age)
return cached_response
else:
# Expired, remove from cache
del self.cache[key]
# Query upstream
response = self.upstream_resolver.resolve(request, handler)
# Cache successful responses
if response.header.rcode == RCODE.NOERROR and response.rr:
min_ttl = min(rr.ttl for rr in response.rr if rr.ttl > 0)
if min_ttl > 0:
self.cache[key] = (response, now, min_ttl)
return response
# Create caching resolver
resolver = CachingResolver("8.8.8.8")
# Start server
server = DNSServer(resolver, port=5353, address="127.0.0.1")
print("Starting caching DNS server on 127.0.0.1:5353")
server.start()Install with Tessl CLI
npx tessl i tessl/pypi-dnslib