or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

connection.mdcursor.mderrors.mdindex.mdmeta.mdsqlalchemy.mdtypes.md

sqlalchemy.mddocs/

0

# SQLAlchemy Integration

1

2

Partial SQLAlchemy dialect implementation supporting textual SQL execution and basic database operations within SQLAlchemy applications, providing integration with the SQLAlchemy ecosystem while maintaining Phoenix-specific functionality.

3

4

## Capabilities

5

6

### Phoenix Dialect

7

8

SQLAlchemy dialect for Phoenix database connectivity through phoenixdb.

9

10

```python { .api }

11

class PhoenixDialect(DefaultDialect):

12

"""

13

Phoenix dialect for SQLAlchemy.

14

15

Provides basic SQLAlchemy integration for Phoenix databases with support for

16

textual SQL execution and connection management.

17

"""

18

19

name = "phoenix"

20

"""Dialect name for SQLAlchemy registration."""

21

22

driver = "phoenixdb"

23

"""Driver name used by SQLAlchemy."""

24

```

25

26

### Engine Creation

27

28

```python { .api }

29

def create_engine(url, **kwargs):

30

"""

31

Creates SQLAlchemy engine for Phoenix connections.

32

33

URL format: phoenix://host:port[/path][?parameters]

34

35

Parameters:

36

- url (str): Connection URL in Phoenix format

37

- connect_args (dict): Additional phoenixdb.connect() parameters

38

- **kwargs: Standard SQLAlchemy engine parameters

39

40

Returns:

41

Engine: SQLAlchemy engine instance

42

"""

43

```

44

45

### Execution Context

46

47

```python { .api }

48

class PhoenixExecutionContext(DefaultExecutionContext):

49

"""

50

Phoenix-specific execution context for SQLAlchemy operations.

51

"""

52

53

def should_autocommit_text(self, statement):

54

"""

55

Determines if statement should be autocommitted.

56

57

Parameters:

58

- statement (str): SQL statement text

59

60

Returns:

61

bool: True if statement requires autocommit (DDL/DML operations)

62

"""

63

```

64

65

### DDL Compiler

66

67

```python { .api }

68

class PhoenixDDLCompiler(DDLCompiler):

69

"""

70

DDL compiler for Phoenix-specific SQL generation.

71

"""

72

73

def visit_primary_key_constraint(self, constraint):

74

"""

75

Compiles primary key constraints.

76

77

Parameters:

78

- constraint: SQLAlchemy PrimaryKeyConstraint

79

80

Returns:

81

str: Phoenix-compatible PRIMARY KEY clause

82

83

Raises:

84

CompileError: If constraint has no name (required by Phoenix)

85

"""

86

```

87

88

## Usage Examples

89

90

### Basic Engine Setup

91

92

```python

93

from sqlalchemy import create_engine

94

95

# Basic connection

96

engine = create_engine('phoenix://localhost:8765')

97

98

# With connection arguments

99

engine = create_engine(

100

'phoenix://localhost:8765',

101

connect_args={

102

'autocommit': True,

103

'authentication': 'BASIC',

104

'avatica_user': 'username',

105

'avatica_password': 'password'

106

}

107

)

108

109

# Test connection

110

with engine.connect() as conn:

111

result = conn.execute(text("SELECT 1"))

112

print(result.fetchone())

113

```

114

115

### URL Configuration

116

117

```python

118

from sqlalchemy import create_engine

119

120

# Basic URL

121

engine = create_engine('phoenix://localhost:8765')

122

123

# With HTTPS

124

engine = create_engine('phoenix://secure-host:8765',

125

connect_args={'verify': '/path/to/cert.pem'})

126

127

# URL parameters (alternative to connect_args)

128

url = 'phoenix://localhost:8765'

129

engine = create_engine(url, connect_args={

130

'authentication': 'SPNEGO',

131

'truststore': '/path/to/truststore.pem'

132

})

133

```

134

135

### Textual SQL Execution

136

137

The Phoenix dialect primarily supports textual SQL execution:

138

139

