or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

arrow-integration.mdasync-operations.mdcore-database.mdindex.mdpandas-integration.mdspark-integration.mdsqlalchemy-integration.md

async-operations.mddocs/

0

# Asynchronous Operations

1

2

Full async/await support with Future-based API for non-blocking query execution, enabling concurrent query processing and integration with async frameworks. Ideal for applications requiring high concurrency and responsive user interfaces.

3

4

## Capabilities

5

6

### Async Cursor

7

8

Asynchronous cursor providing non-blocking query execution with Future-based result handling, allowing multiple concurrent queries and integration with async/await patterns.

9

10

```python { .api }

11

class AsyncCursor:

12

arraysize: int

13

max_workers: int

14

15

def execute(

16

self,

17

operation: str,

18

parameters: Optional[Union[Dict[str, Any], List[str]]] = None,

19

work_group: Optional[str] = None,

20

s3_staging_dir: Optional[str] = None,

21

cache_size: Optional[int] = 0,

22

cache_expiration_time: Optional[int] = 0,

23

result_reuse_enable: Optional[bool] = None,

24

result_reuse_minutes: Optional[int] = None,

25

paramstyle: Optional[str] = None,

26

**kwargs

27

) -> Tuple[str, Future[Union[AthenaResultSet, Any]]]:

28

"""

29

Execute a SQL statement asynchronously.

30

31

Parameters:

32

- operation: SQL query string

33

- parameters: Query parameters (dict or sequence)

34

- work_group: Athena workgroup for execution

35

- s3_staging_dir: S3 location for query results

36

- cache_size: Query result cache size

37

- cache_expiration_time: Cache expiration time in seconds

38

- result_reuse_enable: Enable query result reuse

39

- result_reuse_minutes: Result reuse duration in minutes

40

- paramstyle: Parameter substitution style

41

- **kwargs: Additional execution options

42

43

Returns:

44

Tuple of (query_id, Future[AthenaResultSet]) for result handling

45

"""

46

47

def executemany(

48

self,

49

operation: str,

50

seq_of_parameters: List[Optional[Union[Dict[str, Any], List[str]]]],

51

**kwargs

52

) -> None:

53

"""

54

Execute multiple statements (not supported).

55

56

Raises:

57

NotSupportedError: Always raised as async executemany is not supported

58

"""

59

60

def cancel(self, query_id: str) -> Future[None]:

61

"""

62

Cancel a running query by query ID.

63

64

Parameters:

65

- query_id: ID of query to cancel

66

67

Returns:

68

Future that completes when cancellation is processed

69

"""

70

71

def description(self, query_id: str) -> Future[Optional[List[Tuple[str, str, None, None, int, int, str]]]]:

72

"""

73

Get column description for a query asynchronously.

74

75

Parameters:

76

- query_id: Query execution ID

77

78

Returns:

79

Future containing column metadata as list of tuples with

80

(name, type_code, display_size, internal_size, precision, scale, null_ok)

81

"""

82

83

def query_execution(self, query_id: str) -> Future[AthenaQueryExecution]:

84

"""

85

Get query execution metadata asynchronously.

86

87

Parameters:

88

- query_id: Query execution ID

89

90

Returns:

91

Future[AthenaQueryExecution] with execution details

92

"""

93

94

def poll(self, query_id: str) -> Future[AthenaQueryExecution]:

95

"""

96

Poll query execution status asynchronously.

97

98

Parameters:

99

- query_id: Query execution ID

100

101

Returns:

102

Future[AthenaQueryExecution] with current status

103

"""

104

105

def close(self, wait: bool = False) -> None:

106

"""

107

Close cursor and shutdown thread pool executor.

108

109

Parameters:

110

- wait: If True, wait for all running queries to complete before shutdown

111

"""

112

```

113

114

### Async Dict Cursor

115

116

Asynchronous cursor variant that returns results as dictionaries with column names as keys.

117

118

