or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

arrow-integration.mdasync-operations.mdcore-database.mdindex.mdpandas-integration.mdspark-integration.mdsqlalchemy-integration.md

sqlalchemy-integration.mddocs/

0

# SQLAlchemy Integration

1

2

Complete SQLAlchemy dialect implementation with custom Athena types, enabling ORM support and integration with existing SQLAlchemy-based applications. Provides seamless database abstraction layer for Athena.

3

4

## Installation

5

6

```bash

7

pip install PyAthena[SQLAlchemy]

8

```

9

10

## Capabilities

11

12

### Athena Dialects

13

14

Multiple dialect implementations optimized for different use cases and result formats.

15

16

```python { .api }

17

class AthenaDialect:

18

"""Base Athena dialect for standard SQLAlchemy operations."""

19

name: str = "awsathena"

20

21

class AthenaRestDialect(AthenaDialect):

22

"""REST API-based dialect for standard operations."""

23

name: str = "awsathena+rest"

24

25

class AthenaPandasDialect(AthenaDialect):

26

"""Pandas-optimized dialect for DataFrame operations."""

27

name: str = "awsathena+pandas"

28

29

class AthenaArrowDialect(AthenaDialect):

30

"""Arrow-optimized dialect for columnar operations."""

31

name: str = "awsathena+arrow"

32

```

33

34

### Custom Athena Types

35

36

SQLAlchemy type implementations for Athena-specific data types.

37

38

```python { .api }

39

class TINYINT(sqltypes.Integer):

40

"""Athena TINYINT type (8-bit integer)."""

41

42

class STRUCT(TypeEngine[Dict]):

43

"""Athena STRUCT type for nested objects."""

44

45

class MAP(TypeEngine[Dict]):

46

"""Athena MAP type for key-value pairs."""

47

48

class ARRAY(TypeEngine[List]):

49

"""Athena ARRAY type for ordered collections."""

50

51

class AthenaTimestamp(TypeEngine[datetime]):

52

"""Athena TIMESTAMP type with timezone support."""

53

54

class AthenaDate(TypeEngine[date]):

55

"""Athena DATE type."""

56

```

57

58

### SQL Compilation

59

60

Custom compilers for translating SQLAlchemy constructs to Athena SQL.

61

62

```python { .api }

63

class AthenaStatementCompiler(SQLCompiler):

64

"""Compiles SQLAlchemy statements to Athena SQL."""

65

66

class AthenaDDLCompiler(DDLCompiler):

67

"""Compiles DDL statements for Athena."""

68

69

class AthenaTypeCompiler(GenericTypeCompiler):

70

"""Compiles SQLAlchemy types to Athena SQL types."""

71

```

72

73

### Identifier Preparation

74

75

Classes for properly formatting identifiers in different SQL contexts.

76

77

```python { .api }

78

class AthenaDMLIdentifierPreparer(IdentifierPreparer):

79

"""Prepares identifiers for DML statements."""

80

81

class AthenaDDLIdentifierPreparer(IdentifierPreparer):

82

"""Prepares identifiers for DDL statements."""

83

```

84

85

## Usage Examples

86

87

### Basic SQLAlchemy Connection

88

89

```python

90

from sqlalchemy import create_engine, text

91

from sqlalchemy.orm import sessionmaker

92

93

# Create engine with Athena dialect

94

engine = create_engine(

95

"awsathena+rest://:@athena.us-west-2.amazonaws.com:443/default?"

96

"s3_staging_dir=s3://my-bucket/athena-results/"

97

)

98

99

# Test connection

100

with engine.connect() as conn:

101

result = conn.execute(text("SELECT 1 as test_column"))

102

print(result.fetchone())

103

```

104

105

### Advanced Connection Configuration

106

107

