or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asset-management.mddata-transfers.mddatabase-operations.mdindex.md

database-operations.mddocs/

0

# Database Operations

1

2

Core database functionality for connecting to Trino clusters and executing SQL operations. The TrinoHook provides comprehensive database interaction capabilities with support for multiple authentication methods, connection management, and various query execution patterns.

3

4

## Capabilities

5

6

### Connection Management

7

8

Establishes and manages connections to Trino clusters with comprehensive authentication support and configuration options.

9

10

```python { .api }

11

class TrinoHook(DbApiHook):

12

"""

13

Interact with Trino through trino package.

14

15

Attributes:

16

- conn_name_attr: str = "trino_conn_id"

17

- default_conn_name: str = "trino_default"

18

- conn_type: str = "trino"

19

- hook_name: str = "Trino"

20

- strip_semicolon: bool = True

21

- query_id: str = ""

22

"""

23

24

def __init__(self, *args, **kwargs):

25

"""Initialize the TrinoHook."""

26

pass

27

28

def get_conn(self) -> Connection:

29

"""

30

Return a connection object with proper authentication.

31

32

Supports multiple authentication methods:

33

- Basic authentication (username/password)

34

- JWT authentication (token or file)

35

- Certificate authentication (client certs)

36

- Kerberos authentication

37

38

Returns:

39

Connection object configured with specified authentication

40

"""

41

pass

42

43

@classmethod

44

def get_ui_field_behaviour(cls) -> dict[str, Any]:

45

"""

46

Return custom field behaviour for Airflow UI.

47

48

Returns:

49

Dict with UI field configuration for connection form

50

"""

51

pass

52

```

53

54

### Query Execution

55

56

Execute SQL queries against Trino with various result formats and parameter binding support.

57

58

```python { .api }

59

def get_records(

60

self,

61

sql: str,

62

parameters=None

63

) -> list:

64

"""

65

Execute query and return all records.

66

67

Parameters:

68

- sql: SQL query string

69

- parameters: Query parameters for binding

70

71

Returns:

72

List of tuples containing query results

73

"""

74

pass

75

76

def get_first(

77

self,

78

sql: str,

79

parameters=None

80

) -> Any:

81

"""

82

Execute query and return first record.

83

84

Parameters:

85

- sql: SQL query string

86

- parameters: Query parameters for binding

87

88

Returns:

89

First record as tuple or None if no results

90

"""

91

pass

92

93

@deprecated(

94

reason="Replaced by function `get_df`.",

95

category=AirflowProviderDeprecationWarning,

96

action="ignore",

97

)

98

def get_pandas_df(

99

self,

100

sql: str = "",

101

parameters=None,

102

**kwargs

103

) -> pandas.DataFrame:

104

"""

105

Execute query and return pandas DataFrame.

106

107

DEPRECATED: Use get_df() instead.

108

109

Parameters:

110

- sql: SQL query string

111

- parameters: Query parameters for binding

112

- **kwargs: Additional pandas read options

113

114

Returns:

115

pandas DataFrame with query results

116

"""

117

pass

118

119

def get_df(

120

self,

121

sql: str = "",

122

parameters=None,

123

**kwargs

124

) -> pandas.DataFrame | polars.DataFrame:

125

"""

126

Execute query and return DataFrame (pandas or polars based on configuration).

127

128

Modern replacement for get_pandas_df() with support for both pandas and polars.

129

130

Parameters:

131

- sql: SQL query string

132

- parameters: Query parameters for binding

133

- **kwargs: Additional DataFrame read options

134

135

Returns:

136

pandas.DataFrame or polars.DataFrame with query results

137

"""

138

pass

139

140

def _get_pandas_df(

141

self,

142

sql: str = "",

143

parameters=None,

144

**kwargs

145

) -> pandas.DataFrame:

146

"""

147

Internal method to get pandas DataFrame.

148

149

Parameters:

150

- sql: SQL query string

151

- parameters: Query parameters for binding

152

- **kwargs: Additional pandas read options

153

154

Returns:

155

pandas DataFrame with query results

156

"""

157

pass

158

159

def _get_polars_df(

160

self,

161

sql: str = "",

162

parameters=None,

163

**kwargs

164

) -> polars.DataFrame:

165

"""

166

Internal method to get polars DataFrame.

167

168

Parameters:

169

- sql: SQL query string

170

- parameters: Query parameters for binding

171

- **kwargs: Additional polars read options

172

173

Returns:

174

polars DataFrame with query results

175

"""

176

pass

177

```

