Python client for the Impala distributed query engine and HiveServer2 implementations
—
Utility functions for data conversion and integration with the Python data science ecosystem, including pandas DataFrame conversion and database management functions.
Convert query results to pandas DataFrames for seamless integration with the Python data science stack.
def as_pandas(cursor, coerce_float=False):
"""
Convert cursor results to pandas DataFrame.
This function converts the results from an executed cursor into a pandas
DataFrame, enabling easy integration with the Python data science ecosystem
including scikit-learn, matplotlib, and other analysis libraries.
Parameters:
cursor: Active cursor object with executed query results
coerce_float (bool): Coerce numeric columns to float type. Default is False.
Returns:
pandas.DataFrame: Query results as a DataFrame with proper column names
Raises:
ImportError: If pandas is not installed
ValueError: If cursor has no results or is not executed
"""Administrative functions for managing databases and performing maintenance operations.
def force_drop_impala_database(cursor, database_name):
"""
Force drop an Impala database and all its tables.
This is a utility function for development and testing that forcibly
drops a database even if it contains tables.
Parameters:
cursor: Active cursor object
database_name (str): Name of the database to drop
Warning:
This operation is destructive and cannot be undone.
"""
def force_drop_hive_database(cursor, database_name):
"""
Force drop a Hive database and all its tables.
Similar to force_drop_impala_database but optimized for Hive-specific
metadata handling.
Parameters:
cursor: Active cursor object
database_name (str): Name of the database to drop
Warning:
This operation is destructive and cannot be undone.
"""Utilities for logging and debugging database operations.
def get_logger_and_init_null():
"""
Get logger instance with null handler initialization.
Returns a properly configured logger for impyla operations with
a null handler to prevent unwanted log output unless explicitly
configured by the user.
Returns:
logging.Logger: Configured logger instance
"""Internal utility functions used by the impyla library for various operations.
def _random_id(prefix='', length=8):
"""
Generate a random identifier.
Used internally for generating unique identifiers for operations
and temporary objects.
Parameters:
prefix (str): Optional prefix for the identifier
length (int): Length of the random part (default 8)
Returns:
str: Random identifier string
"""
def _get_table_schema_hack(cursor, table):
"""
Get table schema information using internal methods.
This is an internal utility for extracting table schema information
when standard methods are not available.
Parameters:
cursor: Active cursor object
table (str): Name of the table
Returns:
list: Schema information as list of column descriptors
"""
def _gen_safe_random_table_name(cursor, prefix='tmp'):
"""
Generate a safe random table name that doesn't conflict with existing tables.
Parameters:
cursor: Active cursor object
prefix (str): Prefix for the table name (default 'tmp')
Returns:
str: Safe random table name
"""
def compute_result_schema(cursor, query_string):
"""
Compute the result schema for a query without executing it.
Parameters:
cursor: Active cursor object
query_string (str): SQL query to analyze
Returns:
list: Schema information for the query result
"""
def get_basic_credentials_for_request_headers(user, password):
"""
Generate basic authentication credentials for HTTP request headers.
Parameters:
user (str): Username
password (str): Password
Returns:
str: Base64 encoded credentials for Authorization header
"""Utilities for handling HTTP transport and cookie management.
def cookie_matches_path(c, path):
"""
Check if a cookie matches a given path.
Parameters:
c: Cookie object
path (str): URL path to check
Returns:
bool: True if cookie matches the path
"""
def get_cookie_expiry(c):
"""
Get the expiry time of a cookie.
Parameters:
c: Cookie object
Returns:
datetime or None: Cookie expiry time
"""
def get_cookies(resp_headers):
"""
Extract cookies from HTTP response headers.
Parameters:
resp_headers: HTTP response headers
Returns:
list: List of cookie objects
"""
def get_all_cookies(path, resp_headers):
"""
Get all cookies from HTTP response headers for a given path.
Parameters:
path (str): URL path
resp_headers: HTTP response headers
Returns:
list: List of cookie objects matching the path
"""
def get_all_matching_cookies(cookie_names, path, resp_headers):
"""
Get cookies matching specific names from HTTP response headers.
Parameters:
cookie_names (list): List of cookie names to match
path (str): URL path
resp_headers: HTTP response headers
Returns:
list: List of matching cookie objects
"""Utilities for handling deprecation warnings and protocol warnings.
def warn_protocol_param():
"""
Issue a warning about deprecated protocol parameters.
"""
def warn_deprecate(functionality='This', alternative=None):
"""
Issue a deprecation warning for functionality.
Parameters:
functionality (str): Description of deprecated functionality
alternative (str): Suggested alternative (optional)
"""
def warn_nontls_jwt():
"""
Issue a warning about using JWT without TLS.
"""from impala.dbapi import connect
from impala.util import as_pandas
# Connect and execute query
conn = connect(host='impala-host', port=21050)
cursor = conn.cursor()
cursor.execute("""
SELECT
customer_id,
order_date,
total_amount,
status
FROM orders
WHERE order_date >= '2023-01-01'
LIMIT 1000
""")
# Convert to pandas DataFrame
df = as_pandas(cursor)
print(f"DataFrame shape: {df.shape}")
print("\nColumn types:")
print(df.dtypes)
print("\nFirst few rows:")
print(df.head())
cursor.close()
conn.close()from impala.dbapi import connect
from impala.util import as_pandas
import matplotlib.pyplot as plt
conn = connect(host='impala-host', port=21050)
cursor = conn.cursor()
# Query sales data
cursor.execute("""
SELECT
DATE_TRUNC('month', order_date) as month,
SUM(total_amount) as monthly_sales,
COUNT(*) as order_count,
AVG(total_amount) as avg_order_value
FROM orders
WHERE order_date >= '2023-01-01'
GROUP BY DATE_TRUNC('month', order_date)
ORDER BY month
""")
# Convert to DataFrame
df = as_pandas(cursor, coerce_float=True)
# Perform analysis
print("Monthly Sales Summary:")
print(df.describe())
# Plot monthly trends
plt.figure(figsize=(12, 6))
plt.subplot(1, 2, 1)
plt.plot(df['month'], df['monthly_sales'])
plt.title('Monthly Sales')
plt.xticks(rotation=45)
plt.subplot(1, 2, 2)
plt.plot(df['month'], df['avg_order_value'])
plt.title('Average Order Value')
plt.xticks(rotation=45)
plt.tight_layout()
plt.show()
cursor.close()
conn.close()from impala.dbapi import connect
from impala.util import as_pandas
import json
conn = connect(host='impala-host', port=21050)
cursor = conn.cursor()
# Query with complex data types (arrays, structs)
cursor.execute("""
SELECT
user_id,
profile.name,
profile.email,
tags,
metadata
FROM user_profiles
LIMIT 100
""")
df = as_pandas(cursor)
# Handle complex types
print("Data types:")
print(df.dtypes)
# Work with array columns
if 'tags' in df.columns:
# Convert string representation of arrays to Python lists
df['tags_list'] = df['tags'].apply(
lambda x: json.loads(x) if x and x != 'NULL' else []
)
# Analyze tag frequency
all_tags = []
for tags in df['tags_list']:
all_tags.extend(tags)
from collections import Counter
tag_counts = Counter(all_tags)
print("\nTop 10 most common tags:")
for tag, count in tag_counts.most_common(10):
print(f"{tag}: {count}")
cursor.close()
conn.close()from impala.dbapi import connect
from impala.util import force_drop_impala_database
# Connect with administrative privileges
conn = connect(
host='impala-host',
port=21050,
auth_mechanism='GSSAPI' # Admin access
)
cursor = conn.cursor()
# Create test database
cursor.execute("CREATE DATABASE IF NOT EXISTS test_analytics")
# Create some test tables
cursor.execute("""
CREATE TABLE IF NOT EXISTS test_analytics.sample_data (
id INT,
name STRING,
value DOUBLE
)
""")
cursor.execute("""
INSERT INTO test_analytics.sample_data
VALUES (1, 'test', 123.45)
""")
# List tables in the database
cursor.execute("SHOW TABLES IN test_analytics")
tables = cursor.fetchall()
print(f"Tables in test_analytics: {[t[0] for t in tables]}")
# Force drop the entire database (use with caution!)
try:
force_drop_impala_database(cursor, 'test_analytics')
print("Database test_analytics dropped successfully")
except Exception as e:
print(f"Error dropping database: {e}")
cursor.close()
conn.close()from impala.dbapi import connect
from impala.util import as_pandas
import pandas as pd
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
def create_ml_pipeline(host, query, target_column):
"""Create a machine learning pipeline from Impala data."""
# Connect and fetch data
conn = connect(host=host, port=21050)
cursor = conn.cursor()
cursor.execute(query)
df = as_pandas(cursor, coerce_float=True)
print(f"Loaded {len(df)} rows from Impala")
# Prepare features and target
feature_columns = [col for col in df.columns if col != target_column]
X = df[feature_columns]
y = df[target_column]
# Handle missing values
X = X.fillna(X.mean())
# Scale features
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# Train model
model = RandomForestClassifier(n_estimators=100, random_state=42)
model.fit(X_scaled, y)
print(f"Model trained with accuracy: {model.score(X_scaled, y):.3f}")
# Feature importance
feature_importance = pd.DataFrame({
'feature': feature_columns,
'importance': model.feature_importances_
}).sort_values('importance', ascending=False)
print("\nTop 5 important features:")
print(feature_importance.head())
cursor.close()
conn.close()
return model, scaler, feature_importance
# Usage
ml_query = """
SELECT
age, income, education_years, experience_years,
CASE WHEN salary > 50000 THEN 1 ELSE 0 END as high_earner
FROM employee_data
WHERE age IS NOT NULL
AND income IS NOT NULL
"""
model, scaler, importance = create_ml_pipeline(
'impala-host',
ml_query,
'high_earner'
)Install with Tessl CLI
npx tessl i tessl/pypi-impyla