```python

140

from sqlalchemy import create_engine, text

141

142

engine = create_engine('phoenix://localhost:8765')

143

144

with engine.connect() as conn:

145

# Create table

146

conn.execute(text("""

147

CREATE TABLE users (

148

id INTEGER PRIMARY KEY,

149

username VARCHAR,

150

email VARCHAR

151

)

152

"""))

153

154

# Insert data

155

conn.execute(text("UPSERT INTO users VALUES (?, ?, ?)"),

156

(1, 'admin', 'admin@example.com'))

157

158

# Query data

159

result = conn.execute(text("SELECT * FROM users WHERE id = ?"), (1,))

160

user = result.fetchone()

161

print(f"User: {user}")

162

163

# Bulk operations

164

users_data = [

165

(2, 'john', 'john@example.com'),

166

(3, 'jane', 'jane@example.com')

167

]

168

conn.execute(text("UPSERT INTO users VALUES (?, ?, ?)"), users_data)

169

170

# Query all users

171

result = conn.execute(text("SELECT * FROM users ORDER BY id"))

172

for row in result:

173

print(f"ID: {row.id}, Username: {row.username}, Email: {row.email}")

174

```

175

176

### Connection Pool Configuration

177

178

```python

179

from sqlalchemy import create_engine

180

from sqlalchemy.pool import QueuePool

181

182

# Configure connection pooling

183

engine = create_engine(

184

'phoenix://localhost:8765',

185

poolclass=QueuePool,

186

pool_size=5,

187

max_overflow=10,

188

pool_timeout=30,

189

connect_args={

190

'autocommit': True,

191

'max_retries': 3

192

}

193

)

194

195

# Use the pooled connections

196

def execute_query(sql, params=None):

197

with engine.connect() as conn:

198

if params:

199

return conn.execute(text(sql), params)

200

else:

201

return conn.execute(text(sql))

202

203

# Multiple operations will reuse pooled connections

204

result1 = execute_query("SELECT COUNT(*) FROM users")

205

result2 = execute_query("SELECT * FROM users WHERE id = ?", (1,))

206

```

207

208

### Transaction Management

209

210

```python

211

from sqlalchemy import create_engine, text

212

213

engine = create_engine('phoenix://localhost:8765')

214

215

# Automatic transaction management

216

with engine.begin() as conn:

217

# All operations in single transaction

218

conn.execute(text("INSERT INTO audit_log VALUES (?, ?, ?)"),

219

(1, 'user_created', '2023-01-01'))

220

221

conn.execute(text("UPSERT INTO users VALUES (?, ?)"),

222

(1, 'new_user'))

223

224

# Automatic commit on successful completion

225

# Automatic rollback on exception

226

227

# Manual transaction control

228

with engine.connect() as conn:

229

trans = conn.begin()

230

try:

231

conn.execute(text("DELETE FROM temp_table"))

232

conn.execute(text("INSERT INTO temp_table SELECT * FROM source_table"))

233

trans.commit()

234

except Exception as e:

235

print(f"Error: {e}")

236

trans.rollback()

237

```

238

239

### Metadata Inspection

240

241

```python

242

from sqlalchemy import create_engine, inspect

243

244

engine = create_engine('phoenix://localhost:8765')

245

inspector = inspect(engine)

246

247

# Note: Limited metadata support in Phoenix dialect

248

try:

249

# Get table names

250

table_names = inspector.get_table_names()

251

print(f"Tables: {table_names}")

252

253

# Get schema names

254

schema_names = inspector.get_schema_names()

255

print(f"Schemas: {schema_names}")

256

257

except NotImplementedError:

258

print("Metadata introspection not fully supported")

259

260

# Use direct phoenixdb metadata instead

261

raw_conn = engine.raw_connection()

262

phoenixdb_conn = raw_conn.connection # Get underlying phoenixdb connection

263

meta = phoenixdb_conn.meta()

264

265

tables = meta.get_tables()

266

for table in tables:

267

print(f"Table: {table['TABLE_SCHEM']}.{table['TABLE_NAME']}")

268

```

269

270

### Authentication Configuration

271

272

```python

273

from sqlalchemy import create_engine

274

275

# Basic authentication

276

engine = create_engine(

277

'phoenix://localhost:8765',

278

connect_args={

279

'authentication': 'BASIC',

280

'avatica_user': 'username',

281

'avatica_password': 'password'

282

}

283

)

284

285

# SPNEGO/Kerberos authentication

286

engine = create_engine(

287

'phoenix://secure-host:8765',

288

connect_args={

289

'authentication': 'SPNEGO',

290

'verify': '/path/to/truststore.pem'

291

}

292

)

293

294

# Custom authentication using requests.auth

295

from requests.auth import HTTPBasicAuth

296

297

engine = create_engine(

298

'phoenix://localhost:8765',

299

connect_args={

300

'auth': HTTPBasicAuth('username', 'password')

301

}

302

)

303

```

