0
# PyAthena
1
2
A comprehensive Python DB API 2.0 (PEP 249) client for Amazon Athena that provides high-performance SQL query execution against data stored in Amazon S3. PyAthena offers multiple cursor types optimized for different data processing workflows, including pandas DataFrames, PyArrow Tables, and asynchronous operations, making it an essential tool for data engineers and analysts working with AWS data lakes.
3
4
## Package Information
5
6
- **Package Name**: PyAthena
7
- **Language**: Python
8
- **Installation**: `pip install PyAthena`
9
- **Extra Packages**:
10
- SQLAlchemy: `pip install PyAthena[SQLAlchemy]`
11
- Pandas: `pip install PyAthena[Pandas]`
12
- Arrow: `pip install PyAthena[Arrow]`
13
- fastparquet: `pip install PyAthena[fastparquet]`
14
15
## Core Imports
16
17
```python
18
import pyathena
19
from pyathena import connect
20
```
21
22
For specialized cursors:
23
24
```python
25
from pyathena.pandas.cursor import PandasCursor
26
from pyathena.arrow.cursor import ArrowCursor
27
from pyathena.async_cursor import AsyncCursor
28
from pyathena.spark.cursor import SparkCursor
29
```
30
31
## Basic Usage
32
33
```python
34
from pyathena import connect
35
36
# Connect to Athena
37
conn = connect(
38
s3_staging_dir="s3://your-bucket/results/",
39
region_name="us-west-2"
40
)
41
42
# Execute query with standard cursor
43
cursor = conn.cursor()
44
cursor.execute("SELECT * FROM your_table LIMIT 10")
45
46
# Get column information
47
print(cursor.description)
48
49
# Fetch results
50
rows = cursor.fetchall()
51
for row in rows:
52
print(row)
53
54
# Close connection
55
conn.close()
56
```
57
58
## Architecture
59
60
PyAthena is built around a flexible cursor architecture supporting multiple result formats:
61
62
- **Connection Management**: Single connection class with pluggable cursor types
63
- **Cursor Variants**: Standard, Dictionary, Pandas, Arrow, Async, and Spark cursors
64
- **Result Processing**: Memory-efficient streaming and chunked processing for large datasets
65
- **AWS Integration**: Native boto3 integration with retry logic and authentication
66
- **SQLAlchemy Support**: Complete dialect implementation with custom Athena types
67
68
This design enables PyAthena to serve both traditional database applications and modern data science workflows while maintaining DB API 2.0 compliance.
69
70
## Capabilities
71
72
### Core Database Operations
73
74
Standard DB API 2.0 compliant database operations including connection management, query execution, result fetching, and transaction handling. Provides both tuple and dictionary result formats.
75
76
```python { .api }
77
def connect(
78
s3_staging_dir: Optional[str] = None,
79
region_name: Optional[str] = None,
80
schema_name: Optional[str] = "default",
81
catalog_name: Optional[str] = "awsdatacatalog",
82
work_group: Optional[str] = None,
83
poll_interval: float = 1,
84
encryption_option: Optional[str] = None,
85
kms_key: Optional[str] = None,
86
profile_name: Optional[str] = None,
87
role_arn: Optional[str] = None,
88
role_session_name: str = "PyAthena-session-{timestamp}",
89
external_id: Optional[str] = None,
90
serial_number: Optional[str] = None,
91
duration_seconds: int = 3600,
92
converter: Optional[Converter] = None,
93
formatter: Optional[Formatter] = None,
94
retry_config: Optional[RetryConfig] = None,
95
cursor_class: Optional[Type[ConnectionCursor]] = None,
96
cursor_kwargs: Optional[Dict[str, Any]] = None,
97
kill_on_interrupt: bool = True,
98
session: Optional[Session] = None,
99
config: Optional[Config] = None,
100
result_reuse_enable: bool = False,
101
result_reuse_minutes: int = 1440,
102
on_start_query_execution: Optional[Callable[[str], None]] = None,
103
**kwargs
104
) -> Connection[ConnectionCursor]: ...
105
106
class Connection[ConnectionCursor]:
107
session: Session
108
client: BaseClient
109
retry_config: RetryConfig
110
s3_staging_dir: Optional[str]
111
region_name: Optional[str]
112
schema_name: Optional[str]
113
catalog_name: Optional[str]
114
work_group: Optional[str]
115
poll_interval: float
116
encryption_option: Optional[str]
117
kms_key: Optional[str]
118
119
def cursor(self, cursor: Optional[Type[FunctionalCursor]] = None, **kwargs) -> Union[FunctionalCursor, ConnectionCursor]: ...
120
def close(self) -> None: ...
121
def commit(self) -> None: ...
122
def rollback(self) -> None: ...
123
def __enter__(self) -> Connection[ConnectionCursor]: ...
124
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
125
126
class Cursor:
127
arraysize: int
128
description: Optional[List[Tuple[str, str, None, None, int, int, str]]]
129
rowcount: int
130
rownumber: Optional[int]
131
query_id: Optional[str]
132
result_set: Optional[AthenaResultSet]
133
134
def execute(self, operation: str, parameters: Optional[Union[Dict[str, Any], List[str]]] = None, work_group: Optional[str] = None, s3_staging_dir: Optional[str] = None, cache_size: int = 0, cache_expiration_time: int = 0, result_reuse_enable: Optional[bool] = None, result_reuse_minutes: Optional[int] = None, paramstyle: Optional[str] = None, on_start_query_execution: Optional[Callable[[str], None]] = None, **kwargs) -> Cursor: ...
135
def executemany(self, operation: str, seq_of_parameters: List[Optional[Union[Dict[str, Any], List[str]]]], **kwargs) -> None: ...
136
def fetchone(self) -> Optional[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...
137
def fetchmany(self, size: Optional[int] = None) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...
138
def fetchall(self) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...
139
def cancel(self) -> None: ...
140
def close(self) -> None: ...
141
def __iter__(self) -> Iterator[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...
142
def __next__(self) -> Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]: ...
143
144
class DictCursor(Cursor):
145
"""Cursor that returns results as dictionaries with column names as keys."""
146
dict_type: Type[Dict] = dict
147
```
148
149
[Core Database Operations](./core-database.md)
150
151
### Pandas Integration
152
153
High-performance integration with pandas DataFrames, enabling direct query result processing as DataFrames with support for chunked processing of large datasets.
154
155
```python { .api }
156
class PandasCursor:
157
arraysize: int
158
description: Optional[List[Tuple]]
159
rowcount: int
160
query_id: Optional[str]
161
result_set: Optional[AthenaPandasResultSet]
162
163
def execute(self, operation: str, parameters: Optional[Union[Dict[str, Any], List[str]]] = None, work_group: Optional[str] = None, s3_staging_dir: Optional[str] = None, cache_size: Optional[int] = 0, cache_expiration_time: Optional[int] = 0, result_reuse_enable: Optional[bool] = None, result_reuse_minutes: Optional[int] = None, paramstyle: Optional[str] = None, keep_default_na: bool = False, na_values: Optional[Iterable[str]] = ("",), quoting: int = 1, on_start_query_execution: Optional[Callable[[str], None]] = None, **kwargs) -> PandasCursor: ...
164
def executemany(self, operation: str, seq_of_parameters: List[Optional[Union[Dict[str, Any], List[str]]]], **kwargs) -> None: ...
165
def fetchone(self) -> Optional[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...
166
def fetchmany(self, size: Optional[int] = None) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...
167
def fetchall(self) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...
168
def as_pandas(self) -> Union[DataFrame, DataFrameIterator]: ...
169
def iter_chunks(self) -> Generator[DataFrame, None, None]: ...
170
def cancel(self) -> None: ...
171
def close(self) -> None: ...
172
```
173
174
[Pandas Integration](./pandas-integration.md)
175
176
### PyArrow Integration
177
178
Native PyArrow Table support for columnar data processing, providing optimal performance for analytical workloads and seamless integration with the Arrow ecosystem.
179
180
```python { .api }
181
class ArrowCursor:
182
def execute(self, operation: str, parameters=None, **kwargs) -> ArrowCursor: ...
183
def fetchall(self) -> Table: ...
184
def as_arrow(self) -> Table: ...
185
```
186
187
[PyArrow Integration](./arrow-integration.md)
188
189
### Asynchronous Operations
190
191
Full async/await support with Future-based API for non-blocking query execution, enabling concurrent query processing and integration with async frameworks.
192
193
```python { .api }
194
class AsyncCursor:
195
def execute(self, operation: str, parameters=None, **kwargs) -> Tuple[str, Future[AthenaResultSet]]: ...
196
def cancel(self, query_id: str) -> Future[None]: ...
197
def poll(self, query_id: str) -> Future[AthenaQueryExecution]: ...
198
def close(self, wait: bool = False) -> None: ...
199
```
200
201
[Asynchronous Operations](./async-operations.md)
202
203
### Spark Integration
204
205
Integration with Athena's Spark execution engine for distributed processing, Jupyter notebook compatibility, and advanced analytics workloads.
206
207
```python { .api }
208
class SparkCursor:
209
def execute(self, code: str, **kwargs) -> SparkCursor: ...
210
@property
211
def session_id(self) -> str: ...
212
@property
213
def calculation_id(self) -> Optional[str]: ...
214
@property
215
def state(self) -> Optional[str]: ...
216
```
217
218
[Spark Integration](./spark-integration.md)
219
220
### SQLAlchemy Integration
221
222
Complete SQLAlchemy dialect implementation with custom Athena types, enabling ORM support and integration with existing SQLAlchemy-based applications.
223
224
```python { .api }
225
class AthenaDialect: ...
226
class AthenaRestDialect: ...
227
class AthenaPandasDialect: ...
228
class AthenaArrowDialect: ...
229
230
# Custom Athena types
231
class TINYINT: ...
232
class STRUCT: ...
233
class MAP: ...
234
class ARRAY: ...
235
```
236
237
[SQLAlchemy Integration](./sqlalchemy-integration.md)
238
239
## Types
240
241
### DB API Constants
242
243
```python { .api }
244
apilevel: str = "2.0"
245
threadsafety: int = 2
246
paramstyle: str = "pyformat"
247
248
# Type objects for DB API 2.0
249
STRING: DBAPITypeObject
250
BINARY: DBAPITypeObject
251
BOOLEAN: DBAPITypeObject
252
NUMBER: DBAPITypeObject
253
DATE: DBAPITypeObject
254
TIME: DBAPITypeObject
255
DATETIME: DBAPITypeObject
256
JSON: DBAPITypeObject
257
```
258
259
### Core Connection Types
260
261
```python { .api }
262
ConnectionCursor = TypeVar("ConnectionCursor", bound=BaseCursor)
263
FunctionalCursor = TypeVar("FunctionalCursor", bound=BaseCursor)
264
265
class Connection[ConnectionCursor]:
266
session: Session
267
client: BaseClient
268
retry_config: RetryConfig
269
270
class Converter:
271
"""Base converter class for data type conversion."""
272
273
class Formatter:
274
"""Base formatter class for parameter formatting."""
275
276
class DefaultParameterFormatter(Formatter):
277
"""Default parameter formatter implementation."""
278
279
class RetryConfig:
280
"""Configuration for API retry logic."""
281
def __init__(
282
self,
283
exceptions: Iterable[str] = ("ThrottlingException", "TooManyRequestsException"),
284
attempt: int = 5,
285
multiplier: int = 1,
286
max_delay: int = 100,
287
exponential_base: int = 2
288
): ...
289
290
class BaseCursor:
291
"""Base cursor class with common functionality."""
292
293
class AthenaResultSet:
294
"""Result set wrapper for Athena query results."""
295
```
296
297
### Exception Hierarchy
298
299
```python { .api }
300
class Error(Exception): ...
301
class Warning(Exception): ...
302
class InterfaceError(Error): ...
303
class DatabaseError(Error): ...
304
class InternalError(DatabaseError): ...
305
class OperationalError(DatabaseError): ...
306
class ProgrammingError(DatabaseError): ...
307
class DataError(DatabaseError): ...
308
class NotSupportedError(DatabaseError): ...
309
```
310
311
### Query Execution Models
312
313
```python { .api }
314
class AthenaQueryExecution:
315
# State constants
316
STATE_QUEUED: str = "QUEUED"
317
STATE_RUNNING: str = "RUNNING"
318
STATE_SUCCEEDED: str = "SUCCEEDED"
319
STATE_FAILED: str = "FAILED"
320
STATE_CANCELLED: str = "CANCELLED"
321
322
class RetryConfig:
323
def __init__(
324
self,
325
attempts: int = 3,
326
delay: float = 1.0,
327
max_delay: float = 60.0,
328
exponential_base: float = 2.0
329
): ...
330
```