XML bomb protection for Python stdlib modules
—
Secure XML-RPC client and server protection with gzip bomb prevention. DefusedXML provides defused parsers and decompression limits for XML-RPC communications, protecting against both XML-based attacks and gzip compression bombs that can cause denial of service through excessive memory consumption.
Functions to apply and remove system-wide XML-RPC security patches.
def monkey_patch():
"""
Apply security patches to XML-RPC modules system-wide.
Replaces xmlrpc.client.FastParser with DefusedExpatParser,
xmlrpc.client.GzipDecodedResponse with DefusedGzipDecodedResponse,
and xmlrpc.client.gzip_decode with defused_gzip_decode.
Also patches xmlrpc.server.gzip_decode if available (Python 3).
This is a global monkey patch that affects all XML-RPC usage
in the current Python process.
"""
def unmonkey_patch():
"""
Remove security patches from XML-RPC modules.
Restores original XML-RPC implementations by reverting
monkey patches applied by monkey_patch().
Warning: This removes security protections and should only
be used if defused XML-RPC processing is causing compatibility issues.
"""Usage Examples:
import defusedxml.xmlrpc as xmlrpc_defused
# Apply system-wide XML-RPC security patches
xmlrpc_defused.monkey_patch()
# Now all XML-RPC operations use secure implementations
import xmlrpc.client
server = xmlrpc.client.ServerProxy('http://example.com/xmlrpc')
result = server.some_method() # Uses defused parser
# Remove patches if needed (not recommended)
xmlrpc_defused.unmonkey_patch()Secure gzip decompression with configurable size limits to prevent gzip bomb attacks.
def defused_gzip_decode(data, limit=None):
"""
Decompress gzip-encoded data with size limits to prevent gzip bombs.
Args:
data (bytes): Gzip-compressed data to decompress
limit (int, optional): Maximum decompressed size in bytes (default: MAX_DATA)
Returns:
bytes: Decompressed data
Raises:
ValueError: If decompressed data exceeds size limit or data is invalid
NotImplementedError: If gzip module is not available
"""Usage Examples:
import defusedxml.xmlrpc as xmlrpc_defused
# Decompress with default limit (30MB)
compressed_data = get_gzip_data()
try:
decompressed = xmlrpc_defused.defused_gzip_decode(compressed_data)
print(f"Decompressed {len(compressed_data)} bytes to {len(decompressed)} bytes")
except ValueError as e:
print(f"Gzip decompression failed: {e}")
# Decompress with custom limit (10MB)
try:
decompressed = xmlrpc_defused.defused_gzip_decode(compressed_data, limit=10*1024*1024)
except ValueError as e:
print(f"Data exceeded 10MB limit: {e}")
# Disable limit (not recommended for untrusted data)
decompressed = xmlrpc_defused.defused_gzip_decode(trusted_data, limit=-1)Secure XML-RPC parser with configurable security restrictions for processing XML-RPC requests and responses.
class DefusedExpatParser:
"""
Secure XML-RPC parser using expat with configurable security restrictions.
Replaces the standard XML-RPC FastParser with security handlers
to prevent XML bomb attacks, DTD processing attacks, and external
entity attacks in XML-RPC communications.
"""
def __init__(self, target, forbid_dtd=False, forbid_entities=True, forbid_external=True):
"""
Initialize DefusedExpatParser for XML-RPC processing.
Args:
target: XML-RPC target handler for processing parsed data
forbid_dtd (bool): Forbid DTD processing (default: False)
forbid_entities (bool): Forbid entity expansion (default: True)
forbid_external (bool): Forbid external references (default: True)
"""
def defused_start_doctype_decl(self, name, sysid, pubid, has_internal_subset):
"""Handler that raises DTDForbidden when DTD processing is forbidden"""
def defused_entity_decl(self, name, is_parameter_entity, value, base, sysid, pubid, notation_name):
"""Handler that raises EntitiesForbidden when entity processing is forbidden"""
def defused_unparsed_entity_decl(self, name, base, sysid, pubid, notation_name):
"""Handler that raises EntitiesForbidden for unparsed entities when forbidden"""
def defused_external_entity_ref_handler(self, context, base, sysid, pubid):
"""Handler that raises ExternalReferenceForbidden when external references are forbidden"""Secure gzip response handler with size limits for processing compressed XML-RPC responses.
class DefusedGzipDecodedResponse:
"""
Secure gzip-decoded response handler with size limits.
Replaces the standard GzipDecodedResponse with size limits
to prevent gzip bomb attacks that can consume excessive memory
through maliciously crafted compressed responses.
"""
def __init__(self, response, limit=None):
"""
Initialize DefusedGzipDecodedResponse with size limits.
Args:
response: HTTP response object containing gzip-compressed data
limit (int, optional): Maximum decompressed size in bytes (default: MAX_DATA)
Raises:
ValueError: If response data exceeds size limit
NotImplementedError: If gzip module is not available
"""
def read(self, n):
"""
Read and decompress up to n bytes from response.
Args:
n (int): Maximum number of bytes to read
Returns:
bytes: Decompressed data
Raises:
ValueError: If total decompressed data exceeds size limit
"""
def close(self):
"""Close the response and cleanup resources"""MAX_DATA = 30 * 1024 * 1024 # Maximum data size limit (30MB)Usage Example:
import defusedxml.xmlrpc as xmlrpc_defused
# Custom size limit
custom_limit = 5 * 1024 * 1024 # 5MB
response_handler = xmlrpc_defused.DefusedGzipDecodedResponse(http_response, limit=custom_limit)
try:
data = response_handler.read(1024)
while data:
process_data(data)
data = response_handler.read(1024)
finally:
response_handler.close()import defusedxml.xmlrpc as xmlrpc_defused
# Apply protection at application startup
xmlrpc_defused.monkey_patch()
# Now all XML-RPC usage is automatically protected
import xmlrpc.client
def make_xmlrpc_call(server_url, method_name, *args):
"""Make XML-RPC call with automatic security protection."""
try:
server = xmlrpc.client.ServerProxy(server_url)
result = getattr(server, method_name)(*args)
return result
except xmlrpc.client.Fault as e:
print(f"XML-RPC fault: {e}")
except Exception as e:
print(f"XML-RPC error: {e}")
return None
# Make calls normally - they're automatically secured
result = make_xmlrpc_call('http://example.com/rpc', 'get_data', 'param1')import defusedxml.xmlrpc as xmlrpc_defused
import xmlrpc.client
class SecureXMLRPCTransport(xmlrpc.client.Transport):
"""Custom XML-RPC transport with defused parser."""
def getparser(self):
"""Return defused parser instead of standard parser."""
target = xmlrpc.client.Unmarshaller()
parser = xmlrpc_defused.DefusedExpatParser(
target,
forbid_dtd=True,
forbid_entities=True,
forbid_external=True
)
return parser, target
# Use custom transport
transport = SecureXMLRPCTransport()
server = xmlrpc.client.ServerProxy('http://example.com/rpc', transport=transport)
result = server.method_name()import defusedxml.xmlrpc as xmlrpc_defused
def safe_gzip_decode(compressed_data, max_size=None):
"""Safely decompress gzip data with error handling."""
try:
limit = max_size or xmlrpc_defused.MAX_DATA
decompressed = xmlrpc_defused.defused_gzip_decode(compressed_data, limit=limit)
print(f"Successfully decompressed {len(compressed_data)} bytes to {len(decompressed)} bytes")
return decompressed
except ValueError as e:
if "max gzipped payload length exceeded" in str(e):
print(f"Gzip bomb detected: compressed data would exceed {limit} bytes")
elif "invalid data" in str(e):
print("Invalid gzip data format")
else:
print(f"Gzip decompression error: {e}")
return None
except NotImplementedError:
print("Gzip module not available")
return Noneimport defusedxml.xmlrpc as xmlrpc_defused
from xmlrpc.server import SimpleXMLRPCServer
# Apply patches before creating server
xmlrpc_defused.monkey_patch()
class SecureXMLRPCServer(SimpleXMLRPCServer):
"""XML-RPC server with defused XML processing."""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Server automatically uses defused parsers due to monkey patching
def _dispatch(self, method, params):
"""Dispatch method calls with additional security logging."""
print(f"XML-RPC call: {method} with {len(params)} parameters")
return super()._dispatch(method, params)
# Create and run secure server
server = SecureXMLRPCServer(('localhost', 8000))
server.register_function(lambda x: x * 2, 'double')
print("Starting secure XML-RPC server...")
server.serve_forever()import defusedxml.xmlrpc as xmlrpc_defused
import os
def configure_xmlrpc_security():
"""Configure XML-RPC security based on environment."""
# Check if we're in a development environment
if os.getenv('ENVIRONMENT') == 'development':
print("Development mode: XML-RPC security monitoring only")
# Could implement logging-only mode here
else:
print("Production mode: Applying XML-RPC security patches")
xmlrpc_defused.monkey_patch()
def safe_xmlrpc_call(url, method, *args, max_response_size=None):
"""Make XML-RPC call with optional response size limits."""
import xmlrpc.client
# Set custom gzip limit if specified
if max_response_size:
original_max = xmlrpc_defused.MAX_DATA
xmlrpc_defused.MAX_DATA = max_response_size
try:
server = xmlrpc.client.ServerProxy(url)
result = getattr(server, method)(*args)
return result
finally:
# Restore original limit
if max_response_size:
xmlrpc_defused.MAX_DATA = original_max
# Configure security at startup
configure_xmlrpc_security()
# Make calls with custom limits
result = safe_xmlrpc_call('http://api.example.com/rpc', 'get_large_dataset', max_response_size=50*1024*1024)DefusedXML XML-RPC protection is typically applied via monkey patching:
# Before (vulnerable)
import xmlrpc.client
server = xmlrpc.client.ServerProxy('http://example.com/rpc')
result = server.method()
# After (secure)
import defusedxml.xmlrpc as xmlrpc_defused
xmlrpc_defused.monkey_patch() # Apply protection system-wide
import xmlrpc.client
server = xmlrpc.client.ServerProxy('http://example.com/rpc')
result = server.method() # Now automatically protectedThe monkey patching approach ensures all XML-RPC usage in the application is automatically secured without requiring code changes.
Install with Tessl CLI
npx tessl i tessl/pypi-defusedxml