CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyathena

Python DB API 2.0 (PEP 249) client for Amazon Athena

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

spark-integration.mddocs/

Spark Integration

Integration with Athena's Spark execution engine for distributed processing, Jupyter notebook compatibility, and advanced analytics workloads. Enables running Spark code directly on Athena's serverless Spark engine.

Capabilities

Spark Cursor

Cursor for executing Spark calculations on Athena's Spark engine, providing distributed processing capabilities and integration with Jupyter notebooks.

class SparkCursor:
    session_id: str
    calculation_id: Optional[str]
    description: Optional[str]
    working_directory: Optional[str]
    state: Optional[str]
    state_change_reason: Optional[str]
    submission_date_time: Optional[datetime]
    completion_date_time: Optional[datetime]
    dpu_execution_in_millis: Optional[int]
    progress: Optional[str]
    std_out_s3_uri: Optional[str]
    std_error_s3_uri: Optional[str]
    result_s3_uri: Optional[str]
    result_type: Optional[str]
    
    def execute(self, code: str, **kwargs) -> SparkCursor:
        """
        Execute Spark code on Athena's Spark engine.
        
        Parameters:
        - code: Spark code to execute (Python, Scala, or SQL)
        - **kwargs: Additional execution options
        
        Returns:
        Self for method chaining
        """
    
    def cancel(self) -> None:
        """Cancel the currently executing Spark calculation."""
    
    def close(self) -> None:
        """Close the cursor and clean up resources."""
    
    @property
    def calculation_execution(self) -> Optional[AthenaCalculationExecution]:
        """Get the current calculation execution metadata."""

Async Spark Cursor

Asynchronous version of SparkCursor for non-blocking Spark calculations.

class AsyncSparkCursor:
    def execute(self, code: str, **kwargs) -> Tuple[str, Future[AthenaCalculationExecution]]:
        """
        Execute Spark code asynchronously.
        
        Parameters:
        - code: Spark code to execute
        
        Returns:
        Tuple of (calculation_id, Future[AthenaCalculationExecution])
        """
    
    def cancel(self, calculation_id: str) -> Future[None]:
        """Cancel Spark calculation by ID asynchronously."""
    
    def close(self, wait: bool = False) -> None:
        """Close cursor, optionally waiting for running calculations."""

Spark Calculation Models

Models representing Spark calculation execution and status information.

class AthenaCalculationExecution:
    # Calculation states
    STATE_CREATING: str = "CREATING"
    STATE_CREATED: str = "CREATED" 
    STATE_QUEUED: str = "QUEUED"
    STATE_RUNNING: str = "RUNNING"
    STATE_CANCELLING: str = "CANCELLING"
    STATE_CANCELLED: str = "CANCELLED"
    STATE_COMPLETED: str = "COMPLETED"
    STATE_FAILED: str = "FAILED"
    
    calculation_execution_id: Optional[str]
    session_id: Optional[str]
    description: Optional[str]
    working_directory: Optional[str]
    state: Optional[str]
    state_change_reason: Optional[str]
    submission_date_time: Optional[datetime]
    completion_date_time: Optional[datetime]
    dpu_execution_in_millis: Optional[int]
    progress: Optional[str]
    std_out_s3_uri: Optional[str]
    std_error_s3_uri: Optional[str]
    result_s3_uri: Optional[str]
    result_type: Optional[str]

class AthenaSessionStatus:
    session_id: Optional[str]
    description: Optional[str]
    working_directory: Optional[str]
    idle_since_date_time: Optional[datetime]
    last_modified_date_time: Optional[datetime]
    termination_date_time: Optional[datetime]
    notebook_version: Optional[str]
    session_configuration: Optional[Dict]
    status: Optional[Dict]

Usage Examples

Basic Spark Code Execution

from pyathena import connect
from pyathena.spark.cursor import SparkCursor

# Connect with Spark cursor
conn = connect(
    s3_staging_dir="s3://my-bucket/athena-results/",
    region_name="us-west-2",
    cursor_class=SparkCursor,
    work_group="spark-workgroup"  # Must be configured for Spark
)

cursor = conn.cursor()

# Execute Spark Python code
spark_code = """
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("AthenaSparkExample").getOrCreate()

# Read data from S3
df = spark.read.parquet("s3://my-bucket/data/sales/")

# Perform transformations
result = df.groupBy("product_category") \\
    .agg({"amount": "sum", "quantity": "count"}) \\
    .orderBy("sum(amount)", ascending=False)

result.show()

# Save results
result.write.mode("overwrite").parquet("s3://my-bucket/results/category_summary/")
"""

cursor.execute(spark_code)

# Check execution status
print(f"Session ID: {cursor.session_id}")
print(f"Calculation ID: {cursor.calculation_id}")
print(f"State: {cursor.state}")
print(f"Progress: {cursor.progress}")

