DNS toolkit for Python supporting almost all record types with high-level and low-level DNS operations
85
Low-level DNS query functions that provide direct control over DNS message exchange with nameservers. These functions handle UDP and TCP communication, zone transfers, and provide fine-grained control over query parameters.
Send DNS queries using UDP protocol with full control over query parameters and response handling.
def udp(q, where, timeout=None, port=53, af=None, source=None, source_port=0,
ignore_unexpected=False, one_rr_per_rrset=False, ignore_trailing=False):
"""
Send a DNS query via UDP.
Args:
q (dns.message.Message): DNS query message
where (str): Nameserver IP address
timeout (float): Query timeout in seconds
port (int): Destination port (default 53)
af (int): Address family (socket.AF_INET or socket.AF_INET6)
source (str): Source IP address
source_port (int): Source port number
ignore_unexpected (bool): Ignore responses from unexpected sources
one_rr_per_rrset (bool): Put each RR in its own RRset
ignore_trailing (bool): Ignore trailing junk in packets
Returns:
dns.message.Message: DNS response message
"""
def send_udp(sock, what, destination, expiration=None):
"""
Send a DNS message to the specified UDP socket destination.
Args:
sock (socket): UDP socket
what (dns.message.Message or bytes): Message to send
destination (tuple): (address, port) tuple
expiration (float): Absolute expiration time
"""
def receive_udp(sock, destination, expiration=None, ignore_unexpected=False,
one_rr_per_rrset=False, keyring=None, request_mac=b'',
ignore_trailing=False):
"""
Read a DNS message from a UDP socket.
Args:
sock (socket): UDP socket
destination (tuple): Expected source (address, port)
expiration (float): Absolute expiration time
ignore_unexpected (bool): Ignore responses from unexpected sources
one_rr_per_rrset (bool): Put each RR in its own RRset
keyring (dict): TSIG keyring for validation
request_mac (bytes): TSIG MAC from request
ignore_trailing (bool): Ignore trailing junk in packets
Returns:
dns.message.Message: DNS response message
"""Send DNS queries using TCP protocol for reliable delivery and large responses.
def tcp(q, where, timeout=None, port=53, af=None, source=None, source_port=0,
one_rr_per_rrset=False, ignore_trailing=False):
"""
Send a DNS query via TCP.
Args:
q (dns.message.Message): DNS query message
where (str): Nameserver IP address
timeout (float): Query timeout in seconds
port (int): Destination port (default 53)
af (int): Address family (socket.AF_INET or socket.AF_INET6)
source (str): Source IP address
source_port (int): Source port number
one_rr_per_rrset (bool): Put each RR in its own RRset
ignore_trailing (bool): Ignore trailing junk in packets
Returns:
dns.message.Message: DNS response message
"""
def send_tcp(sock, what, expiration=None):
"""
Send a DNS message to the specified TCP socket.
Args:
sock (socket): TCP socket
what (dns.message.Message or bytes): Message to send
expiration (float): Absolute expiration time
"""
def receive_tcp(sock, expiration=None, one_rr_per_rrset=False, keyring=None,
request_mac=b'', ignore_trailing=False):
"""
Read a DNS message from a TCP socket.
Args:
sock (socket): TCP socket
expiration (float): Absolute expiration time
one_rr_per_rrset (bool): Put each RR in its own RRset
keyring (dict): TSIG keyring for validation
request_mac (bytes): TSIG MAC from request
ignore_trailing (bool): Ignore trailing junk in packets
Returns:
dns.message.Message: DNS response message
"""Perform zone transfers (AXFR/IXFR) to retrieve complete zone data from authoritative servers.
def xfr(where, zone, rdtype='AXFR', rdclass='IN', timeout=None, port=53,
keyring=None, keyname=None, relativize=True, af=None, lifetime=None,
source=None, source_port=0, serial=0, use_udp=False,
keyalgorithm='HMAC-MD5.SIG-ALG.REG.INT'):
"""
Perform a zone transfer and return a generator yielding messages.
Args:
where (str): Nameserver IP address
zone (str or dns.name.Name): Zone name
rdtype (str or int): Transfer type ('AXFR' or 'IXFR')
rdclass (str or int): Record class (default 'IN')
timeout (float): Query timeout in seconds
port (int): Destination port (default 53)
keyring (dict): TSIG keyring for authentication
keyname (str or dns.name.Name): TSIG key name
relativize (bool): Relativize names to zone origin
af (int): Address family
lifetime (float): Total transfer lifetime
source (str): Source IP address
source_port (int): Source port number
serial (int): Serial number for IXFR
use_udp (bool): Use UDP for IXFR
keyalgorithm (str): TSIG algorithm name
Yields:
dns.message.Message: Transfer response messages
"""Utility functions for working with sockets and query timing.
def _compute_expiration(timeout):
"""
Compute absolute expiration time from timeout.
Args:
timeout (float): Timeout in seconds
Returns:
float: Absolute expiration time
"""
def _matches_destination(af, address, destination, ignore_unexpected):
"""
Check if address matches expected destination.
Args:
af (int): Address family
address (tuple): Actual source address
destination (tuple): Expected destination
ignore_unexpected (bool): Ignore unexpected sources
Returns:
bool: True if addresses match or should be ignored
"""import dns.message
import dns.query
import dns.name
import dns.rdatatype
# Create a query message
qname = dns.name.from_text('example.com')
q = dns.message.make_query(qname, dns.rdatatype.A)
# Send UDP query
response = dns.query.udp(q, '8.8.8.8', timeout=10)
# Process response
for rrset in response.answer:
for rdata in rrset:
if rrset.rdtype == dns.rdatatype.A:
print(f"IP address: {rdata.address}")import dns.message
import dns.query
import dns.name
import dns.rdatatype
# Create query with EDNS
qname = dns.name.from_text('large-response.example.com')
q = dns.message.make_query(qname, dns.rdatatype.TXT, use_edns=0, payload=4096)
# Send TCP query with custom source
response = dns.query.tcp(q, '8.8.8.8', timeout=30, source='192.168.1.100',
source_port=12345)
# Check response
print(f"Response code: {response.rcode()}")
print(f"Answer count: {len(response.answer)}")import dns.query
import dns.zone
import dns.name
# Perform AXFR zone transfer
zone_name = dns.name.from_text('example.com')
zone_messages = dns.query.xfr('ns1.example.com', zone_name,
timeout=300, lifetime=600)
# Build zone from transfer
zone = dns.zone.from_xfr(zone_messages)
# Iterate through zone records
for name, node in zone.nodes.items():
for rdataset in node.rdatasets:
print(f"{name} {rdataset.ttl} {rdataset.rdclass} {rdataset.rdtype}")
for rdata in rdataset:
print(f" {rdata}")import dns.message
import dns.query
import dns.tsigkeyring
import dns.name
# Create TSIG keyring
keyring = dns.tsigkeyring.from_text({
'key-name': 'base64-encoded-key'
})
# Create authenticated query
qname = dns.name.from_text('secure.example.com')
q = dns.message.make_query(qname, dns.rdatatype.A)
q.use_tsig(keyring, 'key-name')
# Send authenticated query
response = dns.query.udp(q, '192.168.1.1', timeout=10)
# Verify response authentication
if response.tsig_error() == 0:
print("Response verified successfully")
else:
print(f"TSIG verification failed: {response.tsig_error()}")class UnexpectedSource(DNSException):
"""A DNS query response came from an unexpected address or port."""
class BadResponse(DNSException):
"""A DNS query response does not respond to the question asked."""
class TransferError(DNSException):
"""A zone transfer response had a non-zero rcode."""Install with Tessl CLI
npx tessl i tessl/pypi-dnspythondocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10