or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

arrow-integration.mdasync-operations.mdcore-database.mdindex.mdpandas-integration.mdspark-integration.mdsqlalchemy-integration.md

core-database.mddocs/

0

# Core Database Operations

1

2

Standard DB API 2.0 compliant database operations for connecting to Amazon Athena, executing SQL queries, and processing results. Provides both tuple and dictionary result formats with full cursor functionality.

3

4

## Capabilities

5

6

### Connection Management

7

8

Create and manage connections to Amazon Athena with comprehensive configuration options for AWS authentication, S3 staging, and query execution parameters.

9

10

```python { .api }

11

def connect(

12

s3_staging_dir: Optional[str] = None,

13

region_name: Optional[str] = None,

14

schema_name: Optional[str] = "default",

15

catalog_name: Optional[str] = "awsdatacatalog",

16

work_group: Optional[str] = None,

17

poll_interval: float = 1,

18

encryption_option: Optional[str] = None,

19

kms_key: Optional[str] = None,

20

profile_name: Optional[str] = None,

21

role_arn: Optional[str] = None,

22

role_session_name: str = "PyAthena-session-{timestamp}",

23

external_id: Optional[str] = None,

24

serial_number: Optional[str] = None,

25

duration_seconds: int = 3600,

26

converter: Optional[Converter] = None,

27

formatter: Optional[Formatter] = None,

28

retry_config: Optional[RetryConfig] = None,

29

cursor_class: Optional[Type[ConnectionCursor]] = None,

30

cursor_kwargs: Optional[Dict[str, Any]] = None,

31

**kwargs

32

) -> Connection[ConnectionCursor]:

33

"""

34

Create a connection to Amazon Athena.

35

36

Parameters:

37

- s3_staging_dir: S3 location for query results (required for most operations)

38

- region_name: AWS region name (e.g., 'us-west-2')

39

- schema_name: Default database/schema name (default: 'default')

40

- catalog_name: Data catalog name (default: 'awsdatacatalog')

41

- work_group: Athena workgroup name

42

- poll_interval: Query polling interval in seconds (default: 1)

43

- encryption_option: S3 encryption option ('SSE_S3', 'SSE_KMS', 'CSE_KMS')

44

- kms_key: KMS key ID for encryption

45

- profile_name: AWS profile name for credentials

46

- role_arn: IAM role ARN for assume role

47

- role_session_name: Session name for assume role (includes timestamp)

48

- external_id: External ID for assume role

49

- serial_number: MFA serial number for assume role

50

- duration_seconds: STS assume role duration in seconds (default: 3600)

51

- converter: Type converter instance for result processing

52

- formatter: Parameter formatter instance for query formatting

53

- retry_config: Configuration for API retry logic

54

- cursor_class: Cursor class to use for connections

55

- cursor_kwargs: Additional keyword arguments for cursor initialization

56

- **kwargs: Additional connection parameters (AWS credentials, etc.)

57

58

Returns:

59

Connection object with specified cursor type

60

"""

61

```

62

63

### Connection Class

64

65

DB API 2.0 compliant connection object with cursor creation, transaction management, and resource cleanup.

66

67

```python { .api }

68

class Connection[ConnectionCursor]:

69

session: Session

70

client: BaseClient

71

retry_config: RetryConfig

72

73

def cursor(self, cursor=None, **kwargs) -> ConnectionCursor:

74

"""

75

Create a new cursor object using the connection.

76

77

Parameters:

78

- cursor: Cursor instance to configure (optional)

79

- **kwargs: Additional cursor configuration options

80

81

Returns:

82

Cursor object of the connection's cursor type

83

"""

84

85

def close(self) -> None:

86

"""

87

Close the connection and release resources.

88

"""

89

90

def commit(self) -> None:

91

"""

92

Commit any pending transaction (no-op for Athena).

93

"""

94

95

def rollback(self) -> None:

96

"""

97

Rollback any pending transaction.

98

99

Raises:

100

NotSupportedError: Athena does not support transactions

101

"""

102

103

def __enter__(self) -> Connection[ConnectionCursor]:

104

"""Context manager entry."""

105

106

def __exit__(self, exc_type, exc_val, exc_tb) -> None:

107

"""Context manager exit with automatic cleanup."""

108

```

109

110

### Standard Cursor

111

112

Standard cursor for executing queries and fetching results as tuples, providing full DB API 2.0 compliance.

113

114

