or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-interface.mdconfiguration.mddata-models.mdindex.mdlineage-runner.mdmetadata-providers.mdvisualization-export.md

metadata-providers.mddocs/

0

# Metadata Providers

1

2

Pluggable interfaces for providing schema and table metadata to enhance lineage analysis. Metadata providers supply column information for tables, enabling more accurate column-level lineage extraction and wildcard expansion.

3

4

## Capabilities

5

6

### Base MetaDataProvider

7

8

Abstract base class defining the interface for metadata providers. Custom providers can extend this class to integrate with different metadata sources.

9

10

```python { .api }

11

class MetaDataProvider:

12

def __init__(self) -> None:

13

"""Initialize the metadata provider"""

14

15

def get_table_columns(self, table: Table, **kwargs) -> List[Column]:

16

"""

17

Get columns for a specific table.

18

19

Parameters:

20

- table: Table object to get columns for

21

- **kwargs: additional provider-specific arguments

22

23

Returns:

24

List of Column objects representing the table's columns

25

"""

26

27

def register_session_metadata(self, table: Table, columns: List[Column]) -> None:

28

"""

29

Register table metadata for the current session.

30

31

Parameters:

32

- table: Table object

33

- columns: List of Column objects for the table

34

"""

35

36

def deregister_session_metadata(self) -> None:

37

"""Clear all session metadata"""

38

39

def session(self) -> "MetaDataSession":

40

"""

41

Get a metadata session context manager for temporary metadata.

42

43

Returns:

44

MetaDataSession context manager

45

"""

46

47

def _get_table_columns(self, schema: str, table: str, **kwargs) -> List[str]:

48

"""

49

Abstract method for provider-specific column retrieval.

50

51

Parameters:

52

- schema: schema name

53

- table: table name

54

- **kwargs: provider-specific arguments

55

56

Returns:

57

List of column names as strings

58

"""

59

```

60

61

### DummyMetaDataProvider

62

63

Simple dictionary-based metadata provider for testing and scenarios where schema information is known in advance.

64

65

```python { .api }

66

class DummyMetaDataProvider(MetaDataProvider):

67

def __init__(self, metadata: Optional[Dict[str, List[str]]] = None):

68

"""

69

Initialize with optional metadata dictionary.

70

71

Parameters:

72

- metadata: dictionary mapping table names to column lists

73

Keys can be "table" or "schema.table" format

74

"""

75

76

@property

77

def metadata(self) -> Dict[str, List[str]]:

78

"""Get the metadata dictionary mapping tables to column lists"""

79

```

80

81

### SQLAlchemyMetaDataProvider

82

83

Database-backed metadata provider using SQLAlchemy for schema introspection. Supports any database that SQLAlchemy can connect to.

84

85

```python { .api }

86

class SQLAlchemyMetaDataProvider(MetaDataProvider):

87

def __init__(self, url: str, engine_kwargs: Optional[Dict[str, Any]] = None):

88

"""

89

Initialize with database connection details.

90

91

Parameters:

92

- url: SQLAlchemy database URL

93

- engine_kwargs: additional arguments for SQLAlchemy engine creation

94

"""

95

96

@property

97

def engine(self) -> "sqlalchemy.Engine":

98

"""Get the SQLAlchemy engine instance"""

99

100

@property

101

def metadata_obj(self) -> "sqlalchemy.MetaData":

102

"""Get the SQLAlchemy MetaData object"""

103

104

class MetaDataSession:

105

def __init__(self, metadata_provider: MetaDataProvider):

106

"""

107

Create a metadata session for managing temporary metadata.

108

109

Parameters:

110

- metadata_provider: the provider to create a session for

111

"""

112

113

def __enter__(self):

114

"""Enter context manager"""

115

116

def __exit__(self, exc_type, exc_val, exc_tb):

117

"""Exit context manager and clean up session metadata"""

118

119

def register_session_metadata(self, table: Table, columns: List[Column]) -> None:

120

"""Register session-level metadata for temporary tables or views"""

121

```

122

123

### MetaDataSession

124

125

Context manager for temporary metadata registration during analysis.

126

127

```python { .api }

128

class MetaDataSession:

129

def __enter__(self) -> "MetaDataSession":

130

"""Enter the metadata session context"""

131

132

def __exit__(self, exc_type, exc_val, exc_tb) -> None:

133

"""Exit the metadata session context and clear temporary metadata"""

134

```

135

136

## Usage Examples

137

138

### DummyMetaDataProvider

139

140

```python

141

from sqllineage.core.metadata.dummy import DummyMetaDataProvider

142

from sqllineage.runner import LineageRunner

143

144

# Define table schemas

145

metadata = {

146

"customers": ["customer_id", "name", "email", "created_date"],

147

"orders": ["order_id", "customer_id", "total", "order_date"],

148

"analytics.customer_summary": ["customer_id", "total_orders", "total_spent"]

149

}

150

151

# Create provider and use with LineageRunner

152

provider = DummyMetaDataProvider(metadata)

153

runner = LineageRunner(sql, metadata_provider=provider)

154

155

# Now column lineage will be more accurate

156

for src_col, tgt_col in runner.get_column_lineage():

157

print(f"{src_col} -> {tgt_col}")

158

```

