or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

chunked-operations.mddatabase-operations.mdindex.mdtable-operations.md

chunked-operations.mddocs/

0

# Chunked Operations

1

2

High-performance batch processing for large datasets using context managers to automatically handle chunked inserts and updates with configurable batch sizes and callback support. These classes optimize memory usage and database performance for bulk operations.

3

4

## Capabilities

5

6

### Chunked Insert Operations

7

8

Batch insert operations with automatic chunking and memory management.

9

10

```python { .api }

11

# Import pattern

12

from dataset import chunked

13

14

class ChunkedInsert:

15

def __init__(self, table, chunksize=1000, callback=None):

16

"""

17

Initialize chunked insert context manager.

18

19

Parameters:

20

- table: Table instance to insert into

21

- chunksize: int, number of rows per batch (default 1000)

22

- callback: callable, function called before each batch insert

23

receives the queue (list of rows) as parameter

24

"""

25

26

def insert(self, item):

27

"""

28

Add an item to the insert queue.

29

30

Parameters:

31

- item: dict, row data to insert

32

"""

33

34

def flush(self):

35

"""Force processing of queued items."""

36

37

def __enter__(self):

38

"""Enter context manager."""

39

40

def __exit__(self, exc_type, exc_val, exc_tb):

41

"""Exit context manager and flush remaining items."""

42

```

43

44

### Chunked Update Operations

45

46

Batch update operations with automatic chunking and grouping by field sets.

47

48

```python { .api }

49

class ChunkedUpdate:

50

def __init__(self, table, keys, chunksize=1000, callback=None):

51

"""

52

Initialize chunked update context manager.

53

54

Parameters:

55

- table: Table instance to update

56

- keys: list, column names to use as update filters

57

- chunksize: int, number of rows per batch (default 1000)

58

- callback: callable, function called before each batch update

59

receives the queue (list of rows) as parameter

60

"""

61

62

def update(self, item):

63

"""

64

Add an item to the update queue.

65

66

Parameters:

67

- item: dict, row data to update (must include key columns)

68

"""

69

70

def flush(self):

71

"""Force processing of queued items."""

72

73

def __enter__(self):

74

"""Enter context manager."""

75

76

def __exit__(self, exc_type, exc_val, exc_tb):

77

"""Exit context manager and flush remaining items."""

78

```

79

80

### Exception Handling

81

82

Exception types for chunked operations error handling.

83

84

```python { .api }

85

class InvalidCallback(ValueError):

86

"""Raised when an invalid callback is provided to chunked operations."""

87

```

88

89

## Usage Examples

90

91

### Basic Chunked Insert

92

93

```python

94

import dataset

95

from dataset import chunked

96

97

db = dataset.connect('sqlite:///example.db')

98

table = db['products']

99

100

# Basic chunked insert

101

with chunked.ChunkedInsert(table) as inserter:

102

for i in range(10000):

103

inserter.insert({

104

'name': f'Product {i}',

105

'price': i * 0.99,

106

'category': f'Category {i % 10}'

107

})

108

# Automatically flushes remaining items on context exit

109

```

110

111

### Chunked Insert with Custom Chunk Size

112

113

```python

114

# Custom chunk size for memory optimization

115

with chunked.ChunkedInsert(table, chunksize=500) as inserter:

116

for record in large_dataset:

117

inserter.insert({

118

'name': record.name,

119

'value': record.value,

120

'timestamp': record.created_at

121

})

122

```

123

124

### Chunked Insert with Callback

125

126

```python

127

def progress_callback(queue):

128

"""Called before each batch insert."""

129

print(f"Inserting batch of {len(queue)} records")

130

# Could also log, validate, or transform data here

131

132

def validation_callback(queue):

133

"""Validate data before insertion."""

134

for item in queue:

135

if 'required_field' not in item:

136

raise ValueError(f"Missing required field in {item}")

137

138

with chunked.ChunkedInsert(table, callback=progress_callback) as inserter:

139

for data in data_source:

140

inserter.insert(data)

141

142

# Multiple callbacks via wrapper

143

def combined_callback(queue):

144

validation_callback(queue)

145

progress_callback(queue)

146

147

with chunked.ChunkedInsert(table, callback=combined_callback) as inserter:

148

for data in data_source:

149

inserter.insert(data)

150

```

151

152

### Basic Chunked Update

153

154

```python

155

# Update records based on ID

156

with chunked.ChunkedUpdate(table, keys=['id']) as updater:

157

for record in updated_records:

158

updater.update({

159

'id': record.id,

160

'name': record.new_name,

161

'price': record.new_price,

162

'updated_at': datetime.now()

163

})

164

```

165

166

### Chunked Update with Multiple Keys

167

168

```python

169

# Update using composite key (category + name)

170

with chunked.ChunkedUpdate(table, keys=['category', 'name']) as updater:

171

for update_data in updates:

172

updater.update({

173

'category': update_data.category,

174

'name': update_data.name,

175

'price': update_data.new_price,

176

'stock': update_data.new_stock

177

})

178

```

179

180

### Chunked Update with Callback

181

182

