or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

arrow-integration.mdasync-operations.mdcore-database.mdindex.mdpandas-integration.mdspark-integration.mdsqlalchemy-integration.md

pandas-integration.mddocs/

0

# Pandas Integration

1

2

High-performance integration with pandas DataFrames, enabling direct query result processing as DataFrames with support for chunked processing of large datasets. Ideal for data analysis workflows and scientific computing applications.

3

4

## Installation

5

6

```bash

7

pip install PyAthena[Pandas]

8

```

9

10

## Capabilities

11

12

### Pandas Cursor

13

14

Cursor that returns query results as pandas DataFrames instead of tuples or dictionaries, with full support for Athena's data types and automatic type conversion.

15

16

```python { .api }

17

class PandasCursor:

18

arraysize: int

19

description: Optional[List[Tuple]]

20

rowcount: int

21

rownumber: Optional[int]

22

query_id: Optional[str]

23

result_set: Optional[AthenaPandasResultSet]

24

25

def execute(

26

self,

27

operation: str,

28

parameters: Optional[Union[Dict[str, Any], List[str]]] = None,

29

work_group: Optional[str] = None,

30

s3_staging_dir: Optional[str] = None,

31

cache_size: Optional[int] = 0,

32

cache_expiration_time: Optional[int] = 0,

33

result_reuse_enable: Optional[bool] = None,

34

result_reuse_minutes: Optional[int] = None,

35

paramstyle: Optional[str] = None,

36

keep_default_na: bool = False,

37

na_values: Optional[Iterable[str]] = ("",),

38

quoting: int = 1,

39

on_start_query_execution: Optional[Callable[[str], None]] = None,

40

**kwargs

41

) -> PandasCursor:

42

"""

43

Execute a SQL statement with DataFrame result processing.

44

45

Parameters:

46

- operation: SQL query string

47

- parameters: Query parameters (dict or sequence)

48

- work_group: Athena workgroup for execution

49

- s3_staging_dir: S3 location for query results

50

- cache_size: Query result cache size

51

- cache_expiration_time: Cache expiration time in seconds

52

- result_reuse_enable: Enable query result reuse

53

- result_reuse_minutes: Result reuse duration in minutes

54

- paramstyle: Parameter substitution style

55

- keep_default_na: Keep pandas default NA values

56

- na_values: Additional NA values to recognize

57

- quoting: CSV quoting behavior (QUOTE_MINIMAL=0, QUOTE_ALL=1, etc.)

58

- on_start_query_execution: Callback for query start

59

- **kwargs: Additional arguments passed to pandas.read_csv

60

61

Returns:

62

Self for method chaining

63

"""

64

65

def executemany(

66

self,

67

operation: str,

68

seq_of_parameters: List[Optional[Union[Dict[str, Any], List[str]]]],

69

**kwargs

70

) -> None:

71

"""Execute a SQL statement multiple times with different parameters."""

72

73

def fetchone(self) -> Optional[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]:

74

"""

75

Fetch the next row from result set (standard DB API method).

76

77

Returns:

78

Single row as tuple/dict or None if no more rows

79

"""

80

81

def fetchmany(self, size: Optional[int] = None) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]:

82

"""

83

Fetch multiple rows from result set (standard DB API method).

84

85

Parameters:

86

- size: Number of rows to fetch (default: arraysize)

87

88

Returns:

89

List of rows as tuples/dicts

90

"""

91

92

def fetchall(self) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]:

93

"""

94

Fetch all remaining rows from result set (standard DB API method).

95

96

Returns:

97

List of all remaining rows as tuples/dicts

98

"""

99

100

def as_pandas(self) -> Union[DataFrame, DataFrameIterator]:

101

"""

102

Return results as pandas DataFrame or iterator for large datasets.

103

104

Returns:

105

DataFrame for small results or when chunksize=None,

106

DataFrameIterator for chunked processing when chunksize is set

107

"""

108

109

def iter_chunks(self) -> Generator[DataFrame, None, None]:

110

"""

111

Iterate over result set in DataFrame chunks for memory-efficient processing.

112

113

Chunking behavior depends on cursor configuration:

114

- If chunksize is explicitly set, uses that value

115

- If auto_optimize_chunksize=True and chunksize=None, determines optimal chunksize

116

- If auto_optimize_chunksize=False and chunksize=None, yields entire DataFrame

117

118

Yields:

119

DataFrame chunks when chunking is enabled, or entire DataFrame as single chunk

120

"""

121

122

def cancel(self) -> None:

123

"""Cancel the currently executing query."""

124

125

def close(self) -> None:

126

"""Close the cursor and free associated resources."""

127

```

128

129

### DataFrame Iterator

130

131

Memory-efficient iterator for processing large query results in chunks without loading entire result set into memory.

132

133