304

305

### Error Handling

306

307

```python

308

from sqlalchemy import create_engine, text

309

from sqlalchemy.exc import SQLAlchemyError

310

import phoenixdb

311

312

engine = create_engine('phoenix://localhost:8765')

313

314

try:

315

with engine.connect() as conn:

316

result = conn.execute(text("SELECT * FROM nonexistent_table"))

317

318

except SQLAlchemyError as e:

319

print(f"SQLAlchemy error: {e}")

320

321

# Access underlying phoenixdb exception

322

if hasattr(e.orig, 'message'):

323

print(f"Phoenix error: {e.orig.message}")

324

if hasattr(e.orig, 'sqlstate'):

325

print(f"SQL state: {e.orig.sqlstate}")

326

327

except phoenixdb.Error as e:

328

print(f"phoenixdb error: {e.message}")

329

```

330

331

### Phoenix-Specific Features

332

333

```python

334

from sqlalchemy import create_engine, text

335

336

engine = create_engine('phoenix://localhost:8765')

337

338

with engine.connect() as conn:

339

# Phoenix UPSERT operations

340

conn.execute(text("""

341

UPSERT INTO users (id, username)

342

VALUES (1, 'updated_user')

343

"""))

344

345

# Array columns

346

conn.execute(text("""

347

CREATE TABLE test_arrays (

348

id INTEGER PRIMARY KEY,

349

numbers INTEGER ARRAY

350

)

351

"""))

352

353

conn.execute(text("UPSERT INTO test_arrays VALUES (?, ?)"),

354

(1, [1, 2, 3, 4, 5]))

355

356

# Phoenix functions and operators

357

result = conn.execute(text("""

358

SELECT id, ARRAY_LENGTH(numbers) as array_len

359

FROM test_arrays

360

WHERE ARRAY_CONTAINS(numbers, 3)

361

"""))

362

363

for row in result:

364

print(f"ID: {row.id}, Array Length: {row.array_len}")

365

366

# Phoenix-specific data types

367

conn.execute(text("""

368

CREATE TABLE phoenix_types (

369

id UNSIGNED_INT PRIMARY KEY,

370

big_num UNSIGNED_LONG,

371

precise_decimal DECIMAL(20,10)

372

)

373

"""))

374

```

375

376

## Limitations

377

378

The Phoenix SQLAlchemy dialect is incomplete and primarily supports:

379

380

- **Textual SQL execution** via `text()` construct

381

- **Basic connection management** and pooling

382

- **Transaction support** for Phoenix ACID operations

383

- **Parameter binding** for prepared statements

384

385

**Not supported:**

386

- SQLAlchemy ORM (Object-Relational Mapping)

387

- Core Table/Column abstraction

388

- Schema migration tools (Alembic)

389

- Advanced SQLAlchemy features (joins, subqueries via Core)

390

391

For full Phoenix functionality, use phoenixdb directly alongside SQLAlchemy for specific use cases.

392

393

```python

394

# Recommended approach for complex applications

395

from sqlalchemy import create_engine

396

import phoenixdb

397

398

# Use SQLAlchemy for basic operations

399

engine = create_engine('phoenix://localhost:8765')

400

401

# Use phoenixdb directly for advanced features

402

phoenixdb_conn = phoenixdb.connect('http://localhost:8765/')

403

meta = phoenixdb_conn.meta()

404

tables = meta.get_tables()

405

406

# Combine both as needed

407

with engine.connect() as sqlalchemy_conn:

408

# Basic SQL via SQLAlchemy

409

result = sqlalchemy_conn.execute(text("SELECT COUNT(*) FROM users"))

410

count = result.scalar()

411

412

# Advanced operations via phoenixdb

413

cursor = phoenixdb_conn.cursor(cursor_factory=phoenixdb.cursor.DictCursor)

414

cursor.execute("SELECT * FROM users")

415

users = cursor.fetchall()

416

```