```python

183

def update_callback(queue):

184

"""Called before each batch update."""

185

print(f"Updating batch of {len(queue)} records")

186

187

# Group by operation type for logging

188

operations = {}

189

for item in queue:

190

op_type = item.get('operation_type', 'unknown')

191

operations[op_type] = operations.get(op_type, 0) + 1

192

193

for op_type, count in operations.items():

194

print(f" {op_type}: {count} records")

195

196

with chunked.ChunkedUpdate(table, keys=['id'], callback=update_callback) as updater:

197

for update in batch_updates:

198

updater.update(update)

199

```

200

201

### Memory-Efficient Large Dataset Processing

202

203

```python

204

def process_large_csv(filename, table):

205

"""Process a large CSV file with minimal memory usage."""

206

import csv

207

208

with open(filename, 'r') as file:

209

reader = csv.DictReader(file)

210

211

with chunked.ChunkedInsert(table, chunksize=1000) as inserter:

212

for row in reader:

213

# Transform data as needed

214

processed_row = {

215

'name': row['Name'].strip(),

216

'email': row['Email'].lower(),

217

'age': int(row['Age']) if row['Age'] else None,

218

'created_at': datetime.now()

219

}

220

inserter.insert(processed_row)

221

222

# Process million-record CSV with constant memory usage

223

process_large_csv('large_dataset.csv', db['users'])

224

```

225

226

### Data Synchronization Pattern

227

228

```python

229

def sync_external_data(external_data, table):

230

"""Sync data from external source with progress tracking."""

231

232

def progress_callback(queue):

233

# Log progress

234

print(f"Processing {len(queue)} records")

235

236

# Could also:

237

# - Update progress bar

238

# - Log to file

239

# - Send metrics to monitoring system

240

# - Validate data integrity

241

242

# Use upsert pattern with chunked operations

243

with chunked.ChunkedInsert(table, callback=progress_callback) as inserter:

244

for external_record in external_data:

245

# Transform external format to internal format

246

internal_record = transform_record(external_record)

247

248

# Insert new or update existing based on external_id

249

table.upsert(internal_record, ['external_id'])

250

251

def transform_record(external_record):

252

"""Transform external record format to internal format."""

253

return {

254

'external_id': external_record['id'],

255

'name': external_record['full_name'],

256

'email': external_record['email_address'],

257

'last_sync': datetime.now()

258

}

259

```

260

261

### Error Handling and Recovery

262

263

```python

264

def robust_bulk_insert(data, table):

265

"""Bulk insert with error handling and recovery."""

266

267

failed_records = []

268

269

def error_tracking_callback(queue):

270

"""Track successful batches for recovery."""

271

try:

272

# This gets called before the actual insert

273

print(f"About to process batch of {len(queue)} records")

274

except Exception as e:

275

print(f"Callback error: {e}")

276

# Could log problematic records

277

failed_records.extend(queue)

278

279

try:

280

with chunked.ChunkedInsert(table,

281

chunksize=100, # Smaller chunks for easier recovery

282

callback=error_tracking_callback) as inserter:

283

for record in data:

284

try:

285

# Validate record before queuing

286

validate_record(record)

287

inserter.insert(record)

288

except ValidationError as e:

289

print(f"Skipping invalid record: {e}")

290

failed_records.append(record)

291

292

except Exception as e:

293

print(f"Bulk insert failed: {e}")

294

print(f"Failed records count: {len(failed_records)}")

295

296

# Could retry failed records individually

297

for record in failed_records:

298

try:

299

table.insert(record)

300

except Exception as record_error:

301

print(f"Individual insert failed: {record_error}")

302

303

def validate_record(record):

304

"""Validate record before insertion."""

305

required_fields = ['name', 'email']

306

for field in required_fields:

307

if field not in record or not record[field]:

308

raise ValidationError(f"Missing required field: {field}")

309

310

class ValidationError(Exception):

311

pass

312

```

313

314

### Performance Comparison

315

316

```python

317

import time

318

319

def performance_comparison(data, table):

320

"""Compare performance of different insertion methods."""

321

322

# Method 1: Individual inserts (slowest)

323

start = time.time()

324

for record in data[:1000]: # Small sample for timing

325

table.insert(record)

326

individual_time = time.time() - start

327

print(f"Individual inserts: {individual_time:.2f}s")

328

329

# Method 2: insert_many (faster)

330

start = time.time()

331

table.insert_many(data[:1000])

332

bulk_time = time.time() - start

333

print(f"Bulk insert_many: {bulk_time:.2f}s")

334

335

# Method 3: ChunkedInsert (memory efficient for large datasets)

336

start = time.time()

337

with chunked.ChunkedInsert(table, chunksize=100) as inserter:

338

for record in data[:1000]:

339

inserter.insert(record)

340

chunked_time = time.time() - start

341

print(f"Chunked insert: {chunked_time:.2f}s")

342

343

print(f"Speedup (individual vs bulk): {individual_time/bulk_time:.1f}x")

344

print(f"Speedup (individual vs chunked): {individual_time/chunked_time:.1f}x")

345

```