159

160

### SQLAlchemyMetaDataProvider with PostgreSQL

161

162

```python

163

from sqllineage.core.metadata.sqlalchemy import SQLAlchemyMetaDataProvider

164

from sqllineage.runner import LineageRunner

165

166

# Connect to PostgreSQL database

167

db_url = "postgresql://user:password@localhost:5432/analytics_db"

168

provider = SQLAlchemyMetaDataProvider(db_url)

169

170

sql = """

171

INSERT INTO reporting.daily_sales

172

SELECT

173

date_trunc('day', order_timestamp) as sale_date,

174

sum(amount) as total_sales

175

FROM raw.transactions

176

GROUP BY date_trunc('day', order_timestamp)

177

"""

178

179

# Provider will automatically introspect schema from database

180

runner = LineageRunner(sql, metadata_provider=provider)

181

print("Column lineage with database schema:")

182

runner.print_column_lineage()

183

```

184

185

### SQLAlchemyMetaDataProvider with Snowflake

186

187

```python

188

# Snowflake connection with additional engine options

189

snowflake_url = "snowflake://user:password@account/database/schema"

190

engine_options = {

191

"connect_args": {

192

"warehouse": "COMPUTE_WH",

193

"role": "ANALYST_ROLE"

194

}

195

}

196

197

provider = SQLAlchemyMetaDataProvider(snowflake_url, engine_kwargs=engine_options)

198

runner = LineageRunner(snowflake_sql, dialect="snowflake", metadata_provider=provider)

199

```

200

201

### Custom Metadata Provider

202

203

```python

204

class JSONMetaDataProvider(MetaDataProvider):

205

def __init__(self, json_file_path: str):

206

super().__init__()

207

import json

208

with open(json_file_path, 'r') as f:

209

self.schema_data = json.load(f)

210

211

def _get_table_columns(self, schema: str, table: str, **kwargs) -> List[str]:

212

table_key = f"{schema}.{table}" if schema else table

213

return self.schema_data.get(table_key, [])

214

215

# Use custom provider

216

custom_provider = JSONMetaDataProvider("schemas.json")

217

runner = LineageRunner(sql, metadata_provider=custom_provider)

218

```

219

220

### Session Metadata

221

222

```python

223

from sqllineage.core.metadata.dummy import DummyMetaDataProvider

224

from sqllineage.core.models import Table, Column

225

226

provider = DummyMetaDataProvider()

227

228

# Temporarily register metadata for a specific analysis

229

temp_table = Table("temp_analysis_table")

230

temp_columns = [Column("id"), Column("value"), Column("timestamp")]

231

232

with provider.session():

233

provider.register_session_metadata(temp_table, temp_columns)

234

235

# Run analysis with temporary metadata

236

runner = LineageRunner(sql_with_temp_table, metadata_provider=provider)

237

runner.print_column_lineage()

238

239

# Session metadata is automatically cleared

240

```

241

242

### Metadata for Complex SQL

243

244

```python

245

# Metadata for SQL with CTEs and subqueries

246

metadata = {

247

# Base tables

248

"raw.events": ["event_id", "user_id", "event_type", "timestamp", "properties"],

249

"raw.users": ["user_id", "email", "signup_date", "country"],

250

251

# View or materialized view

252

"analytics.user_events": ["user_id", "event_count", "first_event", "last_event"]

253

}

254

255

provider = DummyMetaDataProvider(metadata)

256

257

complex_sql = """

258

WITH user_activity AS (

259

SELECT

260

user_id,

261

COUNT(*) as event_count,

262

MIN(timestamp) as first_event,

263

MAX(timestamp) as last_event

264

FROM raw.events

265

WHERE event_type = 'page_view'

266

GROUP BY user_id

267

),

268

enriched_activity AS (

269

SELECT

270

ua.user_id,

271

u.email,

272

u.country,

273

ua.event_count,

274

ua.first_event,

275

ua.last_event

276

FROM user_activity ua

277

JOIN raw.users u ON ua.user_id = u.user_id

278

)

279

INSERT INTO analytics.user_events

280

SELECT user_id, event_count, first_event, last_event

281

FROM enriched_activity

282

"""

283

284

runner = LineageRunner(complex_sql, metadata_provider=provider)

285

print("CTE and JOIN column lineage:")

286

runner.print_column_lineage()

287

```

288

289

### Error Handling

290

291

```python

292

from sqllineage.exceptions import MetaDataProviderException

293

294

try:

295

# Invalid database URL

296

provider = SQLAlchemyMetaDataProvider("invalid://connection/string")

297

runner = LineageRunner(sql, metadata_provider=provider)

298

except MetaDataProviderException as e:

299

print(f"Metadata provider error: {e}")

300

# Fallback to dummy provider

301

fallback_provider = DummyMetaDataProvider()

302

runner = LineageRunner(sql, metadata_provider=fallback_provider)

303

```