# Get output and error logs
if cursor.std_out_s3_uri:
    print(f"Output logs: {cursor.std_out_s3_uri}")
if cursor.std_error_s3_uri:
    print(f"Error logs: {cursor.std_error_s3_uri}")

cursor.close()
conn.close()

Spark SQL Execution

from pyathena import connect
from pyathena.spark.cursor import SparkCursor

conn = connect(
    s3_staging_dir="s3://my-bucket/athena-results/",
    region_name="us-west-2",
    cursor_class=SparkCursor,
    work_group="spark-workgroup"
)

cursor = conn.cursor()

# Execute Spark SQL
spark_sql = """
-- Create temporary view from S3 data
CREATE OR REPLACE TEMPORARY VIEW sales 
USING PARQUET
OPTIONS (
  path "s3://my-bucket/data/sales/"
);

-- Perform complex analytics
WITH monthly_metrics AS (
  SELECT 
    DATE_FORMAT(sale_date, 'yyyy-MM') as sale_month,
    product_category,
    SUM(amount) as total_revenue,
    COUNT(*) as transaction_count,
    COUNT(DISTINCT customer_id) as unique_customers
  FROM sales
  WHERE sale_date >= '2023-01-01'
  GROUP BY DATE_FORMAT(sale_date, 'yyyy-MM'), product_category
),
category_trends AS (
  SELECT 
    product_category,
    sale_month,
    total_revenue,
    LAG(total_revenue) OVER (
      PARTITION BY product_category 
      ORDER BY sale_month
    ) as prev_month_revenue,
    total_revenue - LAG(total_revenue) OVER (
      PARTITION BY product_category 
      ORDER BY sale_month
    ) as revenue_change
  FROM monthly_metrics
)
SELECT 
  product_category,
  sale_month,
  total_revenue,
  revenue_change,
  CASE 
    WHEN revenue_change > 0 THEN 'Growth'
    WHEN revenue_change < 0 THEN 'Decline'
    ELSE 'Stable'
  END as trend
FROM category_trends
WHERE prev_month_revenue IS NOT NULL
ORDER BY product_category, sale_month;
"""

cursor.execute(spark_sql)

print(f"Spark SQL execution started")
print(f"Calculation ID: {cursor.calculation_id}")
print(f"Working directory: {cursor.working_directory}")

cursor.close()
conn.close()

Advanced Spark Analytics

from pyathena import connect
from pyathena.spark.cursor import SparkCursor

def spark_ml_pipeline():
    conn = connect(
        s3_staging_dir="s3://my-bucket/athena-results/",
        region_name="us-west-2",
        cursor_class=SparkCursor,
        work_group="spark-workgroup"
    )
    
    cursor = conn.cursor()
    
    # Machine learning pipeline with Spark MLlib
    ml_code = """
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
from pyspark.sql.functions import col, when, isnan, count

spark = SparkSession.builder.appName("CustomerSegmentation").getOrCreate()

# Load customer data
customers = spark.read.parquet("s3://my-bucket/data/customers/")

# Data preprocessing
print("Original dataset shape:", customers.count(), len(customers.columns))

# Handle missing values
customers_clean = customers.dropna()
print("After removing nulls:", customers_clean.count())

# Feature engineering
feature_cols = ["age", "annual_income", "spending_score", "total_orders", "avg_order_value"]

# Create feature vector
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
customers_features = assembler.transform(customers_clean)

# Scale features
scaler = StandardScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(customers_features)
customers_scaled = scaler_model.transform(customers_features)

# K-means clustering
kmeans = KMeans(featuresCol="scaled_features", predictionCol="cluster", k=5, seed=42)
model = kmeans.fit(customers_scaled)

# Make predictions
predictions = model.transform(customers_scaled)

# Evaluate clustering
evaluator = ClusteringEvaluator(featuresCol="scaled_features", predictionCol="cluster")
silhouette = evaluator.evaluate(predictions)
print(f"Silhouette Score: {silhouette}")

# Analyze clusters
cluster_summary = predictions.groupBy("cluster").agg(
    count("*").alias("cluster_size"),
    avg("age").alias("avg_age"),
    avg("annual_income").alias("avg_income"),
    avg("spending_score").alias("avg_spending_score"),
    avg("total_orders").alias("avg_orders")
)

cluster_summary.show()

# Save results
predictions.select("customer_id", "cluster", *feature_cols).write.mode("overwrite").parquet("s3://my-bucket/results/customer_segments/")

# Save cluster centers
centers_df = spark.createDataFrame([(i, center.toArray().tolist()) for i, center in enumerate(model.clusterCenters())], 
                                 ["cluster_id", "center_coordinates"])
centers_df.write.mode("overwrite").json("s3://my-bucket/results/cluster_centers/")

spark.stop()
"""
    
    cursor.execute(ml_code)
    
    print("ML pipeline started")
    print(f"Session: {cursor.session_id}")
    print(f"Calculation: {cursor.calculation_id}")
    
    # Monitor progress
    while cursor.state in ['CREATING', 'CREATED', 'QUEUED', 'RUNNING']:
        time.sleep(10)
        # In practice, you'd poll the status here
        print(f"Current state: {cursor.state}")
        if cursor.progress:
            print(f"Progress: {cursor.progress}")
    
    print(f"Final state: {cursor.state}")
    if cursor.state == 'COMPLETED':
        print("ML pipeline completed successfully!")
        print(f"Results saved to: s3://my-bucket/results/customer_segments/")
    else:
        print(f"Pipeline failed: {cursor.state_change_reason}")
    
    cursor.close()
    conn.close()

