or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-pyathena

Python DB API 2.0 (PEP 249) client for Amazon Athena

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/pyathena@3.17.x

To install, run

npx @tessl/cli install tessl/pypi-pyathena@3.17.0

0

# PyAthena

1

2

A comprehensive Python DB API 2.0 (PEP 249) client for Amazon Athena that provides high-performance SQL query execution against data stored in Amazon S3. PyAthena offers multiple cursor types optimized for different data processing workflows, including pandas DataFrames, PyArrow Tables, and asynchronous operations, making it an essential tool for data engineers and analysts working with AWS data lakes.

3

4

## Package Information

5

6

- **Package Name**: PyAthena

7

- **Language**: Python

8

- **Installation**: `pip install PyAthena`

9

- **Extra Packages**:

10

- SQLAlchemy: `pip install PyAthena[SQLAlchemy]`

11

- Pandas: `pip install PyAthena[Pandas]`

12

- Arrow: `pip install PyAthena[Arrow]`

13

- fastparquet: `pip install PyAthena[fastparquet]`

14

15

## Core Imports

16

17

```python

18

import pyathena

19

from pyathena import connect

20

```

21

22

For specialized cursors:

23

24

```python

25

from pyathena.pandas.cursor import PandasCursor

26

from pyathena.arrow.cursor import ArrowCursor

27

from pyathena.async_cursor import AsyncCursor

28

from pyathena.spark.cursor import SparkCursor

29

```

30

31

## Basic Usage

32

33

```python

34

from pyathena import connect

35

36

# Connect to Athena

37

conn = connect(

38

s3_staging_dir="s3://your-bucket/results/",

39

region_name="us-west-2"

40

)

41

42

# Execute query with standard cursor

43

cursor = conn.cursor()

44

cursor.execute("SELECT * FROM your_table LIMIT 10")

45

46

# Get column information

47

print(cursor.description)

48

49

# Fetch results

50

rows = cursor.fetchall()

51

for row in rows:

52

print(row)

53

54

# Close connection

55

conn.close()

56

```

57

58

## Architecture

59

60

PyAthena is built around a flexible cursor architecture supporting multiple result formats:

61

62

- **Connection Management**: Single connection class with pluggable cursor types

63

- **Cursor Variants**: Standard, Dictionary, Pandas, Arrow, Async, and Spark cursors

64

- **Result Processing**: Memory-efficient streaming and chunked processing for large datasets

65

- **AWS Integration**: Native boto3 integration with retry logic and authentication

66

- **SQLAlchemy Support**: Complete dialect implementation with custom Athena types

67

68

This design enables PyAthena to serve both traditional database applications and modern data science workflows while maintaining DB API 2.0 compliance.

69

70

## Capabilities

71

72

### Core Database Operations

73

74

Standard DB API 2.0 compliant database operations including connection management, query execution, result fetching, and transaction handling. Provides both tuple and dictionary result formats.

75

76

