or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asset-management.mdindex.mdlineage-entities.mdnotifier-compatibility.mdopenlineage-integration.mdprovider-verification.mdsecurity-permissions.mdstandard-components.mdversion-compatibility.md

openlineage-integration.mddocs/

0

# OpenLineage Integration

1

2

OpenLineage facets, utilities, and compatibility functions for data lineage tracking, including SQL and Spark integration utilities. This module provides version-compatible OpenLineage integration with graceful fallbacks when OpenLineage dependencies are not available.

3

4

## Capabilities

5

6

### Base OpenLineage Classes

7

8

Core OpenLineage classes for data lineage representation.

9

10

```python { .api }

11

class BaseFacet:

12

"""Base class for OpenLineage facets."""

13

14

class Dataset:

15

"""OpenLineage dataset representation."""

16

17

class DatasetFacet:

18

"""Base class for dataset-specific facets."""

19

20

class InputDataset:

21

"""Input dataset in OpenLineage events."""

22

23

class OutputDataset:

24

"""Output dataset in OpenLineage events."""

25

26

class RunFacet:

27

"""Base class for run-specific facets."""

28

```

29

30

### Lineage Facets

31

32

Specific facet classes for detailed lineage tracking.

33

34

```python { .api }

35

class ColumnLineageDatasetFacet:

36

"""Facet for tracking column-level lineage."""

37

38

class DocumentationDatasetFacet:

39

"""Facet for dataset documentation."""

40

41

class SchemaDatasetFacet:

42

"""Facet for dataset schema information."""

43

44

class LifecycleStateChangeDatasetFacet:

45

"""Facet for dataset lifecycle state changes."""

46

47

class OutputStatisticsOutputDatasetFacet:

48

"""Facet for output dataset statistics."""

49

50

class SymlinksDatasetFacet:

51

"""Facet for dataset symlink information."""

52

```

53

54

### Run and Job Facets

55

56

Facets for tracking job execution and run information.

57

58

```python { .api }

59

class ErrorMessageRunFacet:

60

"""Facet for capturing error messages in runs."""

61

62

class ExternalQueryRunFacet:

63

"""Facet for external query execution information."""

64

65

class ExtractionErrorRunFacet:

66

"""Facet for lineage extraction errors."""

67

68

class SQLJobFacet:

69

"""Facet for SQL job information."""

70

```

71

72

### Supporting Classes

73

74

Helper classes for OpenLineage data structures.

75

76

```python { .api }

77

class Fields:

78

"""Field definitions for schema facets."""

79

80

class InputField:

81

"""Input field definition for column lineage."""

82

83

class Error:

84

"""Error representation in OpenLineage events."""

85

86

class LifecycleStateChange:

87

"""Lifecycle state change representation."""

88

89

class PreviousIdentifier:

90

"""Previous identifier for renamed datasets."""

91

92

class Identifier:

93

"""Dataset identifier representation."""

94

95

class SchemaDatasetFacetFields:

96

"""Schema field definitions."""

97

```

98

99

### No-Op Fallback

100

101

Function that creates no-op implementations when OpenLineage is not available.

102

103

```python { .api }

104

def create_no_op(*_, **__) -> None:

105

"""

106

Create a no-op placeholder when OpenLineage client is not available.

107

108

Returns:

109

None: Always returns None as a no-op implementation

110

"""

111

```

112

113

### SQL Integration

114

115

Utilities for extracting OpenLineage facets from SQL operations.

116

117

```python { .api }

118

def get_openlineage_facets_with_sql(

119

hook,

120

sql: str | list[str],

121

conn_id: str,

122

database: str | None

123

):

124

"""

125

Get OpenLineage facets from SQL queries.

126

127

Args:

128

hook: Database hook instance

129

sql (str | list[str]): SQL query or queries to analyze

130

conn_id (str): Connection ID for the database

131

database (str | None): Database name

132

133

Returns:

134

OpenLineage facets extracted from the SQL operation

135

"""

136

```

137

138

### Spark Integration

139

140

Utilities for injecting OpenLineage information into Spark properties.

141

142