# Run ML pipeline
spark_ml_pipeline()

Jupyter Notebook Integration

from pyathena import connect
from pyathena.spark.cursor import SparkCursor

class JupyterSparkNotebook:
    """Integration with Jupyter-style notebook execution."""
    
    def __init__(self, connection):
        self.conn = connection
        self.cursor = connection.cursor()
        self.cell_results = []
    
    def execute_cell(self, cell_code, cell_name=None):
        """Execute a notebook cell."""
        print(f"Executing cell: {cell_name or 'Unnamed'}")
        
        result = self.cursor.execute(cell_code)
        
        cell_result = {
            'name': cell_name,
            'calculation_id': self.cursor.calculation_id,
            'state': self.cursor.state,
            'start_time': self.cursor.submission_date_time,
            'std_out': self.cursor.std_out_s3_uri,
            'std_error': self.cursor.std_error_s3_uri
        }
        
        self.cell_results.append(cell_result)
        return result
    
    def get_execution_summary(self):
        """Get summary of all cell executions."""
        for result in self.cell_results:
            print(f"Cell: {result['name']}")
            print(f"  State: {result['state']}")
            print(f"  Calculation ID: {result['calculation_id']}")
            if result['std_out']:
                print(f"  Output: {result['std_out']}")
    
    def close(self):
        self.cursor.close()
        self.conn.close()

# Example notebook workflow
def notebook_example():
    conn = connect(
        s3_staging_dir="s3://my-bucket/athena-results/",
        region_name="us-west-2",
        cursor_class=SparkCursor,
        work_group="spark-workgroup"
    )
    
    notebook = JupyterSparkNotebook(conn)
    
    # Cell 1: Setup and data loading
    notebook.execute_cell("""
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("NotebookExample").getOrCreate()

# Load data
df = spark.read.parquet("s3://my-bucket/data/transactions/")
print(f"Loaded {df.count()} transactions")
df.printSchema()
""", "Data Loading")
    
    # Cell 2: Data exploration
    notebook.execute_cell("""
# Basic statistics
df.describe().show()

# Check for nulls
from pyspark.sql.functions import col, isnan, when, count
df.select([count(when(col(c).isNull() | isnan(col(c)), c)).alias(c) for c in df.columns]).show()
""", "Data Exploration")
    
    # Cell 3: Analysis
    notebook.execute_cell("""
from pyspark.sql.functions import sum, avg, max, min, count, date_format

# Monthly analysis
monthly_stats = df.groupBy(date_format("transaction_date", "yyyy-MM").alias("month")) \\
    .agg(
        sum("amount").alias("total_amount"),
        avg("amount").alias("avg_amount"),
        count("*").alias("transaction_count")
    ) \\
    .orderBy("month")

monthly_stats.show()

# Save results
monthly_stats.write.mode("overwrite").parquet("s3://my-bucket/results/monthly_analysis/")
""", "Monthly Analysis")
    
    # Get execution summary
    notebook.get_execution_summary()
    notebook.close()

notebook_example()

Async Spark Operations

import asyncio
from pyathena import connect
from pyathena.spark.async_cursor import AsyncSparkCursor

