or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdfacets-metadata.mdindex.mdlineage-extraction.mdplugin-integration.mdselective-control.mdspark-integration.mdsql-parsing.mdsql-utilities.mdtemplate-macros.mdutility-functions.md

configuration.mddocs/

0

# Configuration Management

1

2

Configuration functions for accessing and controlling OpenLineage settings, including transport configuration, selective enabling, custom extractors, debugging options, and execution parameters.

3

4

## Capabilities

5

6

### Core Configuration Functions

7

8

Access fundamental OpenLineage configuration settings that control the behavior of lineage collection and emission.

9

10

```python { .api }

11

def is_disabled() -> bool:

12

"""

13

Check if OpenLineage is completely disabled.

14

15

Returns:

16

bool: True if OpenLineage events should not be collected or emitted

17

"""

18

19

def namespace() -> str:

20

"""

21

Get the OpenLineage namespace for lineage events.

22

23

Returns:

24

str: Namespace string that groups lineage events logically

25

"""

26

27

def transport() -> dict[str, Any]:

28

"""

29

Get the transport configuration for sending OpenLineage events.

30

31

Returns:

32

dict: Transport configuration including type and connection details

33

"""

34

```

35

36

### Selective Lineage Configuration

37

38

Control selective lineage collection behavior, allowing fine-grained control over which operators and DAGs emit lineage events.

39

40

```python { .api }

41

def selective_enable() -> bool:

42

"""

43

Check if selective lineage mode is enabled.

44

45

When enabled, lineage is only collected for DAGs/tasks explicitly marked

46

with enable_lineage().

47

48

Returns:

49

bool: True if selective enable mode is active

50

"""

51

52

def disabled_operators() -> set[str]:

53

"""

54

Get the set of operator class names that are disabled for lineage collection.

55

56

Returns:

57

set[str]: Set of fully qualified operator class names to exclude

58

"""

59

```

60

61

### Custom Component Configuration

62

63

Access configuration for custom extractors and facets that extend OpenLineage functionality.

64

65

```python { .api }

66

def custom_extractors() -> set[str]:

67

"""

68

Get the set of custom extractor class paths registered for lineage extraction.

69

70

Returns:

71

set[str]: Set of fully qualified class paths for custom extractors

72

"""

73

74

def custom_run_facets() -> set[str]:

75

"""

76

Get the set of custom run facet function paths for metadata enrichment.

77

78

Returns:

79

set[str]: Set of fully qualified function paths for custom facets

80

"""

81

```

82

83

### Source Code and Documentation Configuration

84

85

Control inclusion of source code and additional information in lineage events.

86

87

```python { .api }

88

def is_source_enabled() -> bool:

89

"""

90

Check if source code inclusion is enabled for lineage events.

91

92

When enabled, operators like PythonOperator and BashOperator include

93

their source code in the lineage events.

94

95

Returns:

96

bool: True if source code should be included in events

97

"""

98

99

def include_full_task_info() -> bool:

100

"""

101

Check if full task information should be included in lineage events.

102

103

When enabled, events include comprehensive task metadata which may

104

contain large fields.

105

106

Returns:

107

bool: True if full task info should be included

108

"""

109

```

110

111

### Spark Integration Configuration

112

113

Configuration specific to Spark application integration and property injection.

114

115

```python { .api }

116

def spark_inject_parent_job_info() -> bool:

117

"""

118

Check if parent job information should be injected into Spark applications.

119

120

When enabled, automatically injects OpenLineage parent job details

121

(namespace, job name, run id) into Spark application properties.

122

123

Returns:

124

bool: True if parent job info injection is enabled

125

"""

126

127

def spark_inject_transport_info() -> bool:

128

"""

129

Check if transport information should be injected into Spark applications.

130

131

When enabled, automatically injects OpenLineage transport configuration

132

into Spark application properties for lineage emission.

133

134

Returns:

135

bool: True if transport info injection is enabled

136

"""

137

```

138

139

