0
# Metadata Providers
1
2
Pluggable interfaces for providing schema and table metadata to enhance lineage analysis. Metadata providers supply column information for tables, enabling more accurate column-level lineage extraction and wildcard expansion.
3
4
## Capabilities
5
6
### Base MetaDataProvider
7
8
Abstract base class defining the interface for metadata providers. Custom providers can extend this class to integrate with different metadata sources.
9
10
```python { .api }
11
class MetaDataProvider:
12
def __init__(self) -> None:
13
"""Initialize the metadata provider"""
14
15
def get_table_columns(self, table: Table, **kwargs) -> List[Column]:
16
"""
17
Get columns for a specific table.
18
19
Parameters:
20
- table: Table object to get columns for
21
- **kwargs: additional provider-specific arguments
22
23
Returns:
24
List of Column objects representing the table's columns
25
"""
26
27
def register_session_metadata(self, table: Table, columns: List[Column]) -> None:
28
"""
29
Register table metadata for the current session.
30
31
Parameters:
32
- table: Table object
33
- columns: List of Column objects for the table
34
"""
35
36
def deregister_session_metadata(self) -> None:
37
"""Clear all session metadata"""
38
39
def session(self) -> "MetaDataSession":
40
"""
41
Get a metadata session context manager for temporary metadata.
42
43
Returns:
44
MetaDataSession context manager
45
"""
46
47
def _get_table_columns(self, schema: str, table: str, **kwargs) -> List[str]:
48
"""
49
Abstract method for provider-specific column retrieval.
50
51
Parameters:
52
- schema: schema name
53
- table: table name
54
- **kwargs: provider-specific arguments
55
56
Returns:
57
List of column names as strings
58
"""
59
```
60
61
### DummyMetaDataProvider
62
63
Simple dictionary-based metadata provider for testing and scenarios where schema information is known in advance.
64
65
```python { .api }
66
class DummyMetaDataProvider(MetaDataProvider):
67
def __init__(self, metadata: Optional[Dict[str, List[str]]] = None):
68
"""
69
Initialize with optional metadata dictionary.
70
71
Parameters:
72
- metadata: dictionary mapping table names to column lists
73
Keys can be "table" or "schema.table" format
74
"""
75
76
@property
77
def metadata(self) -> Dict[str, List[str]]:
78
"""Get the metadata dictionary mapping tables to column lists"""
79
```
80
81
### SQLAlchemyMetaDataProvider
82
83
Database-backed metadata provider using SQLAlchemy for schema introspection. Supports any database that SQLAlchemy can connect to.
84
85
```python { .api }
86
class SQLAlchemyMetaDataProvider(MetaDataProvider):
87
def __init__(self, url: str, engine_kwargs: Optional[Dict[str, Any]] = None):
88
"""
89
Initialize with database connection details.
90
91
Parameters:
92
- url: SQLAlchemy database URL
93
- engine_kwargs: additional arguments for SQLAlchemy engine creation
94
"""
95
96
@property
97
def engine(self) -> "sqlalchemy.Engine":
98
"""Get the SQLAlchemy engine instance"""
99
100
@property
101
def metadata_obj(self) -> "sqlalchemy.MetaData":
102
"""Get the SQLAlchemy MetaData object"""
103
104
class MetaDataSession:
105
def __init__(self, metadata_provider: MetaDataProvider):
106
"""
107
Create a metadata session for managing temporary metadata.
108
109
Parameters:
110
- metadata_provider: the provider to create a session for
111
"""
112
113
def __enter__(self):
114
"""Enter context manager"""
115
116
def __exit__(self, exc_type, exc_val, exc_tb):
117
"""Exit context manager and clean up session metadata"""
118
119
def register_session_metadata(self, table: Table, columns: List[Column]) -> None:
120
"""Register session-level metadata for temporary tables or views"""
121
```
122
123
### MetaDataSession
124
125
Context manager for temporary metadata registration during analysis.
126
127
```python { .api }
128
class MetaDataSession:
129
def __enter__(self) -> "MetaDataSession":
130
"""Enter the metadata session context"""
131
132
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
133
"""Exit the metadata session context and clear temporary metadata"""
134
```
135
136
## Usage Examples
137
138
### DummyMetaDataProvider
139
140
```python
141
from sqllineage.core.metadata.dummy import DummyMetaDataProvider
142
from sqllineage.runner import LineageRunner
143
144
# Define table schemas
145
metadata = {
146
"customers": ["customer_id", "name", "email", "created_date"],
147
"orders": ["order_id", "customer_id", "total", "order_date"],
148
"analytics.customer_summary": ["customer_id", "total_orders", "total_spent"]
149
}
150
151
# Create provider and use with LineageRunner
152
provider = DummyMetaDataProvider(metadata)
153
runner = LineageRunner(sql, metadata_provider=provider)
154
155
# Now column lineage will be more accurate
156
for src_col, tgt_col in runner.get_column_lineage():
157
print(f"{src_col} -> {tgt_col}")
158
```
159
160
### SQLAlchemyMetaDataProvider with PostgreSQL
161
162
```python
163
from sqllineage.core.metadata.sqlalchemy import SQLAlchemyMetaDataProvider
164
from sqllineage.runner import LineageRunner
165
166
# Connect to PostgreSQL database
167
db_url = "postgresql://user:password@localhost:5432/analytics_db"
168
provider = SQLAlchemyMetaDataProvider(db_url)
169
170
sql = """
171
INSERT INTO reporting.daily_sales
172
SELECT
173
date_trunc('day', order_timestamp) as sale_date,
174
sum(amount) as total_sales
175
FROM raw.transactions
176
GROUP BY date_trunc('day', order_timestamp)
177
"""
178
179
# Provider will automatically introspect schema from database
180
runner = LineageRunner(sql, metadata_provider=provider)
181
print("Column lineage with database schema:")
182
runner.print_column_lineage()
183
```
184
185
### SQLAlchemyMetaDataProvider with Snowflake
186
187
```python
188
# Snowflake connection with additional engine options
189
snowflake_url = "snowflake://user:password@account/database/schema"
190
engine_options = {
191
"connect_args": {
192
"warehouse": "COMPUTE_WH",
193
"role": "ANALYST_ROLE"
194
}
195
}
196
197
provider = SQLAlchemyMetaDataProvider(snowflake_url, engine_kwargs=engine_options)
198
runner = LineageRunner(snowflake_sql, dialect="snowflake", metadata_provider=provider)
199
```
200
201
### Custom Metadata Provider
202
203
```python
204
class JSONMetaDataProvider(MetaDataProvider):
205
def __init__(self, json_file_path: str):
206
super().__init__()
207
import json
208
with open(json_file_path, 'r') as f:
209
self.schema_data = json.load(f)
210
211
def _get_table_columns(self, schema: str, table: str, **kwargs) -> List[str]:
212
table_key = f"{schema}.{table}" if schema else table
213
return self.schema_data.get(table_key, [])
214
215
# Use custom provider
216
custom_provider = JSONMetaDataProvider("schemas.json")
217
runner = LineageRunner(sql, metadata_provider=custom_provider)
218
```
219
220
### Session Metadata
221
222
```python
223
from sqllineage.core.metadata.dummy import DummyMetaDataProvider
224
from sqllineage.core.models import Table, Column
225
226
provider = DummyMetaDataProvider()
227
228
# Temporarily register metadata for a specific analysis
229
temp_table = Table("temp_analysis_table")
230
temp_columns = [Column("id"), Column("value"), Column("timestamp")]
231
232
with provider.session():
233
provider.register_session_metadata(temp_table, temp_columns)
234
235
# Run analysis with temporary metadata
236
runner = LineageRunner(sql_with_temp_table, metadata_provider=provider)
237
runner.print_column_lineage()
238
239
# Session metadata is automatically cleared
240
```
241
242
### Metadata for Complex SQL
243
244
```python
245
# Metadata for SQL with CTEs and subqueries
246
metadata = {
247
# Base tables
248
"raw.events": ["event_id", "user_id", "event_type", "timestamp", "properties"],
249
"raw.users": ["user_id", "email", "signup_date", "country"],
250
251
# View or materialized view
252
"analytics.user_events": ["user_id", "event_count", "first_event", "last_event"]
253
}
254
255
provider = DummyMetaDataProvider(metadata)
256
257
complex_sql = """
258
WITH user_activity AS (
259
SELECT
260
user_id,
261
COUNT(*) as event_count,
262
MIN(timestamp) as first_event,
263
MAX(timestamp) as last_event
264
FROM raw.events
265
WHERE event_type = 'page_view'
266
GROUP BY user_id
267
),
268
enriched_activity AS (
269
SELECT
270
ua.user_id,
271
u.email,
272
u.country,
273
ua.event_count,
274
ua.first_event,
275
ua.last_event
276
FROM user_activity ua
277
JOIN raw.users u ON ua.user_id = u.user_id
278
)
279
INSERT INTO analytics.user_events
280
SELECT user_id, event_count, first_event, last_event
281
FROM enriched_activity
282
"""
283
284
runner = LineageRunner(complex_sql, metadata_provider=provider)
285
print("CTE and JOIN column lineage:")
286
runner.print_column_lineage()
287
```
288
289
### Error Handling
290
291
```python
292
from sqllineage.exceptions import MetaDataProviderException
293
294
try:
295
# Invalid database URL
296
provider = SQLAlchemyMetaDataProvider("invalid://connection/string")
297
runner = LineageRunner(sql, metadata_provider=provider)
298
except MetaDataProviderException as e:
299
print(f"Metadata provider error: {e}")
300
# Fallback to dummy provider
301
fallback_provider = DummyMetaDataProvider()
302
runner = LineageRunner(sql, metadata_provider=fallback_provider)
303
```