Proxy connector for aiohttp supporting SOCKS4/5 and HTTP proxies with proxy chaining capabilities
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive exception hierarchy for handling proxy connection failures, timeouts, and authentication errors. All proxy-related exceptions are imported from the python-socks library.
The library provides a structured exception hierarchy for different types of proxy errors.
class ProxyError(Exception):
"""
Base exception for all proxy-related errors.
This is the parent class for all proxy exceptions and can be used
to catch any proxy-related error.
"""
class ProxyConnectionError(ProxyError):
"""
Exception raised when proxy connection fails.
Raised when the connection to the proxy server cannot be established,
including network errors, refused connections, and proxy server errors.
"""
class ProxyTimeoutError(ProxyError):
"""
Exception raised when proxy operations timeout.
Raised when connection attempts or data transfer through the proxy
exceeds the specified timeout duration.
"""import aiohttp
from aiohttp_socks import ProxyConnector, ProxyError, ProxyConnectionError, ProxyTimeoutError
async def fetch_with_error_handling(url):
connector = ProxyConnector.from_url('socks5://127.0.0.1:1080')
try:
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get(url, timeout=10) as response:
return await response.text()
except ProxyConnectionError as e:
print(f"Failed to connect to proxy: {e}")
return None
except ProxyTimeoutError as e:
print(f"Proxy operation timed out: {e}")
return None
except ProxyError as e:
print(f"General proxy error: {e}")
return Nonefrom aiohttp_socks import ChainProxyConnector, ProxyConnector, ProxyError
async def fetch_with_fallback(url, proxy_urls):
"""Try multiple proxies until one works."""
for proxy_url in proxy_urls:
try:
connector = ProxyConnector.from_url(proxy_url)
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get(url, timeout=10) as response:
return await response.text()
except ProxyError as e:
print(f"Proxy {proxy_url} failed: {e}")
continue
raise Exception("All proxies failed")from aiohttp_socks import ChainProxyConnector, ProxyError
async def fetch_with_chain_fallback(url):
"""Try different proxy chain configurations."""
proxy_chains = [
['socks5://proxy1.example.com:1080', 'socks4://proxy2.example.com:1081'],
['http://proxy3.example.com:3128'],
['socks5://backup-proxy.example.com:1080']
]
for chain_urls in proxy_chains:
try:
connector = ChainProxyConnector.from_urls(chain_urls)
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get(url, timeout=15) as response:
return await response.text()
except ProxyError as e:
print(f"Proxy chain {chain_urls} failed: {e}")
continue
raise Exception("All proxy chains failed")Proxy errors work alongside aiohttp's built-in exception handling:
import aiohttp
from aiohttp_socks import ProxyConnector, ProxyError
async def comprehensive_error_handling(url):
connector = ProxyConnector.from_url('socks5://127.0.0.1:1080')
try:
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get(url) as response:
return await response.text()
except ProxyError as e:
print(f"Proxy error: {e}")
except aiohttp.ClientConnectorError as e:
print(f"Connection error: {e}")
except aiohttp.ClientTimeout as e:
print(f"Request timeout: {e}")
except aiohttp.ClientError as e:
print(f"Client error: {e}")
return NoneInstall with Tessl CLI
npx tessl i tessl/pypi-aiohttp-socks