```python { .api }

119

class AsyncDictCursor(AsyncCursor):

120

dict_type: Type[Dict] = dict

121

122

def execute(

123

self,

124

operation: str,

125

parameters: Optional[Union[Dict[str, Any], List[str]]] = None,

126

**kwargs

127

) -> Tuple[str, Future[Union[AthenaResultSet, Any]]]:

128

"""

129

Execute query asynchronously returning dictionary results.

130

131

Parameters:

132

- operation: SQL query string

133

- parameters: Query parameters (dict or sequence)

134

- **kwargs: Additional execution options

135

136

Returns:

137

Tuple of (query_id, Future[AthenaDictResultSet])

138

"""

139

```

140

141

### Future-based Result Handling

142

143

PyAthena uses Python's `concurrent.futures.Future` for asynchronous result handling, providing standard async patterns.

144

145

```python { .api }

146

class Future[T]:

147

def result(self, timeout: Optional[float] = None) -> T:

148

"""Get result, blocking until available or timeout."""

149

150

def add_done_callback(self, fn: Callable[[Future[T]], None]) -> None:

151

"""Add callback function called when Future completes."""

152

153

def cancel(self) -> bool:

154

"""Attempt to cancel the Future."""

155

156

def cancelled(self) -> bool:

157

"""Return True if Future was cancelled."""

158

159

def done(self) -> bool:

160

"""Return True if Future is done (completed or cancelled)."""

161

162

def exception(self, timeout: Optional[float] = None) -> Optional[Exception]:

163

"""Return exception if Future failed, None if successful."""

164

```

165

166

## Usage Examples

167

168

### Basic Async Query Execution

169

170

```python

171

from pyathena import connect

172

from pyathena.async_cursor import AsyncCursor

173

import time

174

175

# Connect with async cursor

176

conn = connect(

177

s3_staging_dir="s3://my-bucket/athena-results/",

178

region_name="us-west-2",

179

cursor_class=AsyncCursor

180

)

181

182

cursor = conn.cursor()

183

184

# Start query asynchronously

185

query_id, future = cursor.execute("SELECT COUNT(*) FROM large_table")

186

print(f"Query started with ID: {query_id}")

187

188

# Do other work while query runs

189

print("Doing other work while query executes...")

190

time.sleep(2)

191

192

# Check if query is complete

193

if future.done():

194

result_set = future.result()

195

print("Query completed!")

196

print(result_set.fetchall())

197

else:

198

print("Query still running...")

199

result_set = future.result() # Wait for completion

200

print("Query completed!")

201

print(result_set.fetchall())

202

203

cursor.close()

204

conn.close()

205

```

206

207

### Concurrent Query Execution

208

209

```python

210

from pyathena import connect

211

from pyathena.async_cursor import AsyncCursor

212

from concurrent.futures import as_completed

213

import time

214

215

def run_concurrent_queries():

216

conn = connect(

217

s3_staging_dir="s3://my-bucket/athena-results/",

218

region_name="us-west-2",

219

cursor_class=AsyncCursor

220

)

221

222

cursor = conn.cursor()

223

224

# Multiple queries to run concurrently

225

queries = [

226

("user_count", "SELECT COUNT(DISTINCT user_id) FROM users"),

227

("daily_revenue", "SELECT DATE(order_date), SUM(amount) FROM orders GROUP BY DATE(order_date)"),

228

("top_products", "SELECT product_id, COUNT(*) as sales FROM orders GROUP BY product_id ORDER BY sales DESC LIMIT 10"),

229

("monthly_stats", "SELECT MONTH(order_date), AVG(amount), COUNT(*) FROM orders GROUP BY MONTH(order_date)")

230

]

231

232

# Start all queries

233

running_queries = {}

234

for name, query in queries:

235

query_id, future = cursor.execute(query)

236

running_queries[name] = (query_id, future)

237

print(f"Started {name} query (ID: {query_id})")

238

239

# Process results as they complete

240

futures = {name: future for name, (_, future) in running_queries.items()}

241

242

for future in as_completed(futures.values()):

243

# Find which query completed

244

completed_name = None

245

for name, f in futures.items():

246

if f is future:

247

completed_name = name

248

break

249

250

try:

251

result_set = future.result()

252

results = result_set.fetchall()

253

print(f"\n{completed_name} completed with {len(results)} rows:")

254

for row in results[:5]: # Show first 5 rows

255

print(f" {row}")

256

except Exception as e:

257

print(f"{completed_name} failed: {e}")

258

259

cursor.close()

260

conn.close()

261

262

run_concurrent_queries()

263

```

