0
# PyArrow Integration
1
2
Native PyArrow Table support for columnar data processing, providing optimal performance for analytical workloads and seamless integration with the Arrow ecosystem. Ideal for high-performance analytics and data interchange.
3
4
## Installation
5
6
```bash
7
pip install PyAthena[Arrow]
8
```
9
10
## Capabilities
11
12
### Arrow Cursor
13
14
Cursor that returns query results as PyArrow Tables, providing columnar data format optimized for analytical operations and memory efficiency.
15
16
```python { .api }
17
class ArrowCursor:
18
arraysize: int
19
description: Optional[List[Tuple]]
20
rowcount: int
21
22
def execute(self, operation: str, parameters=None, **kwargs) -> ArrowCursor:
23
"""
24
Execute a SQL statement with Arrow Table result processing.
25
26
Parameters:
27
- operation: SQL query string
28
- parameters: Query parameters (dict or sequence)
29
- **kwargs: Additional execution options
30
31
Returns:
32
Self for method chaining
33
"""
34
35
def fetchone(self) -> Optional[Table]:
36
"""
37
Fetch the next chunk as an Arrow Table.
38
39
Returns:
40
Arrow Table with single chunk or None if no more data
41
"""
42
43
def fetchmany(self, size: Optional[int] = None) -> Table:
44
"""
45
Fetch multiple rows as an Arrow Table.
46
47
Parameters:
48
- size: Number of rows to fetch (default: arraysize)
49
50
Returns:
51
Arrow Table containing the requested rows
52
"""
53
54
def fetchall(self) -> Table:
55
"""
56
Fetch all remaining rows as a single Arrow Table.
57
58
Returns:
59
Arrow Table containing all remaining rows
60
"""
61
62
def as_arrow(self) -> Table:
63
"""
64
Return results as Arrow Table.
65
66
Returns:
67
PyArrow Table with all query results
68
"""
69
70
def cancel(self) -> None:
71
"""Cancel the currently executing query."""
72
73
def close(self) -> None:
74
"""Close the cursor and free resources."""
75
```
76
77
### Async Arrow Cursor
78
79
Asynchronous version of ArrowCursor for non-blocking operations with Future-based API.
80
81
```python { .api }
82
class AsyncArrowCursor:
83
def execute(self, operation: str, parameters=None, **kwargs) -> Tuple[str, Future[Table]]:
84
"""
85
Execute query asynchronously returning query ID and Future.
86
87
Parameters:
88
- operation: SQL query string
89
- parameters: Query parameters
90
91
Returns:
92
Tuple of (query_id, Future[Table])
93
"""
94
95
def cancel(self, query_id: str) -> Future[None]:
96
"""Cancel query by ID asynchronously."""
97
98
def close(self, wait: bool = False) -> None:
99
"""Close cursor, optionally waiting for running queries."""
100
```
101
102
### Arrow Result Set
103
104
Specialized result set class optimized for PyArrow Table creation with efficient columnar data processing.
105
106
```python { .api }
107
class AthenaArrowResultSet:
108
def as_arrow(self) -> Table:
109
"""Convert result set to PyArrow Table."""
110
111
def fetchone_arrow(self) -> Optional[Table]:
112
"""Fetch single chunk as Arrow Table."""
113
114
def fetchmany_arrow(self, size: int) -> Table:
115
"""Fetch multiple rows as Arrow Table."""
116
117
def fetchall_arrow(self) -> Table:
118
"""Fetch all rows as Arrow Table."""
119
```
120
121
## Usage Examples
122
123
### Basic Arrow Table Query
124
125
```python
126
from pyathena import connect
127
from pyathena.arrow.cursor import ArrowCursor
128
import pyarrow as pa
129
130
# Connect with Arrow cursor
131
conn = connect(
132
s3_staging_dir="s3://my-bucket/athena-results/",
133
region_name="us-west-2",
134
cursor_class=ArrowCursor
135
)
136
137
cursor = conn.cursor()
138
cursor.execute("SELECT * FROM sales_data WHERE year = 2023")
139
140
# Get results as Arrow Table
141
table = cursor.fetchall()
142
print(f"Table shape: {table.shape}")
143
print(f"Columns: {table.column_names}")
144
print(f"Schema: {table.schema}")
145
146
# Access column data
147
revenue_column = table.column('revenue')
148
print(f"Revenue column type: {revenue_column.type}")
149
print(f"Revenue sum: {pa.compute.sum(revenue_column)}")
150
151
cursor.close()
152
conn.close()
153
```
154
155
### High-Performance Analytics
156
157
```python
158
from pyathena import connect
159
from pyathena.arrow.cursor import ArrowCursor
160
import pyarrow as pa
161
import pyarrow.compute as pc
162
163
conn = connect(
164
s3_staging_dir="s3://my-bucket/athena-results/",
165
region_name="us-west-2",
166
cursor_class=ArrowCursor
167
)
168
169
cursor = conn.cursor()
170
171
# Complex analytical query
172
query = """
173
SELECT
174
product_id,
175
sale_date,
176
quantity,
177
unit_price,
178
quantity * unit_price as total_amount,
179
customer_segment
180
FROM sales_data
181
WHERE sale_date >= DATE '2023-01-01'
182
"""
183
184
cursor.execute(query)
185
table = cursor.fetchall()
186
187
# High-performance columnar operations
188
print("Performing columnar analytics...")
189
190
# Aggregations using Arrow compute functions
191
total_revenue = pc.sum(table.column('total_amount'))
192
avg_order_size = pc.mean(table.column('total_amount'))
193
max_quantity = pc.max(table.column('quantity'))
194
195
print(f"Total Revenue: ${total_revenue.as_py():,.2f}")
196
print(f"Average Order Size: ${avg_order_size.as_py():.2f}")
197
print(f"Max Quantity: {max_quantity.as_py()}")
198
199
# Group by operations
200
grouped = table.group_by('customer_segment').aggregate([
201
('total_amount', 'sum'),
202
('quantity', 'mean'),
203
('product_id', 'count')
204
])
205
206
print("\nRevenue by Customer Segment:")
207
for i in range(len(grouped)):
208
segment = grouped.column('customer_segment')[i].as_py()
209
revenue = grouped.column('total_amount_sum')[i].as_py()
210
print(f"{segment}: ${revenue:,.2f}")
211
212
cursor.close()
213
conn.close()
214
```
215
216
### Columnar Data Processing Pipeline
217
218
```python
219
from pyathena import connect
220
from pyathena.arrow.cursor import ArrowCursor
221
import pyarrow as pa
222
import pyarrow.compute as pc
223
import pyarrow.parquet as pq
224
225
def process_sales_data():
226
conn = connect(
227
s3_staging_dir="s3://my-bucket/athena-results/",
228
region_name="us-west-2",
229
cursor_class=ArrowCursor
230
)
231
232
cursor = conn.cursor()
233
234
# Extract data
235
cursor.execute("""
236
SELECT
237
customer_id,
238
product_category,
239
sale_amount,
240
sale_date,
241
region
242
FROM sales_transactions
243
WHERE sale_date >= CURRENT_DATE - INTERVAL '30' DAY
244
""")
245
246
table = cursor.fetchall()
247
248
# Data transformations using Arrow
249
# Add computed columns
250
table = table.add_column(
251
len(table.column_names),
252
'month',
253
pc.month(table.column('sale_date'))
254
)
255
256
# Filter operations
257
high_value_sales = pc.filter(
258
table,
259
pc.greater(table.column('sale_amount'), pa.scalar(1000))
260
)
261
262
print(f"High value sales count: {len(high_value_sales)}")
263
264
# Export to various formats
265
# Parquet
266
pq.write_table(high_value_sales, 'high_value_sales.parquet')
267
268
# CSV
269
high_value_sales.to_pandas().to_csv('high_value_sales.csv', index=False)
270
271
# JSON
272
with open('high_value_sales.json', 'w') as f:
273
f.write(high_value_sales.to_pandas().to_json(orient='records'))
274
275
cursor.close()
276
conn.close()
277
278
return high_value_sales
279
280
# Process data
281
result_table = process_sales_data()
282
print(f"Processed {len(result_table)} high-value sales records")
283
```
284
285
### Integration with Arrow Ecosystem
286
287
```python
288
from pyathena import connect
289
from pyathena.arrow.cursor import ArrowCursor
290
import pyarrow as pa
291
import pyarrow.compute as pc
292
import pyarrow.dataset as ds
293
import pyarrow.flight as flight
294
295
# Data extraction from Athena
296
conn = connect(
297
s3_staging_dir="s3://my-bucket/athena-results/",
298
region_name="us-west-2",
299
cursor_class=ArrowCursor
300
)
301
302
cursor = conn.cursor()
303
cursor.execute("SELECT * FROM customer_analytics")
304
customer_table = cursor.fetchall()
305
306
# Create Arrow dataset for efficient querying
307
dataset = ds.InMemoryDataset(customer_table)
308
309
# Advanced filtering and projection
310
filtered_data = dataset.to_table(
311
filter=pc.and_(
312
pc.greater(ds.field('age'), pa.scalar(25)),
313
pc.equal(ds.field('active'), pa.scalar(True))
314
),
315
columns=['customer_id', 'age', 'total_spend', 'last_purchase_date']
316
)
317
318
print(f"Filtered customers: {len(filtered_data)}")
319
320
# Statistical analysis
321
stats = {
322
'mean_age': pc.mean(filtered_data.column('age')),
323
'mean_spend': pc.mean(filtered_data.column('total_spend')),
324
'total_customers': len(filtered_data)
325
}
326
327
for metric, value in stats.items():
328
print(f"{metric}: {value.as_py()}")
329
330
cursor.close()
331
conn.close()
332
```
333
334
### Memory-Efficient Streaming
335
336
```python
337
from pyathena import connect
338
from pyathena.arrow.cursor import ArrowCursor
339
import pyarrow as pa
340
341
def stream_process_large_dataset():
342
conn = connect(
343
s3_staging_dir="s3://my-bucket/athena-results/",
344
region_name="us-west-2",
345
cursor_class=ArrowCursor
346
)
347
348
cursor = conn.cursor()
349
cursor.execute("SELECT * FROM large_transaction_table")
350
351
# Stream processing for memory efficiency
352
batch_size = 10000
353
total_processed = 0
354
running_sum = 0
355
356
while True:
357
batch = cursor.fetchmany(batch_size)
358
if len(batch) == 0:
359
break
360
361
# Process batch
362
batch_sum = pa.compute.sum(batch.column('amount')).as_py()
363
running_sum += batch_sum
364
total_processed += len(batch)
365
366
print(f"Processed {total_processed} rows, running sum: ${running_sum:,.2f}")
367
368
print(f"Final total: ${running_sum:,.2f} from {total_processed} transactions")
369
370
cursor.close()
371
conn.close()
372
373
stream_process_large_dataset()
374
```
375
376
### Async Arrow Processing
377
378
```python
379
import asyncio
380
from pyathena import connect
381
from pyathena.arrow.async_cursor import AsyncArrowCursor
382
import pyarrow as pa
383
import pyarrow.compute as pc
384
385
async def concurrent_arrow_queries():
386
conn = connect(
387
s3_staging_dir="s3://my-bucket/athena-results/",
388
region_name="us-west-2",
389
cursor_class=AsyncArrowCursor
390
)
391
392
cursor = conn.cursor()
393
394
# Multiple analytical queries
395
queries = [
396
("daily_sales", "SELECT sale_date, SUM(amount) as daily_total FROM sales GROUP BY sale_date"),
397
("product_metrics", "SELECT product_id, COUNT(*) as sales_count FROM sales GROUP BY product_id"),
398
("customer_stats", "SELECT customer_segment, AVG(amount) as avg_spend FROM sales GROUP BY customer_segment")
399
]
400
401
# Execute all queries concurrently
402
futures = {}
403
for name, query in queries:
404
query_id, future = cursor.execute(query)
405
futures[name] = future
406
407
# Collect results
408
results = {}
409
for name, future in futures.items():
410
table = await future
411
results[name] = table
412
print(f"{name}: {len(table)} rows")
413
414
# Perform cross-table analytics
415
total_daily_sales = pc.sum(results['daily_sales'].column('daily_total'))
416
unique_products = len(results['product_metrics'])
417
418
print(f"Total sales across all days: ${total_daily_sales.as_py():,.2f}")
419
print(f"Unique products sold: {unique_products}")
420
421
cursor.close()
422
conn.close()
423
424
# Run async processing
425
asyncio.run(concurrent_arrow_queries())
426
```
427
428
## Type Conversion
429
430
PyAthena maps Athena data types to appropriate Arrow types:
431
432
- `boolean` → `pa.bool_()`
433
- `tinyint` → `pa.int8()`
434
- `smallint` → `pa.int16()`
435
- `integer` → `pa.int32()`
436
- `bigint` → `pa.int64()`
437
- `real` → `pa.float32()`
438
- `double` → `pa.float64()`
439
- `decimal` → `pa.decimal128()` or `pa.decimal256()`
440
- `varchar`, `char` → `pa.string()`
441
- `date` → `pa.date32()`
442
- `timestamp` → `pa.timestamp('ns')`
443
- `array` → `pa.list_()`
444
- `map` → `pa.map_()`
445
- `row` → `pa.struct()`
446
447
## Performance Benefits
448
449
Arrow integration provides several performance advantages:
450
451
- **Columnar Storage**: Efficient memory layout for analytical operations
452
- **Zero-Copy Operations**: Minimal data copying during transformations
453
- **Vectorized Computing**: SIMD-optimized operations via Arrow compute functions
454
- **Memory Efficiency**: Compact representation of typed data
455
- **Interoperability**: Seamless integration with other Arrow-based tools
456
- **Parallel Processing**: Built-in support for parallel operations
457
458
## Arrow Ecosystem Integration
459
460
PyAthena's Arrow support enables integration with:
461
462
- **Apache Arrow**: Core columnar processing capabilities
463
- **PyArrow**: Python Arrow bindings and computation kernels
464
- **Arrow Flight**: High-performance data transport
465
- **Parquet**: Efficient columnar file format
466
- **Pandas**: Convert to/from DataFrames when needed
467
- **Polars**: High-performance DataFrame library
468
- **DuckDB**: In-process analytical database