0
# Core Database Operations
1
2
Standard DB API 2.0 compliant database operations for connecting to Amazon Athena, executing SQL queries, and processing results. Provides both tuple and dictionary result formats with full cursor functionality.
3
4
## Capabilities
5
6
### Connection Management
7
8
Create and manage connections to Amazon Athena with comprehensive configuration options for AWS authentication, S3 staging, and query execution parameters.
9
10
```python { .api }
11
def connect(
12
s3_staging_dir: Optional[str] = None,
13
region_name: Optional[str] = None,
14
schema_name: Optional[str] = "default",
15
catalog_name: Optional[str] = "awsdatacatalog",
16
work_group: Optional[str] = None,
17
poll_interval: float = 1,
18
encryption_option: Optional[str] = None,
19
kms_key: Optional[str] = None,
20
profile_name: Optional[str] = None,
21
role_arn: Optional[str] = None,
22
role_session_name: str = "PyAthena-session-{timestamp}",
23
external_id: Optional[str] = None,
24
serial_number: Optional[str] = None,
25
duration_seconds: int = 3600,
26
converter: Optional[Converter] = None,
27
formatter: Optional[Formatter] = None,
28
retry_config: Optional[RetryConfig] = None,
29
cursor_class: Optional[Type[ConnectionCursor]] = None,
30
cursor_kwargs: Optional[Dict[str, Any]] = None,
31
**kwargs
32
) -> Connection[ConnectionCursor]:
33
"""
34
Create a connection to Amazon Athena.
35
36
Parameters:
37
- s3_staging_dir: S3 location for query results (required for most operations)
38
- region_name: AWS region name (e.g., 'us-west-2')
39
- schema_name: Default database/schema name (default: 'default')
40
- catalog_name: Data catalog name (default: 'awsdatacatalog')
41
- work_group: Athena workgroup name
42
- poll_interval: Query polling interval in seconds (default: 1)
43
- encryption_option: S3 encryption option ('SSE_S3', 'SSE_KMS', 'CSE_KMS')
44
- kms_key: KMS key ID for encryption
45
- profile_name: AWS profile name for credentials
46
- role_arn: IAM role ARN for assume role
47
- role_session_name: Session name for assume role (includes timestamp)
48
- external_id: External ID for assume role
49
- serial_number: MFA serial number for assume role
50
- duration_seconds: STS assume role duration in seconds (default: 3600)
51
- converter: Type converter instance for result processing
52
- formatter: Parameter formatter instance for query formatting
53
- retry_config: Configuration for API retry logic
54
- cursor_class: Cursor class to use for connections
55
- cursor_kwargs: Additional keyword arguments for cursor initialization
56
- **kwargs: Additional connection parameters (AWS credentials, etc.)
57
58
Returns:
59
Connection object with specified cursor type
60
"""
61
```
62
63
### Connection Class
64
65
DB API 2.0 compliant connection object with cursor creation, transaction management, and resource cleanup.
66
67
```python { .api }
68
class Connection[ConnectionCursor]:
69
session: Session
70
client: BaseClient
71
retry_config: RetryConfig
72
73
def cursor(self, cursor=None, **kwargs) -> ConnectionCursor:
74
"""
75
Create a new cursor object using the connection.
76
77
Parameters:
78
- cursor: Cursor instance to configure (optional)
79
- **kwargs: Additional cursor configuration options
80
81
Returns:
82
Cursor object of the connection's cursor type
83
"""
84
85
def close(self) -> None:
86
"""
87
Close the connection and release resources.
88
"""
89
90
def commit(self) -> None:
91
"""
92
Commit any pending transaction (no-op for Athena).
93
"""
94
95
def rollback(self) -> None:
96
"""
97
Rollback any pending transaction.
98
99
Raises:
100
NotSupportedError: Athena does not support transactions
101
"""
102
103
def __enter__(self) -> Connection[ConnectionCursor]:
104
"""Context manager entry."""
105
106
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
107
"""Context manager exit with automatic cleanup."""
108
```
109
110
### Standard Cursor
111
112
Standard cursor for executing queries and fetching results as tuples, providing full DB API 2.0 compliance.
113
114
```python { .api }
115
class Cursor:
116
arraysize: int
117
description: Optional[List[Tuple]]
118
rowcount: int
119
rownumber: Optional[int]
120
121
def execute(self, operation: str, parameters=None, **kwargs) -> Cursor:
122
"""
123
Execute a SQL statement.
124
125
Parameters:
126
- operation: SQL query string
127
- parameters: Query parameters (dict or sequence)
128
- **kwargs: Additional execution options
129
130
Returns:
131
Self for method chaining
132
"""
133
134
def executemany(self, operation: str, seq_of_parameters, **kwargs) -> None:
135
"""
136
Execute a SQL statement multiple times with different parameters.
137
138
Parameters:
139
- operation: SQL query string
140
- seq_of_parameters: Sequence of parameter sets
141
- **kwargs: Additional execution options
142
"""
143
144
def fetchone(self) -> Optional[Tuple]:
145
"""
146
Fetch the next row of a query result set.
147
148
Returns:
149
Single row as tuple or None if no more rows
150
"""
151
152
def fetchmany(self, size: Optional[int] = None) -> List[Tuple]:
153
"""
154
Fetch the next set of rows of a query result set.
155
156
Parameters:
157
- size: Number of rows to fetch (default: arraysize)
158
159
Returns:
160
List of rows as tuples
161
"""
162
163
def fetchall(self) -> List[Tuple]:
164
"""
165
Fetch all remaining rows of a query result set.
166
167
Returns:
168
List of all remaining rows as tuples
169
"""
170
171
def cancel(self) -> None:
172
"""
173
Cancel the currently executing query.
174
"""
175
176
def close(self) -> None:
177
"""
178
Close the cursor and free associated resources.
179
"""
180
181
def __iter__(self) -> Iterator[Tuple]:
182
"""Iterator protocol support for row-by-row processing."""
183
184
def __next__(self) -> Tuple:
185
"""Iterator protocol implementation."""
186
```
187
188
### Dictionary Cursor
189
190
Cursor variant that returns query results as dictionaries with column names as keys.
191
192
```python { .api }
193
class DictCursor(Cursor):
194
def fetchone(self) -> Optional[Dict[str, Any]]:
195
"""
196
Fetch the next row as a dictionary.
197
198
Returns:
199
Single row as dict with column names as keys, or None
200
"""
201
202
def fetchmany(self, size: Optional[int] = None) -> List[Dict[str, Any]]:
203
"""
204
Fetch multiple rows as dictionaries.
205
206
Parameters:
207
- size: Number of rows to fetch
208
209
Returns:
210
List of rows as dictionaries
211
"""
212
213
def fetchall(self) -> List[Dict[str, Any]]:
214
"""
215
Fetch all remaining rows as dictionaries.
216
217
Returns:
218
List of all remaining rows as dictionaries
219
"""
220
```
221
222
### Usage Examples
223
224
#### Basic Connection and Query
225
226
```python
227
from pyathena import connect
228
229
# Create connection
230
conn = connect(
231
s3_staging_dir="s3://my-bucket/athena-results/",
232
region_name="us-west-2",
233
schema_name="default"
234
)
235
236
# Execute query
237
cursor = conn.cursor()
238
cursor.execute("SELECT COUNT(*) as row_count FROM my_table")
239
240
# Get results
241
result = cursor.fetchone()
242
print(f"Row count: {result[0]}")
243
244
cursor.close()
245
conn.close()
246
```
247
248
#### Using Dictionary Cursor
249
250
```python
251
from pyathena import connect
252
from pyathena.cursor import DictCursor
253
254
conn = connect(
255
s3_staging_dir="s3://my-bucket/athena-results/",
256
region_name="us-west-2",
257
cursor_class=DictCursor
258
)
259
260
cursor = conn.cursor()
261
cursor.execute("SELECT name, age, city FROM users LIMIT 5")
262
263
# Results as dictionaries
264
for row in cursor.fetchall():
265
print(f"Name: {row['name']}, Age: {row['age']}, City: {row['city']}")
266
267
cursor.close()
268
conn.close()
269
```
270
271
#### Parameterized Queries
272
273
```python
274
from pyathena import connect
275
276
conn = connect(
277
s3_staging_dir="s3://my-bucket/athena-results/",
278
region_name="us-west-2"
279
)
280
281
cursor = conn.cursor()
282
283
# Using named parameters (recommended)
284
cursor.execute(
285
"SELECT * FROM users WHERE age > %(min_age)s AND city = %(city)s",
286
parameters={'min_age': 25, 'city': 'San Francisco'}
287
)
288
289
results = cursor.fetchall()
290
print(f"Found {len(results)} users")
291
292
cursor.close()
293
conn.close()
294
```
295
296
#### Context Manager Usage
297
298
```python
299
from pyathena import connect
300
301
# Automatic resource cleanup
302
with connect(
303
s3_staging_dir="s3://my-bucket/athena-results/",
304
region_name="us-west-2"
305
) as conn:
306
with conn.cursor() as cursor:
307
cursor.execute("SELECT * FROM my_table LIMIT 10")
308
for row in cursor: # Iterator support
309
print(row)
310
```
311
312
## Error Handling
313
314
PyAthena raises standard DB API 2.0 exceptions for different error conditions:
315
316
```python
317
from pyathena import connect
318
from pyathena.error import OperationalError, ProgrammingError
319
320
try:
321
conn = connect(s3_staging_dir="s3://my-bucket/results/")
322
cursor = conn.cursor()
323
cursor.execute("SELECT * FROM nonexistent_table")
324
except ProgrammingError as e:
325
print(f"SQL Error: {e}")
326
except OperationalError as e:
327
print(f"Execution Error: {e}")
328
```