```python { .api }

134

class DataFrameIterator:

135

def __iter__(self) -> Iterator[DataFrame]:

136

"""Iterator protocol for DataFrame chunks."""

137

138

def __next__(self) -> DataFrame:

139

"""Get next DataFrame chunk."""

140

141

def to_pandas(self) -> DataFrame:

142

"""

143

Concatenate all chunks into single DataFrame.

144

145

Warning: May consume large amounts of memory for big datasets.

146

147

Returns:

148

Single DataFrame containing all data

149

"""

150

```

151

152

### Pandas Result Set

153

154

Specialized result set class optimized for pandas DataFrame creation with efficient type conversion and memory management.

155

156

```python { .api }

157

class AthenaPandasResultSet:

158

def as_pandas(self) -> DataFrame:

159

"""Convert result set to pandas DataFrame."""

160

161

def fetchone_pandas(self) -> Optional[DataFrame]:

162

"""Fetch single row as DataFrame."""

163

164

def fetchmany_pandas(self, size: int) -> DataFrame:

165

"""Fetch multiple rows as DataFrame."""

166

167

def fetchall_pandas(self) -> DataFrame:

168

"""Fetch all rows as DataFrame."""

169

```

170

171

## Usage Examples

172

173

### Basic DataFrame Query

174

175

```python

176

from pyathena import connect

177

from pyathena.pandas.cursor import PandasCursor

178

import pandas as pd

179

180

# Connect with pandas cursor

181

conn = connect(

182

s3_staging_dir="s3://my-bucket/athena-results/",

183

region_name="us-west-2",

184

cursor_class=PandasCursor

185

)

186

187

cursor = conn.cursor()

188

cursor.execute("SELECT * FROM sales_data WHERE year = 2023")

189

190

# Get results as DataFrame

191

df = cursor.fetchall()

192

print(df.head())

193

print(f"Shape: {df.shape}")

194

print(f"Columns: {df.columns.tolist()}")

195

196

cursor.close()

197

conn.close()

198

```

199

200

### Chunked Processing for Large Datasets

201

202

```python

203

from pyathena import connect

204

from pyathena.pandas.cursor import PandasCursor

205

206

conn = connect(

207

s3_staging_dir="s3://my-bucket/athena-results/",

208

region_name="us-west-2",

209

cursor_class=PandasCursor

210

)

211

212

cursor = conn.cursor()

213

cursor.execute("SELECT * FROM large_table")

214

215

# Process in chunks to manage memory

216

total_rows = 0

217

for chunk in cursor.iter_chunks():

218

# Process each chunk

219

print(f"Processing chunk with {len(chunk)} rows")

220

221

# Example: calculate statistics per chunk

222

chunk_stats = chunk.describe()

223

print(chunk_stats)

224

225

total_rows += len(chunk)

226

227

print(f"Total rows processed: {total_rows}")

228

cursor.close()

229

conn.close()

230

```

231

232

### Advanced DataFrame Operations

233

234

```python

235

from pyathena import connect

236

from pyathena.pandas.cursor import PandasCursor

237

import pandas as pd

238

239

conn = connect(

240

s3_staging_dir="s3://my-bucket/athena-results/",

241

region_name="us-west-2",

242

cursor_class=PandasCursor

243

)

244

245

cursor = conn.cursor()

246

247

# Complex analytical query

248

query = """

249

SELECT

250

customer_id,

251

product_category,

252

SUM(amount) as total_amount,

253

COUNT(*) as transaction_count,

254

AVG(amount) as avg_amount

255

FROM transactions

256

WHERE date_column >= DATE '2023-01-01'

257

GROUP BY customer_id, product_category

258

ORDER BY total_amount DESC

259

"""

260

261

cursor.execute(query)

262

df = cursor.fetchall()

263

264

# Pandas operations on query results

265

# Pivot table

266

pivot_df = df.pivot_table(

267

values='total_amount',

268

index='customer_id',

269

columns='product_category',

270

fill_value=0

271

)

272

273

# Statistical analysis

274

correlation_matrix = df[['total_amount', 'transaction_count', 'avg_amount']].corr()

275

print("Correlation Matrix:")

276

print(correlation_matrix)

277

278

# Data visualization preparation

279

top_customers = df.nlargest(10, 'total_amount')

280

print("Top 10 Customers:")

281

print(top_customers[['customer_id', 'total_amount']])

282

283

cursor.close()

284

conn.close()

285

```

286

287

### Asynchronous DataFrame Processing

288

289

