CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-cloudant

IBM Cloudant Python client library providing comprehensive interface for Cloudant and CouchDB databases

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

http-adapters.mddocs/

HTTP Adapters

Custom HTTP transport adapters for handling rate limiting, retries, and connection management in Cloudant and CouchDB client connections.

Capabilities

Rate Limiting Adapter

Handle HTTP 429 "Too Many Requests" responses with automatic retry and exponential backoff.

class Replay429Adapter(HTTPAdapter):
    """
    HTTP adapter that replays requests receiving 429 Too Many Requests responses.
    
    Implements exponential backoff strategy where sleep duration doubles
    for each consecutive 429 response until maximum retries reached.
    """
    
    def __init__(self, retries=3, initialBackoff=0.25):
        """
        Initialize rate limiting adapter.
        
        Parameters:
        - retries (int): Maximum number of retry attempts (default: 3)
        - initialBackoff (float): Initial backoff time in seconds (default: 0.25)
        
        Behavior:
        - Only retries on HTTP 429 status codes
        - Supports all CouchDB HTTP methods (GET, HEAD, PUT, POST, DELETE, COPY)
        - Uses exponential backoff: 0.25s, 0.5s, 1.0s, 2.0s, etc.
        - No retries for connection or read errors (only HTTP 429)
        """

Usage Examples

Basic Rate Limiting

from cloudant import cloudant
from cloudant.adapters import Replay429Adapter

# Create adapter with default settings (3 retries, 0.25s initial backoff)
adapter = Replay429Adapter()

with cloudant(username, password, account=account_name, adapter=adapter) as client:
    db = client['my_database']
    
    # Client will automatically retry on 429 responses
    # with exponential backoff: 0.25s, 0.5s, 1.0s
    for i in range(100):
        doc = {'_id': f'doc_{i}', 'data': f'value_{i}'}
        db.create_document(doc)
        print(f"Created document {i}")

Custom Retry Configuration

from cloudant import cloudant
from cloudant.adapters import Replay429Adapter

# Custom adapter with more aggressive retry policy
# 5 retries starting with 1 second backoff
adapter = Replay429Adapter(retries=5, initialBackoff=1.0)

with cloudant(username, password, account=account_name, adapter=adapter) as client:
    db = client['my_database']
    
    # This will retry up to 5 times on 429 responses
    # with backoff: 1.0s, 2.0s, 4.0s, 8.0s, 16.0s
    try:
        # Bulk operations that might hit rate limits
        docs = [{'_id': f'bulk_{i}', 'value': i} for i in range(1000)]
        result = db.bulk_docs(docs)
        print(f"Bulk insert completed: {len(result)} documents")
        
    except Exception as e:
        print(f"Bulk insert failed after retries: {e}")

Conservative Rate Limiting

from cloudant import cloudant
from cloudant.adapters import Replay429Adapter

# Conservative adapter for high-volume applications
# Longer initial backoff to be more respectful of rate limits
adapter = Replay429Adapter(retries=2, initialBackoff=2.0)

with cloudant(username, password, account=account_name, adapter=adapter) as client:
    db = client['my_database']
    
    # Slower but more reliable for sustained high-volume operations
    # Backoff pattern: 2.0s, 4.0s
    query_result = db.get_query_result(
        selector={'type': 'user'},
        limit=10000  # Large query that might trigger rate limiting
    )
    
    users = list(query_result)
    print(f"Retrieved {len(users)} users")

Direct Client Configuration

from cloudant.client import Cloudant
from cloudant.adapters import Replay429Adapter

# Configure adapter directly with client
adapter = Replay429Adapter(retries=4, initialBackoff=0.5)

client = Cloudant(username, password, account=account_name, adapter=adapter)
client.connect()

try:
    # All requests through this client will use the rate limiting adapter
    databases = client.all_dbs()
    print(f"Found {len(databases)} databases")
    
    for db_name in databases[:5]:  # Process first 5 databases
        db = client[db_name]
        doc_count = db.doc_count()
        print(f"Database {db_name}: {doc_count} documents")
        
finally:
    client.disconnect()

High-Volume Data Processing

from cloudant import cloudant
from cloudant.adapters import Replay429Adapter
import time

# Adapter optimized for high-volume operations
adapter = Replay429Adapter(retries=10, initialBackoff=0.1)

