CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-redshift-connector

Amazon Redshift connector for Python implementing Python Database API Specification 2.0

Pending
Overview
Eval results
Files

data-science.mddocs/

Data Science Integration

Native integration with pandas and numpy for efficient data transfer between Redshift and Python data science workflows. The redshift_connector provides optimized methods for working with DataFrames and numpy arrays, enabling seamless data analysis pipelines.

Capabilities

Pandas DataFrame Integration

Direct support for reading query results into pandas DataFrames and writing DataFrames to Redshift tables with automatic type conversion and optimization.

class Cursor:
    def fetch_dataframe(self, num: int = None) -> 'pandas.DataFrame':
        """
        Fetch query results as a pandas DataFrame.
        
        Parameters:
        - num: Maximum number of rows to fetch (None for all remaining rows)
        
        Returns:
        pandas.DataFrame with query results
        
        Raises:
        InterfaceError: If pandas is not available
        """
    
    def write_dataframe(self, df: 'pandas.DataFrame', table: str) -> None:
        """
        Write a pandas DataFrame to a Redshift table.
        
        Parameters:
        - df: pandas DataFrame to write
        - table: Target table name
        
        The method automatically handles type conversion and creates
        appropriate INSERT statements for efficient data loading.
        
        Raises:
        InterfaceError: If pandas is not available
        ProgrammingError: If table doesn't exist or schema mismatch
        """

NumPy Array Integration

Support for fetching query results as numpy arrays for numerical computing and scientific analysis workflows.

class Cursor:
    def fetch_numpy_array(self, num: int = None) -> 'numpy.ndarray':
        """
        Fetch query results as a numpy ndarray.
        
        Parameters:
        - num: Maximum number of rows to fetch (None for all remaining rows)
        
        Returns:
        numpy.ndarray containing query results with appropriate dtypes
        
        The method automatically converts Redshift data types to
        compatible numpy dtypes for efficient numerical operations.
        
        Raises:
        InterfaceError: If numpy is not available
        """

Installation for Data Science Features

Data science integration requires optional dependencies that can be installed separately.

# Install with data science dependencies
# pip install redshift_connector[full]

# This includes:
# - pandas >= 1.0.0
# - numpy >= 1.15.0

# Error handling for missing dependencies
MISSING_MODULE_ERROR_MSG: str = (
    "redshift_connector requires {module} support for this functionality. "
    "Please install redshift_connector[full] for {module} support"
)

Data Type Conversion

Automatic conversion between Redshift data types and pandas/numpy compatible types for seamless data analysis.

# Redshift to pandas/numpy type mapping:
# 
# BIGINT -> int64
# INTEGER -> int32  
# SMALLINT -> int16
# BOOLEAN -> bool
# REAL -> float32
# DOUBLE PRECISION -> float64
# NUMERIC/DECIMAL -> float64 (or Decimal if numeric_to_float=False)
# VARCHAR/CHAR/TEXT -> object (string)
# DATE -> datetime64[D] 
# TIMESTAMP -> datetime64[ns]
# TIME -> object (datetime.time)
# JSON/JSONB -> object (parsed JSON)
# ARRAY types -> object (Python lists)

# Configuration for numeric conversion
conn = redshift_connector.connect(
    # ... connection parameters
    numeric_to_float=True  # Convert NUMERIC to float (default: False, uses Decimal)
)

Usage Examples

Practical examples demonstrating data science integration patterns.

import redshift_connector
import pandas as pd
import numpy as np

# Connect to Redshift
conn = redshift_connector.connect(
    host='examplecluster.abc123xyz789.us-west-1.redshift.amazonaws.com',
    database='dev',
    user='awsuser',
    password='password',
    numeric_to_float=True  # Convert NUMERIC to float for pandas compatibility
)

cursor = conn.cursor()

# Fetch data as DataFrame
cursor.execute("""
    SELECT customer_id, order_date, total_amount, product_category
    FROM sales_data 
    WHERE order_date >= '2023-01-01'
    ORDER BY order_date
""")

# Get results as pandas DataFrame
df = cursor.fetch_dataframe()
print(f"Loaded {len(df)} rows into DataFrame")
print(df.dtypes)
print(df.head())

# Perform data analysis
monthly_sales = df.groupby(df['order_date'].dt.to_period('M'))['total_amount'].sum()
category_stats = df.groupby('product_category')['total_amount'].agg(['mean', 'sum', 'count'])

# Write processed data back to Redshift
# First create target table
cursor.execute("""
    CREATE TABLE IF NOT EXISTS monthly_sales_summary (
        month VARCHAR(10),
        total_sales DECIMAL(15,2)
    )
""")