```python

108

from sqlalchemy import create_engine

109

from urllib.parse import quote_plus

110

111

# Connection string with all parameters

112

connection_params = {

113

"aws_access_key_id": "YOUR_ACCESS_KEY",

114

"aws_secret_access_key": quote_plus("YOUR_SECRET_KEY"),

115

"region_name": "us-west-2",

116

"schema_name": "default",

117

"s3_staging_dir": quote_plus("s3://my-bucket/athena-results/"),

118

"work_group": "primary",

119

"catalog_name": "AwsDataCatalog"

120

}

121

122

# Build connection string

123

connection_string = (

124

f"awsathena+rest://{connection_params['aws_access_key_id']}:"

125

f"{connection_params['aws_secret_access_key']}@"

126

f"athena.{connection_params['region_name']}.amazonaws.com:443/"

127

f"{connection_params['schema_name']}?"

128

f"s3_staging_dir={connection_params['s3_staging_dir']}&"

129

f"work_group={connection_params['work_group']}&"

130

f"catalog_name={connection_params['catalog_name']}"

131

)

132

133

engine = create_engine(connection_string)

134

```

135

136

### ORM Model Definition

137

138

```python

139

from sqlalchemy import Column, Integer, String, DateTime, Numeric, Boolean

140

from sqlalchemy.ext.declarative import declarative_base

141

from sqlalchemy.orm import sessionmaker

142

from pyathena.sqlalchemy.types import TINYINT, STRUCT, ARRAY, MAP

143

144

Base = declarative_base()

145

146

class Customer(Base):

147

__tablename__ = 'customers'

148

149

customer_id = Column(Integer, primary_key=True)

150

name = Column(String(100), nullable=False)

151

email = Column(String(255), unique=True)

152

age = Column(TINYINT) # Athena-specific type

153

is_active = Column(Boolean, default=True)

154

created_at = Column(DateTime)

155

total_spent = Column(Numeric(10, 2))

156

157

# Complex types

158

preferences = Column(MAP(String, String)) # Key-value preferences

159

order_history = Column(ARRAY(Integer)) # Array of order IDs

160

profile = Column(STRUCT([ # Nested structure

161

('address', String),

162

('phone', String),

163

('preferences', MAP(String, String))

164

]))

165

166

class Order(Base):

167

__tablename__ = 'orders'

168

169

order_id = Column(Integer, primary_key=True)

170

customer_id = Column(Integer, nullable=False)

171

order_date = Column(DateTime)

172

amount = Column(Numeric(10, 2))

173

status = Column(String(20))

174

items = Column(ARRAY(STRUCT([ # Array of structured items

175

('product_id', Integer),

176

('quantity', Integer),

177

('price', Numeric(8, 2))

178

])))

179

180

# Create engine and session

181

engine = create_engine("awsathena+rest://...")

182

Session = sessionmaker(bind=engine)

183

session = Session()

184

```

185

186

### ORM Queries

187

188

```python

189

from sqlalchemy import func, and_, or_

190

from datetime import datetime, timedelta

191

192

# Basic queries

193

active_customers = session.query(Customer).filter(Customer.is_active == True).all()

194

195

# Complex filtering

196

high_value_customers = session.query(Customer).filter(

197

and_(

198

Customer.total_spent > 1000,

199

Customer.is_active == True,

200

Customer.created_at > datetime.now() - timedelta(days=365)

201

)

202

).all()

203

204

# Aggregation queries

205

customer_stats = session.query(

206

func.count(Customer.customer_id).label('total_customers'),

207

func.avg(Customer.total_spent).label('avg_spent'),

208

func.max(Customer.total_spent).label('max_spent'),

209

func.min(Customer.age).label('min_age'),

210

func.max(Customer.age).label('max_age')

211

).first()

212

213

print(f"Total customers: {customer_stats.total_customers}")

214

print(f"Average spent: ${customer_stats.avg_spent:.2f}")

215

216

# Join queries

217

recent_orders = session.query(Customer, Order).join(

218

Order, Customer.customer_id == Order.customer_id

219

).filter(

220

Order.order_date > datetime.now() - timedelta(days=30)

221

).all()

222

223

# Group by queries

224

monthly_revenue = session.query(

225

func.date_format(Order.order_date, '%Y-%m').label('month'),

226

func.sum(Order.amount).label('total_revenue'),

227

func.count(Order.order_id).label('order_count')

228

).group_by(

229

func.date_format(Order.order_date, '%Y-%m')

230

).order_by('month').all()

231

232

for row in monthly_revenue:

233

print(f"Month: {row.month}, Revenue: ${row.total_revenue:.2f}, Orders: {row.order_count}")

234

```

