IBM Cloudant Python client library providing comprehensive interface for Cloudant and CouchDB databases
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive exception hierarchy for handling different types of errors with specific exceptions for client, database, document, and operation-specific failures.
All Cloudant library exceptions inherit from the base CloudantException class.
class CloudantException(Exception):
"""
Base exception class for all Cloudant library errors.
All other Cloudant exceptions inherit from this class, allowing
for comprehensive error handling with a single catch block.
"""
class CloudantArgumentError(CloudantException):
"""
Exception for invalid arguments passed to library functions.
Raised when:
- Required parameters are missing
- Parameter values are invalid or out of range
- Conflicting parameters are provided
"""
class ResultException(CloudantException):
"""
Exception for errors in result processing and iteration.
Raised when:
- Result iteration fails
- Result data is malformed
- Pagination errors occur
"""
class CloudantClientException(CloudantException):
"""
Exception for client connection and authentication errors.
Raised when:
- Authentication fails
- Connection to server fails
- Session management errors occur
- Network connectivity issues
"""
class CloudantDatabaseException(CloudantException):
"""
Exception for database-level operations.
Raised when:
- Database creation/deletion fails
- Database access denied
- Database metadata operations fail
- Query execution errors
"""
class CloudantDesignDocumentException(CloudantException):
"""
Exception for design document operations.
Raised when:
- Design document creation/update fails
- View definition errors
- Show/list function execution fails
- Update handler errors
"""
class CloudantDocumentException(CloudantException):
"""
Exception for document-level operations.
Raised when:
- Document CRUD operations fail
- Document conflict errors
- Attachment operations fail
- Document validation errors
"""
class CloudantFeedException(CloudantException):
"""
Exception for feed-related operations.
Raised when:
- Change feed connection fails
- Feed consumption errors
- Feed parameter validation fails
"""
class CloudantIndexException(CloudantException):
"""
Exception for index management operations.
Raised when:
- Index creation/deletion fails
- Index definition errors
- Index query optimization issues
"""
class CloudantReplicatorException(CloudantException):
"""
Exception for replication operations.
Raised when:
- Replication setup fails
- Replication monitoring errors
- Replication state management issues
"""
class CloudantViewException(CloudantException):
"""
Exception for view-related operations.
Raised when:
- View query execution fails
- View parameter validation errors
- MapReduce processing issues
"""from cloudant import cloudant
from cloudant.error import CloudantException
with cloudant('user', 'pass', account='myaccount') as client:
try:
# Any Cloudant operation
db = client['my_database']
doc = db['my_document']
doc.fetch()
except CloudantException as e:
print(f"Cloudant operation failed: {e}")
print(f"Error type: {type(e).__name__}")from cloudant import cloudant
from cloudant.error import (
CloudantClientException,
CloudantDatabaseException,
CloudantDocumentException,
CloudantArgumentError
)
try:
with cloudant('invalid_user', 'invalid_pass', account='myaccount') as client:
# This will fail at connection time
db = client['my_database']
except CloudantClientException as e:
print(f"Authentication failed: {e}")
# Handle authentication error
# Could prompt for new credentials, use fallback auth, etc.
try:
with cloudant('user', 'pass', account='myaccount') as client:
# Try to access non-existent database
db = client['non_existent_database']
if not db.exists():
db.create()
except CloudantDatabaseException as e:
print(f"Database operation failed: {e}")
# Handle database error
# Could create database, use different database, etc.
try:
with cloudant('user', 'pass', account='myaccount') as client:
db = client['my_database']
doc = db['non_existent_document']
doc.fetch() # This will fail
except CloudantDocumentException as e:
print(f"Document operation failed: {e}")
# Handle document error
# Could create document, use default values, etc.
try:
with cloudant('user', 'pass', account='myaccount') as client:
# Invalid parameter
db = client.create_database('') # Empty name
except CloudantArgumentError as e:
print(f"Invalid argument: {e}")
# Handle argument error
# Could validate inputs, provide defaults, etc.from cloudant import cloudant
from cloudant.error import CloudantDocumentException
import time
import random
def save_with_retry(doc, max_retries=5):
"""Save document with automatic conflict resolution."""
for attempt in range(max_retries):
try:
doc.save()
print(f"Document saved successfully on attempt {attempt + 1}")
return True
except CloudantDocumentException as e:
error_msg = str(e).lower()
if 'conflict' in error_msg or '409' in str(e):
print(f"Conflict detected on attempt {attempt + 1}")
if attempt < max_retries - 1:
# Fetch latest version and retry
doc.fetch()
# Add random delay to reduce conflict probability
time.sleep(random.uniform(0.1, 0.5))
continue
else:
print(f"Max retries reached, conflict not resolved")
raise
else:
# Non-conflict error, don't retry
print(f"Non-conflict error: {e}")
raise
return False
# Usage
with cloudant('user', 'pass', account='myaccount') as client:
db = client['my_database']
doc = db['conflicted_document']
doc['updated_field'] = 'new_value'
doc['timestamp'] = time.time()
save_with_retry(doc)from cloudant import cloudant
from cloudant.error import CloudantClientException
import time
def robust_operation(client, operation_func, max_retries=3):
"""Execute operation with retry on network errors."""
for attempt in range(max_retries):
try:
return operation_func(client)
except CloudantClientException as e:
error_msg = str(e).lower()
# Check if it's a network-related error
if any(keyword in error_msg for keyword in [
'timeout', 'connection', 'network', 'unreachable'
]):
print(f"Network error on attempt {attempt + 1}: {e}")
if attempt < max_retries - 1:
# Exponential backoff
wait_time = 2 ** attempt
print(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
continue
else:
print("Max retries reached for network error")
raise
else:
# Non-network error, don't retry
print(f"Non-network client error: {e}")
raise
def my_database_operation(client):
"""Example database operation."""
db = client['my_database']
return db.all_docs(limit=10)
# Usage
with cloudant('user', 'pass', account='myaccount') as client:
try:
result = robust_operation(client, my_database_operation)
print(f"Operation successful: {len(list(result))} documents")
except CloudantClientException as e:
print(f"Operation failed permanently: {e}")from cloudant import cloudant
from cloudant.error import CloudantDatabaseException, CloudantIndexException
def safe_query(db, selector, **kwargs):
"""Execute query with comprehensive error handling."""
try:
return db.get_query_result(selector, **kwargs)
except CloudantDatabaseException as e:
error_msg = str(e).lower()
if 'no_usable_index' in error_msg:
print("No suitable index found for query")
print("Suggestion: Create an index on the queried fields")
# Try to suggest index creation
fields = list(selector.keys())
print(f"Consider creating index on fields: {fields}")
# Could automatically create index here if desired
# db.create_query_index(fields=fields)
elif 'invalid_selector' in error_msg:
print(f"Invalid query selector: {selector}")
print("Check selector syntax and field names")
elif 'request_timeout' in error_msg:
print("Query timed out")
print("Try adding a limit or creating better indexes")
raise # Re-raise after logging
def safe_index_creation(db, fields, index_name):
"""Create index with error handling."""
try:
return db.create_query_index(
fields=fields,
index_name=index_name
)
except CloudantIndexException as e:
error_msg = str(e).lower()
if 'index_exists' in error_msg or 'conflict' in error_msg:
print(f"Index {index_name} already exists")
return None
elif 'invalid_fields' in error_msg:
print(f"Invalid index fields: {fields}")
print("Check field names and types")
raise
# Usage
with cloudant('user', 'pass', account='myaccount') as client:
db = client['my_database']
# Try query with error handling
try:
selector = {'type': 'user', 'status': 'active'}
result = safe_query(db, selector, limit=50)
for doc in result:
print(f"User: {doc.get('name', 'N/A')}")
except CloudantDatabaseException as e:
print(f"Query failed: {e}")
# Try index creation with error handling
try:
safe_index_creation(db, ['type', 'status'], 'type_status_idx')
print("Index created successfully")
except CloudantIndexException as e:
print(f"Index creation failed: {e}")from cloudant import cloudant
from cloudant.replicator import Replicator
from cloudant.error import CloudantReplicatorException
import time
def monitor_replication_with_error_handling(replicator, repl_id, timeout=300):
"""Monitor replication with comprehensive error handling."""
start_time = time.time()
while time.time() - start_time < timeout:
try:
state = replicator.replication_state(repl_id)
repl_state = state.get('_replication_state', 'unknown')
if repl_state == 'completed':
stats = state.get('_replication_stats', {})
docs_written = stats.get('docs_written', 0)
print(f"Replication completed: {docs_written} documents")
return True
elif repl_state == 'error':
error_reason = state.get('_replication_state_reason', 'Unknown error')
print(f"Replication failed: {error_reason}")
# Handle specific replication errors
if 'unauthorized' in error_reason.lower():
print("Authentication error - check credentials")
elif 'not_found' in error_reason.lower():
print("Database not found - check database names")
elif 'timeout' in error_reason.lower():
print("Network timeout - check connectivity")
return False
elif repl_state in ['triggered', 'running']:
stats = state.get('_replication_stats', {})
docs_read = stats.get('docs_read', 0)
docs_written = stats.get('docs_written', 0)
print(f"Replication progress: {docs_written}/{docs_read}")
time.sleep(5) # Check every 5 seconds
except CloudantReplicatorException as e:
print(f"Error checking replication state: {e}")
# If replication document was deleted, it might have completed
if 'not_found' in str(e).lower():
print("Replication document not found - may have completed")
return None
time.sleep(10) # Wait longer on error
print(f"Replication monitoring timed out after {timeout} seconds")
return None
# Usage
with cloudant('user', 'pass', account='myaccount') as client:
replicator = Replicator(client)
try:
# Start replication with error handling
repl_doc = replicator.create_replication(
source_db='source_db',
target_db='target_db',
create_target=True
)
repl_id = repl_doc['_id']
print(f"Started replication: {repl_id}")
# Monitor with error handling
success = monitor_replication_with_error_handling(replicator, repl_id)
if success:
print("Replication completed successfully")
elif success is False:
print("Replication failed")
else:
print("Replication status uncertain")
except CloudantReplicatorException as e:
print(f"Failed to start replication: {e}")from cloudant import cloudant
from cloudant.error import CloudantFeedException
import time
def robust_change_feed(db, max_reconnects=5):
"""Change feed with automatic reconnection on errors."""
reconnect_count = 0
last_seq = '0'
while reconnect_count < max_reconnects:
try:
print(f"Starting change feed from sequence: {last_seq}")
changes = db.changes(
since=last_seq,
feed='continuous',
include_docs=True,
heartbeat=30000,
timeout=60000
)
for change in changes:
if change: # Skip heartbeat messages
doc_id = change['id']
last_seq = change['seq']
print(f"Change detected: {doc_id} (seq: {last_seq})")
# Process change here
if change.get('deleted'):
print(f"Document deleted: {doc_id}")
else:
doc = change.get('doc', {})
print(f"Document updated: {doc_id}")
# If we reach here, feed ended normally
print("Change feed ended normally")
break
except CloudantFeedException as e:
reconnect_count += 1
error_msg = str(e).lower()
print(f"Feed error (attempt {reconnect_count}/{max_reconnects}): {e}")
if 'timeout' in error_msg or 'connection' in error_msg:
print("Network-related error, will retry")
elif 'unauthorized' in error_msg:
print("Authentication error, check credentials")
break
elif 'not_found' in error_msg:
print("Database not found")
break
if reconnect_count < max_reconnects:
wait_time = min(2 ** reconnect_count, 60) # Cap at 60 seconds
print(f"Reconnecting in {wait_time} seconds...")
time.sleep(wait_time)
else:
print("Max reconnection attempts reached")
raise
except KeyboardInterrupt:
print("Change feed interrupted by user")
break
except Exception as e:
print(f"Unexpected error in change feed: {e}")
break
# Usage
with cloudant('user', 'pass', account='myaccount') as client:
db = client['my_database']
try:
robust_change_feed(db)
except CloudantFeedException as e:
print(f"Change feed failed permanently: {e}")from cloudant import cloudant
from cloudant.error import CloudantException
import traceback
import logging
# Set up logging for better error tracking
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('cloudant_app')
def detailed_error_handling():
"""Example of detailed error context extraction."""
try:
with cloudant('user', 'pass', account='myaccount') as client:
db = client['my_database']
# Some operation that might fail
doc = db['problem_document']
doc.fetch()
except CloudantException as e:
# Log detailed error information
logger.error(f"Cloudant operation failed:")
logger.error(f" Error type: {type(e).__name__}")
logger.error(f" Error message: {str(e)}")
# Check if there's additional error context
if hasattr(e, 'response'):
logger.error(f" HTTP status: {e.response.status_code}")
logger.error(f" Response headers: {dict(e.response.headers)}")
if hasattr(e, 'request'):
logger.error(f" Request URL: {e.request.url}")
logger.error(f" Request method: {e.request.method}")
# Full stack trace for debugging
logger.debug("Full stack trace:")
logger.debug(traceback.format_exc())
# Re-raise with additional context
raise type(e)(f"Operation failed with context: {str(e)}") from e
def error_recovery_example():
"""Example of implementing error recovery strategies."""
max_retries = 3
retry_count = 0
while retry_count < max_retries:
try:
with cloudant('user', 'pass', account='myaccount') as client:
db = client['my_database']
# Critical operation
doc = db.create_document({
'type': 'important_record',
'data': 'critical_data',
'timestamp': time.time()
})
logger.info(f"Document created successfully: {doc['_id']}")
return doc
except CloudantDocumentException as e:
retry_count += 1
logger.warning(f"Document operation failed (attempt {retry_count}): {e}")
if retry_count < max_retries:
# Implement recovery strategy
if 'conflict' in str(e).lower():
# Generate new ID for conflicts
logger.info("Generating new document ID due to conflict")
time.sleep(0.5) # Brief delay
else:
# General retry with exponential backoff
wait_time = 2 ** retry_count
logger.info(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
logger.error("Max retries exceeded, operation failed permanently")
raise
except CloudantClientException as e:
logger.error(f"Client error, cannot retry: {e}")
raise
except Exception as e:
logger.error(f"Unexpected error: {e}")
raise
return None
# Usage examples
if __name__ == "__main__":
try:
detailed_error_handling()
except CloudantException as e:
print(f"Final error: {e}")
try:
doc = error_recovery_example()
if doc:
print(f"Operation succeeded: {doc['_id']}")
except CloudantException as e:
print(f"Operation failed permanently: {e}")Use Specific Exceptions: Catch specific exception types rather than the generic CloudantException when you need different handling logic.
Implement Retry Logic: For network-related errors, implement exponential backoff retry strategies.
Log Error Context: Include error type, message, and relevant context (HTTP status, request details) in logs.
Graceful Degradation: Design your application to continue functioning with reduced capabilities when possible.
Monitor Error Patterns: Track error frequencies and types to identify system issues early.
Validate Inputs: Use CloudantArgumentError to validate parameters before making requests.
Handle Conflicts: Implement conflict resolution strategies for document operations in multi-user environments.
Set Appropriate Timeouts: Use reasonable timeout values to prevent indefinite blocking while allowing sufficient time for operations.
Resource Cleanup: Always ensure proper cleanup of resources (connections, feeds) in error scenarios.
User-Friendly Messages: Convert technical error messages into user-friendly explanations when appropriate.
Install with Tessl CLI
npx tessl i tessl/pypi-cloudant