178

179

### Data Insertion

180

181

Insert data into Trino tables with batch processing and transaction management.

182

183

```python { .api }

184

def insert_rows(

185

self,

186

table: str,

187

rows: Iterable[tuple],

188

target_fields: Iterable[str] | None = None,

189

commit_every: int = 0,

190

replace: bool = False,

191

**kwargs

192

) -> None:

193

"""

194

Insert rows into Trino table.

195

196

Parameters:

197

- table: Target table name

198

- rows: Iterable of tuples containing row data

199

- target_fields: Names of columns to fill in the table

200

- commit_every: Maximum rows to insert in one transaction (0 = all rows)

201

- replace: Whether to replace instead of insert

202

- **kwargs: Additional keyword arguments

203

"""

204

pass

205

```

206

207

### Transaction Management

208

209

Manage database transactions and isolation levels for consistent data operations.

210

211

```python { .api }

212

def get_isolation_level(self) -> Any:

213

"""

214

Get current transaction isolation level.

215

216

Returns:

217

Current isolation level setting

218

"""

219

pass

220

```

221

222

### OpenLineage Integration

223

224

Support for data lineage tracking through OpenLineage integration.

225

226

```python { .api }

227

def get_openlineage_database_info(self, connection):

228

"""

229

Get database information for OpenLineage tracking.

230

231

Parameters:

232

- connection: Database connection object

233

234

Returns:

235

Database info dict for lineage tracking

236

"""

237

pass

238

239

def get_openlineage_database_dialect(self, _):

240

"""

241

Get database dialect for OpenLineage.

242

243

Returns:

244

Database dialect identifier

245

"""

246

pass

247

248

def get_openlineage_default_schema(self):

249

"""

250

Get default schema for OpenLineage tracking.

251

252

Returns:

253

Default schema name

254

"""

255

pass

256

```

257

258

### Connection URI

259

260

Generate connection URIs for external integrations and debugging.

261

262

```python { .api }

263

def get_uri(self) -> str:

264

"""

265

Get connection URI string.

266

267

Returns:

268

Connection URI for the current Trino connection

269

"""

270

pass

271

272

@staticmethod

273

def _serialize_cell(cell: Any, conn: Connection | None = None) -> Any:

274

"""

275

Serialize cell value for database insertion.

276

277

Trino will adapt all execute() args internally, hence we return cell without any conversion.

278

279

Parameters:

280

- cell: The cell value to insert into the table

281

- conn: The database connection (optional)

282

283

Returns:

284

The unmodified cell value

285

"""

286

pass

287

```

288

289

## Authentication Configuration

290

291

The hook supports multiple authentication methods configured through Airflow connection extras:

292

293

### Basic Authentication

294

Set connection login and password fields:

295

```python

296

# Connection configuration

297

login = "username"

298

password = "password"

299

```

300

301

### JWT Authentication

302

Configure JWT token in connection extras:

303

```python

304

# Via token string

305

extra = {"auth": "jwt", "jwt__token": "your-jwt-token"}

306

307

# Via token file

308

extra = {"auth": "jwt", "jwt__file": "/path/to/token.jwt"}

309

```

310

311

### Certificate Authentication

312

Configure client certificates in connection extras:

313

```python

314

extra = {

315

"auth": "certs",

316

"certs__client_cert_path": "/path/to/client.crt",

317

"certs__client_key_path": "/path/to/client.key"

318

}

319

```

320

321

### Kerberos Authentication