235

236

### Working with Complex Types

237

238

```python

239

from sqlalchemy import text

240

241

# Query with complex type operations

242

complex_query = text("""

243

SELECT

244

customer_id,

245

name,

246

preferences['newsletter'] as newsletter_pref,

247

cardinality(order_history) as total_orders,

248

profile.address as customer_address

249

FROM customers

250

WHERE preferences['vip'] = 'true'

251

AND cardinality(order_history) > 5

252

""")

253

254

results = session.execute(complex_query).fetchall()

255

for row in results:

256

print(f"Customer: {row.name}, Address: {row.customer_address}, Orders: {row.total_orders}")

257

258

# Insert with complex types

259

new_customer = Customer(

260

customer_id=12345,

261

name="John Doe",

262

email="john@example.com",

263

age=35,

264

preferences={

265

'newsletter': 'true',

266

'vip': 'false',

267

'language': 'en'

268

},

269

order_history=[1001, 1002, 1003],

270

profile={

271

'address': '123 Main St, Anytown, USA',

272

'phone': '+1-555-0123',

273

'preferences': {

274

'contact_method': 'email',

275

'timezone': 'EST'

276

}

277

}

278

)

279

280

session.add(new_customer)

281

session.commit()

282

```

283

284

### Pandas Integration with SQLAlchemy

285

286

```python

287

from sqlalchemy import create_engine

288

import pandas as pd

289

290

# Use pandas dialect for DataFrame operations

291

engine = create_engine("awsathena+pandas://...")

292

293

# Read query results directly into DataFrame

294

df = pd.read_sql_query("""

295

SELECT

296

customer_id,

297

name,

298

total_spent,

299

age,

300

is_active

301

FROM customers

302

WHERE total_spent > 500

303

""", engine)

304

305

print(df.head())

306

print(f"DataFrame shape: {df.shape}")

307

308

# Use DataFrame for analysis

309

customer_analysis = df.groupby('is_active').agg({

310

'total_spent': ['mean', 'sum', 'count'],

311

'age': ['mean', 'min', 'max']

312

}).round(2)

313

314

print("Customer Analysis by Status:")

315

print(customer_analysis)

316

317

# Write DataFrame back to Athena (via S3)

318

# Note: This requires additional S3 write permissions

319

df_to_write = df[df['total_spent'] > 1000]

320

df_to_write.to_sql(

321

'high_value_customers',

322

engine,

323

if_exists='replace',

324

index=False,

325

method='multi' # Batch insert for better performance

326

)

327

```

328

329

### Arrow Integration with SQLAlchemy

330

331

```python

332

from sqlalchemy import create_engine

333

import pyarrow as pa

334

335

# Use Arrow dialect for columnar operations

336

engine = create_engine("awsathena+arrow://...")

337

338

# Execute query and get Arrow Table

339

with engine.connect() as conn:

340

result = conn.execute(text("""

341

SELECT

342

product_category,

343

COUNT(*) as order_count,

344

SUM(amount) as total_revenue,

345

AVG(amount) as avg_order_value

346

FROM orders

347

GROUP BY product_category

348

ORDER BY total_revenue DESC

349

"""))

350

351

# Convert to Arrow Table (if using arrow dialect)

352

arrow_table = pa.Table.from_pandas(result.fetchall())

353

354

# High-performance columnar operations

355

total_revenue = pa.compute.sum(arrow_table.column('total_revenue'))

356

print(f"Total revenue across all categories: ${total_revenue.as_py():,.2f}")

357

```

