or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mddata-flow.mddata-sources.mdindex.mdstate-management.md

state-management.mddocs/

0

# State Management

1

2

Persistent state management for data sources enabling incremental ingestion and resume capabilities. The state management system allows data sources to track their progress and resume from where they left off.

3

4

## Capabilities

5

6

### Data Source State Interface

7

8

Abstract interface for managing data source state with support for stream-specific and general state tracking.

9

10

```python { .api }

11

class IDataSourceState:

12

"""

13

Abstract interface for data source state management.

14

"""

15

def read_stream(self, stream_name: str) -> Dict[str, Any]:

16

"""

17

Read the state of a stream within a data source.

18

19

Args:

20

stream_name: Name of the stream

21

22

Returns:

23

Dictionary containing the stream's state

24

"""

25

26

def update_stream(self, stream_name: str, state: Dict[str, Any]):

27

"""

28

Update the state of a stream within the data source.

29

30

Args:

31

stream_name: Name of the stream

32

state: New state dictionary for the stream

33

"""

34

35

def read_others(self, key: str) -> Dict[str, Any]:

36

"""

37

Read state of a data source that is not related to streams.

38

39

Args:

40

key: State key identifier

41

42

Returns:

43

Dictionary containing the state for the given key

44

"""

45

46

def update_others(self, key: str, state: Dict[str, Any]):

47

"""

48

Update state of a data source not related to streams.

49

50

Args:

51

key: State key identifier

52

state: New state dictionary for the key

53

"""

54

```

55

56

### Data Source State Implementation

57

58

Concrete implementation of state management that uses pluggable storage backends.

59

60

```python { .api }

61

class DataSourceState(IDataSourceState):

62

"""

63

Concrete implementation of data source state management.

64

"""

65

def __init__(self, state_storage: IDataSourceStateStorage, source: str):

66

"""

67

Initialize state manager.

68

69

Args:

70

state_storage: Storage backend for persisting state

71

source: Data source identifier

72

"""

73

74

def read_stream(self, stream_name: str) -> Dict[str, Any]:

75

"""Read stream state from storage"""

76

77

def read_others(self, key: str) -> Dict[str, Any]:

78

"""Read general state from storage"""

79

80

def update_stream(self, stream_name: str, state: Dict[str, Any]):

81

"""Update stream state in storage"""

82

83

def update_others(self, key: str, state: Dict[str, Any]):

84

"""Update general state in storage"""

85

```

86

87

### State Storage Interface

88

89

Abstract interface for state storage backends that can be implemented for different persistence mechanisms.

90

91

```python { .api }

92

class IDataSourceStateStorage:

93

"""

94

Abstract interface for state storage backends.

95

"""

96

def read(self, data_source: str) -> Dict[str, Any]:

97

"""

98

Read the state from underlying storage.

99

100

Args:

101

data_source: The data source name

102

103

Returns:

104

Dictionary containing the complete state for the data source

105

"""

106

107

def write(self, data_source: str, state: Dict[str, Any]):

108

"""

109

Write (persist) the current state of the data source to underlying storage.

110

111

Args:

112

data_source: The data source name

113

state: Complete state dictionary to persist

114

"""

115

```

116

117

### State Storage Implementations

118

119

Built-in implementations for different storage backends.

120

121

```python { .api }

122

class InMemoryDataSourceStateStorage(IDataSourceStateStorage):

123

"""

124

In-memory state storage useful for testing purposes.

125

"""

126

def __init__(self): ...

127

128

def read(self, data_source: str) -> Dict[str, Any]:

129

"""Read state from memory"""

130

131

def write(self, data_source: str, state: Dict[str, Any]):

132

"""Write state to memory"""

133

134

class PropertiesBasedDataSourceStorage(IDataSourceStateStorage):

135

"""

136

State storage implementation using VDK properties system.

137

"""

138

KEY = ".vdk.data_sources.state"

139

140

def __init__(self, properties: IProperties):

141

"""

142

Initialize properties-based storage.

143

144

Args:

145

properties: VDK properties interface

146

"""

147

148

def read(self, data_source: str) -> Dict[str, Any]:

149

"""Read state from VDK properties"""

150

151

def write(self, data_source: str, state: Dict[str, Any]):

152

"""Write state to VDK properties"""

153

```

154

155

### State Factory

156

157

Factory class for creating state managers with specific storage backends.

158

159

```python { .api }

160

class DataSourceStateFactory:

161

"""

162

Factory for creating data source state managers.

163

"""

164

def __init__(self, storage: IDataSourceStateStorage):

165

"""

166

Initialize factory with storage backend.

167

168

Args:

169

storage: State storage implementation

170

"""

171

172

def get_data_source_state(self, source: str) -> IDataSourceState:

173

"""

174

Create a state manager for a specific data source.

175

176

Args:

177

source: Data source identifier

178

179

Returns:

180

Data source state manager instance

181

"""

182

```

183

184

## Usage Examples

185

186

### Basic State Management in Data Sources

187

188