```python { .api }

77

def connect(

78

s3_staging_dir: Optional[str] = None,

79

region_name: Optional[str] = None,

80

schema_name: Optional[str] = "default",

81

catalog_name: Optional[str] = "awsdatacatalog",

82

work_group: Optional[str] = None,

83

poll_interval: float = 1,

84

encryption_option: Optional[str] = None,

85

kms_key: Optional[str] = None,

86

profile_name: Optional[str] = None,

87

role_arn: Optional[str] = None,

88

role_session_name: str = "PyAthena-session-{timestamp}",

89

external_id: Optional[str] = None,

90

serial_number: Optional[str] = None,

91

duration_seconds: int = 3600,

92

converter: Optional[Converter] = None,

93

formatter: Optional[Formatter] = None,

94

retry_config: Optional[RetryConfig] = None,

95

cursor_class: Optional[Type[ConnectionCursor]] = None,

96

cursor_kwargs: Optional[Dict[str, Any]] = None,

97

kill_on_interrupt: bool = True,

98

session: Optional[Session] = None,

99

config: Optional[Config] = None,

100

result_reuse_enable: bool = False,

101

result_reuse_minutes: int = 1440,

102

on_start_query_execution: Optional[Callable[[str], None]] = None,

103

**kwargs

104

) -> Connection[ConnectionCursor]: ...

105

106

class Connection[ConnectionCursor]:

107

session: Session

108

client: BaseClient

109

retry_config: RetryConfig

110

s3_staging_dir: Optional[str]

111

region_name: Optional[str]

112

schema_name: Optional[str]

113

catalog_name: Optional[str]

114

work_group: Optional[str]

115

poll_interval: float

116

encryption_option: Optional[str]

117

kms_key: Optional[str]

118

119

def cursor(self, cursor: Optional[Type[FunctionalCursor]] = None, **kwargs) -> Union[FunctionalCursor, ConnectionCursor]: ...

120

def close(self) -> None: ...

121

def commit(self) -> None: ...

122

def rollback(self) -> None: ...

123

def __enter__(self) -> Connection[ConnectionCursor]: ...

124

def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...

125

126

class Cursor:

127

arraysize: int

128

description: Optional[List[Tuple[str, str, None, None, int, int, str]]]

129

rowcount: int

130

rownumber: Optional[int]

131

query_id: Optional[str]

132

result_set: Optional[AthenaResultSet]

133

134

def execute(self, operation: str, parameters: Optional[Union[Dict[str, Any], List[str]]] = None, work_group: Optional[str] = None, s3_staging_dir: Optional[str] = None, cache_size: int = 0, cache_expiration_time: int = 0, result_reuse_enable: Optional[bool] = None, result_reuse_minutes: Optional[int] = None, paramstyle: Optional[str] = None, on_start_query_execution: Optional[Callable[[str], None]] = None, **kwargs) -> Cursor: ...

135

def executemany(self, operation: str, seq_of_parameters: List[Optional[Union[Dict[str, Any], List[str]]]], **kwargs) -> None: ...

136

def fetchone(self) -> Optional[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...

137

def fetchmany(self, size: Optional[int] = None) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...

138

def fetchall(self) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...

139

def cancel(self) -> None: ...

140

def close(self) -> None: ...

141

def __iter__(self) -> Iterator[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...

142

def __next__(self) -> Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]: ...

143

144

class DictCursor(Cursor):

145

"""Cursor that returns results as dictionaries with column names as keys."""

146

dict_type: Type[Dict] = dict

147

```

148

149

[Core Database Operations](./core-database.md)

150

151

### Pandas Integration

152

153

High-performance integration with pandas DataFrames, enabling direct query result processing as DataFrames with support for chunked processing of large datasets.

154

155

```python { .api }

156

class PandasCursor:

157

arraysize: int

158

description: Optional[List[Tuple]]

159

rowcount: int

160

query_id: Optional[str]

161

result_set: Optional[AthenaPandasResultSet]

162

163

def execute(self, operation: str, parameters: Optional[Union[Dict[str, Any], List[str]]] = None, work_group: Optional[str] = None, s3_staging_dir: Optional[str] = None, cache_size: Optional[int] = 0, cache_expiration_time: Optional[int] = 0, result_reuse_enable: Optional[bool] = None, result_reuse_minutes: Optional[int] = None, paramstyle: Optional[str] = None, keep_default_na: bool = False, na_values: Optional[Iterable[str]] = ("",), quoting: int = 1, on_start_query_execution: Optional[Callable[[str], None]] = None, **kwargs) -> PandasCursor: ...

164

def executemany(self, operation: str, seq_of_parameters: List[Optional[Union[Dict[str, Any], List[str]]]], **kwargs) -> None: ...

165

def fetchone(self) -> Optional[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...

166

def fetchmany(self, size: Optional[int] = None) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...

167

def fetchall(self) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]: ...

168

def as_pandas(self) -> Union[DataFrame, DataFrameIterator]: ...

169

def iter_chunks(self) -> Generator[DataFrame, None, None]: ...

170

def cancel(self) -> None: ...

171

def close(self) -> None: ...

172

```

173

174

[Pandas Integration](./pandas-integration.md)

175

176

### PyArrow Integration

177

178

Native PyArrow Table support for columnar data processing, providing optimal performance for analytical workloads and seamless integration with the Arrow ecosystem.

179

180

```python { .api }

181

class ArrowCursor:

182

def execute(self, operation: str, parameters=None, **kwargs) -> ArrowCursor: ...

183

def fetchall(self) -> Table: ...

184

def as_arrow(self) -> Table: ...

185

```

186

187

[PyArrow Integration](./arrow-integration.md)

188

189

### Asynchronous Operations

190

191

Full async/await support with Future-based API for non-blocking query execution, enabling concurrent query processing and integration with async frameworks.

192

193