```python { .api }

115

class Cursor:

116

arraysize: int

117

description: Optional[List[Tuple]]

118

rowcount: int

119

rownumber: Optional[int]

120

121

def execute(self, operation: str, parameters=None, **kwargs) -> Cursor:

122

"""

123

Execute a SQL statement.

124

125

Parameters:

126

- operation: SQL query string

127

- parameters: Query parameters (dict or sequence)

128

- **kwargs: Additional execution options

129

130

Returns:

131

Self for method chaining

132

"""

133

134

def executemany(self, operation: str, seq_of_parameters, **kwargs) -> None:

135

"""

136

Execute a SQL statement multiple times with different parameters.

137

138

Parameters:

139

- operation: SQL query string

140

- seq_of_parameters: Sequence of parameter sets

141

- **kwargs: Additional execution options

142

"""

143

144

def fetchone(self) -> Optional[Tuple]:

145

"""

146

Fetch the next row of a query result set.

147

148

Returns:

149

Single row as tuple or None if no more rows

150

"""

151

152

def fetchmany(self, size: Optional[int] = None) -> List[Tuple]:

153

"""

154

Fetch the next set of rows of a query result set.

155

156

Parameters:

157

- size: Number of rows to fetch (default: arraysize)

158

159

Returns:

160

List of rows as tuples

161

"""

162

163

def fetchall(self) -> List[Tuple]:

164

"""

165

Fetch all remaining rows of a query result set.

166

167

Returns:

168

List of all remaining rows as tuples

169

"""

170

171

def cancel(self) -> None:

172

"""

173

Cancel the currently executing query.

174

"""

175

176

def close(self) -> None:

177

"""

178

Close the cursor and free associated resources.

179

"""

180

181

def __iter__(self) -> Iterator[Tuple]:

182

"""Iterator protocol support for row-by-row processing."""

183

184

def __next__(self) -> Tuple:

185

"""Iterator protocol implementation."""

186

```

187

188

### Dictionary Cursor

189

190

Cursor variant that returns query results as dictionaries with column names as keys.

191

192

```python { .api }

193

class DictCursor(Cursor):

194

def fetchone(self) -> Optional[Dict[str, Any]]:

195

"""

196

Fetch the next row as a dictionary.

197

198

Returns:

199

Single row as dict with column names as keys, or None

200

"""

201

202

def fetchmany(self, size: Optional[int] = None) -> List[Dict[str, Any]]:

203

"""

204

Fetch multiple rows as dictionaries.

205

206

Parameters:

207

- size: Number of rows to fetch

208

209

Returns:

210

List of rows as dictionaries

211

"""

212

213

def fetchall(self) -> List[Dict[str, Any]]:

214

"""

215

Fetch all remaining rows as dictionaries.

216

217

Returns:

218

List of all remaining rows as dictionaries

219

"""

220

```

221

222

### Usage Examples

223

224

#### Basic Connection and Query

225

226

```python

227

from pyathena import connect

228

229

# Create connection

230

conn = connect(

231

s3_staging_dir="s3://my-bucket/athena-results/",

232

region_name="us-west-2",

233

schema_name="default"

234

)

235

236

# Execute query

237

cursor = conn.cursor()

238

cursor.execute("SELECT COUNT(*) as row_count FROM my_table")

239

240

# Get results

241

result = cursor.fetchone()

242

print(f"Row count: {result[0]}")

243

244

cursor.close()

245

conn.close()

246

```

247

248

#### Using Dictionary Cursor

249

250

```python

251

from pyathena import connect

252

from pyathena.cursor import DictCursor

253

254

conn = connect(

255

s3_staging_dir="s3://my-bucket/athena-results/",

256

region_name="us-west-2",

257

cursor_class=DictCursor

258

)

259

260

cursor = conn.cursor()

261

cursor.execute("SELECT name, age, city FROM users LIMIT 5")

262

263

# Results as dictionaries

264

for row in cursor.fetchall():

265

print(f"Name: {row['name']}, Age: {row['age']}, City: {row['city']}")

266

267

cursor.close()

268

conn.close()

269

```

270

271

#### Parameterized Queries

272

273

```python

274

from pyathena import connect

275

276

conn = connect(

277

s3_staging_dir="s3://my-bucket/athena-results/",

278

region_name="us-west-2"

279

)

280

281

cursor = conn.cursor()

282

283

# Using named parameters (recommended)

284

cursor.execute(

285

"SELECT * FROM users WHERE age > %(min_age)s AND city = %(city)s",

286

parameters={'min_age': 25, 'city': 'San Francisco'}

287

)

288

289

results = cursor.fetchall()

290

print(f"Found {len(results)} users")

291

292

cursor.close()

293

conn.close()

294

```

295

296

#### Context Manager Usage

297

298

```python

299

from pyathena import connect

300

301

# Automatic resource cleanup

302

with connect(

303

s3_staging_dir="s3://my-bucket/athena-results/",

304

region_name="us-west-2"

305

) as conn:

306

with conn.cursor() as cursor:

307

cursor.execute("SELECT * FROM my_table LIMIT 10")

308

for row in cursor: # Iterator support

309

print(row)

310

```

311

312

## Error Handling

313

314

PyAthena raises standard DB API 2.0 exceptions for different error conditions:

315

316

```python

317

from pyathena import connect

318

from pyathena.error import OperationalError, ProgrammingError

319

320

try:

321

conn = connect(s3_staging_dir="s3://my-bucket/results/")

322

cursor = conn.cursor()

323

cursor.execute("SELECT * FROM nonexistent_table")

324

except ProgrammingError as e:

325

print(f"SQL Error: {e}")

326

except OperationalError as e:

327

print(f"Execution Error: {e}")

328

```