322

Configure Kerberos settings in connection extras:

323

```python

324

extra = {

325

"auth": "kerberos",

326

"kerberos__config": "/path/to/krb5.conf",

327

"kerberos__service_name": "trino",

328

"kerberos__mutual_authentication": True,

329

"kerberos__force_preemptive": False,

330

"kerberos__hostname_override": "trino.example.com",

331

"kerberos__principal": "user@REALM",

332

"kerberos__delegate": False,

333

"kerberos__ca_bundle": "/path/to/ca-bundle.crt"

334

}

335

```

336

337

## Usage Examples

338

339

### Basic Query Execution

340

341

```python

342

from airflow.providers.trino.hooks.trino import TrinoHook

343

344

# Initialize hook

345

hook = TrinoHook(trino_conn_id='my_trino_conn')

346

347

# Execute simple query

348

sql = "SELECT count(*) FROM catalog.schema.table"

349

result = hook.get_records(sql)

350

print(f"Row count: {result[0][0]}")

351

352

# Get first result

353

first_row = hook.get_first("SELECT * FROM catalog.schema.table LIMIT 1")

354

print(f"First row: {first_row}")

355

```

356

357

### Working with DataFrames

358

359

```python

360

import pandas as pd

361

import polars as pl

362

from airflow.providers.trino.hooks.trino import TrinoHook

363

364

hook = TrinoHook(trino_conn_id='my_trino_conn')

365

366

# Modern approach - get DataFrame (pandas or polars based on configuration)

367

sql = "SELECT id, name, value FROM catalog.schema.table LIMIT 100"

368

df = hook.get_df(sql)

369

370

# Or explicitly get pandas DataFrame

371

df_pandas = hook._get_pandas_df(sql)

372

373

# Or explicitly get polars DataFrame

374

df_polars = hook._get_polars_df(sql)

375

376

# Legacy approach (deprecated)

377

df_legacy = hook.get_pandas_df(sql) # Shows deprecation warning

378

379

# Process DataFrame

380

print(f"DataFrame shape: {df.shape}")

381

print(df.describe())

382

```

383

384

### Parameterized Queries

385

386

```python

387

from airflow.providers.trino.hooks.trino import TrinoHook

388

389

hook = TrinoHook(trino_conn_id='my_trino_conn')

390

391

# Execute parameterized query

392

sql = "SELECT * FROM catalog.schema.table WHERE date >= ? AND status = ?"

393

params = ['2023-01-01', 'active']

394

results = hook.get_records(sql, parameters=params)

395

```

396

397

### Data Insertion

398

399

```python

400

from airflow.providers.trino.hooks.trino import TrinoHook

401

402

hook = TrinoHook(trino_conn_id='my_trino_conn')

403

404

# Prepare data rows

405

rows = [

406

(1, 'Alice', 100.5),

407

(2, 'Bob', 200.0),

408

(3, 'Charlie', 150.75)

409

]

410

411

# Insert data

412

hook.insert_rows(

413

table='catalog.schema.target_table',

414

rows=rows,

415

target_fields=['id', 'name', 'value'],

416

commit_every=1000

417

)

418

```

419

420

## Helper Functions

421

422

### Client Information Generation

423

424

```python { .api }

425

def generate_trino_client_info() -> str:

426

"""

427

Return JSON string with DAG context information.

428

429

Includes dag_id, task_id, logical_date/execution_date, try_number,

430

dag_run_id, and dag_owner from Airflow context.

431

432

Returns:

433

JSON string with task execution context

434

"""

435

pass

436

```

437

438

## Exception Handling

439

440

```python { .api }

441

class TrinoException(Exception):

442

"""

443

Custom exception for Trino-related errors.

444

445

Raised for Trino-specific issues and error conditions.

446

"""

447

pass

448

```

449

450

Common error scenarios:

451

- Connection authentication failures

452

- Invalid SQL syntax

453

- Missing tables or schemas

454

- Permission errors

455

- Network connectivity issues