```python

290

import asyncio

291

from pyathena import connect

292

from pyathena.pandas.async_cursor import AsyncPandasCursor

293

294

async def async_query_example():

295

conn = connect(

296

s3_staging_dir="s3://my-bucket/athena-results/",

297

region_name="us-west-2",

298

cursor_class=AsyncPandasCursor

299

)

300

301

cursor = conn.cursor()

302

303

# Execute multiple queries concurrently

304

queries = [

305

"SELECT COUNT(*) as count FROM table1",

306

"SELECT AVG(amount) as avg_amount FROM table2",

307

"SELECT MAX(date_column) as max_date FROM table3"

308

]

309

310

# Start all queries

311

futures = []

312

for query in queries:

313

query_id, future = cursor.execute(query)

314

futures.append(future)

315

316

# Wait for all results

317

results = await asyncio.gather(*futures)

318

319

for i, df in enumerate(results):

320

print(f"Query {i+1} result:")

321

print(df)

322

323

cursor.close()

324

conn.close()

325

326

# Run async example

327

asyncio.run(async_query_example())

328

```

329

330

### Memory-Efficient Large Dataset Processing

331

332

```python

333

from pyathena import connect

334

from pyathena.pandas.cursor import PandasCursor

335

import pandas as pd

336

337

def process_large_dataset():

338

conn = connect(

339

s3_staging_dir="s3://my-bucket/athena-results/",

340

region_name="us-west-2",

341

cursor_class=PandasCursor

342

)

343

344

cursor = conn.cursor()

345

cursor.execute("SELECT * FROM very_large_table")

346

347

# Use iterator to avoid loading entire dataset

348

results_summary = {

349

'total_rows': 0,

350

'total_revenue': 0,

351

'unique_customers': set()

352

}

353

354

# Process chunk by chunk

355

for chunk_df in cursor.iter_chunks():

356

results_summary['total_rows'] += len(chunk_df)

357

results_summary['total_revenue'] += chunk_df['revenue'].sum()

358

results_summary['unique_customers'].update(chunk_df['customer_id'].unique())

359

360

# Optional: save intermediate results

361

# chunk_df.to_parquet(f'chunk_{chunk_number}.parquet')

362

363

# Final summary

364

results_summary['unique_customers'] = len(results_summary['unique_customers'])

365

366

print("Dataset Summary:")

367

for key, value in results_summary.items():

368

print(f"{key}: {value}")

369

370

cursor.close()

371

conn.close()

372

373

process_large_dataset()

374

```

375

376

### Integration with Data Science Workflow

377

378

```python

379

from pyathena import connect

380

from pyathena.pandas.cursor import PandasCursor

381

import pandas as pd

382

import numpy as np

383

from sklearn.preprocessing import StandardScaler

384

from sklearn.model_selection import train_test_split

385

386

# Data extraction for ML pipeline

387

conn = connect(

388

s3_staging_dir="s3://my-bucket/athena-results/",

389

region_name="us-west-2",

390

cursor_class=PandasCursor

391

)

392

393

cursor = conn.cursor()

394

395

# Feature engineering query

396

feature_query = """

397

SELECT

398

customer_id,

399

age,

400

income,

401

account_balance,

402

num_transactions_last_month,

403

avg_transaction_amount,

404

days_since_last_transaction,

405

CASE WHEN churned = 'yes' THEN 1 ELSE 0 END as target

406

FROM customer_features

407

WHERE feature_date = CURRENT_DATE - INTERVAL '1' DAY

408

"""

409

410

cursor.execute(feature_query)

411

df = cursor.fetchall()

412

413

# Prepare features for machine learning

414

feature_columns = [

415

'age', 'income', 'account_balance',

416

'num_transactions_last_month', 'avg_transaction_amount',

417

'days_since_last_transaction'

418

]

419

420

X = df[feature_columns]

421

y = df['target']

422

423

# Preprocessing

424

scaler = StandardScaler()

425

X_scaled = scaler.fit_transform(X)

426

427

# Train/test split

428

X_train, X_test, y_train, y_test = train_test_split(

429

X_scaled, y, test_size=0.2, random_state=42

430

)

431

432

print(f"Training set shape: {X_train.shape}")

433

print(f"Test set shape: {X_test.shape}")

434

print(f"Feature columns: {feature_columns}")

435

436

cursor.close()

437

conn.close()

438

```

439

440

## Type Conversion

441

442

PyAthena automatically converts Athena data types to appropriate pandas dtypes:

443

444

- `boolean``bool`

445

- `tinyint`, `smallint`, `integer`, `bigint``int64`

446

- `real`, `double`, `float``float64`

447

- `decimal``float64` or `object` (for high precision)

448

- `varchar`, `char``object` (string)

449

- `date``datetime64[ns]` (date only)

450

- `timestamp``datetime64[ns]`

451

- `array`, `map`, `row``object` (complex types as Python objects)

452

453

## Performance Considerations

454

455

- Use chunked processing (`iter_chunks()`) for datasets that may not fit in memory

456

- Set appropriate `arraysize` for optimal chunk sizes

457

- Consider using async cursors for concurrent query execution

458

- Use parameterized queries to leverage Athena's query result caching

459

- For very large datasets, consider using PyArrow integration for better performance