CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pyathena

Python DB API 2.0 (PEP 249) client for Amazon Athena

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

arrow-integration.mddocs/

PyArrow Integration

Native PyArrow Table support for columnar data processing, providing optimal performance for analytical workloads and seamless integration with the Arrow ecosystem. Ideal for high-performance analytics and data interchange.

Installation

pip install PyAthena[Arrow]

Capabilities

Arrow Cursor

Cursor that returns query results as PyArrow Tables, providing columnar data format optimized for analytical operations and memory efficiency.

class ArrowCursor:
    arraysize: int
    description: Optional[List[Tuple]]
    rowcount: int
    
    def execute(self, operation: str, parameters=None, **kwargs) -> ArrowCursor:
        """
        Execute a SQL statement with Arrow Table result processing.
        
        Parameters:
        - operation: SQL query string
        - parameters: Query parameters (dict or sequence)
        - **kwargs: Additional execution options
        
        Returns:
        Self for method chaining
        """
    
    def fetchone(self) -> Optional[Table]:
        """
        Fetch the next chunk as an Arrow Table.
        
        Returns:
        Arrow Table with single chunk or None if no more data
        """
    
    def fetchmany(self, size: Optional[int] = None) -> Table:
        """
        Fetch multiple rows as an Arrow Table.
        
        Parameters:
        - size: Number of rows to fetch (default: arraysize)
        
        Returns:
        Arrow Table containing the requested rows
        """
    
    def fetchall(self) -> Table:
        """
        Fetch all remaining rows as a single Arrow Table.
        
        Returns:
        Arrow Table containing all remaining rows
        """
    
    def as_arrow(self) -> Table:
        """
        Return results as Arrow Table.
        
        Returns:
        PyArrow Table with all query results
        """
    
    def cancel(self) -> None:
        """Cancel the currently executing query."""
    
    def close(self) -> None:
        """Close the cursor and free resources."""

Async Arrow Cursor

Asynchronous version of ArrowCursor for non-blocking operations with Future-based API.

class AsyncArrowCursor:
    def execute(self, operation: str, parameters=None, **kwargs) -> Tuple[str, Future[Table]]:
        """
        Execute query asynchronously returning query ID and Future.
        
        Parameters:
        - operation: SQL query string
        - parameters: Query parameters
        
        Returns:
        Tuple of (query_id, Future[Table])
        """
    
    def cancel(self, query_id: str) -> Future[None]:
        """Cancel query by ID asynchronously."""
    
    def close(self, wait: bool = False) -> None:
        """Close cursor, optionally waiting for running queries."""

Arrow Result Set

Specialized result set class optimized for PyArrow Table creation with efficient columnar data processing.

class AthenaArrowResultSet:
    def as_arrow(self) -> Table:
        """Convert result set to PyArrow Table."""
    
    def fetchone_arrow(self) -> Optional[Table]:
        """Fetch single chunk as Arrow Table."""
    
    def fetchmany_arrow(self, size: int) -> Table:
        """Fetch multiple rows as Arrow Table."""
    
    def fetchall_arrow(self) -> Table:
        """Fetch all rows as Arrow Table."""

Usage Examples

Basic Arrow Table Query

from pyathena import connect
from pyathena.arrow.cursor import ArrowCursor
import pyarrow as pa

# Connect with Arrow cursor
conn = connect(
    s3_staging_dir="s3://my-bucket/athena-results/",
    region_name="us-west-2",
    cursor_class=ArrowCursor
)

cursor = conn.cursor()
cursor.execute("SELECT * FROM sales_data WHERE year = 2023")

# Get results as Arrow Table
table = cursor.fetchall()
print(f"Table shape: {table.shape}")
print(f"Columns: {table.column_names}")
print(f"Schema: {table.schema}")

# Access column data
revenue_column = table.column('revenue')
print(f"Revenue column type: {revenue_column.type}")
print(f"Revenue sum: {pa.compute.sum(revenue_column)}")

cursor.close()
conn.close()

High-Performance Analytics

from pyathena import connect
from pyathena.arrow.cursor import ArrowCursor
import pyarrow as pa
import pyarrow.compute as pc

conn = connect(
    s3_staging_dir="s3://my-bucket/athena-results/",
    region_name="us-west-2",
    cursor_class=ArrowCursor
)