### Performance and Debugging Configuration

140

141

Configuration options for performance tuning, debugging, and operational control.

142

143

```python { .api }

144

def debug_mode() -> bool:

145

"""

146

Check if debug mode is enabled for OpenLineage events.

147

148

When enabled, events include debugging information such as installed

149

packages and their versions, potentially creating large events.

150

151

Returns:

152

bool: True if debug mode is active

153

"""

154

155

def execution_timeout() -> int:

156

"""

157

Get the maximum execution timeout for OpenLineage metadata extraction.

158

159

Returns:

160

int: Timeout in seconds for metadata extraction operations

161

"""

162

163

def dag_state_change_process_pool_size() -> int:

164

"""

165

Get the number of processes for handling DAG state changes asynchronously.

166

167

Returns:

168

int: Process pool size for async DAG state change processing

169

"""

170

```

171

172

### Configuration File Access

173

174

Access to configuration file paths and legacy environment variable handling.

175

176

```python { .api }

177

def config_path(check_legacy_env_var: bool = True) -> str:

178

"""

179

Get the path to the OpenLineage configuration file.

180

181

Args:

182

check_legacy_env_var: Whether to check legacy environment variables

183

184

Returns:

185

str: Absolute path to the OpenLineage configuration file

186

"""

187

```

188

189

## Usage Examples

190

191

### Basic Configuration Check

192

193

```python

194

from airflow.providers.openlineage.conf import is_disabled, namespace, transport

195

196

# Check if OpenLineage is enabled

197

if not is_disabled():

198

print(f"OpenLineage namespace: {namespace()}")

199

print(f"Transport config: {transport()}")

200

```

201

202

### Selective Enable Configuration

203

204

```python

205

from airflow.providers.openlineage.conf import selective_enable, disabled_operators

206

207

if selective_enable():

208

print("Selective lineage mode is active")

209

else:

210

disabled = disabled_operators()

211

if disabled:

212

print(f"Disabled operators: {disabled}")

213

```

214

215

### Debug and Performance Settings

216

217

```python

218

from airflow.providers.openlineage.conf import debug_mode, execution_timeout, include_full_task_info

219

220

print(f"Debug mode: {debug_mode()}")

221

print(f"Execution timeout: {execution_timeout()}s")

222

print(f"Include full task info: {include_full_task_info()}")

223

```

224

225

### Custom Components Check

226

227

```python

228

from airflow.providers.openlineage.conf import custom_extractors, custom_run_facets

229

230

extractors = custom_extractors()

231

facets = custom_run_facets()

232

233

print(f"Custom extractors: {extractors}")

234

print(f"Custom facets: {facets}")

235

```

236

237

## Configuration Examples

238

239

### Transport Configuration

240

241

```python

242

# HTTP Transport

243

transport_config = {

244

"type": "http",

245

"url": "http://localhost:5000",

246

"endpoint": "api/v1/lineage",

247

"timeout": 5,

248

"verify": False

249

}

250

251

# Kafka Transport

252

transport_config = {

253

"type": "kafka",

254

"config": {

255

"bootstrap.servers": "localhost:9092",

256

"acks": "all",

257

"retries": 3

258

},

259

"topic": "openlineage.events"

260

}

261

262

# File Transport

263

transport_config = {

264

"type": "file",

265

"location": "/tmp/openlineage"

266

}

267

```

268

269

### Airflow Configuration

270

271

Configuration options can be set in `airflow.cfg`:

272

273

```ini

274

[openlineage]

275

disabled = False

276

namespace = my_airflow_instance

277

transport = {"type": "http", "url": "http://localhost:5000"}

278

selective_enable = False

279

disabled_for_operators = airflow.operators.bash.BashOperator;airflow.operators.python.PythonOperator

280

extractors = my_package.CustomExtractor;my_package.AnotherExtractor

281

custom_run_facets = my_package.custom_facet_function

282

debug_mode = False

283

execution_timeout = 10

284

include_full_task_info = False

285

```