264

265

### Async Query with Callbacks

266

267

```python

268

from pyathena import connect

269

from pyathena.async_cursor import AsyncCursor

270

import time

271

272

def query_callback(future):

273

"""Callback function called when query completes."""

274

try:

275

result_set = future.result()

276

results = result_set.fetchall()

277

print(f"Query completed with {len(results)} rows")

278

for row in results:

279

print(f" {row}")

280

except Exception as e:

281

print(f"Query failed: {e}")

282

283

def async_with_callback():

284

conn = connect(

285

s3_staging_dir="s3://my-bucket/athena-results/",

286

region_name="us-west-2",

287

cursor_class=AsyncCursor

288

)

289

290

cursor = conn.cursor()

291

292

# Execute query with callback

293

query_id, future = cursor.execute("SELECT * FROM products LIMIT 5")

294

future.add_done_callback(query_callback)

295

296

print(f"Query {query_id} started, callback will be called when complete")

297

298

# Do other work

299

for i in range(5):

300

print(f"Doing other work... {i+1}")

301

time.sleep(1)

302

303

# Ensure query is complete before closing

304

if not future.done():

305

print("Waiting for query to complete...")

306

future.result() # Wait for completion

307

308

cursor.close()

309

conn.close()

310

311

async_with_callback()

312

```

313

314

### Query Status Monitoring

315

316

```python

317

from pyathena import connect

318

from pyathena.async_cursor import AsyncCursor

319

import time

320

321

def monitor_query_execution():

322

conn = connect(

323

s3_staging_dir="s3://my-bucket/athena-results/",

324

region_name="us-west-2",

325

cursor_class=AsyncCursor

326

)

327

328

cursor = conn.cursor()

329

330

# Start a long-running query

331

query_id, future = cursor.execute("""

332

SELECT

333

customer_id,

334

COUNT(*) as order_count,

335

SUM(amount) as total_spent,

336

AVG(amount) as avg_order_value

337

FROM orders

338

GROUP BY customer_id

339

HAVING COUNT(*) > 10

340

ORDER BY total_spent DESC

341

""")

342

343

print(f"Started query {query_id}")

344

345

# Monitor execution status

346

while not future.done():

347

# Get current execution status

348

status_future = cursor.poll(query_id)

349

execution = status_future.result()

350

351

print(f"Query state: {execution.state}")

352

if hasattr(execution, 'statistics') and execution.statistics:

353

if hasattr(execution.statistics, 'data_scanned_in_bytes'):

354

data_scanned = execution.statistics.data_scanned_in_bytes

355

print(f"Data scanned: {data_scanned / 1024 / 1024:.2f} MB")

356

357

if execution.state in ['SUCCEEDED', 'FAILED', 'CANCELLED']:

358

break

359

360

time.sleep(2) # Poll every 2 seconds

361

362

# Get final results

363

if future.done():

364

try:

365

result_set = future.result()

366

results = result_set.fetchall()

367

print(f"Query completed successfully with {len(results)} rows")

368

except Exception as e:

369

print(f"Query failed: {e}")

370

371

cursor.close()

372

conn.close()

373

374

monitor_query_execution()

375

```

376

377

### Async Error Handling

378

379