cursor = conn.cursor()

# Complex analytical query
query = """
SELECT 
    product_id,
    sale_date,
    quantity,
    unit_price,
    quantity * unit_price as total_amount,
    customer_segment
FROM sales_data
WHERE sale_date >= DATE '2023-01-01'
"""

cursor.execute(query)
table = cursor.fetchall()

# High-performance columnar operations
print("Performing columnar analytics...")

# Aggregations using Arrow compute functions
total_revenue = pc.sum(table.column('total_amount'))
avg_order_size = pc.mean(table.column('total_amount'))
max_quantity = pc.max(table.column('quantity'))

print(f"Total Revenue: ${total_revenue.as_py():,.2f}")
print(f"Average Order Size: ${avg_order_size.as_py():.2f}")
print(f"Max Quantity: {max_quantity.as_py()}")

# Group by operations
grouped = table.group_by('customer_segment').aggregate([
    ('total_amount', 'sum'),
    ('quantity', 'mean'),
    ('product_id', 'count')
])

print("\nRevenue by Customer Segment:")
for i in range(len(grouped)):
    segment = grouped.column('customer_segment')[i].as_py()
    revenue = grouped.column('total_amount_sum')[i].as_py()
    print(f"{segment}: ${revenue:,.2f}")

cursor.close()
conn.close()

Columnar Data Processing Pipeline

from pyathena import connect
from pyathena.arrow.cursor import ArrowCursor
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.parquet as pq

def process_sales_data():
    conn = connect(
        s3_staging_dir="s3://my-bucket/athena-results/",
        region_name="us-west-2",
        cursor_class=ArrowCursor
    )
    
    cursor = conn.cursor()
    
    # Extract data
    cursor.execute("""
        SELECT 
            customer_id,
            product_category,
            sale_amount,
            sale_date,
            region
        FROM sales_transactions
        WHERE sale_date >= CURRENT_DATE - INTERVAL '30' DAY
    """)
    
    table = cursor.fetchall()
    
    # Data transformations using Arrow
    # Add computed columns
    table = table.add_column(
        len(table.column_names),
        'month',
        pc.month(table.column('sale_date'))
    )
    
    # Filter operations
    high_value_sales = pc.filter(
        table,
        pc.greater(table.column('sale_amount'), pa.scalar(1000))
    )
    
    print(f"High value sales count: {len(high_value_sales)}")
    
    # Export to various formats
    # Parquet
    pq.write_table(high_value_sales, 'high_value_sales.parquet')
    
    # CSV
    high_value_sales.to_pandas().to_csv('high_value_sales.csv', index=False)
    
    # JSON
    with open('high_value_sales.json', 'w') as f:
        f.write(high_value_sales.to_pandas().to_json(orient='records'))
    
    cursor.close()
    conn.close()
    
    return high_value_sales

# Process data
result_table = process_sales_data()
print(f"Processed {len(result_table)} high-value sales records")

Integration with Arrow Ecosystem

from pyathena import connect
from pyathena.arrow.cursor import ArrowCursor
import pyarrow as pa
import pyarrow.compute as pc
import pyarrow.dataset as ds
import pyarrow.flight as flight

# Data extraction from Athena
conn = connect(
    s3_staging_dir="s3://my-bucket/athena-results/",
    region_name="us-west-2",
    cursor_class=ArrowCursor
)

cursor = conn.cursor()
cursor.execute("SELECT * FROM customer_analytics")
customer_table = cursor.fetchall()

# Create Arrow dataset for efficient querying
dataset = ds.InMemoryDataset(customer_table)

# Advanced filtering and projection
filtered_data = dataset.to_table(
    filter=pc.and_(
        pc.greater(ds.field('age'), pa.scalar(25)),
        pc.equal(ds.field('active'), pa.scalar(True))
    ),
    columns=['customer_id', 'age', 'total_spend', 'last_purchase_date']
)

print(f"Filtered customers: {len(filtered_data)}")

# Statistical analysis
stats = {
    'mean_age': pc.mean(filtered_data.column('age')),
    'mean_spend': pc.mean(filtered_data.column('total_spend')),
    'total_customers': len(filtered_data)
}

for metric, value in stats.items():
    print(f"{metric}: {value.as_py()}")

