Python DB API 2.0 (PEP 249) client for Amazon Athena
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Full async/await support with Future-based API for non-blocking query execution, enabling concurrent query processing and integration with async frameworks. Ideal for applications requiring high concurrency and responsive user interfaces.
Asynchronous cursor providing non-blocking query execution with Future-based result handling, allowing multiple concurrent queries and integration with async/await patterns.
class AsyncCursor:
arraysize: int
max_workers: int
def execute(
self,
operation: str,
parameters: Optional[Union[Dict[str, Any], List[str]]] = None,
work_group: Optional[str] = None,
s3_staging_dir: Optional[str] = None,
cache_size: Optional[int] = 0,
cache_expiration_time: Optional[int] = 0,
result_reuse_enable: Optional[bool] = None,
result_reuse_minutes: Optional[int] = None,
paramstyle: Optional[str] = None,
**kwargs
) -> Tuple[str, Future[Union[AthenaResultSet, Any]]]:
"""
Execute a SQL statement asynchronously.
Parameters:
- operation: SQL query string
- parameters: Query parameters (dict or sequence)
- work_group: Athena workgroup for execution
- s3_staging_dir: S3 location for query results
- cache_size: Query result cache size
- cache_expiration_time: Cache expiration time in seconds
- result_reuse_enable: Enable query result reuse
- result_reuse_minutes: Result reuse duration in minutes
- paramstyle: Parameter substitution style
- **kwargs: Additional execution options
Returns:
Tuple of (query_id, Future[AthenaResultSet]) for result handling
"""
def executemany(
self,
operation: str,
seq_of_parameters: List[Optional[Union[Dict[str, Any], List[str]]]],
**kwargs
) -> None:
"""
Execute multiple statements (not supported).
Raises:
NotSupportedError: Always raised as async executemany is not supported
"""
def cancel(self, query_id: str) -> Future[None]:
"""
Cancel a running query by query ID.
Parameters:
- query_id: ID of query to cancel
Returns:
Future that completes when cancellation is processed
"""
def description(self, query_id: str) -> Future[Optional[List[Tuple[str, str, None, None, int, int, str]]]]:
"""
Get column description for a query asynchronously.
Parameters:
- query_id: Query execution ID
Returns:
Future containing column metadata as list of tuples with
(name, type_code, display_size, internal_size, precision, scale, null_ok)
"""
def query_execution(self, query_id: str) -> Future[AthenaQueryExecution]:
"""
Get query execution metadata asynchronously.
Parameters:
- query_id: Query execution ID
Returns:
Future[AthenaQueryExecution] with execution details
"""
def poll(self, query_id: str) -> Future[AthenaQueryExecution]:
"""
Poll query execution status asynchronously.
Parameters:
- query_id: Query execution ID
Returns:
Future[AthenaQueryExecution] with current status
"""
def close(self, wait: bool = False) -> None:
"""
Close cursor and shutdown thread pool executor.
Parameters:
- wait: If True, wait for all running queries to complete before shutdown
"""Asynchronous cursor variant that returns results as dictionaries with column names as keys.
class AsyncDictCursor(AsyncCursor):
dict_type: Type[Dict] = dict
def execute(
self,
operation: str,
parameters: Optional[Union[Dict[str, Any], List[str]]] = None,
**kwargs
) -> Tuple[str, Future[Union[AthenaResultSet, Any]]]:
"""
Execute query asynchronously returning dictionary results.
Parameters:
- operation: SQL query string
- parameters: Query parameters (dict or sequence)
- **kwargs: Additional execution options
Returns:
Tuple of (query_id, Future[AthenaDictResultSet])
"""PyAthena uses Python's concurrent.futures.Future for asynchronous result handling, providing standard async patterns.
class Future[T]:
def result(self, timeout: Optional[float] = None) -> T:
"""Get result, blocking until available or timeout."""
def add_done_callback(self, fn: Callable[[Future[T]], None]) -> None:
"""Add callback function called when Future completes."""
def cancel(self) -> bool:
"""Attempt to cancel the Future."""
def cancelled(self) -> bool:
"""Return True if Future was cancelled."""
def done(self) -> bool:
"""Return True if Future is done (completed or cancelled)."""
def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
"""Return exception if Future failed, None if successful."""from pyathena import connect
from pyathena.async_cursor import AsyncCursor
import time
# Connect with async cursor
conn = connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2",
cursor_class=AsyncCursor
)
cursor = conn.cursor()
# Start query asynchronously
query_id, future = cursor.execute("SELECT COUNT(*) FROM large_table")
print(f"Query started with ID: {query_id}")
# Do other work while query runs
print("Doing other work while query executes...")
time.sleep(2)
# Check if query is complete
if future.done():
result_set = future.result()
print("Query completed!")
print(result_set.fetchall())
else:
print("Query still running...")
result_set = future.result() # Wait for completion
print("Query completed!")
print(result_set.fetchall())
cursor.close()
conn.close()from pyathena import connect
from pyathena.async_cursor import AsyncCursor
from concurrent.futures import as_completed
import time
def run_concurrent_queries():
conn = connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2",
cursor_class=AsyncCursor
)
cursor = conn.cursor()
# Multiple queries to run concurrently
queries = [
("user_count", "SELECT COUNT(DISTINCT user_id) FROM users"),
("daily_revenue", "SELECT DATE(order_date), SUM(amount) FROM orders GROUP BY DATE(order_date)"),
("top_products", "SELECT product_id, COUNT(*) as sales FROM orders GROUP BY product_id ORDER BY sales DESC LIMIT 10"),
("monthly_stats", "SELECT MONTH(order_date), AVG(amount), COUNT(*) FROM orders GROUP BY MONTH(order_date)")
]
# Start all queries
running_queries = {}
for name, query in queries:
query_id, future = cursor.execute(query)
running_queries[name] = (query_id, future)
print(f"Started {name} query (ID: {query_id})")
# Process results as they complete
futures = {name: future for name, (_, future) in running_queries.items()}
for future in as_completed(futures.values()):
# Find which query completed
completed_name = None
for name, f in futures.items():
if f is future:
completed_name = name
break
try:
result_set = future.result()
results = result_set.fetchall()
print(f"\n{completed_name} completed with {len(results)} rows:")
for row in results[:5]: # Show first 5 rows
print(f" {row}")
except Exception as e:
print(f"{completed_name} failed: {e}")
cursor.close()
conn.close()
run_concurrent_queries()from pyathena import connect
from pyathena.async_cursor import AsyncCursor
import time
def query_callback(future):
"""Callback function called when query completes."""
try:
result_set = future.result()
results = result_set.fetchall()
print(f"Query completed with {len(results)} rows")
for row in results:
print(f" {row}")
except Exception as e:
print(f"Query failed: {e}")
def async_with_callback():
conn = connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2",
cursor_class=AsyncCursor
)
cursor = conn.cursor()
# Execute query with callback
query_id, future = cursor.execute("SELECT * FROM products LIMIT 5")
future.add_done_callback(query_callback)
print(f"Query {query_id} started, callback will be called when complete")
# Do other work
for i in range(5):
print(f"Doing other work... {i+1}")
time.sleep(1)
# Ensure query is complete before closing
if not future.done():
print("Waiting for query to complete...")
future.result() # Wait for completion
cursor.close()
conn.close()
async_with_callback()from pyathena import connect
from pyathena.async_cursor import AsyncCursor
import time
def monitor_query_execution():
conn = connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2",
cursor_class=AsyncCursor
)
cursor = conn.cursor()
# Start a long-running query
query_id, future = cursor.execute("""
SELECT
customer_id,
COUNT(*) as order_count,
SUM(amount) as total_spent,
AVG(amount) as avg_order_value
FROM orders
GROUP BY customer_id
HAVING COUNT(*) > 10
ORDER BY total_spent DESC
""")
print(f"Started query {query_id}")
# Monitor execution status
while not future.done():
# Get current execution status
status_future = cursor.poll(query_id)
execution = status_future.result()
print(f"Query state: {execution.state}")
if hasattr(execution, 'statistics') and execution.statistics:
if hasattr(execution.statistics, 'data_scanned_in_bytes'):
data_scanned = execution.statistics.data_scanned_in_bytes
print(f"Data scanned: {data_scanned / 1024 / 1024:.2f} MB")
if execution.state in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
break
time.sleep(2) # Poll every 2 seconds
# Get final results
if future.done():
try:
result_set = future.result()
results = result_set.fetchall()
print(f"Query completed successfully with {len(results)} rows")
except Exception as e:
print(f"Query failed: {e}")
cursor.close()
conn.close()
monitor_query_execution()from pyathena import connect
from pyathena.async_cursor import AsyncCursor
from pyathena.error import OperationalError, ProgrammingError
from concurrent.futures import TimeoutError
def async_error_handling():
conn = connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2",
cursor_class=AsyncCursor
)
cursor = conn.cursor()
# Test queries with different error conditions
test_queries = [
("valid_query", "SELECT COUNT(*) FROM users"),
("syntax_error", "SELCT COUNT(*) FROM users"), # Intentional typo
("missing_table", "SELECT * FROM nonexistent_table"),
("timeout_query", "SELECT * FROM very_large_table") # May timeout
]
for name, query in test_queries:
try:
print(f"\nExecuting {name}...")
query_id, future = cursor.execute(query)
# Set timeout for demonstration
result_set = future.result(timeout=30) # 30 second timeout
results = result_set.fetchall()
print(f"✓ {name} succeeded with {len(results)} rows")
except ProgrammingError as e:
print(f"✗ {name} failed with syntax error: {e}")
except OperationalError as e:
print(f"✗ {name} failed with operational error: {e}")
except TimeoutError:
print(f"✗ {name} timed out")
# Cancel the timed-out query
try:
cancel_future = cursor.cancel(query_id)
cancel_future.result(timeout=10)
print(f" Query {query_id} cancelled")
except Exception as cancel_error:
print(f" Failed to cancel query: {cancel_error}")
except Exception as e:
print(f"✗ {name} failed with unexpected error: {e}")
cursor.close()
conn.close()
async_error_handling()import asyncio
from pyathena import connect
from pyathena.async_cursor import AsyncCursor
from concurrent.futures import ThreadPoolExecutor
async def athena_with_asyncio():
"""Example integrating PyAthena async cursor with asyncio."""
# Create connection in thread pool (connection setup is synchronous)
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
conn = await loop.run_in_executor(
executor,
lambda: connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2",
cursor_class=AsyncCursor
)
)
cursor = conn.cursor()
# Execute multiple queries concurrently using asyncio
async def execute_query(name, query):
query_id, future = cursor.execute(query)
print(f"Started {name} (Query ID: {query_id})")
# Convert Future to asyncio-compatible awaitable
result_set = await loop.run_in_executor(None, future.result)
results = result_set.fetchall()
print(f"Completed {name}: {len(results)} rows")
return name, results
# Define queries
queries = [
("user_stats", "SELECT COUNT(*) as user_count FROM users"),
("order_stats", "SELECT COUNT(*) as order_count FROM orders"),
("product_stats", "SELECT COUNT(*) as product_count FROM products")
]
# Execute all queries concurrently
tasks = [execute_query(name, query) for name, query in queries]
results = await asyncio.gather(*tasks)
# Process results
for name, data in results:
print(f"{name}: {data}")
# Cleanup
cursor.close()
conn.close()
# Run with asyncio
asyncio.run(athena_with_asyncio())from pyathena import connect
from pyathena.async_cursor import AsyncCursor
import time
class AsyncQueryPipeline:
"""Pipeline for managing multiple dependent async queries."""
def __init__(self, connection):
self.conn = connection
self.cursor = connection.cursor()
self.results = {}
def execute_stage(self, stage_name, query, dependencies=None):
"""Execute a query stage, optionally waiting for dependencies."""
# Wait for dependencies if specified
if dependencies:
for dep in dependencies:
if dep in self.results:
future = self.results[dep]['future']
if not future.done():
print(f"Waiting for dependency {dep}...")
future.result()
print(f"Starting stage: {stage_name}")
query_id, future = self.cursor.execute(query)
self.results[stage_name] = {
'query_id': query_id,
'future': future,
'start_time': time.time()
}
return query_id, future
def wait_for_all(self):
"""Wait for all stages to complete."""
for stage_name, stage_info in self.results.items():
future = stage_info['future']
if not future.done():
print(f"Waiting for {stage_name}...")
future.result()
duration = time.time() - stage_info['start_time']
print(f"{stage_name} completed in {duration:.2f} seconds")
def get_results(self, stage_name):
"""Get results for a specific stage."""
future = self.results[stage_name]['future']
result_set = future.result()
return result_set.fetchall()
def close(self):
"""Close the cursor and connection."""
self.cursor.close()
self.conn.close()
# Example usage
def run_async_pipeline():
conn = connect(
s3_staging_dir="s3://my-bucket/athena-results/",
region_name="us-west-2",
cursor_class=AsyncCursor
)
pipeline = AsyncQueryPipeline(conn)
# Stage 1: Data preparation
pipeline.execute_stage(
'data_prep',
"""
CREATE TABLE temp_user_summary AS
SELECT
user_id,
COUNT(*) as order_count,
SUM(amount) as total_spent
FROM orders
GROUP BY user_id
"""
)
# Stage 2: Analysis (depends on data_prep)
pipeline.execute_stage(
'user_analysis',
"""
SELECT
CASE
WHEN total_spent > 1000 THEN 'High Value'
WHEN total_spent > 500 THEN 'Medium Value'
ELSE 'Low Value'
END as customer_segment,
COUNT(*) as customer_count,
AVG(total_spent) as avg_spent
FROM temp_user_summary
GROUP BY customer_segment
""",
dependencies=['data_prep']
)
# Stage 3: Additional metrics (depends on data_prep)
pipeline.execute_stage(
'retention_metrics',
"""
SELECT
order_count,
COUNT(*) as customers_with_count,
AVG(total_spent) as avg_spent_for_count
FROM temp_user_summary
GROUP BY order_count
ORDER BY order_count
""",
dependencies=['data_prep']
)
# Wait for all stages to complete
pipeline.wait_for_all()
# Process results
user_analysis = pipeline.get_results('user_analysis')
retention_metrics = pipeline.get_results('retention_metrics')
print("\nUser Analysis Results:")
for row in user_analysis:
print(f" {row}")
print("\nRetention Metrics:")
for row in retention_metrics:
print(f" {row}")
pipeline.close()
run_async_pipeline()Install with Tessl CLI
npx tessl i tessl/pypi-pyathena