or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdfacets-metadata.mdindex.mdlineage-extraction.mdplugin-integration.mdselective-control.mdspark-integration.mdsql-parsing.mdsql-utilities.mdtemplate-macros.mdutility-functions.md

sql-parsing.mddocs/

0

# SQL Parsing and Analysis

1

2

Parse SQL statements to extract data lineage, including table dependencies, column mappings, and database schema information. The SQL parsing functionality provides comprehensive analysis of SQL queries to automatically discover data flows and transformations.

3

4

## Capabilities

5

6

### SQL Parser Class

7

8

Main interface for parsing SQL statements and extracting lineage metadata.

9

10

```python { .api }

11

class SQLParser:

12

"""

13

Main SQL parsing interface for extracting lineage from SQL statements.

14

"""

15

16

def __init__(self, dialect: str | None = None, default_schema: str | None = None):

17

"""

18

Initialize SQL parser with optional dialect and schema.

19

20

Args:

21

dialect: SQL dialect (e.g., 'postgresql', 'mysql', 'bigquery')

22

default_schema: Default schema name for unqualified table references

23

"""

24

25

def parse(self, sql: list[str] | str) -> SqlMeta | None:

26

"""

27

Parse SQL statement(s) and return metadata.

28

29

Args:

30

sql: SQL statement(s) to parse

31

32

Returns:

33

SqlMeta: Parsed SQL metadata or None if parsing fails

34

"""

35

36

def parse_table_schemas(

37

self,

38

hook,

39

inputs: list[Dataset],

40

outputs: list[Dataset]

41

) -> tuple[list[Dataset], list[Dataset]]:

42

"""

43

Parse and enrich table schemas with column information.

44

45

Args:

46

hook: Database hook for schema queries

47

inputs: Input datasets to enrich

48

outputs: Output datasets to enrich

49

50

Returns:

51

tuple: Enriched (inputs, outputs) datasets with schema information

52

"""

53

54

def get_metadata_from_parser(

55

self,

56

parse_result: SqlMeta,

57

database: str | None,

58

schema: str | None

59

) -> tuple[list[Dataset], list[Dataset]]:

60

"""

61

Extract input/output datasets from parse results.

62

63

Args:

64

parse_result: Result from SQL parsing

65

database: Database name

66

schema: Schema name

67

68

Returns:

69

tuple: (input_datasets, output_datasets)

70

"""

71

72

def attach_column_lineage(

73

self,

74

datasets: list[Dataset],

75

database: str | None,

76

parse_result: SqlMeta

77

):

78

"""

79

Attach column-level lineage information to datasets.

80

81

Args:

82

datasets: Datasets to enrich with column lineage

83

database: Database name

84

parse_result: SQL parsing result with column mappings

85

"""

86

87

def generate_openlineage_metadata_from_sql(

88

self,

89

operator_instance,

90

task_instance,

91

task_uuid: str

92

) -> OperatorLineage:

93

"""

94

Generate complete OpenLineage metadata from SQL operations.

95

96

Args:

97

operator_instance: Airflow operator instance

98

task_instance: Task instance context

99

task_uuid: Unique task identifier

100

101

Returns:

102

OperatorLineage: Complete operator lineage with datasets and facets

103

"""

104

105

@staticmethod

106

def create_namespace(database_info: DatabaseInfo) -> str:

107

"""

108

Create namespace string from database information.

109

110

Args:

111

database_info: Database configuration

112

113

Returns:

114

str: Formatted namespace string

115

"""

116

117

@classmethod

118

def normalize_sql(cls, sql: list[str] | str) -> str:

119

"""

120

Normalize SQL statement(s) for consistent parsing.

121

122

Args:

123

sql: SQL statement(s) to normalize

124

125

Returns:

126

str: Normalized SQL string

127

"""

128

129

@classmethod

130

def split_sql_string(cls, sql: list[str] | str) -> list[str]:

131

"""

132

Split SQL string into individual statements.

133

134

Args:

135

sql: SQL string or list to split

136

137

Returns:

138

list[str]: List of individual SQL statements

139

"""

140

141

def create_information_schema_query(

142

self,

143

tables_hierarchy: dict,

144

information_schema_columns: list[str],

145

information_schema_table_name: str,

146

is_cross_db: bool,

147

use_flat_cross_db_query: bool,

148

is_uppercase_names: bool

149

) -> str:

150

"""

151

Create query for extracting schema information from information_schema.

152

153

Args:

154

tables_hierarchy: Nested dictionary of database/schema/table structure

155

information_schema_columns: Columns to select from information schema

156

information_schema_table_name: Name of information schema table

157

is_cross_db: Whether query spans multiple databases

158

use_flat_cross_db_query: Whether to use flat cross-database query

159

is_uppercase_names: Whether to uppercase table/column names

160

161

Returns:

162

str: SQL query for schema information extraction

163

"""

164

```