cursor.close()
conn.close()

Memory-Efficient Streaming

from pyathena import connect
from pyathena.arrow.cursor import ArrowCursor
import pyarrow as pa

def stream_process_large_dataset():
    conn = connect(
        s3_staging_dir="s3://my-bucket/athena-results/",
        region_name="us-west-2",
        cursor_class=ArrowCursor
    )
    
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM large_transaction_table")
    
    # Stream processing for memory efficiency
    batch_size = 10000
    total_processed = 0
    running_sum = 0
    
    while True:
        batch = cursor.fetchmany(batch_size)
        if len(batch) == 0:
            break
        
        # Process batch
        batch_sum = pa.compute.sum(batch.column('amount')).as_py()
        running_sum += batch_sum
        total_processed += len(batch)
        
        print(f"Processed {total_processed} rows, running sum: ${running_sum:,.2f}")
    
    print(f"Final total: ${running_sum:,.2f} from {total_processed} transactions")
    
    cursor.close()
    conn.close()

stream_process_large_dataset()

Async Arrow Processing

import asyncio
from pyathena import connect
from pyathena.arrow.async_cursor import AsyncArrowCursor
import pyarrow as pa
import pyarrow.compute as pc

async def concurrent_arrow_queries():
    conn = connect(
        s3_staging_dir="s3://my-bucket/athena-results/",
        region_name="us-west-2",
        cursor_class=AsyncArrowCursor
    )
    
    cursor = conn.cursor()
    
    # Multiple analytical queries
    queries = [
        ("daily_sales", "SELECT sale_date, SUM(amount) as daily_total FROM sales GROUP BY sale_date"),
        ("product_metrics", "SELECT product_id, COUNT(*) as sales_count FROM sales GROUP BY product_id"),
        ("customer_stats", "SELECT customer_segment, AVG(amount) as avg_spend FROM sales GROUP BY customer_segment")
    ]
    
    # Execute all queries concurrently
    futures = {}
    for name, query in queries:
        query_id, future = cursor.execute(query)
        futures[name] = future
    
    # Collect results
    results = {}
    for name, future in futures.items():
        table = await future
        results[name] = table
        print(f"{name}: {len(table)} rows")
    
    # Perform cross-table analytics
    total_daily_sales = pc.sum(results['daily_sales'].column('daily_total'))
    unique_products = len(results['product_metrics'])
    
    print(f"Total sales across all days: ${total_daily_sales.as_py():,.2f}")
    print(f"Unique products sold: {unique_products}")
    
    cursor.close()
    conn.close()

# Run async processing
asyncio.run(concurrent_arrow_queries())

Type Conversion

PyAthena maps Athena data types to appropriate Arrow types:

  • booleanpa.bool_()
  • tinyintpa.int8()
  • smallintpa.int16()
  • integerpa.int32()
  • bigintpa.int64()
  • realpa.float32()
  • doublepa.float64()
  • decimalpa.decimal128() or pa.decimal256()
  • varchar, charpa.string()
  • datepa.date32()
  • timestamppa.timestamp('ns')
  • arraypa.list_()
  • mappa.map_()
  • rowpa.struct()

Performance Benefits

Arrow integration provides several performance advantages:

  • Columnar Storage: Efficient memory layout for analytical operations
  • Zero-Copy Operations: Minimal data copying during transformations
  • Vectorized Computing: SIMD-optimized operations via Arrow compute functions
  • Memory Efficiency: Compact representation of typed data
  • Interoperability: Seamless integration with other Arrow-based tools
  • Parallel Processing: Built-in support for parallel operations

Arrow Ecosystem Integration

PyAthena's Arrow support enables integration with:

  • Apache Arrow: Core columnar processing capabilities
  • PyArrow: Python Arrow bindings and computation kernels
  • Arrow Flight: High-performance data transport
  • Parquet: Efficient columnar file format
  • Pandas: Convert to/from DataFrames when needed
  • Polars: High-performance DataFrame library
  • DuckDB: In-process analytical database

Install with Tessl CLI

npx tessl i tessl/pypi-pyathena

docs

arrow-integration.md

async-operations.md

core-database.md

index.md

pandas-integration.md

spark-integration.md

sqlalchemy-integration.md

tile.json