```python { .api }

143

def inject_parent_job_information_into_spark_properties(

144

properties: dict,

145

context: Context

146

) -> dict:

147

"""

148

Inject OpenLineage parent job information into Spark properties.

149

150

Args:

151

properties (dict): Spark configuration properties

152

context (Context): Airflow task context

153

154

Returns:

155

dict: Updated properties with OpenLineage parent job information

156

"""

157

158

def inject_transport_information_into_spark_properties(

159

properties: dict,

160

context: Context

161

) -> dict:

162

"""

163

Inject OpenLineage transport information into Spark properties.

164

165

Args:

166

properties (dict): Spark configuration properties

167

context (Context): Airflow task context

168

169

Returns:

170

dict: Updated properties with OpenLineage transport configuration

171

"""

172

```

173

174

### Asset Translation

175

176

Utility for translating Airflow assets for OpenLineage compatibility.

177

178

```python { .api }

179

def translate_airflow_asset(*args, **kwargs):

180

"""

181

Translate Airflow assets for OpenLineage compatibility.

182

183

Handles asset/dataset compatibility between Airflow versions.

184

Maps to translate_airflow_asset in newer versions or translate_airflow_dataset

185

with parameter renaming in older versions.

186

187

Note: This function is conditionally imported and may have different signatures

188

depending on the OpenLineage provider version and Airflow version.

189

"""

190

```

191

192

## Usage Examples

193

194

```python

195

from airflow.providers.common.compat.openlineage.facet import (

196

Dataset, RunFacet, SchemaDatasetFacet, SQLJobFacet, create_no_op

197

)

198

from airflow.providers.common.compat.openlineage.utils.sql import get_openlineage_facets_with_sql

199

from airflow.providers.common.compat.openlineage.utils.spark import (

200

inject_parent_job_information_into_spark_properties,

201

inject_transport_information_into_spark_properties

202

)

203

from airflow.providers.common.compat.openlineage.check import require_openlineage_version

204

205

# Check if OpenLineage is available and use facets

206

try:

207

# Create OpenLineage dataset

208

input_dataset = Dataset(

209

namespace="postgresql://localhost:5432",

210

name="analytics.users"

211

)

212

213

# Create schema facet

214

schema_facet = SchemaDatasetFacet(

215

fields=[

216

{"name": "user_id", "type": "INTEGER"},

217

{"name": "email", "type": "VARCHAR(255)"}

218

]

219

)

220

221

# Create SQL job facet

222

sql_facet = SQLJobFacet(query="SELECT * FROM users WHERE active = true")

223

224

except ImportError:

225

# Fall back to no-op when OpenLineage not available

226

input_dataset = create_no_op()

227

schema_facet = create_no_op()

228

sql_facet = create_no_op()

229

230

# SQL integration example

231

@require_openlineage_version(provider_min_version="1.0.0")

232

def extract_sql_lineage(**context):

233

from airflow.providers.postgres.hooks.postgres import PostgresHook

234

235

hook = PostgresHook(conn_id="postgres_default")

236

sql = "SELECT user_id, email FROM users WHERE created_date = '{{ ds }}'"

237

238

# Get OpenLineage facets from SQL

239

facets = get_openlineage_facets_with_sql(

240

hook=hook,

241

sql=sql,

242

conn_id="postgres_default",

243

database="analytics"

244

)

245

246

return facets

247

248

# Spark integration example

249

@require_openlineage_version(provider_min_version="1.2.0")

250

def configure_spark_with_openlineage(**context):

251

spark_properties = {

252

"spark.app.name": "data-processing",

253

"spark.sql.adaptive.enabled": "true"

254

}

255

256

# Inject OpenLineage parent job information

257

spark_properties = inject_parent_job_information_into_spark_properties(

258

properties=spark_properties,

259

context=context

260

)

261

262

# Inject OpenLineage transport configuration

263

spark_properties = inject_transport_information_into_spark_properties(

264

properties=spark_properties,

265

context=context

266

)

267

268

return spark_properties

269

270

# Use in operators

271

from airflow.providers.postgres.operators.postgres import PostgresOperator

272

273

sql_task = PostgresOperator(

274

task_id="process_users",

275

sql="""

276

INSERT INTO processed_users

277

SELECT user_id, UPPER(email) as email

278

FROM users

279

WHERE created_date = '{{ ds }}'

280

""",

281

postgres_conn_id="postgres_default",

282

# OpenLineage will automatically extract lineage information

283

)

284

```