CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-impyla

Python client for the Impala distributed query engine and HiveServer2 implementations

Pending
Overview
Eval results
Files

data-utilities.mddocs/

Data Utilities

Utility functions for data conversion and integration with the Python data science ecosystem, including pandas DataFrame conversion and database management functions.

Capabilities

Pandas Integration

Convert query results to pandas DataFrames for seamless integration with the Python data science stack.

def as_pandas(cursor, coerce_float=False):
    """
    Convert cursor results to pandas DataFrame.
    
    This function converts the results from an executed cursor into a pandas
    DataFrame, enabling easy integration with the Python data science ecosystem
    including scikit-learn, matplotlib, and other analysis libraries.
    
    Parameters:
        cursor: Active cursor object with executed query results
        coerce_float (bool): Coerce numeric columns to float type. Default is False.
        
    Returns:
        pandas.DataFrame: Query results as a DataFrame with proper column names
        
    Raises:
        ImportError: If pandas is not installed
        ValueError: If cursor has no results or is not executed
    """

Database Management

Administrative functions for managing databases and performing maintenance operations.

def force_drop_impala_database(cursor, database_name):
    """
    Force drop an Impala database and all its tables.
    
    This is a utility function for development and testing that forcibly
    drops a database even if it contains tables.
    
    Parameters:
        cursor: Active cursor object
        database_name (str): Name of the database to drop
        
    Warning:
        This operation is destructive and cannot be undone.
    """

def force_drop_hive_database(cursor, database_name):
    """
    Force drop a Hive database and all its tables.
    
    Similar to force_drop_impala_database but optimized for Hive-specific
    metadata handling.
    
    Parameters:
        cursor: Active cursor object  
        database_name (str): Name of the database to drop
        
    Warning:
        This operation is destructive and cannot be undone.
    """

Logging and Debugging

Utilities for logging and debugging database operations.

def get_logger_and_init_null():
    """
    Get logger instance with null handler initialization.
    
    Returns a properly configured logger for impyla operations with
    a null handler to prevent unwanted log output unless explicitly
    configured by the user.
    
    Returns:
        logging.Logger: Configured logger instance
    """

Internal Utilities

Internal utility functions used by the impyla library for various operations.

def _random_id(prefix='', length=8):
    """
    Generate a random identifier.
    
    Used internally for generating unique identifiers for operations
    and temporary objects.
    
    Parameters:
        prefix (str): Optional prefix for the identifier
        length (int): Length of the random part (default 8)
        
    Returns:
        str: Random identifier string
    """

def _get_table_schema_hack(cursor, table):
    """
    Get table schema information using internal methods.
    
    This is an internal utility for extracting table schema information
    when standard methods are not available.
    
    Parameters:
        cursor: Active cursor object
        table (str): Name of the table
        
    Returns:
        list: Schema information as list of column descriptors
    """

def _gen_safe_random_table_name(cursor, prefix='tmp'):
    """
    Generate a safe random table name that doesn't conflict with existing tables.
    
    Parameters:
        cursor: Active cursor object
        prefix (str): Prefix for the table name (default 'tmp')
        
    Returns:
        str: Safe random table name
    """

def compute_result_schema(cursor, query_string):
    """
    Compute the result schema for a query without executing it.
    
    Parameters:
        cursor: Active cursor object
        query_string (str): SQL query to analyze
        
    Returns:
        list: Schema information for the query result
    """

def get_basic_credentials_for_request_headers(user, password):
    """
    Generate basic authentication credentials for HTTP request headers.
    
    Parameters:
        user (str): Username
        password (str): Password
        
    Returns:
        str: Base64 encoded credentials for Authorization header
    """

HTTP and Cookie Utilities

Utilities for handling HTTP transport and cookie management.

def cookie_matches_path(c, path):
    """
    Check if a cookie matches a given path.
    
    Parameters:
        c: Cookie object
        path (str): URL path to check
        
    Returns:
        bool: True if cookie matches the path
    """

def get_cookie_expiry(c):
    """
    Get the expiry time of a cookie.
    
    Parameters:
        c: Cookie object
        
    Returns:
        datetime or None: Cookie expiry time
    """

def get_cookies(resp_headers):
    """
    Extract cookies from HTTP response headers.
    
    Parameters:
        resp_headers: HTTP response headers
        
    Returns:
        list: List of cookie objects
    """

def get_all_cookies(path, resp_headers):
    """
    Get all cookies from HTTP response headers for a given path.
    
    Parameters:
        path (str): URL path
        resp_headers: HTTP response headers
        
    Returns:
        list: List of cookie objects matching the path
    """

def get_all_matching_cookies(cookie_names, path, resp_headers):
    """
    Get cookies matching specific names from HTTP response headers.
    
    Parameters:
        cookie_names (list): List of cookie names to match
        path (str): URL path
        resp_headers: HTTP response headers
        
    Returns:
        list: List of matching cookie objects
    """

Deprecation and Warning Utilities

Utilities for handling deprecation warnings and protocol warnings.

def warn_protocol_param():
    """
    Issue a warning about deprecated protocol parameters.
    """

def warn_deprecate(functionality='This', alternative=None):
    """
    Issue a deprecation warning for functionality.
    
    Parameters:
        functionality (str): Description of deprecated functionality
        alternative (str): Suggested alternative (optional)
    """

def warn_nontls_jwt():
    """
    Issue a warning about using JWT without TLS.
    """

Usage Examples

Basic Pandas Conversion

from impala.dbapi import connect
from impala.util import as_pandas

# Connect and execute query
conn = connect(host='impala-host', port=21050)
cursor = conn.cursor()

