Enables Versatile Data Kit (VDK) to integrate with various data sources by providing a unified interface for data ingestion and management.
npx @tessl/cli install tessl/pypi-vdk-data-sources@0.1.00
# VDK Data Sources
1
2
Enables Versatile Data Kit (VDK) to integrate with various data sources by providing a unified interface for data ingestion and management. This plugin simplifies data pipeline construction by offering consistent APIs for connecting to databases, REST APIs, message brokers, and other data sources.
3
4
## Package Information
5
6
- **Package Name**: vdk-data-sources
7
- **Package Type**: pypi
8
- **Language**: Python
9
- **Installation**: `pip install vdk-data-sources`
10
11
## Core Imports
12
13
```python
14
from vdk.plugin.data_sources.mapping.data_flow import DataFlowInput
15
from vdk.plugin.data_sources.mapping.definitions import (
16
SourceDefinition,
17
DestinationDefinition,
18
DataFlowMappingDefinition
19
)
20
```
21
22
For implementing custom data sources:
23
24
```python
25
from vdk.plugin.data_sources.data_source import (
26
IDataSource,
27
IDataSourceStream,
28
IDataSourceConfiguration,
29
DataSourcePayload
30
)
31
from vdk.plugin.data_sources.factory import data_source, SingletonDataSourceFactory
32
from vdk.plugin.data_sources.config import config_class, config_field
33
from vdk.plugin.data_sources.state import IDataSourceState
34
```
35
36
For TOML configuration parsing:
37
38
```python
39
from vdk.plugin.data_sources.mapping import toml_parser
40
```
41
42
For CLI utilities:
43
44
```python
45
from vdk.plugin.data_sources.sources_command import list_data_sources, list_config_options
46
```
47
48
## Basic Usage
49
50
### Simple Data Flow
51
52
```python
53
from vdk.api.job_input import IJobInput
54
from vdk.plugin.data_sources.mapping.data_flow import DataFlowInput
55
from vdk.plugin.data_sources.mapping.definitions import (
56
SourceDefinition,
57
DestinationDefinition,
58
DataFlowMappingDefinition
59
)
60
61
def run(job_input: IJobInput):
62
# Define source and destination
63
source = SourceDefinition(
64
id="my-source",
65
name="auto-generated-data",
66
config={"num_records": 100}
67
)
68
destination = DestinationDefinition(
69
id="my-dest",
70
method="memory"
71
)
72
73
# Execute data flow
74
with DataFlowInput(job_input) as flow_input:
75
flow_input.start(DataFlowMappingDefinition(source, destination))
76
```
77
78
### TOML Configuration-Based Data Flow
79
80
```python
81
from vdk.api.job_input import IJobInput
82
from vdk.plugin.data_sources.mapping.data_flow import DataFlowInput
83
from vdk.plugin.data_sources.mapping import toml_parser
84
85
def run(job_input: IJobInput):
86
# Load configuration from TOML file
87
definitions = toml_parser.load_config("config.toml")
88
89
# Execute all configured flows
90
with DataFlowInput(job_input) as flow_input:
91
flow_input.start_flows(definitions)
92
```
93
94
Example `config.toml`:
95
96
```toml
97
[sources.auto]
98
name = "auto-generated-data"
99
config = { num_records = 50 }
100
101
[destinations.auto-dest]
102
method = "memory"
103
104
[[flows]]
105
from = "auto"
106
to = "auto-dest"
107
```
108
109
## Architecture
110
111
The vdk-data-sources plugin follows a layered architecture with clear separation of concerns:
112
113
- **Data Sources**: Manage connections and expose data streams
114
- **Data Streams**: Abstract individual data channels within sources
115
- **Data Flow Engine**: Orchestrates data movement from sources to destinations
116
- **Configuration System**: Provides typed configuration management
117
- **State Management**: Handles incremental ingestion and resume capabilities
118
- **Plugin Integration**: Integrates with VDK's plugin system
119
120
## Capabilities
121
122
### Data Source Implementation
123
124
Core interfaces and utilities for creating custom data sources that integrate with the VDK data ingestion framework.
125
126
```python { .api }
127
@data_source(name: str, config_class: Type[IDataSourceConfiguration])
128
def data_source_decorator(data_source_class: Type[IDataSource]): ...
129
130
class IDataSource:
131
def configure(self, config: IDataSourceConfiguration): ...
132
def connect(self, state: Optional[IDataSourceState]): ...
133
def disconnect(self): ...
134
def streams(self) -> List[IDataSourceStream]: ...
135
136
class IDataSourceStream:
137
def name(self) -> str: ...
138
def read(self) -> Iterable[DataSourcePayload]: ...
139
```
140
141
[Data Sources](./data-sources.md)
142
143
### Data Flow Management
144
145
High-level orchestration system for managing data flows from sources to destinations with transformation support.
146
147
```python { .api }
148
class DataFlowInput:
149
def __init__(self, job_input: IJobInput): ...
150
def start(self, flow_definition: DataFlowMappingDefinition): ...
151
def start_flows(self, definitions: Definitions): ...
152
def close(self): ...
153
154
class DataFlowMappingDefinition:
155
from_source: SourceDefinition
156
to_destination: DestinationDefinition
157
map_function: Optional[Callable[[DataSourcePayload], Optional[DataSourcePayload]]]
158
```
159
160
[Data Flow](./data-flow.md)
161
162
### Configuration System
163
164
Typed configuration management system with decorators for defining data source configuration schemas.
165
166
```python { .api }
167
@config_class(name: str, description: str, **kwargs)
168
def config_class_decorator(cls): ...
169
170
def config_field(
171
description: str,
172
is_sensitive: bool = False,
173
default=MISSING,
174
**kwargs
175
): ...
176
```
177
178
[Configuration](./configuration.md)
179
180
### State Management
181
182
Persistent state management for data sources enabling incremental ingestion and resume capabilities.
183
184
```python { .api }
185
class IDataSourceState:
186
def read_stream(self, stream_name: str) -> Dict[str, Any]: ...
187
def update_stream(self, stream_name: str, state: Dict[str, Any]): ...
188
def read_others(self, key: str) -> Dict[str, Any]: ...
189
def update_others(self, key: str, state: Dict[str, Any]): ...
190
```
191
192
[State Management](./state-management.md)
193
194
### Data Metrics Analysis
195
196
Interfaces for implementing metrics collection and analysis on data being ingested from data sources.
197
198
```python { .api }
199
class IDataMetricsBatchAnalyzer:
200
def analyze_batch(self, payload: DataSourcePayload): ...
201
202
class IDataMetricsFullAnalyzer:
203
def get_data_store_target(self) -> str: ...
204
def get_data_store_method(self) -> str: ...
205
def analyze_at_the_end(self): ...
206
```
207
208
### CLI Utilities
209
210
Functions for programmatically accessing data source registry information and exploring available data sources.
211
212
```python { .api }
213
def list_data_sources() -> List[Dict]:
214
"""
215
List all registered data sources.
216
217
Returns:
218
List of dictionaries containing data source information
219
"""
220
221
def list_config_options(data_source_name: str) -> List[Dict]:
222
"""
223
List configuration options for a specific data source.
224
225
Args:
226
data_source_name: Name of the data source
227
228
Returns:
229
List of dictionaries containing configuration options
230
"""
231
```
232
233
## Types
234
235
```python { .api }
236
@dataclass(frozen=True)
237
class DataSourcePayload:
238
data: Optional[Dict[str, Any]]
239
metadata: Optional[Dict[str, Union[int, str, bool, float, datetime]]]
240
state: Optional[Dict[str, Any]] = field(default_factory=dict)
241
destination_table: Optional[str] = None
242
243
@dataclass
244
class SourceDefinition:
245
id: str
246
name: str
247
config: Dict[str, Any] = field(default_factory=dict)
248
249
@dataclass
250
class DestinationDefinition:
251
id: str
252
method: str
253
target: Optional[str] = None
254
255
@dataclass
256
class Definitions:
257
sources: Dict[str, SourceDefinition]
258
destinations: Dict[str, DestinationDefinition]
259
flows: List[DataFlowMappingDefinition]
260
261
class StopDataSourceStream(Exception):
262
"""Signal the end of a stream and there's no more data"""
263
264
class RetryDataSourceStream(Exception):
265
"""Signal the stream ingestion should be retried"""
266
267
class DataSourcesAggregatedException(Exception):
268
"""Exception to aggregate multiple Data Sources failures"""
269
def __init__(self, data_streams_exceptions: Dict[str, Exception]): ...
270
271
class DataSourceNotFoundException(Exception):
272
"""Raised when a requested data source is not found in registry"""
273
274
@dataclass
275
class DataSourceError:
276
"""Data class to encapsulate information about a Data Source ingestion error"""
277
data_stream: IDataSourceStream
278
exception: BaseException
279
```