```python

189

from vdk.plugin.data_sources.data_source import IDataSource, IDataSourceStream

190

from vdk.plugin.data_sources.state import IDataSourceState

191

192

class IncrementalDataSourceStream(IDataSourceStream):

193

def __init__(self, stream_name: str, state: IDataSourceState):

194

self._stream_name = stream_name

195

self._state = state

196

197

def name(self) -> str:

198

return self._stream_name

199

200

def read(self) -> Iterable[DataSourcePayload]:

201

# Read last processed state

202

last_state = self._state.read_stream(self._stream_name)

203

last_id = last_state.get("last_id", 0)

204

205

# Simulate reading records starting from last processed ID

206

for record_id in range(last_id + 1, last_id + 11): # Process 10 records

207

data = {"id": record_id, "value": f"data_{record_id}"}

208

209

# Yield payload with state information

210

yield DataSourcePayload(

211

data=data,

212

metadata={"timestamp": datetime.now()},

213

state={"last_id": record_id} # State will be automatically persisted

214

)

215

216

class IncrementalDataSource(IDataSource):

217

def configure(self, config):

218

self._config = config

219

220

def connect(self, state: IDataSourceState):

221

self._state = state

222

# Create streams that can access state

223

self._streams = [

224

IncrementalDataSourceStream("stream_1", state),

225

IncrementalDataSourceStream("stream_2", state)

226

]

227

228

def disconnect(self):

229

self._streams = []

230

231

def streams(self):

232

return self._streams

233

```

234

235

### Custom State Storage Backend

236

237

```python

238

import json

239

import os

240

from vdk.plugin.data_sources.state import IDataSourceStateStorage

241

242

class FileBasedStateStorage(IDataSourceStateStorage):

243

"""Custom file-based state storage implementation."""

244

245

def __init__(self, state_directory: str):

246

self.state_directory = state_directory

247

os.makedirs(state_directory, exist_ok=True)

248

249

def _get_state_file_path(self, data_source: str) -> str:

250

return os.path.join(self.state_directory, f"{data_source}_state.json")

251

252

def read(self, data_source: str) -> Dict[str, Any]:

253

state_file = self._get_state_file_path(data_source)

254

if os.path.exists(state_file):

255

with open(state_file, 'r') as f:

256

return json.load(f)

257

return {}

258

259

def write(self, data_source: str, state: Dict[str, Any]):

260

state_file = self._get_state_file_path(data_source)

261

with open(state_file, 'w') as f:

262

json.dump(state, f, indent=2, default=str)

263

```

264

265

### Database-Backed State Storage

266

267

```python

268

import sqlite3

269

import json

270

from typing import Any, Dict

271

272

class DatabaseStateStorage(IDataSourceStateStorage):

273

"""Database-backed state storage implementation."""

274

275

def __init__(self, db_path: str):

276

self.db_path = db_path

277

self._init_database()

278

279

def _init_database(self):

280

with sqlite3.connect(self.db_path) as conn:

281

conn.execute("""

282

CREATE TABLE IF NOT EXISTS data_source_state (

283

source_name TEXT PRIMARY KEY,

284

state_data TEXT,

285

updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP

286

)

287

""")

288

289

def read(self, data_source: str) -> Dict[str, Any]:

290

with sqlite3.connect(self.db_path) as conn:

291

cursor = conn.execute(

292

"SELECT state_data FROM data_source_state WHERE source_name = ?",

293

(data_source,)

294

)

295

row = cursor.fetchone()

296

if row:

297

return json.loads(row[0])

298

return {}

299

300

def write(self, data_source: str, state: Dict[str, Any]):

301

state_json = json.dumps(state, default=str)

302

with sqlite3.connect(self.db_path) as conn:

303

conn.execute("""

304

INSERT OR REPLACE INTO data_source_state (source_name, state_data)

305

VALUES (?, ?)

306

""", (data_source, state_json))

307

```

308

309

### State Management with Different State Types

310

311

```python

312

class ComplexDataSource(IDataSource):

313

def connect(self, state: IDataSourceState):

314

self._state = state

315

316

# Read different types of state

317

connection_state = state.read_others("connection")

318

if connection_state:

319

print(f"Resuming connection from: {connection_state}")

320

321

# Read stream-specific state

322

for stream_name in ["orders", "customers", "products"]:

323

stream_state = state.read_stream(stream_name)

324

last_sync = stream_state.get("last_sync_time")

325

if last_sync:

326

print(f"Stream {stream_name} last synced at: {last_sync}")

327

328

# Update general state

329

state.update_others("connection", {

330

"connected_at": datetime.now().isoformat(),

331

"server_version": "1.2.3"

332

})

333

```

334

335

### State Factory Usage

336

337

```python

338

from vdk.plugin.data_sources.state import DataSourceStateFactory, InMemoryDataSourceStateStorage

339

340

# Create factory with in-memory storage for testing

341

factory = DataSourceStateFactory(InMemoryDataSourceStateStorage())

342

343

# Get state manager for specific data source

344

source_state = factory.get_data_source_state("my-database")

345

346

# Use state manager

347

source_state.update_stream("table1", {"last_row_id": 1000})

348

source_state.update_others("metadata", {"schema_version": "2.1"})

349

350

# Read state back

351

table_state = source_state.read_stream("table1")

352

metadata = source_state.read_others("metadata")

353

```

354

355

### Integration with Data Ingestion System

356

357

The state management system is automatically integrated with the data ingestion pipeline. When a `DataSourcePayload` includes state information, it's automatically persisted after successful ingestion:

358

359

```python

360

# In your data source stream implementation

361

def read(self) -> Iterable[DataSourcePayload]:

362

# Read current state

363

current_state = self._state.read_stream(self.name())

364

last_processed = current_state.get("last_processed_timestamp", 0)

365

366

# Query new data since last processed timestamp

367

new_records = self._fetch_records_since(last_processed)

368

369

for record in new_records:

370

# Yield payload with updated state

371

yield DataSourcePayload(

372

data=record,

373

metadata={"record_timestamp": record["timestamp"]},

374

state={"last_processed_timestamp": record["timestamp"]}

375

# This state will be automatically persisted after successful ingestion

376

)

377

```