```python

380

from pyathena import connect

381

from pyathena.async_cursor import AsyncCursor

382

from pyathena.error import OperationalError, ProgrammingError

383

from concurrent.futures import TimeoutError

384

385

def async_error_handling():

386

conn = connect(

387

s3_staging_dir="s3://my-bucket/athena-results/",

388

region_name="us-west-2",

389

cursor_class=AsyncCursor

390

)

391

392

cursor = conn.cursor()

393

394

# Test queries with different error conditions

395

test_queries = [

396

("valid_query", "SELECT COUNT(*) FROM users"),

397

("syntax_error", "SELCT COUNT(*) FROM users"), # Intentional typo

398

("missing_table", "SELECT * FROM nonexistent_table"),

399

("timeout_query", "SELECT * FROM very_large_table") # May timeout

400

]

401

402

for name, query in test_queries:

403

try:

404

print(f"\nExecuting {name}...")

405

query_id, future = cursor.execute(query)

406

407

# Set timeout for demonstration

408

result_set = future.result(timeout=30) # 30 second timeout

409

results = result_set.fetchall()

410

print(f"✓ {name} succeeded with {len(results)} rows")

411

412

except ProgrammingError as e:

413

print(f"✗ {name} failed with syntax error: {e}")

414

except OperationalError as e:

415

print(f"✗ {name} failed with operational error: {e}")

416

except TimeoutError:

417

print(f"✗ {name} timed out")

418

# Cancel the timed-out query

419

try:

420

cancel_future = cursor.cancel(query_id)

421

cancel_future.result(timeout=10)

422

print(f" Query {query_id} cancelled")

423

except Exception as cancel_error:

424

print(f" Failed to cancel query: {cancel_error}")

425

except Exception as e:

426

print(f"✗ {name} failed with unexpected error: {e}")

427

428

cursor.close()

429

conn.close()

430

431

async_error_handling()

432

```

433

434

### Integration with asyncio

435

436

```python

437

import asyncio

438

from pyathena import connect

439

from pyathena.async_cursor import AsyncCursor

440

from concurrent.futures import ThreadPoolExecutor

441

442

async def athena_with_asyncio():

443

"""Example integrating PyAthena async cursor with asyncio."""

444

445

# Create connection in thread pool (connection setup is synchronous)

446

loop = asyncio.get_event_loop()

447

with ThreadPoolExecutor() as executor:

448

conn = await loop.run_in_executor(

449

executor,

450

lambda: connect(

451

s3_staging_dir="s3://my-bucket/athena-results/",

452

region_name="us-west-2",

453

cursor_class=AsyncCursor

454

)

455

)

456

457

cursor = conn.cursor()

458

459

# Execute multiple queries concurrently using asyncio

460

async def execute_query(name, query):

461

query_id, future = cursor.execute(query)

462

print(f"Started {name} (Query ID: {query_id})")

463

464

# Convert Future to asyncio-compatible awaitable

465

result_set = await loop.run_in_executor(None, future.result)

466

results = result_set.fetchall()

467

468

print(f"Completed {name}: {len(results)} rows")

469

return name, results

470

471

# Define queries

472

queries = [

473

("user_stats", "SELECT COUNT(*) as user_count FROM users"),

474

("order_stats", "SELECT COUNT(*) as order_count FROM orders"),

475

("product_stats", "SELECT COUNT(*) as product_count FROM products")

476

]

477

478

# Execute all queries concurrently

479

tasks = [execute_query(name, query) for name, query in queries]

480

results = await asyncio.gather(*tasks)

481

482

# Process results

483

for name, data in results:

484

print(f"{name}: {data}")

485

486

# Cleanup

487

cursor.close()

488

conn.close()

489

490

# Run with asyncio

491

asyncio.run(athena_with_asyncio())

492

```

493

494

### Async Query Pipeline

495

496