165

166

### Database Information Configuration

167

168

Container for database-specific configuration and connection details.

169

170

```python { .api }

171

class DatabaseInfo:

172

"""

173

Database configuration container with connection details and schema information.

174

"""

175

176

scheme: str # Database scheme (e.g., 'postgresql', 'mysql')

177

authority: str | None # Database authority/host information

178

database: str | None # Database name

179

information_schema_columns: list[str] # Columns available in information_schema

180

information_schema_table_name: str # Name of information schema table

181

use_flat_cross_db_query: bool # Whether to use flat cross-database queries

182

is_information_schema_cross_db: bool # Whether information_schema spans databases

183

is_uppercase_names: bool # Whether to uppercase identifiers

184

normalize_name_method: Callable[[str], str] # Method for normalizing names

185

```

186

187

### Type Definitions

188

189

Type definitions for SQL parsing operations and parameters.

190

191

```python { .api }

192

class GetTableSchemasParams(TypedDict):

193

"""

194

Type definition for table schema extraction parameters.

195

"""

196

hook: Any # Database hook instance

197

namespace: str # OpenLineage namespace

198

database: str | None # Database name

199

schema: str | None # Schema name

200

tables_hierarchy: dict # Nested table structure

201

information_schema_columns: list[str] # Information schema columns

202

information_schema_table_name: str # Information schema table name

203

is_cross_db: bool # Cross-database query flag

204

use_flat_cross_db_query: bool # Flat query flag

205

is_uppercase_names: bool # Uppercase names flag

206

```

207

208

### Utility Functions

209

210

Helper functions for SQL parsing and lineage extraction.

211

212

```python { .api }

213

def default_normalize_name_method(name: str) -> str:

214

"""

215

Default method for normalizing database object names.

216

217

Args:

218

name: Name to normalize

219

220

Returns:

221

str: Normalized name

222

"""

223

224

def from_table_meta(

225

table_meta: DbTableMeta,

226

database: str | None,

227

namespace: str,

228

is_uppercase: bool

229

) -> Dataset:

230

"""

231

Convert table metadata to OpenLineage Dataset.

232

233

Args:

234

table_meta: Database table metadata

235

database: Database name

236

namespace: OpenLineage namespace

237

is_uppercase: Whether names should be uppercase

238

239

Returns:

240

Dataset: OpenLineage dataset representation

241

"""

242

243

def get_openlineage_facets_with_sql(

244

hook: DbApiHook,

245

sql: str | list[str],

246

conn_id: str,

247

database: str | None

248

) -> OperatorLineage | None:

249

"""

250

Extract OpenLineage facets from SQL operations using database hook.

251

252

Args:

253

hook: Database API hook for connection

254

sql: SQL statement(s) to analyze

255

conn_id: Airflow connection ID

256

database: Database name

257

258

Returns:

259

OperatorLineage: Extracted lineage metadata or None if extraction fails

260

"""

261

```

262

263

### Constants

264

265

Default values and configuration constants for SQL parsing.

266

267

```python { .api }

268

DEFAULT_NAMESPACE: str = "default"

269

"""Default namespace for OpenLineage events when none specified."""

270

271

DEFAULT_INFORMATION_SCHEMA_COLUMNS: list[str] = [

272

"table_schema",

273

"table_name",

274

"column_name",

275

"ordinal_position",

276

"udt_name"

277

]

278

"""Default columns to select from information_schema tables."""

279

280

DEFAULT_INFORMATION_SCHEMA_TABLE_NAME: str = "columns"

281

"""Default information_schema table name for column metadata."""

282

```

283

284

## Usage Examples

285

286

### Basic SQL Parsing

287

288

```python

289

from airflow.providers.openlineage.sqlparser import SQLParser

290

291

# Initialize parser

292

parser = SQLParser(dialect='postgresql', default_schema='public')

293

294

# Parse SQL statement

295

sql = "SELECT * FROM users u JOIN orders o ON u.id = o.user_id"

296

metadata = parser.parse(sql)

297

298

if metadata:

299

print(f"Input tables: {metadata.in_tables}")

300

print(f"Output tables: {metadata.out_tables}")

301

```