358

359

### DDL Operations

360

361

```python

362

from sqlalchemy import create_engine, MetaData, Table, Column, Integer, String, DateTime

363

from sqlalchemy.schema import CreateTable

364

365

engine = create_engine("awsathena+rest://...")

366

metadata = MetaData()

367

368

# Define table structure

369

analytics_table = Table(

370

'user_analytics',

371

metadata,

372

Column('user_id', Integer, primary_key=True),

373

Column('session_date', DateTime),

374

Column('page_views', Integer),

375

Column('session_duration', Integer),

376

Column('user_agent', String(500)),

377

Column('referrer', String(200))

378

)

379

380

# Generate CREATE TABLE statement

381

create_stmt = CreateTable(analytics_table)

382

print(str(create_stmt.compile(engine)))

383

384

# Create table in Athena

385

with engine.connect() as conn:

386

metadata.create_all(conn)

387

print("Table created successfully")

388

389

# Create external table pointing to S3 data

390

external_table_ddl = text("""

391

CREATE EXTERNAL TABLE IF NOT EXISTS web_logs (

392

timestamp string,

393

ip_address string,

394

user_agent string,

395

request_url string,

396

response_code int,

397

bytes_sent bigint

398

)

399

STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat'

400

OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'

401

LOCATION 's3://my-bucket/web-logs/'

402

TBLPROPERTIES ('has_encrypted_data'='false')

403

""")

404

405

with engine.connect() as conn:

406

conn.execute(external_table_ddl)

407

print("External table created")

408

```

409

410

### Advanced Query Patterns

411

412

```python

413

from sqlalchemy import create_engine, text, bindparam

414

from sqlalchemy.sql import and_, or_, func

415

416

engine = create_engine("awsathena+rest://...")

417

418

# Parameterized queries with SQLAlchemy

419

parameterized_query = text("""

420

SELECT

421

customer_id,

422

name,

423

total_spent,

424

CASE

425

WHEN total_spent > :high_threshold THEN 'High Value'

426

WHEN total_spent > :medium_threshold THEN 'Medium Value'

427

ELSE 'Low Value'

428

END as customer_segment

429

FROM customers

430

WHERE created_at >= :start_date

431

AND is_active = :active_status

432

ORDER BY total_spent DESC

433

LIMIT :limit_count

434

""").bindparam(

435

bindparam('high_threshold', Integer),

436

bindparam('medium_threshold', Integer),

437

bindparam('start_date', DateTime),

438

bindparam('active_status', Boolean),

439

bindparam('limit_count', Integer)

440

)

441

442

# Execute with parameters

443

with engine.connect() as conn:

444

result = conn.execute(parameterized_query, {

445

'high_threshold': 1000,

446

'medium_threshold': 500,

447

'start_date': datetime(2023, 1, 1),

448

'active_status': True,

449

'limit_count': 100

450

})

451

452

customers = result.fetchall()

453

for customer in customers:

454

print(f"{customer.name}: ${customer.total_spent:.2f} ({customer.customer_segment})")

455

456

# Window functions

457

window_query = text("""

458

SELECT

459

customer_id,

460

order_date,

461

amount,

462

ROW_NUMBER() OVER (PARTITION BY customer_id ORDER BY order_date) as order_sequence,

463

SUM(amount) OVER (PARTITION BY customer_id ORDER BY order_date

464

ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as running_total,

465

LAG(amount, 1) OVER (PARTITION BY customer_id ORDER BY order_date) as prev_order_amount

466

FROM orders

467

WHERE order_date >= DATE '2023-01-01'

468

ORDER BY customer_id, order_date

469

""")

470

471

with engine.connect() as conn:

472

result = conn.execute(window_query)

473

for row in result:

474

print(f"Customer {row.customer_id}: Order {row.order_sequence}, "

475

f"Amount: ${row.amount:.2f}, Running Total: ${row.running_total:.2f}")

476

```