```python

497

from pyathena import connect

498

from pyathena.async_cursor import AsyncCursor

499

import time

500

501

class AsyncQueryPipeline:

502

"""Pipeline for managing multiple dependent async queries."""

503

504

def __init__(self, connection):

505

self.conn = connection

506

self.cursor = connection.cursor()

507

self.results = {}

508

509

def execute_stage(self, stage_name, query, dependencies=None):

510

"""Execute a query stage, optionally waiting for dependencies."""

511

# Wait for dependencies if specified

512

if dependencies:

513

for dep in dependencies:

514

if dep in self.results:

515

future = self.results[dep]['future']

516

if not future.done():

517

print(f"Waiting for dependency {dep}...")

518

future.result()

519

520

print(f"Starting stage: {stage_name}")

521

query_id, future = self.cursor.execute(query)

522

523

self.results[stage_name] = {

524

'query_id': query_id,

525

'future': future,

526

'start_time': time.time()

527

}

528

529

return query_id, future

530

531

def wait_for_all(self):

532

"""Wait for all stages to complete."""

533

for stage_name, stage_info in self.results.items():

534

future = stage_info['future']

535

if not future.done():

536

print(f"Waiting for {stage_name}...")

537

future.result()

538

539

duration = time.time() - stage_info['start_time']

540

print(f"{stage_name} completed in {duration:.2f} seconds")

541

542

def get_results(self, stage_name):

543

"""Get results for a specific stage."""

544

future = self.results[stage_name]['future']

545

result_set = future.result()

546

return result_set.fetchall()

547

548

def close(self):

549

"""Close the cursor and connection."""

550

self.cursor.close()

551

self.conn.close()

552

553

# Example usage

554

def run_async_pipeline():

555

conn = connect(

556

s3_staging_dir="s3://my-bucket/athena-results/",

557

region_name="us-west-2",

558

cursor_class=AsyncCursor

559

)

560

561

pipeline = AsyncQueryPipeline(conn)

562

563

# Stage 1: Data preparation

564

pipeline.execute_stage(

565

'data_prep',

566

"""

567

CREATE TABLE temp_user_summary AS

568

SELECT

569

user_id,

570

COUNT(*) as order_count,

571

SUM(amount) as total_spent

572

FROM orders

573

GROUP BY user_id

574

"""

575

)

576

577

# Stage 2: Analysis (depends on data_prep)

578

pipeline.execute_stage(

579

'user_analysis',

580

"""

581

SELECT

582

CASE

583

WHEN total_spent > 1000 THEN 'High Value'

584

WHEN total_spent > 500 THEN 'Medium Value'

585

ELSE 'Low Value'

586

END as customer_segment,

587

COUNT(*) as customer_count,

588

AVG(total_spent) as avg_spent

589

FROM temp_user_summary

590

GROUP BY customer_segment

591

""",

592

dependencies=['data_prep']

593

)

594

595

# Stage 3: Additional metrics (depends on data_prep)

596

pipeline.execute_stage(

597

'retention_metrics',

598

"""

599

SELECT

600

order_count,

601

COUNT(*) as customers_with_count,

602

AVG(total_spent) as avg_spent_for_count

603

FROM temp_user_summary

604

GROUP BY order_count

605

ORDER BY order_count

606

""",

607

dependencies=['data_prep']

608

)

609

610

# Wait for all stages to complete

611

pipeline.wait_for_all()

612

613

# Process results

614

user_analysis = pipeline.get_results('user_analysis')

615

retention_metrics = pipeline.get_results('retention_metrics')

616

617

print("\nUser Analysis Results:")

618

for row in user_analysis:

619

print(f" {row}")

620

621

print("\nRetention Metrics:")

622

for row in retention_metrics:

623

print(f" {row}")

624

625

pipeline.close()

626

627

run_async_pipeline()

628

```

629

630

## Performance Considerations

631

632

- Use async cursors when you need to execute multiple queries concurrently

633

- Async operations are particularly beneficial for I/O-bound workloads

634

- Consider using connection pooling for high-concurrency applications

635

- Monitor query execution status to handle long-running queries appropriately

636

- Use appropriate timeouts to prevent hanging operations

637

- Async cursors work well with web frameworks like FastAPI, aiohttp, and Tornado

638

639

## Best Practices

640

641

1. **Resource Management**: Always close cursors and connections properly

642

2. **Error Handling**: Use try/except blocks with specific exception types

643

3. **Timeout Handling**: Set appropriate timeouts for query execution

644

4. **Concurrent Limits**: Don't exceed Athena's concurrent query limits

645

5. **Query Monitoring**: Monitor long-running queries and provide user feedback

646

6. **Memory Management**: Be mindful of memory usage with large result sets