with cloudant(username, password, account=account_name, adapter=adapter) as client:
    db = client['large_database']
    
    # Process large dataset with rate limit protection
    batch_size = 100
    total_processed = 0
    
    # Get all document IDs
    all_docs = db.all_docs()
    doc_ids = [row['id'] for row in all_docs]
    
    # Process in batches
    for i in range(0, len(doc_ids), batch_size):
        batch_ids = doc_ids[i:i + batch_size]
        
        try:
            # Bulk fetch documents
            docs_result = db.all_docs(keys=batch_ids, include_docs=True)
            docs = [row['doc'] for row in docs_result if 'doc' in row]
            
            # Process documents
            for doc in docs:
                # Simulate processing
                doc['processed'] = True
                doc['processed_at'] = time.time()
            
            # Bulk update
            db.bulk_docs(docs)
            total_processed += len(docs)
            
            print(f"Processed batch {i//batch_size + 1}: {len(docs)} documents")
            
        except Exception as e:
            print(f"Batch {i//batch_size + 1} failed: {e}")
            continue
    
    print(f"Total documents processed: {total_processed}")

Integration with Connection Pools

from cloudant.client import Cloudant
from cloudant.adapters import Replay429Adapter
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

# Create custom adapter combining rate limiting with connection pooling
class PooledReplay429Adapter(Replay429Adapter):
    def __init__(self, retries=3, initialBackoff=0.25, pool_connections=10, pool_maxsize=10):
        super().__init__(retries=retries, initialBackoff=initialBackoff)
        self.init_poolmanager(pool_connections, pool_maxsize)

# Use enhanced adapter
adapter = PooledReplay429Adapter(
    retries=5,
    initialBackoff=0.5,
    pool_connections=20,  # More connections for concurrent requests
    pool_maxsize=20
)

client = Cloudant(username, password, account=account_name, adapter=adapter)
client.connect()

try:
    # Client now has both rate limiting and optimized connection pooling
    databases = client.all_dbs()
    
    # Concurrent operations will benefit from connection pooling
    # while still being protected against rate limiting
    
finally:
    client.disconnect()

Monitoring Adapter Behavior

from cloudant import cloudant
from cloudant.adapters import Replay429Adapter
import logging

# Enable urllib3 debug logging to see retry behavior
logging.basicConfig(level=logging.DEBUG)
urllib3_logger = logging.getLogger('urllib3')
urllib3_logger.setLevel(logging.DEBUG)

# Create adapter
adapter = Replay429Adapter(retries=3, initialBackoff=1.0)

with cloudant(username, password, account=account_name, adapter=adapter) as client:
    db = client['test_database']
    
    try:
        # This operation might trigger rate limiting
        # Check logs to see retry behavior
        for i in range(50):
            doc = {'_id': f'test_doc_{i}', 'timestamp': time.time()}
            result = db.create_document(doc)
            print(f"Document {i}: {result['ok']}")
            
    except Exception as e:
        print(f"Operation failed: {e}")

Error Handling

Rate limiting adapters handle HTTP 429 responses automatically, but other errors should still be caught:

from cloudant import cloudant
from cloudant.adapters import Replay429Adapter
from cloudant.error import CloudantException
from requests.exceptions import RequestException

adapter = Replay429Adapter(retries=3, initialBackoff=0.25)

with cloudant(username, password, account=account_name, adapter=adapter) as client:
    db = client['my_database']
    
    try:
        # Rate limiting is handled automatically
        result = db.create_document({'test': 'data'})
        
    except CloudantException as e:
        # Cloudant-specific errors (authentication, permissions, etc.)
        print(f"Cloudant error: {e}")
        
    except RequestException as e:
        # Network errors, timeouts, etc. that aren't retryable
        print(f"Request error: {e}")
        
    except Exception as e:
        # Other errors
        print(f"Unexpected error: {e}")

Types

# Adapter configuration
AdapterConfig = dict[str, Any]

# HTTP methods supported for retries
SUPPORTED_METHODS = frozenset(['GET', 'HEAD', 'PUT', 'POST', 'DELETE', 'COPY'])

# Retry configuration
RetryConfig = dict[str, Any]

Import Statements

from cloudant.adapters import Replay429Adapter
from requests.adapters import HTTPAdapter

Install with Tessl CLI

npx tessl i tessl/pypi-cloudant

docs

authentication.md

change-feeds.md

database-management.md

document-operations.md

error-handling.md

http-adapters.md

index.md

query-indexing.md

replication.md

scheduler-monitoring.md

security-document.md

views-design-documents.md

tile.json