477

478

### Connection Pooling and Performance

479

480

```python

481

from sqlalchemy import create_engine, pool

482

from sqlalchemy.pool import QueuePool

483

484

# Configure connection pooling

485

engine = create_engine(

486

"awsathena+rest://...",

487

poolclass=QueuePool,

488

pool_size=5, # Number of connections to maintain

489

max_overflow=10, # Additional connections allowed

490

pool_pre_ping=True, # Verify connections before use

491

pool_recycle=3600, # Recycle connections after 1 hour

492

connect_args={

493

'poll_interval': 1,

494

'kill_on_interrupt': True

495

}

496

)

497

498

# Monitor connection pool

499

def check_pool_status():

500

pool = engine.pool

501

print(f"Pool size: {pool.size()}")

502

print(f"Checked out connections: {pool.checkedout()}")

503

print(f"Overflow connections: {pool.overflow()}")

504

505

# Use context managers for proper connection handling

506

with engine.connect() as conn:

507

result = conn.execute(text("SELECT COUNT(*) FROM large_table"))

508

count = result.scalar()

509

print(f"Table has {count} rows")

510

511

check_pool_status()

512

```

513

514

### Transaction Handling

515

516

```python

517

from sqlalchemy import create_engine, text

518

from sqlalchemy.exc import SQLAlchemyError

519

520

engine = create_engine("awsathena+rest://...")

521

522

# Note: Athena doesn't support traditional transactions

523

# But SQLAlchemy can handle connection-level operations

524

525

def safe_bulk_operation():

526

with engine.connect() as conn:

527

try:

528

# Multiple related operations

529

conn.execute(text("CREATE TABLE temp_results AS SELECT * FROM source_table WHERE condition = 1"))

530

531

conn.execute(text("""

532

INSERT INTO final_table

533

SELECT customer_id, SUM(amount) as total

534

FROM temp_results

535

GROUP BY customer_id

536

"""))

537

538

conn.execute(text("DROP TABLE temp_results"))

539

540

print("Bulk operation completed successfully")

541

542

except SQLAlchemyError as e:

543

print(f"Operation failed: {e}")

544

# Cleanup if needed

545

try:

546

conn.execute(text("DROP TABLE IF EXISTS temp_results"))

547

except:

548

pass

549

raise

550

551

safe_bulk_operation()

552

```

553

554

## Connection String Formats

555

556

### Basic Connection String

557

```

558

awsathena+rest://aws_access_key_id:aws_secret_access_key@athena.region.amazonaws.com:443/schema_name?s3_staging_dir=s3://bucket/path/

559

```

560

561

### With All Parameters

562

```

563

awsathena+rest://access_key:secret_key@athena.us-west-2.amazonaws.com:443/default?s3_staging_dir=s3://my-bucket/results/&work_group=primary&catalog_name=AwsDataCatalog&region_name=us-west-2

564

```

565

566

### Using Environment Variables

567

```python

568

import os

569

from sqlalchemy import create_engine

570

571

# Set environment variables

572

os.environ['AWS_ACCESS_KEY_ID'] = 'your_access_key'

573

os.environ['AWS_SECRET_ACCESS_KEY'] = 'your_secret_key'

574

os.environ['AWS_DEFAULT_REGION'] = 'us-west-2'

575

576

# Simplified connection string

577

engine = create_engine(

578

"awsathena+rest://:@athena.us-west-2.amazonaws.com:443/default?"

579

"s3_staging_dir=s3://my-bucket/athena-results/"

580

)

581

```

582

583

## Dialect-Specific Features

584

585

- **Base Dialect**: Standard SQLAlchemy operations with tuple results

586

- **REST Dialect**: Optimized for general-purpose queries

587

- **Pandas Dialect**: Automatic DataFrame conversion for analytical queries

588

- **Arrow Dialect**: Columnar processing for high-performance analytics

589

590

Each dialect supports the same SQL operations but optimizes result processing for different use cases.