# Convert Series to DataFrame for writing
summary_df = monthly_sales.reset_index()
summary_df.columns = ['month', 'total_sales']
summary_df['month'] = summary_df['month'].astype(str)

# Write DataFrame to table
cursor.write_dataframe(summary_df, 'monthly_sales_summary')

# Fetch numerical data as numpy array for analysis
cursor.execute("SELECT total_amount, quantity, discount FROM order_details")
data_array = cursor.fetch_numpy_array()

# Perform numerical analysis
mean_amount = np.mean(data_array[:, 0])  # total_amount column
correlation_matrix = np.corrcoef(data_array.T)
print(f"Mean order amount: {mean_amount}")
print("Correlation matrix:")
print(correlation_matrix)

cursor.close()
conn.close()

Performance Considerations

Optimization strategies for efficient data transfer between Redshift and Python data science libraries.

# Performance Tips:

# 1. Use columnar operations when possible
cursor.execute("SELECT col1, col2, col3 FROM large_table WHERE condition")
df = cursor.fetch_dataframe()  # More efficient than row-by-row processing

# 2. Limit result sets for large tables
cursor.execute("SELECT * FROM large_table LIMIT 100000")
df = cursor.fetch_dataframe()

# 3. Use appropriate data types
conn = redshift_connector.connect(
    # ... 
    numeric_to_float=True  # Faster than Decimal for numerical analysis
)

# 4. Batch operations for large DataFrames  
# Write large DataFrames in chunks if needed
chunk_size = 10000
for i in range(0, len(large_df), chunk_size):
    chunk = large_df[i:i + chunk_size]
    cursor.write_dataframe(chunk, 'target_table')

# 5. Use stream parameter for very large result sets
cursor.execute("SELECT * FROM very_large_table", stream=True)
# Process results in chunks rather than loading all into memory

Error Handling for Data Science Operations

Specific error handling patterns for data science integration features.

import redshift_connector
from redshift_connector import InterfaceError, ProgrammingError

try:
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM my_table")
    df = cursor.fetch_dataframe()
except InterfaceError as e:
    if "pandas" in str(e):
        print("pandas not available. Install with: pip install redshift_connector[full]")
    elif "numpy" in str(e):
        print("numpy not available. Install with: pip install redshift_connector[full]")
    else:
        print(f"Interface error: {e}")
except ProgrammingError as e:
    print(f"SQL or schema error: {e}")

try:
    cursor.write_dataframe(df, 'nonexistent_table')
except ProgrammingError as e:
    print(f"Table write error: {e}")
    # Consider creating table first or checking schema compatibility

Integration with Data Science Ecosystem

The redshift_connector integrates seamlessly with the broader Python data science ecosystem.

# Common integration patterns:

# 1. With Jupyter Notebooks
# %%sql magic command support (via SQLAlchemy integration)
import pandas as pd
import redshift_connector

conn = redshift_connector.connect(...)
df = pd.read_sql("SELECT * FROM my_table", conn)

# 2. With Matplotlib/Seaborn for visualization
import matplotlib.pyplot as plt
import seaborn as sns

cursor = conn.cursor()
cursor.execute("SELECT category, AVG(value) as avg_value FROM data GROUP BY category")
df = cursor.fetch_dataframe()

plt.figure(figsize=(10, 6))
sns.barplot(data=df, x='category', y='avg_value')
plt.title('Average Values by Category')
plt.show()

# 3. With Scikit-learn for machine learning
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression

cursor.execute("SELECT feature1, feature2, target FROM ml_data")
data = cursor.fetch_numpy_array()

X = data[:, :2]  # features
y = data[:, 2]   # target

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)
model = LinearRegression()
model.fit(X_train, y_train)

# 4. With Apache Airflow for data pipelines
from airflow.operators.python import PythonOperator

def extract_transform_load():
    conn = redshift_connector.connect(...)
    cursor = conn.cursor()
    
    # Extract
    cursor.execute("SELECT * FROM source_table")
    df = cursor.fetch_dataframe()
    
    # Transform
    df['processed_date'] = pd.to_datetime(df['date_column'])
    df['calculated_field'] = df['field1'] * df['field2']
    
    # Load
    cursor.write_dataframe(df, 'target_table')
    conn.close()

extract_task = PythonOperator(
    task_id='extract_transform_load',
    python_callable=extract_transform_load,
    dag=dag
)

Install with Tessl CLI

npx tessl i tessl/pypi-redshift-connector

docs

authentication.md

core-database.md

data-science.md

data-types.md

error-handling.md

index.md

metadata.md

tile.json