or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdfacets-metadata.mdindex.mdlineage-extraction.mdplugin-integration.mdselective-control.mdspark-integration.mdsql-parsing.mdsql-utilities.mdtemplate-macros.mdutility-functions.md

sql-utilities.mddocs/

0

# SQL Utilities

1

2

Specialized utilities for SQL-based lineage extraction, including schema analysis, table discovery, information schema querying, and database metadata extraction for comprehensive SQL lineage tracking.

3

4

## Capabilities

5

6

### Table Schema Classes

7

8

Classes for representing and working with database table schemas.

9

10

```python { .api }

11

class ColumnIndex(Enum):

12

"""

13

Enumeration for information schema column indices.

14

15

Defines standard column positions in information_schema query results

16

for consistent schema extraction across different database systems.

17

"""

18

SCHEMA = 0 # Table schema/database name

19

TABLE_NAME = 1 # Table name

20

COLUMN_NAME = 2 # Column name

21

ORDINAL_POSITION = 3 # Column position in table

22

UDT_NAME = 4 # User-defined type name

23

24

class TableSchema:

25

"""

26

Table schema container with dataset conversion methods.

27

28

Represents a database table's schema information including

29

columns, types, and metadata for lineage extraction.

30

"""

31

32

def to_dataset(

33

self,

34

namespace: str,

35

database: str | None = None,

36

schema: str | None = None

37

) -> Dataset:

38

"""

39

Convert table schema to OpenLineage Dataset.

40

41

Args:

42

namespace: OpenLineage namespace

43

database: Database name

44

schema: Schema name

45

46

Returns:

47

Dataset: OpenLineage dataset with schema facets

48

"""

49

```

50

51

### Type Definitions

52

53

Type aliases for complex data structures used in SQL utilities.

54

55

```python { .api }

56

TablesHierarchy = dict[str | None, dict[str | None, list[str]]]

57

"""

58

Type alias for nested table hierarchy dictionary.

59

60

Structure: {database: {schema: [table_names]}}

61

Represents the hierarchical organization of tables across

62

databases and schemas for comprehensive schema analysis.

63

"""

64

```

65

66

### Schema Extraction Functions

67

68

Functions for extracting table schemas and metadata from databases.

69

70

```python { .api }

71

def get_table_schemas(

72

hook,

73

namespace: str,

74

database: str | None,

75

schema: str | None,

76

tables_hierarchy: TablesHierarchy,

77

information_schema_columns: list[str],

78

information_schema_table_name: str,

79

is_cross_db: bool,

80

use_flat_cross_db_query: bool,

81

is_uppercase_names: bool

82

) -> tuple[list[Dataset], list[Dataset]]:

83

"""

84

Get table schemas from database using information_schema queries.

85

86

Args:

87

hook: Database hook for executing queries

88

namespace: OpenLineage namespace

89

database: Target database name

90

schema: Target schema name

91

tables_hierarchy: Nested table structure dictionary

92

information_schema_columns: Columns to select from information_schema

93

information_schema_table_name: Name of information schema table

94

is_cross_db: Whether query spans multiple databases

95

use_flat_cross_db_query: Whether to use flat cross-database query

96

is_uppercase_names: Whether to uppercase table/column names

97

98

Returns:

99

tuple: (input_datasets, output_datasets) with schema information

100

"""

101

102

def parse_query_result(cursor) -> list[TableSchema]:

103

"""

104

Parse database query results into TableSchema objects.

105

106

Args:

107

cursor: Database cursor with query results

108

109

Returns:

110

list[TableSchema]: Parsed table schema objects

111

"""

112

```

113

114

### Query Generation Functions

115

116

Functions for generating SQL queries for schema discovery and analysis.

117

118

```python { .api }

119

def create_information_schema_query(

120

tables_hierarchy: TablesHierarchy,

121

information_schema_columns: list[str],

122

information_schema_table_name: str,

123

is_cross_db: bool,

124

use_flat_cross_db_query: bool,

125

is_uppercase_names: bool

126

) -> str:

127

"""

128

Create SQL query for extracting schema information from information_schema.

129

130

Args:

131

tables_hierarchy: Nested dictionary of database/schema/table structure

132

information_schema_columns: Columns to select from information schema

133

information_schema_table_name: Name of information schema table

134

is_cross_db: Whether query spans multiple databases

135

use_flat_cross_db_query: Whether to use flat cross-database query

136

is_uppercase_names: Whether to uppercase table/column names

137

138

Returns:

139

str: SQL query for schema information extraction

140

"""

141

142

def create_filter_clauses(

143

tables_hierarchy: TablesHierarchy,

144

is_uppercase_names: bool

145

) -> ClauseElement:

146

"""

147

Create SQL filter clauses for table hierarchy filtering.

148

149

Args:

150

tables_hierarchy: Nested table structure dictionary

151

is_uppercase_names: Whether to uppercase identifiers

152

153

Returns:

154

ClauseElement: SQLAlchemy filter clause element

155

"""

156

```