302

303

### Database Configuration

304

305

```python

306

from airflow.providers.openlineage.sqlparser import DatabaseInfo, default_normalize_name_method

307

308

# Configure database information

309

db_info = DatabaseInfo(

310

scheme='postgresql',

311

authority='localhost:5432',

312

database='analytics',

313

information_schema_columns=['table_schema', 'table_name', 'column_name'],

314

information_schema_table_name='columns',

315

use_flat_cross_db_query=False,

316

is_information_schema_cross_db=False,

317

is_uppercase_names=False,

318

normalize_name_method=default_normalize_name_method

319

)

320

321

# Create namespace

322

namespace = SQLParser.create_namespace(db_info)

323

print(f"Namespace: {namespace}")

324

```

325

326

### Schema Analysis

327

328

```python

329

from airflow.providers.openlineage.sqlparser import SQLParser

330

from airflow.hooks.postgres_hook import PostgresHook

331

332

# Setup

333

parser = SQLParser(dialect='postgresql')

334

hook = PostgresHook(postgres_conn_id='my_postgres')

335

336

# Parse SQL and get basic lineage

337

sql = """

338

INSERT INTO analytics.user_metrics

339

SELECT user_id, COUNT(*) as order_count

340

FROM orders

341

GROUP BY user_id

342

"""

343

344

metadata = parser.parse(sql)

345

inputs, outputs = parser.get_metadata_from_parser(metadata, 'analytics', 'public')

346

347

# Enrich with schema information

348

enriched_inputs, enriched_outputs = parser.parse_table_schemas(hook, inputs, outputs)

349

350

print(f"Enriched inputs: {enriched_inputs}")

351

print(f"Enriched outputs: {enriched_outputs}")

352

```

353

354

### Complete Lineage Generation

355

356

```python

357

from airflow.providers.openlineage.sqlparser import get_openlineage_facets_with_sql

358

from airflow.hooks.postgres_hook import PostgresHook

359

360

# Extract complete lineage

361

hook = PostgresHook(postgres_conn_id='analytics_db')

362

sql = "INSERT INTO reports.daily_sales SELECT * FROM raw.sales WHERE date = CURRENT_DATE"

363

364

lineage = get_openlineage_facets_with_sql(

365

hook=hook,

366

sql=sql,

367

conn_id='analytics_db',

368

database='analytics'

369

)

370

371

if lineage:

372

print(f"Input datasets: {lineage.inputs}")

373

print(f"Output datasets: {lineage.outputs}")

374

print(f"Run facets: {lineage.run_facets}")

375

```

376

377

### Custom SQL Normalization

378

379

```python

380

from airflow.providers.openlineage.sqlparser import SQLParser

381

382

# Normalize complex SQL

383

complex_sql = [

384

"/* Comment */ SELECT * FROM table1;",

385

"INSERT INTO table2 SELECT * FROM table1 WHERE active = true;",

386

""

387

]

388

389

normalized = SQLParser.normalize_sql(complex_sql)

390

statements = SQLParser.split_sql_string(normalized)

391

392

print(f"Normalized SQL: {normalized}")

393

print(f"Individual statements: {statements}")

394

```

395

396

## Integration with Operators

397

398

The SQL parser integrates automatically with SQL-based operators:

399

400

```python

401

from airflow.providers.postgres.operators.postgres import PostgresOperator

402

from airflow.providers.openlineage.sqlparser import SQLParser

403

404

# The operator automatically uses SQLParser for lineage extraction

405

sql_task = PostgresOperator(

406

task_id='analyze_sales',

407

postgres_conn_id='analytics_db',

408

sql="""

409

INSERT INTO reports.monthly_sales

410

SELECT

411

DATE_TRUNC('month', order_date) as month,

412

SUM(amount) as total_sales

413

FROM sales.orders

414

WHERE order_date >= '2023-01-01'

415

GROUP BY DATE_TRUNC('month', order_date)

416

""",

417

dag=dag

418

)

419

```

420

421

## Supported SQL Dialects

422

423

The SQL parser supports various SQL dialects:

424

425

- PostgreSQL

426

- MySQL

427

- BigQuery

428

- Snowflake

429

- Redshift

430

- SQLite

431

- Generic SQL (limited features)

432

433

Dialect-specific features include appropriate query syntax, information schema handling, and identifier quoting conventions.