cursor.execute("""
    SELECT 
        customer_id,
        order_date,
        total_amount,
        status
    FROM orders 
    WHERE order_date >= '2023-01-01'
    LIMIT 1000
""")

# Convert to pandas DataFrame
df = as_pandas(cursor)

print(f"DataFrame shape: {df.shape}")
print("\nColumn types:")
print(df.dtypes)

print("\nFirst few rows:")
print(df.head())

cursor.close()
conn.close()

Data Analysis with Pandas

from impala.dbapi import connect
from impala.util import as_pandas
import matplotlib.pyplot as plt

conn = connect(host='impala-host', port=21050)
cursor = conn.cursor()

# Query sales data
cursor.execute("""
    SELECT 
        DATE_TRUNC('month', order_date) as month,
        SUM(total_amount) as monthly_sales,
        COUNT(*) as order_count,
        AVG(total_amount) as avg_order_value
    FROM orders 
    WHERE order_date >= '2023-01-01'
    GROUP BY DATE_TRUNC('month', order_date)
    ORDER BY month
""")

# Convert to DataFrame
df = as_pandas(cursor, coerce_float=True)

# Perform analysis
print("Monthly Sales Summary:")
print(df.describe())

# Plot monthly trends
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.plot(df['month'], df['monthly_sales'])
plt.title('Monthly Sales')
plt.xticks(rotation=45)

plt.subplot(1, 2, 2)
plt.plot(df['month'], df['avg_order_value'])
plt.title('Average Order Value')
plt.xticks(rotation=45)

plt.tight_layout()
plt.show()

cursor.close()
conn.close()

Working with Complex Data Types

from impala.dbapi import connect
from impala.util import as_pandas
import json

conn = connect(host='impala-host', port=21050)
cursor = conn.cursor()

# Query with complex data types (arrays, structs)
cursor.execute("""
    SELECT 
        user_id,
        profile.name,
        profile.email,
        tags,
        metadata
    FROM user_profiles 
    LIMIT 100
""")

df = as_pandas(cursor)

# Handle complex types
print("Data types:")
print(df.dtypes)

# Work with array columns
if 'tags' in df.columns:
    # Convert string representation of arrays to Python lists
    df['tags_list'] = df['tags'].apply(
        lambda x: json.loads(x) if x and x != 'NULL' else []
    )
    
    # Analyze tag frequency
    all_tags = []
    for tags in df['tags_list']:
        all_tags.extend(tags)
    
    from collections import Counter
    tag_counts = Counter(all_tags)
    print("\nTop 10 most common tags:")
    for tag, count in tag_counts.most_common(10):
        print(f"{tag}: {count}")

cursor.close()
conn.close()

Database Management Operations

from impala.dbapi import connect
from impala.util import force_drop_impala_database

# Connect with administrative privileges
conn = connect(
    host='impala-host', 
    port=21050,
    auth_mechanism='GSSAPI'  # Admin access
)
cursor = conn.cursor()

# Create test database
cursor.execute("CREATE DATABASE IF NOT EXISTS test_analytics")

# Create some test tables
cursor.execute("""
    CREATE TABLE IF NOT EXISTS test_analytics.sample_data (
        id INT,
        name STRING,
        value DOUBLE
    )
""")

cursor.execute("""
    INSERT INTO test_analytics.sample_data 
    VALUES (1, 'test', 123.45)
""")

# List tables in the database
cursor.execute("SHOW TABLES IN test_analytics")
tables = cursor.fetchall()
print(f"Tables in test_analytics: {[t[0] for t in tables]}")

# Force drop the entire database (use with caution!)
try:
    force_drop_impala_database(cursor, 'test_analytics')
    print("Database test_analytics dropped successfully")
except Exception as e:
    print(f"Error dropping database: {e}")

cursor.close()
conn.close()

Custom Data Processing Pipeline

from impala.dbapi import connect
from impala.util import as_pandas
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier

def create_ml_pipeline(host, query, target_column):
    """Create a machine learning pipeline from Impala data."""
    
    # Connect and fetch data
    conn = connect(host=host, port=21050)
    cursor = conn.cursor()
    
    cursor.execute(query)
    df = as_pandas(cursor, coerce_float=True)
    
    print(f"Loaded {len(df)} rows from Impala")
    
    # Prepare features and target
    feature_columns = [col for col in df.columns if col != target_column]
    X = df[feature_columns]
    y = df[target_column]
    
    # Handle missing values
    X = X.fillna(X.mean())
    
    # Scale features
    scaler = StandardScaler()
    X_scaled = scaler.fit_transform(X)
    
    # Train model
    model = RandomForestClassifier(n_estimators=100, random_state=42)
    model.fit(X_scaled, y)
    
    print(f"Model trained with accuracy: {model.score(X_scaled, y):.3f}")
    
    # Feature importance
    feature_importance = pd.DataFrame({
        'feature': feature_columns,
        'importance': model.feature_importances_
    }).sort_values('importance', ascending=False)
    
    print("\nTop 5 important features:")
    print(feature_importance.head())
    
    cursor.close()
    conn.close()
    
    return model, scaler, feature_importance

# Usage
ml_query = """
    SELECT 
        age, income, education_years, experience_years,
        CASE WHEN salary > 50000 THEN 1 ELSE 0 END as high_earner
    FROM employee_data 
    WHERE age IS NOT NULL 
    AND income IS NOT NULL
"""

model, scaler, importance = create_ml_pipeline(
    'impala-host', 
    ml_query, 
    'high_earner'
)

Install with Tessl CLI

npx tessl i tessl/pypi-impyla

docs

core-database-api.md

data-utilities.md

error-handling.md

index.md

sqlalchemy-integration.md

tile.json