157

158

## Usage Examples

159

160

### Basic Schema Extraction

161

162

```python

163

from airflow.providers.openlineage.utils.sql import get_table_schemas, TablesHierarchy

164

from airflow.hooks.postgres_hook import PostgresHook

165

166

# Setup database connection

167

hook = PostgresHook(postgres_conn_id='analytics_db')

168

169

# Define table hierarchy

170

tables_hierarchy: TablesHierarchy = {

171

'analytics': {

172

'public': ['users', 'orders', 'products'],

173

'staging': ['raw_users', 'raw_orders']

174

},

175

'reporting': {

176

'public': ['daily_reports', 'monthly_summaries']

177

}

178

}

179

180

# Extract schemas

181

input_datasets, output_datasets = get_table_schemas(

182

hook=hook,

183

namespace='production',

184

database='analytics',

185

schema='public',

186

tables_hierarchy=tables_hierarchy,

187

information_schema_columns=['table_schema', 'table_name', 'column_name', 'ordinal_position', 'data_type'],

188

information_schema_table_name='columns',

189

is_cross_db=True,

190

use_flat_cross_db_query=False,

191

is_uppercase_names=False

192

)

193

194

print(f"Input datasets: {len(input_datasets)}")

195

print(f"Output datasets: {len(output_datasets)}")

196

```

197

198

### Custom Information Schema Query

199

200

```python

201

from airflow.providers.openlineage.utils.sql import create_information_schema_query

202

203

# Define complex table hierarchy

204

tables_hierarchy = {

205

'warehouse': {

206

'dim': ['dim_users', 'dim_products', 'dim_time'],

207

'fact': ['fact_sales', 'fact_inventory'],

208

'staging': ['stg_users', 'stg_products', 'stg_sales']

209

},

210

'analytics': {

211

'reports': ['daily_sales', 'monthly_trends'],

212

'ml': ['user_features', 'product_embeddings']

213

}

214

}

215

216

# Generate information schema query

217

query = create_information_schema_query(

218

tables_hierarchy=tables_hierarchy,

219

information_schema_columns=[

220

'table_catalog',

221

'table_schema',

222

'table_name',

223

'column_name',

224

'ordinal_position',

225

'data_type',

226

'is_nullable'

227

],

228

information_schema_table_name='columns',

229

is_cross_db=True,

230

use_flat_cross_db_query=False,

231

is_uppercase_names=False

232

)

233

234

print("Generated query:")

235

print(query)

236

```

237

238

### Query Result Processing

239

240

```python

241

from airflow.providers.openlineage.utils.sql import parse_query_result, TableSchema

242

from airflow.hooks.postgres_hook import PostgresHook

243

244

def extract_table_metadata(connection_id: str, table_hierarchy: TablesHierarchy):

245

"""Extract and process table metadata from database."""

246

247

hook = PostgresHook(postgres_conn_id=connection_id)

248

249

# Execute information schema query

250

query = create_information_schema_query(

251

tables_hierarchy=table_hierarchy,

252

information_schema_columns=['table_schema', 'table_name', 'column_name', 'ordinal_position', 'data_type'],

253

information_schema_table_name='columns',

254

is_cross_db=False,

255

use_flat_cross_db_query=False,

256

is_uppercase_names=False

257

)

258

259

# Get cursor and execute query

260

cursor = hook.get_cursor()

261

cursor.execute(query)

262

263

# Parse results

264

table_schemas = parse_query_result(cursor)

265

266

# Process schemas

267

for schema in table_schemas:

268

print(f"Table: {schema.schema_name}.{schema.table_name}")

269

print(f"Columns: {len(schema.columns)}")

270

271

# Convert to OpenLineage dataset

272

dataset = schema.to_dataset(

273

namespace='production',

274

database='analytics',

275

schema=schema.schema_name

276

)

277

278

print(f"Dataset: {dataset.namespace}/{dataset.name}")

279

print(f"Schema facet: {dataset.facets.get('schema', 'None')}")

280

281

# Usage

282

table_hierarchy = {

283

'analytics': {

284

'public': ['users', 'orders']

285

}

286

}

287

288

extract_table_metadata('analytics_db', table_hierarchy)

289

```

290

291

### TableSchema Usage

292

293

