or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

arrow-integration.mdasync-operations.mdcore-database.mdindex.mdpandas-integration.mdspark-integration.mdsqlalchemy-integration.md

arrow-integration.mddocs/

0

# PyArrow Integration

1

2

Native PyArrow Table support for columnar data processing, providing optimal performance for analytical workloads and seamless integration with the Arrow ecosystem. Ideal for high-performance analytics and data interchange.

3

4

## Installation

5

6

```bash

7

pip install PyAthena[Arrow]

8

```

9

10

## Capabilities

11

12

### Arrow Cursor

13

14

Cursor that returns query results as PyArrow Tables, providing columnar data format optimized for analytical operations and memory efficiency.

15

16

```python { .api }

17

class ArrowCursor:

18

arraysize: int

19

description: Optional[List[Tuple]]

20

rowcount: int

21

22

def execute(self, operation: str, parameters=None, **kwargs) -> ArrowCursor:

23

"""

24

Execute a SQL statement with Arrow Table result processing.

25

26

Parameters:

27

- operation: SQL query string

28

- parameters: Query parameters (dict or sequence)

29

- **kwargs: Additional execution options

30

31

Returns:

32

Self for method chaining

33

"""

34

35

def fetchone(self) -> Optional[Table]:

36

"""

37

Fetch the next chunk as an Arrow Table.

38

39

Returns:

40

Arrow Table with single chunk or None if no more data

41

"""

42

43

def fetchmany(self, size: Optional[int] = None) -> Table:

44

"""

45

Fetch multiple rows as an Arrow Table.

46

47

Parameters:

48

- size: Number of rows to fetch (default: arraysize)

49

50

Returns:

51

Arrow Table containing the requested rows

52

"""

53

54

def fetchall(self) -> Table:

55

"""

56

Fetch all remaining rows as a single Arrow Table.

57

58

Returns:

59

Arrow Table containing all remaining rows

60

"""

61

62

def as_arrow(self) -> Table:

63

"""

64

Return results as Arrow Table.

65

66

Returns:

67

PyArrow Table with all query results

68

"""

69

70

def cancel(self) -> None:

71

"""Cancel the currently executing query."""

72

73

def close(self) -> None:

74

"""Close the cursor and free resources."""

75

```

76

77

### Async Arrow Cursor

78

79

Asynchronous version of ArrowCursor for non-blocking operations with Future-based API.

80

81

```python { .api }

82

class AsyncArrowCursor:

83

def execute(self, operation: str, parameters=None, **kwargs) -> Tuple[str, Future[Table]]:

84

"""

85

Execute query asynchronously returning query ID and Future.

86

87

Parameters:

88

- operation: SQL query string

89

- parameters: Query parameters

90

91

Returns:

92

Tuple of (query_id, Future[Table])

93

"""

94

95

def cancel(self, query_id: str) -> Future[None]:

96

"""Cancel query by ID asynchronously."""

97

98

def close(self, wait: bool = False) -> None:

99

"""Close cursor, optionally waiting for running queries."""

100

```

101

102

### Arrow Result Set

103

104

Specialized result set class optimized for PyArrow Table creation with efficient columnar data processing.

105

106

```python { .api }

107

class AthenaArrowResultSet:

108

def as_arrow(self) -> Table:

109

"""Convert result set to PyArrow Table."""

110

111

def fetchone_arrow(self) -> Optional[Table]:

112

"""Fetch single chunk as Arrow Table."""

113

114

def fetchmany_arrow(self, size: int) -> Table:

115

"""Fetch multiple rows as Arrow Table."""

116

117

def fetchall_arrow(self) -> Table:

118

"""Fetch all rows as Arrow Table."""

119

```

120

121

## Usage Examples

122

123

### Basic Arrow Table Query

124

125

```python

126

from pyathena import connect

127

from pyathena.arrow.cursor import ArrowCursor

128

import pyarrow as pa

129

130

# Connect with Arrow cursor

131

conn = connect(

132

s3_staging_dir="s3://my-bucket/athena-results/",

133

region_name="us-west-2",

134

cursor_class=ArrowCursor

135

)

136

137

cursor = conn.cursor()

138

cursor.execute("SELECT * FROM sales_data WHERE year = 2023")

139

140

# Get results as Arrow Table

141

table = cursor.fetchall()

142

print(f"Table shape: {table.shape}")

143

print(f"Columns: {table.column_names}")

144

print(f"Schema: {table.schema}")

145

146

# Access column data

147

revenue_column = table.column('revenue')

148

print(f"Revenue column type: {revenue_column.type}")

149

print(f"Revenue sum: {pa.compute.sum(revenue_column)}")

150

151

cursor.close()

152

conn.close()

153

```

154

155

### High-Performance Analytics

156

157

