IBM Cloudant Python client library providing comprehensive interface for Cloudant and CouchDB databases
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Custom HTTP transport adapters for handling rate limiting, retries, and connection management in Cloudant and CouchDB client connections.
Handle HTTP 429 "Too Many Requests" responses with automatic retry and exponential backoff.
class Replay429Adapter(HTTPAdapter):
"""
HTTP adapter that replays requests receiving 429 Too Many Requests responses.
Implements exponential backoff strategy where sleep duration doubles
for each consecutive 429 response until maximum retries reached.
"""
def __init__(self, retries=3, initialBackoff=0.25):
"""
Initialize rate limiting adapter.
Parameters:
- retries (int): Maximum number of retry attempts (default: 3)
- initialBackoff (float): Initial backoff time in seconds (default: 0.25)
Behavior:
- Only retries on HTTP 429 status codes
- Supports all CouchDB HTTP methods (GET, HEAD, PUT, POST, DELETE, COPY)
- Uses exponential backoff: 0.25s, 0.5s, 1.0s, 2.0s, etc.
- No retries for connection or read errors (only HTTP 429)
"""from cloudant import cloudant
from cloudant.adapters import Replay429Adapter
# Create adapter with default settings (3 retries, 0.25s initial backoff)
adapter = Replay429Adapter()
with cloudant(username, password, account=account_name, adapter=adapter) as client:
db = client['my_database']
# Client will automatically retry on 429 responses
# with exponential backoff: 0.25s, 0.5s, 1.0s
for i in range(100):
doc = {'_id': f'doc_{i}', 'data': f'value_{i}'}
db.create_document(doc)
print(f"Created document {i}")from cloudant import cloudant
from cloudant.adapters import Replay429Adapter
# Custom adapter with more aggressive retry policy
# 5 retries starting with 1 second backoff
adapter = Replay429Adapter(retries=5, initialBackoff=1.0)
with cloudant(username, password, account=account_name, adapter=adapter) as client:
db = client['my_database']
# This will retry up to 5 times on 429 responses
# with backoff: 1.0s, 2.0s, 4.0s, 8.0s, 16.0s
try:
# Bulk operations that might hit rate limits
docs = [{'_id': f'bulk_{i}', 'value': i} for i in range(1000)]
result = db.bulk_docs(docs)
print(f"Bulk insert completed: {len(result)} documents")
except Exception as e:
print(f"Bulk insert failed after retries: {e}")from cloudant import cloudant
from cloudant.adapters import Replay429Adapter
# Conservative adapter for high-volume applications
# Longer initial backoff to be more respectful of rate limits
adapter = Replay429Adapter(retries=2, initialBackoff=2.0)
with cloudant(username, password, account=account_name, adapter=adapter) as client:
db = client['my_database']
# Slower but more reliable for sustained high-volume operations
# Backoff pattern: 2.0s, 4.0s
query_result = db.get_query_result(
selector={'type': 'user'},
limit=10000 # Large query that might trigger rate limiting
)
users = list(query_result)
print(f"Retrieved {len(users)} users")from cloudant.client import Cloudant
from cloudant.adapters import Replay429Adapter
# Configure adapter directly with client
adapter = Replay429Adapter(retries=4, initialBackoff=0.5)
client = Cloudant(username, password, account=account_name, adapter=adapter)
client.connect()
try:
# All requests through this client will use the rate limiting adapter
databases = client.all_dbs()
print(f"Found {len(databases)} databases")
for db_name in databases[:5]: # Process first 5 databases
db = client[db_name]
doc_count = db.doc_count()
print(f"Database {db_name}: {doc_count} documents")
finally:
client.disconnect()from cloudant import cloudant
from cloudant.adapters import Replay429Adapter
import time
# Adapter optimized for high-volume operations
adapter = Replay429Adapter(retries=10, initialBackoff=0.1)
with cloudant(username, password, account=account_name, adapter=adapter) as client:
db = client['large_database']
# Process large dataset with rate limit protection
batch_size = 100
total_processed = 0
# Get all document IDs
all_docs = db.all_docs()
doc_ids = [row['id'] for row in all_docs]
# Process in batches
for i in range(0, len(doc_ids), batch_size):
batch_ids = doc_ids[i:i + batch_size]
try:
# Bulk fetch documents
docs_result = db.all_docs(keys=batch_ids, include_docs=True)
docs = [row['doc'] for row in docs_result if 'doc' in row]
# Process documents
for doc in docs:
# Simulate processing
doc['processed'] = True
doc['processed_at'] = time.time()
# Bulk update
db.bulk_docs(docs)
total_processed += len(docs)
print(f"Processed batch {i//batch_size + 1}: {len(docs)} documents")
except Exception as e:
print(f"Batch {i//batch_size + 1} failed: {e}")
continue
print(f"Total documents processed: {total_processed}")from cloudant.client import Cloudant
from cloudant.adapters import Replay429Adapter
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry
# Create custom adapter combining rate limiting with connection pooling
class PooledReplay429Adapter(Replay429Adapter):
def __init__(self, retries=3, initialBackoff=0.25, pool_connections=10, pool_maxsize=10):
super().__init__(retries=retries, initialBackoff=initialBackoff)
self.init_poolmanager(pool_connections, pool_maxsize)
# Use enhanced adapter
adapter = PooledReplay429Adapter(
retries=5,
initialBackoff=0.5,
pool_connections=20, # More connections for concurrent requests
pool_maxsize=20
)
client = Cloudant(username, password, account=account_name, adapter=adapter)
client.connect()
try:
# Client now has both rate limiting and optimized connection pooling
databases = client.all_dbs()
# Concurrent operations will benefit from connection pooling
# while still being protected against rate limiting
finally:
client.disconnect()from cloudant import cloudant
from cloudant.adapters import Replay429Adapter
import logging
# Enable urllib3 debug logging to see retry behavior
logging.basicConfig(level=logging.DEBUG)
urllib3_logger = logging.getLogger('urllib3')
urllib3_logger.setLevel(logging.DEBUG)
# Create adapter
adapter = Replay429Adapter(retries=3, initialBackoff=1.0)
with cloudant(username, password, account=account_name, adapter=adapter) as client:
db = client['test_database']
try:
# This operation might trigger rate limiting
# Check logs to see retry behavior
for i in range(50):
doc = {'_id': f'test_doc_{i}', 'timestamp': time.time()}
result = db.create_document(doc)
print(f"Document {i}: {result['ok']}")
except Exception as e:
print(f"Operation failed: {e}")Rate limiting adapters handle HTTP 429 responses automatically, but other errors should still be caught:
from cloudant import cloudant
from cloudant.adapters import Replay429Adapter
from cloudant.error import CloudantException
from requests.exceptions import RequestException
adapter = Replay429Adapter(retries=3, initialBackoff=0.25)
with cloudant(username, password, account=account_name, adapter=adapter) as client:
db = client['my_database']
try:
# Rate limiting is handled automatically
result = db.create_document({'test': 'data'})
except CloudantException as e:
# Cloudant-specific errors (authentication, permissions, etc.)
print(f"Cloudant error: {e}")
except RequestException as e:
# Network errors, timeouts, etc. that aren't retryable
print(f"Request error: {e}")
except Exception as e:
# Other errors
print(f"Unexpected error: {e}")# Adapter configuration
AdapterConfig = dict[str, Any]
# HTTP methods supported for retries
SUPPORTED_METHODS = frozenset(['GET', 'HEAD', 'PUT', 'POST', 'DELETE', 'COPY'])
# Retry configuration
RetryConfig = dict[str, Any]from cloudant.adapters import Replay429Adapter
from requests.adapters import HTTPAdapterInstall with Tessl CLI
npx tessl i tessl/pypi-cloudant