```python

294

from airflow.providers.openlineage.utils.sql import TableSchema, ColumnIndex

295

from openlineage.client.event_v2 import Dataset

296

297

def create_table_schema_from_metadata(metadata_rows):

298

"""Create TableSchema from raw metadata rows."""

299

300

# Group rows by table

301

tables = {}

302

for row in metadata_rows:

303

schema_name = row[ColumnIndex.SCHEMA.value]

304

table_name = row[ColumnIndex.TABLE_NAME.value]

305

column_name = row[ColumnIndex.COLUMN_NAME.value]

306

column_position = row[ColumnIndex.ORDINAL_POSITION.value]

307

data_type = row[ColumnIndex.UDT_NAME.value]

308

309

table_key = f"{schema_name}.{table_name}"

310

if table_key not in tables:

311

tables[table_key] = TableSchema()

312

tables[table_key].schema_name = schema_name

313

tables[table_key].table_name = table_name

314

tables[table_key].columns = []

315

316

tables[table_key].columns.append({

317

'name': column_name,

318

'position': column_position,

319

'type': data_type

320

})

321

322

return list(tables.values())

323

324

def convert_schemas_to_datasets(table_schemas: list[TableSchema], namespace: str):

325

"""Convert table schemas to OpenLineage datasets."""

326

327

datasets = []

328

for schema in table_schemas:

329

dataset = schema.to_dataset(

330

namespace=namespace,

331

database='analytics',

332

schema=schema.schema_name

333

)

334

datasets.append(dataset)

335

336

return datasets

337

338

# Example usage

339

sample_metadata = [

340

('public', 'users', 'id', 1, 'integer'),

341

('public', 'users', 'name', 2, 'varchar'),

342

('public', 'users', 'email', 3, 'varchar'),

343

('public', 'orders', 'id', 1, 'integer'),

344

('public', 'orders', 'user_id', 2, 'integer'),

345

('public', 'orders', 'amount', 3, 'decimal')

346

]

347

348

schemas = create_table_schema_from_metadata(sample_metadata)

349

datasets = convert_schemas_to_datasets(schemas, 'production')

350

351

for dataset in datasets:

352

print(f"Dataset: {dataset.name}")

353

print(f"Schema columns: {len(dataset.facets.get('schema', {}).get('fields', []))}")

354

```

355

356

### Cross-Database Schema Analysis

357

358

```python

359

from airflow.providers.openlineage.utils.sql import get_table_schemas

360

361

def analyze_cross_database_schemas(hook, databases: list[str]):

362

"""Analyze schemas across multiple databases."""

363

364

# Build comprehensive table hierarchy

365

tables_hierarchy = {}

366

367

for db in databases:

368

# Query each database for table information

369

db_tables = get_database_tables(hook, db)

370

tables_hierarchy[db] = db_tables

371

372

# Extract schemas with cross-database support

373

all_inputs, all_outputs = get_table_schemas(

374

hook=hook,

375

namespace='multi_db',

376

database=None, # Cross-database query

377

schema=None,

378

tables_hierarchy=tables_hierarchy,

379

information_schema_columns=[

380

'table_catalog',

381

'table_schema',

382

'table_name',

383

'column_name',

384

'ordinal_position',

385

'data_type'

386

],

387

information_schema_table_name='columns',

388

is_cross_db=True,

389

use_flat_cross_db_query=True,

390

is_uppercase_names=False

391

)

392

393

return all_inputs, all_outputs

394

395

def get_database_tables(hook, database: str) -> dict:

396

"""Get table hierarchy for a specific database."""

397

398

query = f"""

399

SELECT DISTINCT table_schema, table_name

400

FROM {database}.information_schema.tables

401

WHERE table_type = 'BASE TABLE'

402

ORDER BY table_schema, table_name

403

"""

404

405

result = hook.get_records(query)

406

407

schema_tables = {}

408

for schema, table in result:

409

if schema not in schema_tables:

410

schema_tables[schema] = []

411

schema_tables[schema].append(table)

412

413

return schema_tables

414

415

# Usage

416

from airflow.hooks.postgres_hook import PostgresHook

417

418

hook = PostgresHook(postgres_conn_id='multi_db_connection')

419

databases = ['analytics', 'warehouse', 'reporting']

420

421

inputs, outputs = analyze_cross_database_schemas(hook, databases)

422

print(f"Total datasets analyzed: {len(inputs) + len(outputs)}")

423

```

424

425

### Filter Clause Generation

426

427

```python

428

from airflow.providers.openlineage.utils.sql import create_filter_clauses

429

from sqlalchemy import text

430

431

def build_custom_schema_query(tables_hierarchy: TablesHierarchy):

432

"""Build custom query with generated filter clauses."""

433

434

# Generate filter clauses

435

filter_clause = create_filter_clauses(

436

tables_hierarchy=tables_hierarchy,

437

is_uppercase_names=False

438

)

439

440

# Base query

441

base_query = """

442

SELECT

443

table_schema,

444

table_name,

445

column_name,

446

ordinal_position,

447

data_type,

448

is_nullable

449

FROM information_schema.columns

450

"""

451

452

# Combine with filter

453

if filter_clause is not None:

454

full_query = f"{base_query} WHERE {filter_clause}"

455

else:

456

full_query = base_query

457

458

return full_query

459

460

# Usage

461

table_hierarchy = {

462

'analytics': {

463

'public': ['users', 'orders'],

464

'staging': ['raw_data']

465

},

466

'warehouse': {

467

'dim': ['dim_users'],

468

'fact': ['fact_sales']

469

}

470

}

471

472

query = build_custom_schema_query(table_hierarchy)

473

print("Generated query with filters:")

474

print(query)

475

```