```python

158

from pyathena import connect

159

from pyathena.arrow.cursor import ArrowCursor

160

import pyarrow as pa

161

import pyarrow.compute as pc

162

163

conn = connect(

164

s3_staging_dir="s3://my-bucket/athena-results/",

165

region_name="us-west-2",

166

cursor_class=ArrowCursor

167

)

168

169

cursor = conn.cursor()

170

171

# Complex analytical query

172

query = """

173

SELECT

174

product_id,

175

sale_date,

176

quantity,

177

unit_price,

178

quantity * unit_price as total_amount,

179

customer_segment

180

FROM sales_data

181

WHERE sale_date >= DATE '2023-01-01'

182

"""

183

184

cursor.execute(query)

185

table = cursor.fetchall()

186

187

# High-performance columnar operations

188

print("Performing columnar analytics...")

189

190

# Aggregations using Arrow compute functions

191

total_revenue = pc.sum(table.column('total_amount'))

192

avg_order_size = pc.mean(table.column('total_amount'))

193

max_quantity = pc.max(table.column('quantity'))

194

195

print(f"Total Revenue: ${total_revenue.as_py():,.2f}")

196

print(f"Average Order Size: ${avg_order_size.as_py():.2f}")

197

print(f"Max Quantity: {max_quantity.as_py()}")

198

199

# Group by operations

200

grouped = table.group_by('customer_segment').aggregate([

201

('total_amount', 'sum'),

202

('quantity', 'mean'),

203

('product_id', 'count')

204

])

205

206

print("\nRevenue by Customer Segment:")

207

for i in range(len(grouped)):

208

segment = grouped.column('customer_segment')[i].as_py()

209

revenue = grouped.column('total_amount_sum')[i].as_py()

210

print(f"{segment}: ${revenue:,.2f}")

211

212

cursor.close()

213

conn.close()

214

```

215

216

### Columnar Data Processing Pipeline

217

218

```python

219

from pyathena import connect

220

from pyathena.arrow.cursor import ArrowCursor

221

import pyarrow as pa

222

import pyarrow.compute as pc

223

import pyarrow.parquet as pq

224

225

def process_sales_data():

226

conn = connect(

227

s3_staging_dir="s3://my-bucket/athena-results/",

228

region_name="us-west-2",

229

cursor_class=ArrowCursor

230

)

231

232

cursor = conn.cursor()

233

234

# Extract data

235

cursor.execute("""

236

SELECT

237

customer_id,

238

product_category,

239

sale_amount,

240

sale_date,

241

region

242

FROM sales_transactions

243

WHERE sale_date >= CURRENT_DATE - INTERVAL '30' DAY

244

""")

245

246

table = cursor.fetchall()

247

248

# Data transformations using Arrow

249

# Add computed columns

250

table = table.add_column(

251

len(table.column_names),

252

'month',

253

pc.month(table.column('sale_date'))

254

)

255

256

# Filter operations

257

high_value_sales = pc.filter(

258

table,

259

pc.greater(table.column('sale_amount'), pa.scalar(1000))

260

)

261

262

print(f"High value sales count: {len(high_value_sales)}")

263

264

# Export to various formats

265

# Parquet

266

pq.write_table(high_value_sales, 'high_value_sales.parquet')

267

268

# CSV

269

high_value_sales.to_pandas().to_csv('high_value_sales.csv', index=False)

270

271

# JSON

272

with open('high_value_sales.json', 'w') as f:

273

f.write(high_value_sales.to_pandas().to_json(orient='records'))

274

275

cursor.close()

276

conn.close()

277

278

return high_value_sales

279

280

# Process data

281

result_table = process_sales_data()

282

print(f"Processed {len(result_table)} high-value sales records")

283

```

284

285

### Integration with Arrow Ecosystem

286

287

```python

288

from pyathena import connect

289

from pyathena.arrow.cursor import ArrowCursor

290

import pyarrow as pa

291

import pyarrow.compute as pc

292

import pyarrow.dataset as ds

293

import pyarrow.flight as flight

294

295

# Data extraction from Athena

296

conn = connect(

297

s3_staging_dir="s3://my-bucket/athena-results/",

298

region_name="us-west-2",

299

cursor_class=ArrowCursor

300

)

301

302

cursor = conn.cursor()

303

cursor.execute("SELECT * FROM customer_analytics")

304

customer_table = cursor.fetchall()

305

306

# Create Arrow dataset for efficient querying

307

dataset = ds.InMemoryDataset(customer_table)

308

309

# Advanced filtering and projection

310

filtered_data = dataset.to_table(

311

filter=pc.and_(

312

pc.greater(ds.field('age'), pa.scalar(25)),

313

pc.equal(ds.field('active'), pa.scalar(True))

314

),

315

columns=['customer_id', 'age', 'total_spend', 'last_purchase_date']

316

)

317

318

print(f"Filtered customers: {len(filtered_data)}")

319

320

# Statistical analysis

321

stats = {

322

'mean_age': pc.mean(filtered_data.column('age')),

323

'mean_spend': pc.mean(filtered_data.column('total_spend')),

324

'total_customers': len(filtered_data)

325

}

326

327

for metric, value in stats.items():

328

print(f"{metric}: {value.as_py()}")

329

330

cursor.close()

331

conn.close()

332

```