async def async_spark_operations():
    conn = connect(
        s3_staging_dir="s3://my-bucket/athena-results/",
        region_name="us-west-2",
        cursor_class=AsyncSparkCursor,
        work_group="spark-workgroup"
    )
    
    cursor = conn.cursor()
    
    # Execute multiple Spark jobs concurrently
    jobs = [
        ("data_quality", """
spark = SparkSession.builder.appName("DataQuality").getOrCreate()
df = spark.read.parquet("s3://my-bucket/data/raw/")
quality_report = df.agg(*[count(when(col(c).isNull(), c)).alias(f"{c}_nulls") for c in df.columns])
quality_report.write.mode("overwrite").json("s3://my-bucket/reports/data_quality/")
"""),
        ("aggregation", """
spark = SparkSession.builder.appName("Aggregation").getOrCreate()
df = spark.read.parquet("s3://my-bucket/data/processed/")
summary = df.groupBy("category").agg(sum("amount"), count("*"))
summary.write.mode("overwrite").parquet("s3://my-bucket/results/category_summary/")
"""),
        ("feature_engineering", """
spark = SparkSession.builder.appName("FeatureEngineering").getOrCreate()
df = spark.read.parquet("s3://my-bucket/data/customers/")
from pyspark.ml.feature import Bucketizer, VectorAssembler
# Add feature engineering logic here
features = VectorAssembler(inputCols=["age", "income"], outputCol="features").transform(df)
features.write.mode("overwrite").parquet("s3://my-bucket/features/customer_features/")
""")
    ]
    
    # Start all jobs
    running_jobs = {}
    for name, code in jobs:
        calc_id, future = cursor.execute(code)
        running_jobs[name] = {
            'calculation_id': calc_id,
            'future': future
        }
        print(f"Started {name} job (ID: {calc_id})")
    
    # Wait for all jobs to complete
    for name, job_info in running_jobs.items():
        try:
            result = await job_info['future']
            print(f"✓ {name} completed successfully")
            print(f"  State: {result.state}")
            if result.dpu_execution_in_millis:
                print(f"  Execution time: {result.dpu_execution_in_millis}ms")
        except Exception as e:
            print(f"✗ {name} failed: {e}")
    
    cursor.close()
    conn.close()

# Run async Spark operations
asyncio.run(async_spark_operations())

Spark Configuration and Optimization

from pyathena import connect
from pyathena.spark.cursor import SparkCursor

def optimized_spark_job():
    # Connection with Spark-specific configuration
    conn = connect(
        s3_staging_dir="s3://my-bucket/athena-results/",
        region_name="us-west-2",
        cursor_class=SparkCursor,
        work_group="spark-workgroup"
    )
    
    cursor = conn.cursor()
    
    # Optimized Spark configuration
    optimized_code = """
from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# Configure Spark for performance
spark = SparkSession.builder \\
    .appName("OptimizedAnalytics") \\
    .config("spark.sql.adaptive.enabled", "true") \\
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \\
    .config("spark.sql.adaptive.skewJoin.enabled", "true") \\
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \\
    .getOrCreate()

# Enable dynamic partition pruning
spark.conf.set("spark.sql.optimizer.dynamicPartitionPruning.enabled", "true")

# Read large dataset with partitioning
large_df = spark.read.option("mergeSchema", "true") \\
    .parquet("s3://my-bucket/data/large_dataset/")

print(f"Dataset partitions: {large_df.rdd.getNumPartitions()}")

# Efficient aggregation with broadcast hint
lookup_df = spark.read.parquet("s3://my-bucket/data/lookup_table/")
broadcast_lookup = broadcast(lookup_df)

# Join with broadcast hint for small lookup table
result = large_df.join(broadcast_lookup, "key") \\
    .groupBy("category", "region") \\
    .agg(
        sum("amount").alias("total_amount"),
        count("*").alias("record_count"),
        avg("amount").alias("avg_amount")
    ) \\
    .cache()  # Cache for multiple actions

# Write with optimal partitioning
result.coalesce(100) \\
    .write \\
    .mode("overwrite") \\
    .option("compression", "snappy") \\
    .partitionBy("region") \\
    .parquet("s3://my-bucket/results/optimized_output/")

# Show execution plan
result.explain(True)

spark.stop()
"""
    
    cursor.execute(optimized_code)
    
    print("Optimized Spark job started")
    print(f"Session ID: {cursor.session_id}")
    print(f"Working directory: {cursor.working_directory}")
    
    cursor.close()
    conn.close()

optimized_spark_job()

Configuration Requirements

To use Spark integration with PyAthena:

  1. Workgroup Configuration: Your Athena workgroup must be configured for Spark
  2. IAM Permissions: Required permissions for Spark execution and S3 access
  3. S3 Working Directory: Configured S3 location for Spark session files
  4. Engine Version: Compatible Athena engine version with Spark support

Supported Spark Features

  • PySpark: Python API for Spark
  • Spark SQL: SQL interface for Spark
  • MLlib: Machine learning library
  • Structured Streaming: Stream processing (limited support)
  • DataFrame API: High-level DataFrame operations
  • RDD API: Low-level resilient distributed datasets

Performance Considerations

  • Use appropriate cluster sizing for your workgroup
  • Leverage Spark's adaptive query execution
  • Use broadcast joins for small lookup tables
  • Partition large datasets appropriately
  • Cache intermediate results when accessed multiple times
  • Use columnar formats (Parquet) for better performance

Install with Tessl CLI

npx tessl i tessl/pypi-pyathena

docs

arrow-integration.md

async-operations.md

core-database.md

index.md

pandas-integration.md

spark-integration.md

sqlalchemy-integration.md

tile.json