0
# Pandas Integration
1
2
High-performance integration with pandas DataFrames, enabling direct query result processing as DataFrames with support for chunked processing of large datasets. Ideal for data analysis workflows and scientific computing applications.
3
4
## Installation
5
6
```bash
7
pip install PyAthena[Pandas]
8
```
9
10
## Capabilities
11
12
### Pandas Cursor
13
14
Cursor that returns query results as pandas DataFrames instead of tuples or dictionaries, with full support for Athena's data types and automatic type conversion.
15
16
```python { .api }
17
class PandasCursor:
18
arraysize: int
19
description: Optional[List[Tuple]]
20
rowcount: int
21
rownumber: Optional[int]
22
query_id: Optional[str]
23
result_set: Optional[AthenaPandasResultSet]
24
25
def execute(
26
self,
27
operation: str,
28
parameters: Optional[Union[Dict[str, Any], List[str]]] = None,
29
work_group: Optional[str] = None,
30
s3_staging_dir: Optional[str] = None,
31
cache_size: Optional[int] = 0,
32
cache_expiration_time: Optional[int] = 0,
33
result_reuse_enable: Optional[bool] = None,
34
result_reuse_minutes: Optional[int] = None,
35
paramstyle: Optional[str] = None,
36
keep_default_na: bool = False,
37
na_values: Optional[Iterable[str]] = ("",),
38
quoting: int = 1,
39
on_start_query_execution: Optional[Callable[[str], None]] = None,
40
**kwargs
41
) -> PandasCursor:
42
"""
43
Execute a SQL statement with DataFrame result processing.
44
45
Parameters:
46
- operation: SQL query string
47
- parameters: Query parameters (dict or sequence)
48
- work_group: Athena workgroup for execution
49
- s3_staging_dir: S3 location for query results
50
- cache_size: Query result cache size
51
- cache_expiration_time: Cache expiration time in seconds
52
- result_reuse_enable: Enable query result reuse
53
- result_reuse_minutes: Result reuse duration in minutes
54
- paramstyle: Parameter substitution style
55
- keep_default_na: Keep pandas default NA values
56
- na_values: Additional NA values to recognize
57
- quoting: CSV quoting behavior (QUOTE_MINIMAL=0, QUOTE_ALL=1, etc.)
58
- on_start_query_execution: Callback for query start
59
- **kwargs: Additional arguments passed to pandas.read_csv
60
61
Returns:
62
Self for method chaining
63
"""
64
65
def executemany(
66
self,
67
operation: str,
68
seq_of_parameters: List[Optional[Union[Dict[str, Any], List[str]]]],
69
**kwargs
70
) -> None:
71
"""Execute a SQL statement multiple times with different parameters."""
72
73
def fetchone(self) -> Optional[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]:
74
"""
75
Fetch the next row from result set (standard DB API method).
76
77
Returns:
78
Single row as tuple/dict or None if no more rows
79
"""
80
81
def fetchmany(self, size: Optional[int] = None) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]:
82
"""
83
Fetch multiple rows from result set (standard DB API method).
84
85
Parameters:
86
- size: Number of rows to fetch (default: arraysize)
87
88
Returns:
89
List of rows as tuples/dicts
90
"""
91
92
def fetchall(self) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]:
93
"""
94
Fetch all remaining rows from result set (standard DB API method).
95
96
Returns:
97
List of all remaining rows as tuples/dicts
98
"""
99
100
def as_pandas(self) -> Union[DataFrame, DataFrameIterator]:
101
"""
102
Return results as pandas DataFrame or iterator for large datasets.
103
104
Returns:
105
DataFrame for small results or when chunksize=None,
106
DataFrameIterator for chunked processing when chunksize is set
107
"""
108
109
def iter_chunks(self) -> Generator[DataFrame, None, None]:
110
"""
111
Iterate over result set in DataFrame chunks for memory-efficient processing.
112
113
Chunking behavior depends on cursor configuration:
114
- If chunksize is explicitly set, uses that value
115
- If auto_optimize_chunksize=True and chunksize=None, determines optimal chunksize
116
- If auto_optimize_chunksize=False and chunksize=None, yields entire DataFrame
117
118
Yields:
119
DataFrame chunks when chunking is enabled, or entire DataFrame as single chunk
120
"""
121
122
def cancel(self) -> None:
123
"""Cancel the currently executing query."""
124
125
def close(self) -> None:
126
"""Close the cursor and free associated resources."""
127
```
128
129
### DataFrame Iterator
130
131
Memory-efficient iterator for processing large query results in chunks without loading entire result set into memory.
132
133
```python { .api }
134
class DataFrameIterator:
135
def __iter__(self) -> Iterator[DataFrame]:
136
"""Iterator protocol for DataFrame chunks."""
137
138
def __next__(self) -> DataFrame:
139
"""Get next DataFrame chunk."""
140
141
def to_pandas(self) -> DataFrame:
142
"""
143
Concatenate all chunks into single DataFrame.
144
145
Warning: May consume large amounts of memory for big datasets.
146
147
Returns:
148
Single DataFrame containing all data
149
"""
150
```
151
152
### Pandas Result Set
153
154
Specialized result set class optimized for pandas DataFrame creation with efficient type conversion and memory management.
155
156
```python { .api }
157
class AthenaPandasResultSet:
158
def as_pandas(self) -> DataFrame:
159
"""Convert result set to pandas DataFrame."""
160
161
def fetchone_pandas(self) -> Optional[DataFrame]:
162
"""Fetch single row as DataFrame."""
163
164
def fetchmany_pandas(self, size: int) -> DataFrame:
165
"""Fetch multiple rows as DataFrame."""
166
167
def fetchall_pandas(self) -> DataFrame:
168
"""Fetch all rows as DataFrame."""
169
```
170
171
## Usage Examples
172
173
### Basic DataFrame Query
174
175
```python
176
from pyathena import connect
177
from pyathena.pandas.cursor import PandasCursor
178
import pandas as pd
179
180
# Connect with pandas cursor
181
conn = connect(
182
s3_staging_dir="s3://my-bucket/athena-results/",
183
region_name="us-west-2",
184
cursor_class=PandasCursor
185
)
186
187
cursor = conn.cursor()
188
cursor.execute("SELECT * FROM sales_data WHERE year = 2023")
189
190
# Get results as DataFrame
191
df = cursor.fetchall()
192
print(df.head())
193
print(f"Shape: {df.shape}")
194
print(f"Columns: {df.columns.tolist()}")
195
196
cursor.close()
197
conn.close()
198
```
199
200
### Chunked Processing for Large Datasets
201
202
```python
203
from pyathena import connect
204
from pyathena.pandas.cursor import PandasCursor
205
206
conn = connect(
207
s3_staging_dir="s3://my-bucket/athena-results/",
208
region_name="us-west-2",
209
cursor_class=PandasCursor
210
)
211
212
cursor = conn.cursor()
213
cursor.execute("SELECT * FROM large_table")
214
215
# Process in chunks to manage memory
216
total_rows = 0
217
for chunk in cursor.iter_chunks():
218
# Process each chunk
219
print(f"Processing chunk with {len(chunk)} rows")
220
221
# Example: calculate statistics per chunk
222
chunk_stats = chunk.describe()
223
print(chunk_stats)
224
225
total_rows += len(chunk)
226
227
print(f"Total rows processed: {total_rows}")
228
cursor.close()
229
conn.close()
230
```
231
232
### Advanced DataFrame Operations
233
234
```python
235
from pyathena import connect
236
from pyathena.pandas.cursor import PandasCursor
237
import pandas as pd
238
239
conn = connect(
240
s3_staging_dir="s3://my-bucket/athena-results/",
241
region_name="us-west-2",
242
cursor_class=PandasCursor
243
)
244
245
cursor = conn.cursor()
246
247
# Complex analytical query
248
query = """
249
SELECT
250
customer_id,
251
product_category,
252
SUM(amount) as total_amount,
253
COUNT(*) as transaction_count,
254
AVG(amount) as avg_amount
255
FROM transactions
256
WHERE date_column >= DATE '2023-01-01'
257
GROUP BY customer_id, product_category
258
ORDER BY total_amount DESC
259
"""
260
261
cursor.execute(query)
262
df = cursor.fetchall()
263
264
# Pandas operations on query results
265
# Pivot table
266
pivot_df = df.pivot_table(
267
values='total_amount',
268
index='customer_id',
269
columns='product_category',
270
fill_value=0
271
)
272
273
# Statistical analysis
274
correlation_matrix = df[['total_amount', 'transaction_count', 'avg_amount']].corr()
275
print("Correlation Matrix:")
276
print(correlation_matrix)
277
278
# Data visualization preparation
279
top_customers = df.nlargest(10, 'total_amount')
280
print("Top 10 Customers:")
281
print(top_customers[['customer_id', 'total_amount']])
282
283
cursor.close()
284
conn.close()
285
```
286
287
### Asynchronous DataFrame Processing
288
289
```python
290
import asyncio
291
from pyathena import connect
292
from pyathena.pandas.async_cursor import AsyncPandasCursor
293
294
async def async_query_example():
295
conn = connect(
296
s3_staging_dir="s3://my-bucket/athena-results/",
297
region_name="us-west-2",
298
cursor_class=AsyncPandasCursor
299
)
300
301
cursor = conn.cursor()
302
303
# Execute multiple queries concurrently
304
queries = [
305
"SELECT COUNT(*) as count FROM table1",
306
"SELECT AVG(amount) as avg_amount FROM table2",
307
"SELECT MAX(date_column) as max_date FROM table3"
308
]
309
310
# Start all queries
311
futures = []
312
for query in queries:
313
query_id, future = cursor.execute(query)
314
futures.append(future)
315
316
# Wait for all results
317
results = await asyncio.gather(*futures)
318
319
for i, df in enumerate(results):
320
print(f"Query {i+1} result:")
321
print(df)
322
323
cursor.close()
324
conn.close()
325
326
# Run async example
327
asyncio.run(async_query_example())
328
```
329
330
### Memory-Efficient Large Dataset Processing
331
332
```python
333
from pyathena import connect
334
from pyathena.pandas.cursor import PandasCursor
335
import pandas as pd
336
337
def process_large_dataset():
338
conn = connect(
339
s3_staging_dir="s3://my-bucket/athena-results/",
340
region_name="us-west-2",
341
cursor_class=PandasCursor
342
)
343
344
cursor = conn.cursor()
345
cursor.execute("SELECT * FROM very_large_table")
346
347
# Use iterator to avoid loading entire dataset
348
results_summary = {
349
'total_rows': 0,
350
'total_revenue': 0,
351
'unique_customers': set()
352
}
353
354
# Process chunk by chunk
355
for chunk_df in cursor.iter_chunks():
356
results_summary['total_rows'] += len(chunk_df)
357
results_summary['total_revenue'] += chunk_df['revenue'].sum()
358
results_summary['unique_customers'].update(chunk_df['customer_id'].unique())
359
360
# Optional: save intermediate results
361
# chunk_df.to_parquet(f'chunk_{chunk_number}.parquet')
362
363
# Final summary
364
results_summary['unique_customers'] = len(results_summary['unique_customers'])
365
366
print("Dataset Summary:")
367
for key, value in results_summary.items():
368
print(f"{key}: {value}")
369
370
cursor.close()
371
conn.close()
372
373
process_large_dataset()
374
```
375
376
### Integration with Data Science Workflow
377
378
```python
379
from pyathena import connect
380
from pyathena.pandas.cursor import PandasCursor
381
import pandas as pd
382
import numpy as np
383
from sklearn.preprocessing import StandardScaler
384
from sklearn.model_selection import train_test_split
385
386
# Data extraction for ML pipeline
387
conn = connect(
388
s3_staging_dir="s3://my-bucket/athena-results/",
389
region_name="us-west-2",
390
cursor_class=PandasCursor
391
)
392
393
cursor = conn.cursor()
394
395
# Feature engineering query
396
feature_query = """
397
SELECT
398
customer_id,
399
age,
400
income,
401
account_balance,
402
num_transactions_last_month,
403
avg_transaction_amount,
404
days_since_last_transaction,
405
CASE WHEN churned = 'yes' THEN 1 ELSE 0 END as target
406
FROM customer_features
407
WHERE feature_date = CURRENT_DATE - INTERVAL '1' DAY
408
"""
409
410
cursor.execute(feature_query)
411
df = cursor.fetchall()
412
413
# Prepare features for machine learning
414
feature_columns = [
415
'age', 'income', 'account_balance',
416
'num_transactions_last_month', 'avg_transaction_amount',
417
'days_since_last_transaction'
418
]
419
420
X = df[feature_columns]
421
y = df['target']
422
423
# Preprocessing
424
scaler = StandardScaler()
425
X_scaled = scaler.fit_transform(X)
426
427
# Train/test split
428
X_train, X_test, y_train, y_test = train_test_split(
429
X_scaled, y, test_size=0.2, random_state=42
430
)
431
432
print(f"Training set shape: {X_train.shape}")
433
print(f"Test set shape: {X_test.shape}")
434
print(f"Feature columns: {feature_columns}")
435
436
cursor.close()
437
conn.close()
438
```
439
440
## Type Conversion
441
442
PyAthena automatically converts Athena data types to appropriate pandas dtypes:
443
444
- `boolean` → `bool`
445
- `tinyint`, `smallint`, `integer`, `bigint` → `int64`
446
- `real`, `double`, `float` → `float64`
447
- `decimal` → `float64` or `object` (for high precision)
448
- `varchar`, `char` → `object` (string)
449
- `date` → `datetime64[ns]` (date only)
450
- `timestamp` → `datetime64[ns]`
451
- `array`, `map`, `row` → `object` (complex types as Python objects)
452
453
## Performance Considerations
454
455
- Use chunked processing (`iter_chunks()`) for datasets that may not fit in memory
456
- Set appropriate `arraysize` for optimal chunk sizes
457
- Consider using async cursors for concurrent query execution
458
- Use parameterized queries to leverage Athena's query result caching
459
- For very large datasets, consider using PyArrow integration for better performance