333

334

### Memory-Efficient Streaming

335

336

```python

337

from pyathena import connect

338

from pyathena.arrow.cursor import ArrowCursor

339

import pyarrow as pa

340

341

def stream_process_large_dataset():

342

conn = connect(

343

s3_staging_dir="s3://my-bucket/athena-results/",

344

region_name="us-west-2",

345

cursor_class=ArrowCursor

346

)

347

348

cursor = conn.cursor()

349

cursor.execute("SELECT * FROM large_transaction_table")

350

351

# Stream processing for memory efficiency

352

batch_size = 10000

353

total_processed = 0

354

running_sum = 0

355

356

while True:

357

batch = cursor.fetchmany(batch_size)

358

if len(batch) == 0:

359

break

360

361

# Process batch

362

batch_sum = pa.compute.sum(batch.column('amount')).as_py()

363

running_sum += batch_sum

364

total_processed += len(batch)

365

366

print(f"Processed {total_processed} rows, running sum: ${running_sum:,.2f}")

367

368

print(f"Final total: ${running_sum:,.2f} from {total_processed} transactions")

369

370

cursor.close()

371

conn.close()

372

373

stream_process_large_dataset()

374

```

375

376

### Async Arrow Processing

377

378

```python

379

import asyncio

380

from pyathena import connect

381

from pyathena.arrow.async_cursor import AsyncArrowCursor

382

import pyarrow as pa

383

import pyarrow.compute as pc

384

385

async def concurrent_arrow_queries():

386

conn = connect(

387

s3_staging_dir="s3://my-bucket/athena-results/",

388

region_name="us-west-2",

389

cursor_class=AsyncArrowCursor

390

)

391

392

cursor = conn.cursor()

393

394

# Multiple analytical queries

395

queries = [

396

("daily_sales", "SELECT sale_date, SUM(amount) as daily_total FROM sales GROUP BY sale_date"),

397

("product_metrics", "SELECT product_id, COUNT(*) as sales_count FROM sales GROUP BY product_id"),

398

("customer_stats", "SELECT customer_segment, AVG(amount) as avg_spend FROM sales GROUP BY customer_segment")

399

]

400

401

# Execute all queries concurrently

402

futures = {}

403

for name, query in queries:

404

query_id, future = cursor.execute(query)

405

futures[name] = future

406

407

# Collect results

408

results = {}

409

for name, future in futures.items():

410

table = await future

411

results[name] = table

412

print(f"{name}: {len(table)} rows")

413

414

# Perform cross-table analytics

415

total_daily_sales = pc.sum(results['daily_sales'].column('daily_total'))

416

unique_products = len(results['product_metrics'])

417

418

print(f"Total sales across all days: ${total_daily_sales.as_py():,.2f}")

419

print(f"Unique products sold: {unique_products}")

420

421

cursor.close()

422

conn.close()

423

424

# Run async processing

425

asyncio.run(concurrent_arrow_queries())

426

```

427

428

## Type Conversion

429

430

PyAthena maps Athena data types to appropriate Arrow types:

431

432

- `boolean``pa.bool_()`

433

- `tinyint``pa.int8()`

434

- `smallint``pa.int16()`

435

- `integer``pa.int32()`

436

- `bigint``pa.int64()`

437

- `real``pa.float32()`

438

- `double``pa.float64()`

439

- `decimal``pa.decimal128()` or `pa.decimal256()`

440

- `varchar`, `char``pa.string()`

441

- `date``pa.date32()`

442

- `timestamp``pa.timestamp('ns')`

443

- `array``pa.list_()`

444

- `map``pa.map_()`

445

- `row``pa.struct()`

446

447

## Performance Benefits

448

449

Arrow integration provides several performance advantages:

450

451

- **Columnar Storage**: Efficient memory layout for analytical operations

452

- **Zero-Copy Operations**: Minimal data copying during transformations

453

- **Vectorized Computing**: SIMD-optimized operations via Arrow compute functions

454

- **Memory Efficiency**: Compact representation of typed data

455

- **Interoperability**: Seamless integration with other Arrow-based tools

456

- **Parallel Processing**: Built-in support for parallel operations

457

458

## Arrow Ecosystem Integration

459

460

PyAthena's Arrow support enables integration with:

461

462

- **Apache Arrow**: Core columnar processing capabilities

463

- **PyArrow**: Python Arrow bindings and computation kernels

464

- **Arrow Flight**: High-performance data transport

465

- **Parquet**: Efficient columnar file format

466

- **Pandas**: Convert to/from DataFrames when needed

467

- **Polars**: High-performance DataFrame library

468

- **DuckDB**: In-process analytical database