Amazon Redshift connector for Python implementing Python Database API Specification 2.0
—
Complete DB-API 2.0 exception hierarchy providing structured error handling for different types of database and interface errors. The redshift_connector implements a comprehensive exception system that enables applications to handle errors appropriately based on their type and severity.
DB-API 2.0 compliant exception hierarchy with specific error types for different failure scenarios.
class Warning(Exception):
"""
Generic exception raised for important database warnings like data truncations.
This exception is not currently used.
This exception is part of the DB-API 2.0 specification.
"""
class Error(Exception):
"""
Generic exception that is the base exception of all other error exceptions.
This exception is part of the DB-API 2.0 specification.
"""
class InterfaceError(Error):
"""
Generic exception raised for errors that are related to the database
interface rather than the database itself. For example, if the interface
attempts to use an SSL connection but the server refuses, an InterfaceError
will be raised.
This exception is part of the DB-API 2.0 specification.
"""
class DatabaseError(Error):
"""
Generic exception raised for errors that are related to the database.
This exception is currently never raised.
This exception is part of the DB-API 2.0 specification.
"""
class DataError(DatabaseError):
"""
Generic exception raised for errors that are due to problems with the
processed data. This exception is not currently raised.
This exception is part of the DB-API 2.0 specification.
"""
class OperationalError(DatabaseError):
"""
Generic exception raised for errors that are related to the database's
operation and not necessarily under the control of the programmer.
This exception is currently never raised.
This exception is part of the DB-API 2.0 specification.
"""
class IntegrityError(DatabaseError):
"""
Generic exception raised when the relational integrity of the database is
affected. This exception is not currently raised.
This exception is part of the DB-API 2.0 specification.
"""
class InternalError(DatabaseError):
"""
Generic exception raised when the database encounters an internal error.
This is currently only raised when unexpected state occurs in the
interface itself, and is typically the result of a interface bug.
This exception is part of the DB-API 2.0 specification.
"""
class ProgrammingError(DatabaseError):
"""
Generic exception raised for programming errors. For example, this
exception is raised if more parameter fields are in a query string than
there are available parameters.
This exception is part of the DB-API 2.0 specification.
"""
class NotSupportedError(DatabaseError):
"""
Generic exception raised in case a method or database API was used which
is not supported by the database.
This exception is part of the DB-API 2.0 specification.
"""Specialized exceptions for array data type operations and validation.
class ArrayContentNotSupportedError(NotSupportedError):
"""
Raised when attempting to transmit an array where the base type is not
supported for binary data transfer by the interface.
"""
class ArrayContentNotHomogenousError(ProgrammingError):
"""
Raised when attempting to transmit an array that doesn't contain only a
single type of object.
"""
class ArrayDimensionsNotConsistentError(ProgrammingError):
"""
Raised when attempting to transmit an array that has inconsistent
multi-dimension sizes.
"""Common error handling patterns and best practices for different scenarios.
import redshift_connector
from redshift_connector import (
Error, InterfaceError, DatabaseError, ProgrammingError,
OperationalError, DataError, NotSupportedError
)
# Basic error handling pattern
try:
conn = redshift_connector.connect(
host='invalid-host.redshift.amazonaws.com',
database='dev',
user='user',
password='password'
)
except InterfaceError as e:
print(f"Connection interface error: {e}")
# Handle connection issues (network, SSL, authentication)
except Error as e:
print(f"General database error: {e}")
# Catch-all for other database errors
# Query execution error handling
try:
cursor = conn.cursor()
cursor.execute("SELECT * FROM nonexistent_table")
results = cursor.fetchall()
except ProgrammingError as e:
print(f"SQL programming error: {e}")
# Handle SQL syntax errors, missing tables/columns, etc.
except DatabaseError as e:
print(f"Database error: {e}")
# Handle database-level errors
except Error as e:
print(f"General error: {e}")
# Parameter handling errors
try:
cursor.execute("INSERT INTO table (col1, col2) VALUES (%s, %s)", ('value1',)) # Missing parameter
except ProgrammingError as e:
print(f"Parameter mismatch: {e}")
# Array handling errors
try:
cursor.execute("INSERT INTO array_table (arr_col) VALUES (%s)", ([[1, 2], [3, 4, 5]])) # Inconsistent dimensions
except ArrayDimensionsNotConsistentError as e:
print(f"Array dimension error: {e}")
try:
cursor.execute("INSERT INTO array_table (arr_col) VALUES (%s)", ([1, 'mixed', 3.14])) # Mixed types
except ArrayContentNotHomogenousError as e:
print(f"Array content error: {e}")
# Data science integration errors
try:
df = cursor.fetch_dataframe()
except InterfaceError as e:
if "pandas" in str(e):
print("pandas not available. Install with: pip install redshift_connector[full]")
elif "numpy" in str(e):
print("numpy not available. Install with: pip install redshift_connector[full]")
# Authentication errors
try:
conn = redshift_connector.connect(
iam=True,
ssl=False, # Invalid: IAM requires SSL
cluster_identifier='cluster'
)
except InterfaceError as e:
print(f"Authentication configuration error: {e}")
# Connection cleanup with error handling
try:
# Database operations
cursor = conn.cursor()
cursor.execute("SELECT 1")
result = cursor.fetchone()
except DatabaseError as e:
print(f"Database operation failed: {e}")
finally:
# Always clean up connections
if 'cursor' in locals():
cursor.close()
if 'conn' in locals():
conn.close()Using context managers for automatic resource cleanup with proper error handling.
# Connection context manager with error handling
try:
with redshift_connector.connect(
host='examplecluster.abc123xyz789.us-west-1.redshift.amazonaws.com',
database='dev',
user='awsuser',
password='password'
) as conn:
with conn.cursor() as cursor:
cursor.execute("SELECT COUNT(*) FROM large_table")
count = cursor.fetchone()[0]
print(f"Table has {count} rows")
except InterfaceError as e:
print(f"Connection or interface error: {e}")
except ProgrammingError as e:
print(f"SQL or programming error: {e}")
except DatabaseError as e:
print(f"Database error: {e}")
# Connection and cursor automatically closed even if exceptions occurError handling patterns for transaction management and rollback scenarios.
import redshift_connector
from redshift_connector import DatabaseError, ProgrammingError
conn = redshift_connector.connect(...)
try:
# Start transaction (autocommit=False is default)
cursor = conn.cursor()
# Perform multiple operations
cursor.execute("INSERT INTO orders (customer_id, amount) VALUES (%s, %s)", (123, 99.99))
cursor.execute("UPDATE inventory SET quantity = quantity - 1 WHERE product_id = %s", (456,))
cursor.execute("INSERT INTO audit_log (action, timestamp) VALUES (%s, %s)", ('order_placed', '2023-01-01'))
# Commit transaction
conn.commit()
print("Transaction completed successfully")
except ProgrammingError as e:
print(f"SQL error in transaction: {e}")
conn.rollback()
print("Transaction rolled back")
except DatabaseError as e:
print(f"Database error in transaction: {e}")
conn.rollback()
print("Transaction rolled back")
except Exception as e:
print(f"Unexpected error: {e}")
conn.rollback()
print("Transaction rolled back")
finally:
cursor.close()
conn.close()Specific error handling for various authentication scenarios and configurations.
import redshift_connector
from redshift_connector import InterfaceError
# IAM authentication error handling
try:
conn = redshift_connector.connect(
iam=True,
cluster_identifier='my-cluster',
db_user='testuser',
access_key_id='invalid_key',
secret_access_key='invalid_secret',
region='us-west-2'
)
except InterfaceError as e:
if "SSL must be enabled when using IAM" in str(e):
print("IAM authentication requires SSL to be enabled")
elif "Invalid connection property" in str(e):
print("Invalid authentication configuration")
elif "credentials" in str(e).lower():
print("Invalid AWS credentials")
else:
print(f"Authentication error: {e}")
# Identity provider error handling
try:
conn = redshift_connector.connect(
credentials_provider='BrowserIdcAuthPlugin',
iam=True # Invalid: IdC plugins cannot be used with IAM
)
except InterfaceError as e:
if "can not use this authentication plugin with IAM enabled" in str(e):
print("Identity Center plugins cannot be used with IAM=True")
# SSL configuration error handling
try:
conn = redshift_connector.connect(
credentials_provider='BrowserAzureOAuth2CredentialsProvider',
ssl=False # Invalid: Authentication plugins require SSL
)
except InterfaceError as e:
if "Authentication must use an SSL connection" in str(e):
print("Authentication plugins require SSL to be enabled")Error logging and debugging utilities for troubleshooting connection and query issues.
import logging
import redshift_connector
from redshift_connector.utils import make_divider_block, mask_secure_info_in_props
# Enable logging for debugging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('redshift_connector')
try:
conn = redshift_connector.connect(
host='examplecluster.abc123xyz789.us-west-1.redshift.amazonaws.com',
database='dev',
user='awsuser',
password='password'
)
# Connection details are automatically logged (with sensitive info masked)
except Exception as e:
logger.error(f"Connection failed: {e}")
# Detailed error information available in logs
# Utility functions for secure logging
def log_connection_attempt(conn_params):
"""Log connection attempt with sensitive information masked."""
from redshift_connector.redshift_property import RedshiftProperty
info = RedshiftProperty()
for key, value in conn_params.items():
info.put(key, value)
masked_info = mask_secure_info_in_props(info)
logger.info(make_divider_block())
logger.info("Connection attempt with parameters:")
logger.info(str(masked_info))
logger.info(make_divider_block())Patterns for handling transient errors and implementing retry logic.
import time
import redshift_connector
from redshift_connector import OperationalError, InterfaceError
def connect_with_retry(max_attempts=3, retry_delay=1, **conn_params):
"""
Attempt connection with exponential backoff retry logic.
"""
for attempt in range(max_attempts):
try:
conn = redshift_connector.connect(**conn_params)
return conn
except (OperationalError, InterfaceError) as e:
if attempt == max_attempts - 1:
raise # Re-raise on final attempt
print(f"Connection attempt {attempt + 1} failed: {e}")
time.sleep(retry_delay * (2 ** attempt)) # Exponential backoff
def execute_with_retry(cursor, sql, params=None, max_attempts=3):
"""
Execute query with retry logic for transient errors.
"""
for attempt in range(max_attempts):
try:
if params:
cursor.execute(sql, params)
else:
cursor.execute(sql)
return cursor.fetchall()
except OperationalError as e:
if "timeout" in str(e).lower() and attempt < max_attempts - 1:
print(f"Query timeout, retrying... (attempt {attempt + 1})")
time.sleep(2 ** attempt)
continue
raise
except DatabaseError as e:
# Don't retry programming errors or constraint violations
raise
# Usage example
try:
conn = connect_with_retry(
host='examplecluster.abc123xyz789.us-west-1.redshift.amazonaws.com',
database='dev',
user='awsuser',
password='password',
max_attempts=3
)
cursor = conn.cursor()
results = execute_with_retry(cursor, "SELECT COUNT(*) FROM large_table")
print(f"Query result: {results}")
except Exception as e:
print(f"Failed after retries: {e}")
finally:
if 'cursor' in locals():
cursor.close()
if 'conn' in locals():
conn.close()Install with Tessl CLI
npx tessl i tessl/pypi-redshift-connector