0
# Asynchronous Operations
1
2
Full async/await support with Future-based API for non-blocking query execution, enabling concurrent query processing and integration with async frameworks. Ideal for applications requiring high concurrency and responsive user interfaces.
3
4
## Capabilities
5
6
### Async Cursor
7
8
Asynchronous cursor providing non-blocking query execution with Future-based result handling, allowing multiple concurrent queries and integration with async/await patterns.
9
10
```python { .api }
11
class AsyncCursor:
12
arraysize: int
13
max_workers: int
14
15
def execute(
16
self,
17
operation: str,
18
parameters: Optional[Union[Dict[str, Any], List[str]]] = None,
19
work_group: Optional[str] = None,
20
s3_staging_dir: Optional[str] = None,
21
cache_size: Optional[int] = 0,
22
cache_expiration_time: Optional[int] = 0,
23
result_reuse_enable: Optional[bool] = None,
24
result_reuse_minutes: Optional[int] = None,
25
paramstyle: Optional[str] = None,
26
**kwargs
27
) -> Tuple[str, Future[Union[AthenaResultSet, Any]]]:
28
"""
29
Execute a SQL statement asynchronously.
30
31
Parameters:
32
- operation: SQL query string
33
- parameters: Query parameters (dict or sequence)
34
- work_group: Athena workgroup for execution
35
- s3_staging_dir: S3 location for query results
36
- cache_size: Query result cache size
37
- cache_expiration_time: Cache expiration time in seconds
38
- result_reuse_enable: Enable query result reuse
39
- result_reuse_minutes: Result reuse duration in minutes
40
- paramstyle: Parameter substitution style
41
- **kwargs: Additional execution options
42
43
Returns:
44
Tuple of (query_id, Future[AthenaResultSet]) for result handling
45
"""
46
47
def executemany(
48
self,
49
operation: str,
50
seq_of_parameters: List[Optional[Union[Dict[str, Any], List[str]]]],
51
**kwargs
52
) -> None:
53
"""
54
Execute multiple statements (not supported).
55
56
Raises:
57
NotSupportedError: Always raised as async executemany is not supported
58
"""
59
60
def cancel(self, query_id: str) -> Future[None]:
61
"""
62
Cancel a running query by query ID.
63
64
Parameters:
65
- query_id: ID of query to cancel
66
67
Returns:
68
Future that completes when cancellation is processed
69
"""
70
71
def description(self, query_id: str) -> Future[Optional[List[Tuple[str, str, None, None, int, int, str]]]]:
72
"""
73
Get column description for a query asynchronously.
74
75
Parameters:
76
- query_id: Query execution ID
77
78
Returns:
79
Future containing column metadata as list of tuples with
80
(name, type_code, display_size, internal_size, precision, scale, null_ok)
81
"""
82
83
def query_execution(self, query_id: str) -> Future[AthenaQueryExecution]:
84
"""
85
Get query execution metadata asynchronously.
86
87
Parameters:
88
- query_id: Query execution ID
89
90
Returns:
91
Future[AthenaQueryExecution] with execution details
92
"""
93
94
def poll(self, query_id: str) -> Future[AthenaQueryExecution]:
95
"""
96
Poll query execution status asynchronously.
97
98
Parameters:
99
- query_id: Query execution ID
100
101
Returns:
102
Future[AthenaQueryExecution] with current status
103
"""
104
105
def close(self, wait: bool = False) -> None:
106
"""
107
Close cursor and shutdown thread pool executor.
108
109
Parameters:
110
- wait: If True, wait for all running queries to complete before shutdown
111
"""
112
```
113
114
### Async Dict Cursor
115
116
Asynchronous cursor variant that returns results as dictionaries with column names as keys.
117
118
```python { .api }
119
class AsyncDictCursor(AsyncCursor):
120
dict_type: Type[Dict] = dict
121
122
def execute(
123
self,
124
operation: str,
125
parameters: Optional[Union[Dict[str, Any], List[str]]] = None,
126
**kwargs
127
) -> Tuple[str, Future[Union[AthenaResultSet, Any]]]:
128
"""
129
Execute query asynchronously returning dictionary results.
130
131
Parameters:
132
- operation: SQL query string
133
- parameters: Query parameters (dict or sequence)
134
- **kwargs: Additional execution options
135
136
Returns:
137
Tuple of (query_id, Future[AthenaDictResultSet])
138
"""
139
```
140
141
### Future-based Result Handling
142
143
PyAthena uses Python's `concurrent.futures.Future` for asynchronous result handling, providing standard async patterns.
144
145
```python { .api }
146
class Future[T]:
147
def result(self, timeout: Optional[float] = None) -> T:
148
"""Get result, blocking until available or timeout."""
149
150
def add_done_callback(self, fn: Callable[[Future[T]], None]) -> None:
151
"""Add callback function called when Future completes."""
152
153
def cancel(self) -> bool:
154
"""Attempt to cancel the Future."""
155
156
def cancelled(self) -> bool:
157
"""Return True if Future was cancelled."""
158
159
def done(self) -> bool:
160
"""Return True if Future is done (completed or cancelled)."""
161
162
def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:
163
"""Return exception if Future failed, None if successful."""
164
```
165
166
## Usage Examples
167
168
### Basic Async Query Execution
169
170
```python
171
from pyathena import connect
172
from pyathena.async_cursor import AsyncCursor
173
import time
174
175
# Connect with async cursor
176
conn = connect(
177
s3_staging_dir="s3://my-bucket/athena-results/",
178
region_name="us-west-2",
179
cursor_class=AsyncCursor
180
)
181
182
cursor = conn.cursor()
183
184
# Start query asynchronously
185
query_id, future = cursor.execute("SELECT COUNT(*) FROM large_table")
186
print(f"Query started with ID: {query_id}")
187
188
# Do other work while query runs
189
print("Doing other work while query executes...")
190
time.sleep(2)
191
192
# Check if query is complete
193
if future.done():
194
result_set = future.result()
195
print("Query completed!")
196
print(result_set.fetchall())
197
else:
198
print("Query still running...")
199
result_set = future.result() # Wait for completion
200
print("Query completed!")
201
print(result_set.fetchall())
202
203
cursor.close()
204
conn.close()
205
```
206
207
### Concurrent Query Execution
208
209
```python
210
from pyathena import connect
211
from pyathena.async_cursor import AsyncCursor
212
from concurrent.futures import as_completed
213
import time
214
215
def run_concurrent_queries():
216
conn = connect(
217
s3_staging_dir="s3://my-bucket/athena-results/",
218
region_name="us-west-2",
219
cursor_class=AsyncCursor
220
)
221
222
cursor = conn.cursor()
223
224
# Multiple queries to run concurrently
225
queries = [
226
("user_count", "SELECT COUNT(DISTINCT user_id) FROM users"),
227
("daily_revenue", "SELECT DATE(order_date), SUM(amount) FROM orders GROUP BY DATE(order_date)"),
228
("top_products", "SELECT product_id, COUNT(*) as sales FROM orders GROUP BY product_id ORDER BY sales DESC LIMIT 10"),
229
("monthly_stats", "SELECT MONTH(order_date), AVG(amount), COUNT(*) FROM orders GROUP BY MONTH(order_date)")
230
]
231
232
# Start all queries
233
running_queries = {}
234
for name, query in queries:
235
query_id, future = cursor.execute(query)
236
running_queries[name] = (query_id, future)
237
print(f"Started {name} query (ID: {query_id})")
238
239
# Process results as they complete
240
futures = {name: future for name, (_, future) in running_queries.items()}
241
242
for future in as_completed(futures.values()):
243
# Find which query completed
244
completed_name = None
245
for name, f in futures.items():
246
if f is future:
247
completed_name = name
248
break
249
250
try:
251
result_set = future.result()
252
results = result_set.fetchall()
253
print(f"\n{completed_name} completed with {len(results)} rows:")
254
for row in results[:5]: # Show first 5 rows
255
print(f" {row}")
256
except Exception as e:
257
print(f"{completed_name} failed: {e}")
258
259
cursor.close()
260
conn.close()
261
262
run_concurrent_queries()
263
```
264
265
### Async Query with Callbacks
266
267
```python
268
from pyathena import connect
269
from pyathena.async_cursor import AsyncCursor
270
import time
271
272
def query_callback(future):
273
"""Callback function called when query completes."""
274
try:
275
result_set = future.result()
276
results = result_set.fetchall()
277
print(f"Query completed with {len(results)} rows")
278
for row in results:
279
print(f" {row}")
280
except Exception as e:
281
print(f"Query failed: {e}")
282
283
def async_with_callback():
284
conn = connect(
285
s3_staging_dir="s3://my-bucket/athena-results/",
286
region_name="us-west-2",
287
cursor_class=AsyncCursor
288
)
289
290
cursor = conn.cursor()
291
292
# Execute query with callback
293
query_id, future = cursor.execute("SELECT * FROM products LIMIT 5")
294
future.add_done_callback(query_callback)
295
296
print(f"Query {query_id} started, callback will be called when complete")
297
298
# Do other work
299
for i in range(5):
300
print(f"Doing other work... {i+1}")
301
time.sleep(1)
302
303
# Ensure query is complete before closing
304
if not future.done():
305
print("Waiting for query to complete...")
306
future.result() # Wait for completion
307
308
cursor.close()
309
conn.close()
310
311
async_with_callback()
312
```
313
314
### Query Status Monitoring
315
316
```python
317
from pyathena import connect
318
from pyathena.async_cursor import AsyncCursor
319
import time
320
321
def monitor_query_execution():
322
conn = connect(
323
s3_staging_dir="s3://my-bucket/athena-results/",
324
region_name="us-west-2",
325
cursor_class=AsyncCursor
326
)
327
328
cursor = conn.cursor()
329
330
# Start a long-running query
331
query_id, future = cursor.execute("""
332
SELECT
333
customer_id,
334
COUNT(*) as order_count,
335
SUM(amount) as total_spent,
336
AVG(amount) as avg_order_value
337
FROM orders
338
GROUP BY customer_id
339
HAVING COUNT(*) > 10
340
ORDER BY total_spent DESC
341
""")
342
343
print(f"Started query {query_id}")
344
345
# Monitor execution status
346
while not future.done():
347
# Get current execution status
348
status_future = cursor.poll(query_id)
349
execution = status_future.result()
350
351
print(f"Query state: {execution.state}")
352
if hasattr(execution, 'statistics') and execution.statistics:
353
if hasattr(execution.statistics, 'data_scanned_in_bytes'):
354
data_scanned = execution.statistics.data_scanned_in_bytes
355
print(f"Data scanned: {data_scanned / 1024 / 1024:.2f} MB")
356
357
if execution.state in ['SUCCEEDED', 'FAILED', 'CANCELLED']:
358
break
359
360
time.sleep(2) # Poll every 2 seconds
361
362
# Get final results
363
if future.done():
364
try:
365
result_set = future.result()
366
results = result_set.fetchall()
367
print(f"Query completed successfully with {len(results)} rows")
368
except Exception as e:
369
print(f"Query failed: {e}")
370
371
cursor.close()
372
conn.close()
373
374
monitor_query_execution()
375
```
376
377
### Async Error Handling
378
379
```python
380
from pyathena import connect
381
from pyathena.async_cursor import AsyncCursor
382
from pyathena.error import OperationalError, ProgrammingError
383
from concurrent.futures import TimeoutError
384
385
def async_error_handling():
386
conn = connect(
387
s3_staging_dir="s3://my-bucket/athena-results/",
388
region_name="us-west-2",
389
cursor_class=AsyncCursor
390
)
391
392
cursor = conn.cursor()
393
394
# Test queries with different error conditions
395
test_queries = [
396
("valid_query", "SELECT COUNT(*) FROM users"),
397
("syntax_error", "SELCT COUNT(*) FROM users"), # Intentional typo
398
("missing_table", "SELECT * FROM nonexistent_table"),
399
("timeout_query", "SELECT * FROM very_large_table") # May timeout
400
]
401
402
for name, query in test_queries:
403
try:
404
print(f"\nExecuting {name}...")
405
query_id, future = cursor.execute(query)
406
407
# Set timeout for demonstration
408
result_set = future.result(timeout=30) # 30 second timeout
409
results = result_set.fetchall()
410
print(f"✓ {name} succeeded with {len(results)} rows")
411
412
except ProgrammingError as e:
413
print(f"✗ {name} failed with syntax error: {e}")
414
except OperationalError as e:
415
print(f"✗ {name} failed with operational error: {e}")
416
except TimeoutError:
417
print(f"✗ {name} timed out")
418
# Cancel the timed-out query
419
try:
420
cancel_future = cursor.cancel(query_id)
421
cancel_future.result(timeout=10)
422
print(f" Query {query_id} cancelled")
423
except Exception as cancel_error:
424
print(f" Failed to cancel query: {cancel_error}")
425
except Exception as e:
426
print(f"✗ {name} failed with unexpected error: {e}")
427
428
cursor.close()
429
conn.close()
430
431
async_error_handling()
432
```
433
434
### Integration with asyncio
435
436
```python
437
import asyncio
438
from pyathena import connect
439
from pyathena.async_cursor import AsyncCursor
440
from concurrent.futures import ThreadPoolExecutor
441
442
async def athena_with_asyncio():
443
"""Example integrating PyAthena async cursor with asyncio."""
444
445
# Create connection in thread pool (connection setup is synchronous)
446
loop = asyncio.get_event_loop()
447
with ThreadPoolExecutor() as executor:
448
conn = await loop.run_in_executor(
449
executor,
450
lambda: connect(
451
s3_staging_dir="s3://my-bucket/athena-results/",
452
region_name="us-west-2",
453
cursor_class=AsyncCursor
454
)
455
)
456
457
cursor = conn.cursor()
458
459
# Execute multiple queries concurrently using asyncio
460
async def execute_query(name, query):
461
query_id, future = cursor.execute(query)
462
print(f"Started {name} (Query ID: {query_id})")
463
464
# Convert Future to asyncio-compatible awaitable
465
result_set = await loop.run_in_executor(None, future.result)
466
results = result_set.fetchall()
467
468
print(f"Completed {name}: {len(results)} rows")
469
return name, results
470
471
# Define queries
472
queries = [
473
("user_stats", "SELECT COUNT(*) as user_count FROM users"),
474
("order_stats", "SELECT COUNT(*) as order_count FROM orders"),
475
("product_stats", "SELECT COUNT(*) as product_count FROM products")
476
]
477
478
# Execute all queries concurrently
479
tasks = [execute_query(name, query) for name, query in queries]
480
results = await asyncio.gather(*tasks)
481
482
# Process results
483
for name, data in results:
484
print(f"{name}: {data}")
485
486
# Cleanup
487
cursor.close()
488
conn.close()
489
490
# Run with asyncio
491
asyncio.run(athena_with_asyncio())
492
```
493
494
### Async Query Pipeline
495
496
```python
497
from pyathena import connect
498
from pyathena.async_cursor import AsyncCursor
499
import time
500
501
class AsyncQueryPipeline:
502
"""Pipeline for managing multiple dependent async queries."""
503
504
def __init__(self, connection):
505
self.conn = connection
506
self.cursor = connection.cursor()
507
self.results = {}
508
509
def execute_stage(self, stage_name, query, dependencies=None):
510
"""Execute a query stage, optionally waiting for dependencies."""
511
# Wait for dependencies if specified
512
if dependencies:
513
for dep in dependencies:
514
if dep in self.results:
515
future = self.results[dep]['future']
516
if not future.done():
517
print(f"Waiting for dependency {dep}...")
518
future.result()
519
520
print(f"Starting stage: {stage_name}")
521
query_id, future = self.cursor.execute(query)
522
523
self.results[stage_name] = {
524
'query_id': query_id,
525
'future': future,
526
'start_time': time.time()
527
}
528
529
return query_id, future
530
531
def wait_for_all(self):
532
"""Wait for all stages to complete."""
533
for stage_name, stage_info in self.results.items():
534
future = stage_info['future']
535
if not future.done():
536
print(f"Waiting for {stage_name}...")
537
future.result()
538
539
duration = time.time() - stage_info['start_time']
540
print(f"{stage_name} completed in {duration:.2f} seconds")
541
542
def get_results(self, stage_name):
543
"""Get results for a specific stage."""
544
future = self.results[stage_name]['future']
545
result_set = future.result()
546
return result_set.fetchall()
547
548
def close(self):
549
"""Close the cursor and connection."""
550
self.cursor.close()
551
self.conn.close()
552
553
# Example usage
554
def run_async_pipeline():
555
conn = connect(
556
s3_staging_dir="s3://my-bucket/athena-results/",
557
region_name="us-west-2",
558
cursor_class=AsyncCursor
559
)
560
561
pipeline = AsyncQueryPipeline(conn)
562
563
# Stage 1: Data preparation
564
pipeline.execute_stage(
565
'data_prep',
566
"""
567
CREATE TABLE temp_user_summary AS
568
SELECT
569
user_id,
570
COUNT(*) as order_count,
571
SUM(amount) as total_spent
572
FROM orders
573
GROUP BY user_id
574
"""
575
)
576
577
# Stage 2: Analysis (depends on data_prep)
578
pipeline.execute_stage(
579
'user_analysis',
580
"""
581
SELECT
582
CASE
583
WHEN total_spent > 1000 THEN 'High Value'
584
WHEN total_spent > 500 THEN 'Medium Value'
585
ELSE 'Low Value'
586
END as customer_segment,
587
COUNT(*) as customer_count,
588
AVG(total_spent) as avg_spent
589
FROM temp_user_summary
590
GROUP BY customer_segment
591
""",
592
dependencies=['data_prep']
593
)
594
595
# Stage 3: Additional metrics (depends on data_prep)
596
pipeline.execute_stage(
597
'retention_metrics',
598
"""
599
SELECT
600
order_count,
601
COUNT(*) as customers_with_count,
602
AVG(total_spent) as avg_spent_for_count
603
FROM temp_user_summary
604
GROUP BY order_count
605
ORDER BY order_count
606
""",
607
dependencies=['data_prep']
608
)
609
610
# Wait for all stages to complete
611
pipeline.wait_for_all()
612
613
# Process results
614
user_analysis = pipeline.get_results('user_analysis')
615
retention_metrics = pipeline.get_results('retention_metrics')
616
617
print("\nUser Analysis Results:")
618
for row in user_analysis:
619
print(f" {row}")
620
621
print("\nRetention Metrics:")
622
for row in retention_metrics:
623
print(f" {row}")
624
625
pipeline.close()
626
627
run_async_pipeline()
628
```
629
630
## Performance Considerations
631
632
- Use async cursors when you need to execute multiple queries concurrently
633
- Async operations are particularly beneficial for I/O-bound workloads
634
- Consider using connection pooling for high-concurrency applications
635
- Monitor query execution status to handle long-running queries appropriately
636
- Use appropriate timeouts to prevent hanging operations
637
- Async cursors work well with web frameworks like FastAPI, aiohttp, and Tornado
638
639
## Best Practices
640
641
1. **Resource Management**: Always close cursors and connections properly
642
2. **Error Handling**: Use try/except blocks with specific exception types
643
3. **Timeout Handling**: Set appropriate timeouts for query execution
644
4. **Concurrent Limits**: Don't exceed Athena's concurrent query limits
645
5. **Query Monitoring**: Monitor long-running queries and provide user feedback
646
6. **Memory Management**: Be mindful of memory usage with large result sets