```python { .api }

194

class AsyncCursor:

195

def execute(self, operation: str, parameters=None, **kwargs) -> Tuple[str, Future[AthenaResultSet]]: ...

196

def cancel(self, query_id: str) -> Future[None]: ...

197

def poll(self, query_id: str) -> Future[AthenaQueryExecution]: ...

198

def close(self, wait: bool = False) -> None: ...

199

```

200

201

[Asynchronous Operations](./async-operations.md)

202

203

### Spark Integration

204

205

Integration with Athena's Spark execution engine for distributed processing, Jupyter notebook compatibility, and advanced analytics workloads.

206

207

```python { .api }

208

class SparkCursor:

209

def execute(self, code: str, **kwargs) -> SparkCursor: ...

210

@property

211

def session_id(self) -> str: ...

212

@property

213

def calculation_id(self) -> Optional[str]: ...

214

@property

215

def state(self) -> Optional[str]: ...

216

```

217

218

[Spark Integration](./spark-integration.md)

219

220

### SQLAlchemy Integration

221

222

Complete SQLAlchemy dialect implementation with custom Athena types, enabling ORM support and integration with existing SQLAlchemy-based applications.

223

224

```python { .api }

225

class AthenaDialect: ...

226

class AthenaRestDialect: ...

227

class AthenaPandasDialect: ...

228

class AthenaArrowDialect: ...

229

230

# Custom Athena types

231

class TINYINT: ...

232

class STRUCT: ...

233

class MAP: ...

234

class ARRAY: ...

235

```

236

237

[SQLAlchemy Integration](./sqlalchemy-integration.md)

238

239

## Types

240

241

### DB API Constants

242

243

```python { .api }

244

apilevel: str = "2.0"

245

threadsafety: int = 2

246

paramstyle: str = "pyformat"

247

248

# Type objects for DB API 2.0

249

STRING: DBAPITypeObject

250

BINARY: DBAPITypeObject

251

BOOLEAN: DBAPITypeObject

252

NUMBER: DBAPITypeObject

253

DATE: DBAPITypeObject

254

TIME: DBAPITypeObject

255

DATETIME: DBAPITypeObject

256

JSON: DBAPITypeObject

257

```

258

259

### Core Connection Types

260

261

```python { .api }

262

ConnectionCursor = TypeVar("ConnectionCursor", bound=BaseCursor)

263

FunctionalCursor = TypeVar("FunctionalCursor", bound=BaseCursor)

264

265

class Connection[ConnectionCursor]:

266

session: Session

267

client: BaseClient

268

retry_config: RetryConfig

269

270

class Converter:

271

"""Base converter class for data type conversion."""

272

273

class Formatter:

274

"""Base formatter class for parameter formatting."""

275

276

class DefaultParameterFormatter(Formatter):

277

"""Default parameter formatter implementation."""

278

279

class RetryConfig:

280

"""Configuration for API retry logic."""

281

def __init__(

282

self,

283

exceptions: Iterable[str] = ("ThrottlingException", "TooManyRequestsException"),

284

attempt: int = 5,

285

multiplier: int = 1,

286

max_delay: int = 100,

287

exponential_base: int = 2

288

): ...

289

290

class BaseCursor:

291

"""Base cursor class with common functionality."""

292

293

class AthenaResultSet:

294

"""Result set wrapper for Athena query results."""

295

```

296

297

### Exception Hierarchy

298

299

```python { .api }

300

class Error(Exception): ...

301

class Warning(Exception): ...

302

class InterfaceError(Error): ...

303

class DatabaseError(Error): ...

304

class InternalError(DatabaseError): ...

305

class OperationalError(DatabaseError): ...

306

class ProgrammingError(DatabaseError): ...

307

class DataError(DatabaseError): ...

308

class NotSupportedError(DatabaseError): ...

309

```

310

311

### Query Execution Models

312

313

```python { .api }

314

class AthenaQueryExecution:

315

# State constants

316

STATE_QUEUED: str = "QUEUED"

317

STATE_RUNNING: str = "RUNNING"

318

STATE_SUCCEEDED: str = "SUCCEEDED"

319

STATE_FAILED: str = "FAILED"

320

STATE_CANCELLED: str = "CANCELLED"

321

322

class RetryConfig:

323

def __init__(

324

self,

325

attempts: int = 3,

326

delay: float = 1.0,

327

max_delay: float = 60.0,

328

exponential_base: float = 2.0

329

): ...

330

```