0
# State Management
1
2
Persistent state management for data sources enabling incremental ingestion and resume capabilities. The state management system allows data sources to track their progress and resume from where they left off.
3
4
## Capabilities
5
6
### Data Source State Interface
7
8
Abstract interface for managing data source state with support for stream-specific and general state tracking.
9
10
```python { .api }
11
class IDataSourceState:
12
"""
13
Abstract interface for data source state management.
14
"""
15
def read_stream(self, stream_name: str) -> Dict[str, Any]:
16
"""
17
Read the state of a stream within a data source.
18
19
Args:
20
stream_name: Name of the stream
21
22
Returns:
23
Dictionary containing the stream's state
24
"""
25
26
def update_stream(self, stream_name: str, state: Dict[str, Any]):
27
"""
28
Update the state of a stream within the data source.
29
30
Args:
31
stream_name: Name of the stream
32
state: New state dictionary for the stream
33
"""
34
35
def read_others(self, key: str) -> Dict[str, Any]:
36
"""
37
Read state of a data source that is not related to streams.
38
39
Args:
40
key: State key identifier
41
42
Returns:
43
Dictionary containing the state for the given key
44
"""
45
46
def update_others(self, key: str, state: Dict[str, Any]):
47
"""
48
Update state of a data source not related to streams.
49
50
Args:
51
key: State key identifier
52
state: New state dictionary for the key
53
"""
54
```
55
56
### Data Source State Implementation
57
58
Concrete implementation of state management that uses pluggable storage backends.
59
60
```python { .api }
61
class DataSourceState(IDataSourceState):
62
"""
63
Concrete implementation of data source state management.
64
"""
65
def __init__(self, state_storage: IDataSourceStateStorage, source: str):
66
"""
67
Initialize state manager.
68
69
Args:
70
state_storage: Storage backend for persisting state
71
source: Data source identifier
72
"""
73
74
def read_stream(self, stream_name: str) -> Dict[str, Any]:
75
"""Read stream state from storage"""
76
77
def read_others(self, key: str) -> Dict[str, Any]:
78
"""Read general state from storage"""
79
80
def update_stream(self, stream_name: str, state: Dict[str, Any]):
81
"""Update stream state in storage"""
82
83
def update_others(self, key: str, state: Dict[str, Any]):
84
"""Update general state in storage"""
85
```
86
87
### State Storage Interface
88
89
Abstract interface for state storage backends that can be implemented for different persistence mechanisms.
90
91
```python { .api }
92
class IDataSourceStateStorage:
93
"""
94
Abstract interface for state storage backends.
95
"""
96
def read(self, data_source: str) -> Dict[str, Any]:
97
"""
98
Read the state from underlying storage.
99
100
Args:
101
data_source: The data source name
102
103
Returns:
104
Dictionary containing the complete state for the data source
105
"""
106
107
def write(self, data_source: str, state: Dict[str, Any]):
108
"""
109
Write (persist) the current state of the data source to underlying storage.
110
111
Args:
112
data_source: The data source name
113
state: Complete state dictionary to persist
114
"""
115
```
116
117
### State Storage Implementations
118
119
Built-in implementations for different storage backends.
120
121
```python { .api }
122
class InMemoryDataSourceStateStorage(IDataSourceStateStorage):
123
"""
124
In-memory state storage useful for testing purposes.
125
"""
126
def __init__(self): ...
127
128
def read(self, data_source: str) -> Dict[str, Any]:
129
"""Read state from memory"""
130
131
def write(self, data_source: str, state: Dict[str, Any]):
132
"""Write state to memory"""
133
134
class PropertiesBasedDataSourceStorage(IDataSourceStateStorage):
135
"""
136
State storage implementation using VDK properties system.
137
"""
138
KEY = ".vdk.data_sources.state"
139
140
def __init__(self, properties: IProperties):
141
"""
142
Initialize properties-based storage.
143
144
Args:
145
properties: VDK properties interface
146
"""
147
148
def read(self, data_source: str) -> Dict[str, Any]:
149
"""Read state from VDK properties"""
150
151
def write(self, data_source: str, state: Dict[str, Any]):
152
"""Write state to VDK properties"""
153
```
154
155
### State Factory
156
157
Factory class for creating state managers with specific storage backends.
158
159
```python { .api }
160
class DataSourceStateFactory:
161
"""
162
Factory for creating data source state managers.
163
"""
164
def __init__(self, storage: IDataSourceStateStorage):
165
"""
166
Initialize factory with storage backend.
167
168
Args:
169
storage: State storage implementation
170
"""
171
172
def get_data_source_state(self, source: str) -> IDataSourceState:
173
"""
174
Create a state manager for a specific data source.
175
176
Args:
177
source: Data source identifier
178
179
Returns:
180
Data source state manager instance
181
"""
182
```
183
184
## Usage Examples
185
186
### Basic State Management in Data Sources
187
188
```python
189
from vdk.plugin.data_sources.data_source import IDataSource, IDataSourceStream
190
from vdk.plugin.data_sources.state import IDataSourceState
191
192
class IncrementalDataSourceStream(IDataSourceStream):
193
def __init__(self, stream_name: str, state: IDataSourceState):
194
self._stream_name = stream_name
195
self._state = state
196
197
def name(self) -> str:
198
return self._stream_name
199
200
def read(self) -> Iterable[DataSourcePayload]:
201
# Read last processed state
202
last_state = self._state.read_stream(self._stream_name)
203
last_id = last_state.get("last_id", 0)
204
205
# Simulate reading records starting from last processed ID
206
for record_id in range(last_id + 1, last_id + 11): # Process 10 records
207
data = {"id": record_id, "value": f"data_{record_id}"}
208
209
# Yield payload with state information
210
yield DataSourcePayload(
211
data=data,
212
metadata={"timestamp": datetime.now()},
213
state={"last_id": record_id} # State will be automatically persisted
214
)
215
216
class IncrementalDataSource(IDataSource):
217
def configure(self, config):
218
self._config = config
219
220
def connect(self, state: IDataSourceState):
221
self._state = state
222
# Create streams that can access state
223
self._streams = [
224
IncrementalDataSourceStream("stream_1", state),
225
IncrementalDataSourceStream("stream_2", state)
226
]
227
228
def disconnect(self):
229
self._streams = []
230
231
def streams(self):
232
return self._streams
233
```
234
235
### Custom State Storage Backend
236
237
```python
238
import json
239
import os
240
from vdk.plugin.data_sources.state import IDataSourceStateStorage
241
242
class FileBasedStateStorage(IDataSourceStateStorage):
243
"""Custom file-based state storage implementation."""
244
245
def __init__(self, state_directory: str):
246
self.state_directory = state_directory
247
os.makedirs(state_directory, exist_ok=True)
248
249
def _get_state_file_path(self, data_source: str) -> str:
250
return os.path.join(self.state_directory, f"{data_source}_state.json")
251
252
def read(self, data_source: str) -> Dict[str, Any]:
253
state_file = self._get_state_file_path(data_source)
254
if os.path.exists(state_file):
255
with open(state_file, 'r') as f:
256
return json.load(f)
257
return {}
258
259
def write(self, data_source: str, state: Dict[str, Any]):
260
state_file = self._get_state_file_path(data_source)
261
with open(state_file, 'w') as f:
262
json.dump(state, f, indent=2, default=str)
263
```
264
265
### Database-Backed State Storage
266
267
```python
268
import sqlite3
269
import json
270
from typing import Any, Dict
271
272
class DatabaseStateStorage(IDataSourceStateStorage):
273
"""Database-backed state storage implementation."""
274
275
def __init__(self, db_path: str):
276
self.db_path = db_path
277
self._init_database()
278
279
def _init_database(self):
280
with sqlite3.connect(self.db_path) as conn:
281
conn.execute("""
282
CREATE TABLE IF NOT EXISTS data_source_state (
283
source_name TEXT PRIMARY KEY,
284
state_data TEXT,
285
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
286
)
287
""")
288
289
def read(self, data_source: str) -> Dict[str, Any]:
290
with sqlite3.connect(self.db_path) as conn:
291
cursor = conn.execute(
292
"SELECT state_data FROM data_source_state WHERE source_name = ?",
293
(data_source,)
294
)
295
row = cursor.fetchone()
296
if row:
297
return json.loads(row[0])
298
return {}
299
300
def write(self, data_source: str, state: Dict[str, Any]):
301
state_json = json.dumps(state, default=str)
302
with sqlite3.connect(self.db_path) as conn:
303
conn.execute("""
304
INSERT OR REPLACE INTO data_source_state (source_name, state_data)
305
VALUES (?, ?)
306
""", (data_source, state_json))
307
```
308
309
### State Management with Different State Types
310
311
```python
312
class ComplexDataSource(IDataSource):
313
def connect(self, state: IDataSourceState):
314
self._state = state
315
316
# Read different types of state
317
connection_state = state.read_others("connection")
318
if connection_state:
319
print(f"Resuming connection from: {connection_state}")
320
321
# Read stream-specific state
322
for stream_name in ["orders", "customers", "products"]:
323
stream_state = state.read_stream(stream_name)
324
last_sync = stream_state.get("last_sync_time")
325
if last_sync:
326
print(f"Stream {stream_name} last synced at: {last_sync}")
327
328
# Update general state
329
state.update_others("connection", {
330
"connected_at": datetime.now().isoformat(),
331
"server_version": "1.2.3"
332
})
333
```
334
335
### State Factory Usage
336
337
```python
338
from vdk.plugin.data_sources.state import DataSourceStateFactory, InMemoryDataSourceStateStorage
339
340
# Create factory with in-memory storage for testing
341
factory = DataSourceStateFactory(InMemoryDataSourceStateStorage())
342
343
# Get state manager for specific data source
344
source_state = factory.get_data_source_state("my-database")
345
346
# Use state manager
347
source_state.update_stream("table1", {"last_row_id": 1000})
348
source_state.update_others("metadata", {"schema_version": "2.1"})
349
350
# Read state back
351
table_state = source_state.read_stream("table1")
352
metadata = source_state.read_others("metadata")
353
```
354
355
### Integration with Data Ingestion System
356
357
The state management system is automatically integrated with the data ingestion pipeline. When a `DataSourcePayload` includes state information, it's automatically persisted after successful ingestion:
358
359
```python
360
# In your data source stream implementation
361
def read(self) -> Iterable[DataSourcePayload]:
362
# Read current state
363
current_state = self._state.read_stream(self.name())
364
last_processed = current_state.get("last_processed_timestamp", 0)
365
366
# Query new data since last processed timestamp
367
new_records = self._fetch_records_since(last_processed)
368
369
for record in new_records:
370
# Yield payload with updated state
371
yield DataSourcePayload(
372
data=record,
373
metadata={"record_timestamp": record["timestamp"]},
374
state={"last_processed_timestamp": record["timestamp"]}
375
# This state will be automatically persisted after successful ingestion
376
)
377
```