Asynchronous Python HTTP requests using concurrent.futures for non-blocking operations
npx @tessl/cli install tessl/pypi-requests-futures@1.0.0Asynchronous Python HTTP requests using concurrent.futures for non-blocking operations. This library provides a small add-on for the popular requests HTTP library, enabling asynchronous HTTP requests by returning Future objects instead of immediate Response objects.
pip install requests-futuresfrom requests_futures.sessions import FuturesSessionAlternative import patterns:
import requests_futures.sessions
# Access as: requests_futures.sessions.FuturesSession
from requests_futures import sessions
# Access as: sessions.FuturesSessionPackage-level imports:
import requests_futures
# Version info: requests_futures.__version__
# Package metadata: requests_futures.__title__, requests_futures.__author__from requests_futures.sessions import FuturesSession
# Create a session (default uses ThreadPoolExecutor with 8 workers)
session = FuturesSession()
# Start requests asynchronously
future_one = session.get('http://httpbin.org/get')
future_two = session.get('http://httpbin.org/get?foo=bar')
# Wait for results when needed
response_one = future_one.result()
response_two = future_two.result()
print(f'Response 1 status: {response_one.status_code}')
print(f'Response 2 status: {response_two.status_code}')requests-futures extends the requests library's Session class to provide asynchronous functionality:
The main interface for asynchronous HTTP requests, extending requests.Session with concurrent.futures support.
class FuturesSession(Session):
def __init__(
self,
executor=None,
max_workers=8,
session=None,
adapter_kwargs=None,
*args,
**kwargs
): ...Parameters:
executor (optional): Custom concurrent.futures executor instance (ThreadPoolExecutor or ProcessPoolExecutor)max_workers (int, default=8): Number of worker threads when no executor is providedsession (optional): Existing requests.Session instance to wrap for asynchronous executionadapter_kwargs (dict, optional): Additional keyword arguments for HTTPAdapter configuration*args, **kwargs: Additional arguments passed to parent requests.Session classUsage Examples:
Basic session creation:
session = FuturesSession()Custom thread pool size:
session = FuturesSession(max_workers=16)Custom executor:
from concurrent.futures import ThreadPoolExecutor
executor = ThreadPoolExecutor(max_workers=10)
session = FuturesSession(executor=executor)Using existing requests session:
import requests
my_session = requests.Session()
my_session.headers.update({'User-Agent': 'MyApp/1.0'})
async_session = FuturesSession(session=my_session)All HTTP methods return concurrent.futures.Future objects that resolve to requests.Response objects.
def request(self, *args, background_callback=None, **kwargs) -> Future[Response]: ...
def get(self, url, **kwargs) -> Future[Response]: ...
def post(self, url, data=None, json=None, **kwargs) -> Future[Response]: ...
def put(self, url, data=None, **kwargs) -> Future[Response]: ...
def patch(self, url, data=None, **kwargs) -> Future[Response]: ...
def delete(self, url, **kwargs) -> Future[Response]: ...
def head(self, url, **kwargs) -> Future[Response]: ...
def options(self, url, **kwargs) -> Future[Response]: ...Parameters:
url (str): Target URL for the HTTP requestdata (optional): Request body data for POST/PUT/PATCH requestsjson (optional): JSON data to send in request body (POST only)background_callback (callable, optional, DEPRECATED): Function called with (session, response) in background thread. Deprecated in favor of hooks system - will be removed in version 1.0**kwargs: All standard requests library parameters (headers, params, timeout, etc.)Returns: concurrent.futures.Future[requests.Response] - Future object that resolves to a Response
Usage Examples:
# GET request
future = session.get('https://api.example.com/users')
response = future.result()
# POST with JSON data
future = session.post('https://api.example.com/users', json={'name': 'John'})
response = future.result()
# Multiple concurrent requests
futures = [
session.get(f'https://api.example.com/users/{i}')
for i in range(5)
]
# Wait for all to complete
responses = [future.result() for future in futures]def close(self) -> None: ...Closes the session and shuts down the owned executor. Called automatically when using the session as a context manager.
Usage Examples:
Manual cleanup:
session = FuturesSession()
# ... use session
session.close()Context manager (recommended):
with FuturesSession() as session:
future = session.get('https://api.example.com/data')
response = future.result()
# Session automatically closedrequests-futures integrates with the requests library's hooks system for background response processing:
def response_hook(resp, *args, **kwargs):
# Parse JSON in background thread
resp.data = resp.json()
session = FuturesSession()
future = session.get('https://api.example.com/data',
hooks={'response': response_hook})
response = future.result()
# response.data is already parsedSession-level hooks:
session = FuturesSession()
session.hooks['response'] = response_hook
future = session.get('https://api.example.com/data')
response = future.result()
# Hook applied to all requestsFor CPU-intensive processing or memory isolation, use ProcessPoolExecutor:
from concurrent.futures import ProcessPoolExecutor
# Python 3.5+ (full support)
session = FuturesSession(executor=ProcessPoolExecutor(max_workers=4))
# Python 3.4 (requires existing session)
import requests
base_session = requests.Session()
session = FuturesSession(
executor=ProcessPoolExecutor(max_workers=4),
session=base_session
)Requirements:
session parameterUsing concurrent.futures.as_completed() for processing responses as they arrive:
from concurrent.futures import as_completed
session = FuturesSession()
futures = [
session.get(f'https://api.example.com/item/{i}')
for i in range(10)
]
for future in as_completed(futures):
response = future.result()
print(f'Status: {response.status_code}, URL: {response.url}')Attaching metadata to futures:
session = FuturesSession()
futures = []
for i in range(3):
future = session.get('https://api.example.com/data')
future.request_id = i # Attach custom metadata
futures.append(future)
for future in as_completed(futures):
response = future.result()
print(f'Request {future.request_id}: {response.status_code}')Exceptions are deferred to the Future.result() call:
session = FuturesSession()
future = session.get('https://invalid-url-example.com')
try:
response = future.result()
except requests.exceptions.ConnectionError as e:
print(f'Connection failed: {e}')
except requests.exceptions.Timeout as e:
print(f'Request timed out: {e}')Threading Support:
Process Pool Support:
session parameterRuntimeError Exceptions:
The package exposes version and metadata information:
# Module-level constants (from requests_futures.__init__)
__title__ = 'requests-futures'
__version__ = '1.0.2'
__author__ = 'Ross McFarland'
__license__ = 'Apache 2.0'
__copyright__ = 'Copyright 2013 Ross McFarland'Usage:
import requests_futures
print(f"Version: {requests_futures.__version__}")
print(f"Author: {requests_futures.__author__}")# From concurrent.futures
class Future:
def result(self, timeout=None) -> Response: ...
def cancel(self) -> bool: ...
def cancelled(self) -> bool: ...
def done(self) -> bool: ...
def add_done_callback(self, fn) -> None: ...
# From requests
class Response:
status_code: int
headers: dict
text: str
content: bytes
url: str
request: Request
def json(self, **kwargs) -> dict: ...
def raise_for_status(self) -> None: ...
class Session:
headers: dict
cookies: dict
hooks: dict
def request(self, method: str, url: str, **kwargs) -> Response: ...
def get(self, url: str, **kwargs) -> Response: ...
def post(self, url: str, **kwargs) -> Response: ...
# ... other HTTP methods