476

477

### Integration with SQL Parser

478

479

```python

480

from airflow.providers.openlineage.utils.sql import get_table_schemas

481

from airflow.providers.openlineage.sqlparser import SQLParser, DatabaseInfo

482

483

def enhanced_sql_parsing_with_schema(hook, sql_statements: list[str]):

484

"""Enhanced SQL parsing with schema information."""

485

486

# Initialize SQL parser

487

parser = SQLParser(dialect='postgresql')

488

489

# Parse SQL to identify tables

490

all_tables = set()

491

for sql in sql_statements:

492

metadata = parser.parse(sql)

493

if metadata:

494

all_tables.update(metadata.in_tables or [])

495

all_tables.update(metadata.out_tables or [])

496

497

# Build table hierarchy from parsed tables

498

tables_hierarchy = {}

499

for table in all_tables:

500

# Parse table name (assuming format: schema.table or database.schema.table)

501

parts = table.split('.')

502

503

if len(parts) >= 2:

504

if len(parts) == 2:

505

schema, table_name = parts

506

database = None

507

else:

508

database, schema, table_name = parts

509

510

if database not in tables_hierarchy:

511

tables_hierarchy[database] = {}

512

if schema not in tables_hierarchy[database]:

513

tables_hierarchy[database][schema] = []

514

515

tables_hierarchy[database][schema].append(table_name)

516

517

# Get schema information

518

input_datasets, output_datasets = get_table_schemas(

519

hook=hook,

520

namespace='sql_parsing',

521

database=None,

522

schema=None,

523

tables_hierarchy=tables_hierarchy,

524

information_schema_columns=['table_schema', 'table_name', 'column_name', 'data_type'],

525

information_schema_table_name='columns',

526

is_cross_db=True,

527

use_flat_cross_db_query=False,

528

is_uppercase_names=False

529

)

530

531

return {

532

'parsed_tables': all_tables,

533

'input_datasets': input_datasets,

534

'output_datasets': output_datasets,

535

'tables_hierarchy': tables_hierarchy

536

}

537

538

# Usage

539

sql_statements = [

540

"SELECT * FROM analytics.public.users u JOIN analytics.public.orders o ON u.id = o.user_id",

541

"INSERT INTO warehouse.fact.fact_sales SELECT * FROM analytics.staging.raw_sales",

542

"CREATE TABLE reporting.public.daily_summary AS SELECT date, SUM(amount) FROM warehouse.fact.fact_sales GROUP BY date"

543

]

544

545

hook = PostgresHook(postgres_conn_id='analytics_db')

546

result = enhanced_sql_parsing_with_schema(hook, sql_statements)

547

548

print(f"Parsed tables: {result['parsed_tables']}")

549

print(f"Input datasets: {len(result['input_datasets'])}")

550

print(f"Output datasets: {len(result['output_datasets'])}")

551

print(f"Table hierarchy: {result['tables_hierarchy']}")

552

```

553

554

## Database System Support

555

556

The SQL utilities support various database systems with appropriate adaptations:

557

558

### PostgreSQL

559

```python

560

# PostgreSQL-specific configuration

561

information_schema_columns = ['table_schema', 'table_name', 'column_name', 'ordinal_position', 'udt_name']

562

information_schema_table_name = 'columns'

563

```

564

565

### MySQL

566

```python

567

# MySQL-specific configuration

568

information_schema_columns = ['table_schema', 'table_name', 'column_name', 'ordinal_position', 'data_type']

569

information_schema_table_name = 'columns'

570

```

571

572

### BigQuery

573

```python

574

# BigQuery-specific configuration (uses INFORMATION_SCHEMA views)

575

information_schema_columns = ['table_catalog', 'table_schema', 'table_name', 'column_name', 'ordinal_position', 'data_type']

576

information_schema_table_name = 'COLUMN_FIELD_PATHS'

577

```

578

579

### Snowflake

580

```python

581

# Snowflake-specific configuration

582

information_schema_columns = ['table_schema', 'table_name', 'column_name', 'ordinal_position', 'data_type']

583

information_schema_table_name = 'columns'

584

is_uppercase_names = True # Snowflake uses uppercase identifiers

585

```