Python DB API 2.0 (PEP 249) client for Amazon Athena
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Integration with Athena's Spark execution engine for distributed processing, Jupyter notebook compatibility, and advanced analytics workloads. Enables running Spark code directly on Athena's serverless Spark engine.
Cursor for executing Spark calculations on Athena's Spark engine, providing distributed processing capabilities and integration with Jupyter notebooks.
class SparkCursor:
session_id: str
calculation_id: Optional[str]
description: Optional[str]
working_directory: Optional[str]
state: Optional[str]
state_change_reason: Optional[str]
submission_date_time: Optional[datetime]
completion_date_time: Optional[datetime]
dpu_execution_in_millis: Optional[int]
progress: Optional[str]
std_out_s3_uri: Optional[str]
std_error_s3_uri: Optional[str]
result_s3_uri: Optional[str]
result_type: Optional[str]
def execute(self, code: str, **kwargs) -> SparkCursor:
"""
Execute Spark code on Athena's Spark engine.
Parameters:
- code: Spark code to execute (Python, Scala, or SQL)
- **kwargs: Additional execution options
Returns:
Self for method chaining
"""
def cancel(self) -> None:
"""Cancel the currently executing Spark calculation."""
def close(self) -> None:
"""Close the cursor and clean up resources."""
@property
def calculation_execution(self) -> Optional[AthenaCalculationExecution]:
"""Get the current calculation execution metadata."""Asynchronous version of SparkCursor for non-blocking Spark calculations.
class AsyncSparkCursor:
def execute(self, code: str, **kwargs) -> Tuple[str, Future[AthenaCalculationExecution]]:
"""
Execute Spark code asynchronously.
Parameters:
- code: Spark code to execute
Returns:
Tuple of (calculation_id, Future[AthenaCalculationExecution])
"""
def cancel(self, calculation_id: str) -> Future[None]:
"""Cancel Spark calculation by ID asynchronously."""
def close(self, wait: bool = False) -> None:
"""Close cursor, optionally waiting for running calculations."""Models representing Spark calculation execution and status information.
class AthenaCalculationExecution:
# Calculation states
STATE_CREATING: str = "CREATING"
STATE_CREATED: str = "CREATED"
STATE_QUEUED: str = "QUEUED"
STATE_RUNNING: str = "RUNNING"
STATE_CANCELLING: str = "CANCELLING"
STATE_CANCELLED: str = "CANCELLED"
STATE_COMPLETED: str = "COMPLETED"
STATE_FAILED: str = "FAILED"
calculation_execution_id: Optional[str]
session_id: Optional[str]
description: Optional[str]
working_directory: Optional[str]
state: Optional[str]
state_change_reason: Optional[str]
submission_date_time: Optional[datetime]
completion_date_time: Optional[datetime]
dpu_execution_in_millis: Optional[int]
progress: Optional[str]
std_out_s3_uri: Optional[str]
std_error_s3_uri: Optional[str]
result_s3_uri: Optional[str]
result_type: Optional[str]
class AthenaSessionStatus:
session_id: Optional[str]
description: Optional[str]
working_directory: Optional[str]
idle_since_date_time: Optional[datetime]
last_modified_date_time: Optional[datetime]
termination_date_time: Optional[datetime]
notebook_version: Optional[str]
session_configuration: Optional[Dict]
status: Optional[Dict]from pyathena import connect
from pyathena.spark.cursor import SparkCursor
# Connect with Spark cursor
conn = connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2",
cursor_class=SparkCursor,
work_group="spark-workgroup" # Must be configured for Spark
)
cursor = conn.cursor()
# Execute Spark Python code
spark_code = """
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("AthenaSparkExample").getOrCreate()
# Read data from S3
df = spark.read.parquet("s3://my-bucket/data/sales/")
# Perform transformations
result = df.groupBy("product_category") \\
.agg({"amount": "sum", "quantity": "count"}) \\
.orderBy("sum(amount)", ascending=False)
result.show()
# Save results
result.write.mode("overwrite").parquet("s3://my-bucket/results/category_summary/")
"""
cursor.execute(spark_code)
# Check execution status
print(f"Session ID: {cursor.session_id}")
print(f"Calculation ID: {cursor.calculation_id}")
print(f"State: {cursor.state}")
print(f"Progress: {cursor.progress}")
# Get output and error logs
if cursor.std_out_s3_uri:
print(f"Output logs: {cursor.std_out_s3_uri}")
if cursor.std_error_s3_uri:
print(f"Error logs: {cursor.std_error_s3_uri}")
cursor.close()
conn.close()from pyathena import connect
from pyathena.spark.cursor import SparkCursor
conn = connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2",
cursor_class=SparkCursor,
work_group="spark-workgroup"
)
cursor = conn.cursor()
# Execute Spark SQL
spark_sql = """
-- Create temporary view from S3 data
CREATE OR REPLACE TEMPORARY VIEW sales
USING PARQUET
OPTIONS (
path "s3://my-bucket/data/sales/"
);
-- Perform complex analytics
WITH monthly_metrics AS (
SELECT
DATE_FORMAT(sale_date, 'yyyy-MM') as sale_month,
product_category,
SUM(amount) as total_revenue,
COUNT(*) as transaction_count,
COUNT(DISTINCT customer_id) as unique_customers
FROM sales
WHERE sale_date >= '2023-01-01'
GROUP BY DATE_FORMAT(sale_date, 'yyyy-MM'), product_category
),
category_trends AS (
SELECT
product_category,
sale_month,
total_revenue,
LAG(total_revenue) OVER (
PARTITION BY product_category
ORDER BY sale_month
) as prev_month_revenue,
total_revenue - LAG(total_revenue) OVER (
PARTITION BY product_category
ORDER BY sale_month
) as revenue_change
FROM monthly_metrics
)
SELECT
product_category,
sale_month,
total_revenue,
revenue_change,
CASE
WHEN revenue_change > 0 THEN 'Growth'
WHEN revenue_change < 0 THEN 'Decline'
ELSE 'Stable'
END as trend
FROM category_trends
WHERE prev_month_revenue IS NOT NULL
ORDER BY product_category, sale_month;
"""
cursor.execute(spark_sql)
print(f"Spark SQL execution started")
print(f"Calculation ID: {cursor.calculation_id}")
print(f"Working directory: {cursor.working_directory}")
cursor.close()
conn.close()from pyathena import connect
from pyathena.spark.cursor import SparkCursor
def spark_ml_pipeline():
conn = connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2",
cursor_class=SparkCursor,
work_group="spark-workgroup"
)
cursor = conn.cursor()
# Machine learning pipeline with Spark MLlib
ml_code = """
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.sql.functions import col, when, isnan, count
spark = SparkSession.builder.appName("CustomerSegmentation").getOrCreate()
# Load customer data
customers = spark.read.parquet("s3://my-bucket/data/customers/")
# Data preprocessing
print("Original dataset shape:", customers.count(), len(customers.columns))
# Handle missing values
customers_clean = customers.dropna()
print("After removing nulls:", customers_clean.count())
# Feature engineering
feature_cols = ["age", "annual_income", "spending_score", "total_orders", "avg_order_value"]
# Create feature vector
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
customers_features = assembler.transform(customers_clean)
# Scale features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(customers_features)
customers_scaled = scaler_model.transform(customers_features)
# K-means clustering
kmeans = KMeans(featuresCol="scaled_features", predictionCol="cluster", k=5, seed=42)
model = kmeans.fit(customers_scaled)
# Make predictions
predictions = model.transform(customers_scaled)
# Evaluate clustering
evaluator = ClusteringEvaluator(featuresCol="scaled_features", predictionCol="cluster")
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette Score: {silhouette}")
# Analyze clusters
cluster_summary = predictions.groupBy("cluster").agg(
count("*").alias("cluster_size"),
avg("age").alias("avg_age"),
avg("annual_income").alias("avg_income"),
avg("spending_score").alias("avg_spending_score"),
avg("total_orders").alias("avg_orders")
)
cluster_summary.show()
# Save results
predictions.select("customer_id", "cluster", *feature_cols).write.mode("overwrite").parquet("s3://my-bucket/results/customer_segments/")
# Save cluster centers
centers_df = spark.createDataFrame([(i, center.toArray().tolist()) for i, center in enumerate(model.clusterCenters())],
["cluster_id", "center_coordinates"])
centers_df.write.mode("overwrite").json("s3://my-bucket/results/cluster_centers/")
spark.stop()
"""
cursor.execute(ml_code)
print("ML pipeline started")
print(f"Session: {cursor.session_id}")
print(f"Calculation: {cursor.calculation_id}")
# Monitor progress
while cursor.state in ['CREATING', 'CREATED', 'QUEUED', 'RUNNING']:
time.sleep(10)
# In practice, you'd poll the status here
print(f"Current state: {cursor.state}")
if cursor.progress:
print(f"Progress: {cursor.progress}")
print(f"Final state: {cursor.state}")
if cursor.state == 'COMPLETED':
print("ML pipeline completed successfully!")
print(f"Results saved to: s3://my-bucket/results/customer_segments/")
else:
print(f"Pipeline failed: {cursor.state_change_reason}")
cursor.close()
conn.close()
# Run ML pipeline
spark_ml_pipeline()from pyathena import connect
from pyathena.spark.cursor import SparkCursor
class JupyterSparkNotebook:
"""Integration with Jupyter-style notebook execution."""
def __init__(self, connection):
self.conn = connection
self.cursor = connection.cursor()
self.cell_results = []
def execute_cell(self, cell_code, cell_name=None):
"""Execute a notebook cell."""
print(f"Executing cell: {cell_name or 'Unnamed'}")
result = self.cursor.execute(cell_code)
cell_result = {
'name': cell_name,
'calculation_id': self.cursor.calculation_id,
'state': self.cursor.state,
'start_time': self.cursor.submission_date_time,
'std_out': self.cursor.std_out_s3_uri,
'std_error': self.cursor.std_error_s3_uri
}
self.cell_results.append(cell_result)
return result
def get_execution_summary(self):
"""Get summary of all cell executions."""
for result in self.cell_results:
print(f"Cell: {result['name']}")
print(f" State: {result['state']}")
print(f" Calculation ID: {result['calculation_id']}")
if result['std_out']:
print(f" Output: {result['std_out']}")
def close(self):
self.cursor.close()
self.conn.close()
# Example notebook workflow
def notebook_example():
conn = connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2",
cursor_class=SparkCursor,
work_group="spark-workgroup"
)
notebook = JupyterSparkNotebook(conn)
# Cell 1: Setup and data loading
notebook.execute_cell("""
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NotebookExample").getOrCreate()
# Load data
df = spark.read.parquet("s3://my-bucket/data/transactions/")
print(f"Loaded {df.count()} transactions")
df.printSchema()
""", "Data Loading")
# Cell 2: Data exploration
notebook.execute_cell("""
# Basic statistics
df.describe().show()
# Check for nulls
from pyspark.sql.functions import col, isnan, when, count
df.select([count(when(col(c).isNull() | isnan(col(c)), c)).alias(c) for c in df.columns]).show()
""", "Data Exploration")
# Cell 3: Analysis
notebook.execute_cell("""
from pyspark.sql.functions import sum, avg, max, min, count, date_format
# Monthly analysis
monthly_stats = df.groupBy(date_format("transaction_date", "yyyy-MM").alias("month")) \\
.agg(
sum("amount").alias("total_amount"),
avg("amount").alias("avg_amount"),
count("*").alias("transaction_count")
) \\
.orderBy("month")
monthly_stats.show()
# Save results
monthly_stats.write.mode("overwrite").parquet("s3://my-bucket/results/monthly_analysis/")
""", "Monthly Analysis")
# Get execution summary
notebook.get_execution_summary()
notebook.close()
notebook_example()import asyncio
from pyathena import connect
from pyathena.spark.async_cursor import AsyncSparkCursor
async def async_spark_operations():
conn = connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2",
cursor_class=AsyncSparkCursor,
work_group="spark-workgroup"
)
cursor = conn.cursor()
# Execute multiple Spark jobs concurrently
jobs = [
("data_quality", """
spark = SparkSession.builder.appName("DataQuality").getOrCreate()
df = spark.read.parquet("s3://my-bucket/data/raw/")
quality_report = df.agg(*[count(when(col(c).isNull(), c)).alias(f"{c}_nulls") for c in df.columns])
quality_report.write.mode("overwrite").json("s3://my-bucket/reports/data_quality/")
"""),
("aggregation", """
spark = SparkSession.builder.appName("Aggregation").getOrCreate()
df = spark.read.parquet("s3://my-bucket/data/processed/")
summary = df.groupBy("category").agg(sum("amount"), count("*"))
summary.write.mode("overwrite").parquet("s3://my-bucket/results/category_summary/")
"""),
("feature_engineering", """
spark = SparkSession.builder.appName("FeatureEngineering").getOrCreate()
df = spark.read.parquet("s3://my-bucket/data/customers/")
from pyspark.ml.feature import Bucketizer, VectorAssembler
# Add feature engineering logic here
features = VectorAssembler(inputCols=["age", "income"], outputCol="features").transform(df)
features.write.mode("overwrite").parquet("s3://my-bucket/features/customer_features/")
""")
]
# Start all jobs
running_jobs = {}
for name, code in jobs:
calc_id, future = cursor.execute(code)
running_jobs[name] = {
'calculation_id': calc_id,
'future': future
}
print(f"Started {name} job (ID: {calc_id})")
# Wait for all jobs to complete
for name, job_info in running_jobs.items():
try:
result = await job_info['future']
print(f"✓ {name} completed successfully")
print(f" State: {result.state}")
if result.dpu_execution_in_millis:
print(f" Execution time: {result.dpu_execution_in_millis}ms")
except Exception as e:
print(f"✗ {name} failed: {e}")
cursor.close()
conn.close()
# Run async Spark operations
asyncio.run(async_spark_operations())from pyathena import connect
from pyathena.spark.cursor import SparkCursor
def optimized_spark_job():
# Connection with Spark-specific configuration
conn = connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2",
cursor_class=SparkCursor,
work_group="spark-workgroup"
)
cursor = conn.cursor()
# Optimized Spark configuration
optimized_code = """
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# Configure Spark for performance
spark = SparkSession.builder \\
.appName("OptimizedAnalytics") \\
.config("spark.sql.adaptive.enabled", "true") \\
.config("spark.sql.adaptive.coalescePartitions.enabled", "true") \\
.config("spark.sql.adaptive.skewJoin.enabled", "true") \\
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \\
.getOrCreate()
# Enable dynamic partition pruning
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true")
# Read large dataset with partitioning
large_df = spark.read.option("mergeSchema", "true") \\
.parquet("s3://my-bucket/data/large_dataset/")
print(f"Dataset partitions: {large_df.rdd.getNumPartitions()}")
# Efficient aggregation with broadcast hint
lookup_df = spark.read.parquet("s3://my-bucket/data/lookup_table/")
broadcast_lookup = broadcast(lookup_df)
# Join with broadcast hint for small lookup table
result = large_df.join(broadcast_lookup, "key") \\
.groupBy("category", "region") \\
.agg(
sum("amount").alias("total_amount"),
count("*").alias("record_count"),
avg("amount").alias("avg_amount")
) \\
.cache() # Cache for multiple actions
# Write with optimal partitioning
result.coalesce(100) \\
.write \\
.mode("overwrite") \\
.option("compression", "snappy") \\
.partitionBy("region") \\
.parquet("s3://my-bucket/results/optimized_output/")
# Show execution plan
result.explain(True)
spark.stop()
"""
cursor.execute(optimized_code)
print("Optimized Spark job started")
print(f"Session ID: {cursor.session_id}")
print(f"Working directory: {cursor.working_directory}")
cursor.close()
conn.close()
optimized_spark_job()To use Spark integration with PyAthena:
Install with Tessl CLI
npx tessl i tessl/pypi-pyathena