or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

arrays.mdbags.mdconfiguration.mdcore-functions.mddataframes.mddelayed.mddiagnostics.mdindex.md

bags.mddocs/

0

# Bags

1

2

Distributed list-like collections for processing semi-structured and unstructured data with functional programming patterns. Dask Bags handle data that doesn't fit the array or DataFrame model, such as JSON records, log files, or any Python objects.

3

4

## Capabilities

5

6

### Bag Creation

7

8

Create Bag collections from various data sources and Python objects.

9

10

```python { .api }

11

def from_sequence(seq, partition_size=None, name=None):

12

"""

13

Create Bag from a sequence of items.

14

15

Parameters:

16

- seq: Sequence or iterable of items

17

- partition_size: Number of items per partition

18

- name: Custom name for bag

19

20

Returns:

21

dask.bag.Bag: Bag collection

22

"""

23

24

def from_delayed(values, name=None):

25

"""

26

Create Bag from delayed objects.

27

28

Parameters:

29

- values: List of delayed objects

30

- name: Custom name for bag

31

32

Returns:

33

dask.bag.Bag: Bag collection

34

"""

35

36

def from_url(urls, **kwargs):

37

"""

38

Create Bag from URLs or file patterns.

39

40

Parameters:

41

- urls: URL, file path, or list of paths

42

- **kwargs: Additional arguments for file reading

43

44

Returns:

45

dask.bag.Bag: Bag with file contents

46

"""

47

48

def range(start, stop=None, step=None, partition_size=None, name=None):

49

"""

50

Create Bag from numeric range.

51

52

Parameters:

53

- start: Start value or stop if only one argument

54

- stop: End value (exclusive)

55

- step: Step between values

56

- partition_size: Items per partition

57

- name: Custom name

58

59

Returns:

60

dask.bag.Bag: Bag with range values

61

"""

62

63

def zip(*bags):

64

"""

65

Zip multiple bags together.

66

67

Parameters:

68

- *bags: Bags to zip

69

70

Returns:

71

dask.bag.Bag: Bag of tuples

72

"""

73

74

def concat(bags):

75

"""

76

Concatenate multiple bags.

77

78

Parameters:

79

- bags: Iterable of bags to concatenate

80

81

Returns:

82

dask.bag.Bag: Concatenated bag

83

"""

84

```

85

86

### File I/O Operations

87

88

Read and write bags from various file formats.

89

90

```python { .api }

91

def read_text(urlpath, encoding='utf-8', errors='strict',

92

linedelimiter=None, compression=None, blocksize=None,

93

sample=True, **kwargs):

94

"""

95

Read text files into Bag of strings.

96

97

Parameters:

98

- urlpath: File path or pattern

99

- encoding: Text encoding

100

- errors: How to handle encoding errors

101

- linedelimiter: Line separator character

102

- compression: Compression format ('gzip', 'bz2', etc.)

103

- blocksize: Size of each partition in bytes

104

- sample: Whether to sample file for optimization

105

- **kwargs: Additional storage options

106

107

Returns:

108

dask.bag.Bag: Bag of text lines

109

"""

110

111

def read_avro(urlpath, **kwargs):

112

"""

113

Read Avro files into Bag.

114

115

Parameters:

116

- urlpath: File path or pattern

117

- **kwargs: Additional arguments

118

119

Returns:

120

dask.bag.Bag: Bag of Avro records

121

"""

122

123

def to_textfiles(bag, path, name_function=None, compression=None,

124

encoding='utf-8', compute=True, **kwargs):

125

"""

126

Write Bag to text files.

127

128

Parameters:

129

- bag: Bag to write

130

- path: Output directory or file pattern

131

- name_function: Function to generate filenames

132

- compression: Compression format

133

- encoding: Text encoding

134

- compute: Whether to write immediately

135

- **kwargs: Additional storage options

136

137

Returns:

138

Delayed objects or None if compute=True

139

"""

140

```

141

142

### Core Bag Class

143

144

Main Bag class with functional programming interface.

145

146

```python { .api }

147

class Bag:

148

"""

149

Distributed list-like collection with functional interface.

150

151

Properties:

152

- npartitions: int - Number of partitions

153

- name: str - Collection name in task graph

154

"""

155

156

def compute(self, scheduler=None, **kwargs):

157

"""

158

Compute bag and return Python list.

159

160

Returns:

161

list: Computed results as Python list

162

"""

163

164

def persist(self, scheduler=None, **kwargs):

165

"""

166

Persist bag in memory for reuse.

167

168

Returns:

169

dask.bag.Bag: Persisted bag

170

"""

171

172

def take(self, k, npartitions=1):

173

"""

174

Take first k elements.

175

176

Parameters:

177

- k: Number of elements to take

178

- npartitions: Number of partitions to search

179

180

Returns:

181

list: First k elements

182

"""

183

184

def head(self, k=10, npartitions=1):

185

"""Alias for take()."""

186

187

def __iter__(self):

188

"""Iterate over bag (triggers computation)."""

189

190

def __len__(self):

191

"""Length of bag (triggers computation)."""

192

```

193

194

### Transformation Operations

195

196

Functional programming operations for data transformation.

197

198

```python { .api }

199

def map(func, *bags, **kwargs):

200

"""

201

Apply function to each element.

202

203

Parameters:

204

- func: Function to apply

205

- *bags: Bags to map over

206

- **kwargs: Additional arguments

207

208

Returns:

209

dask.bag.Bag: Bag with transformed elements

210

"""

211

212

def filter(predicate, bag):

213

"""

214

Filter elements using predicate function.

215

216

Parameters:

217

- predicate: Function returning True/False

218

- bag: Bag to filter

219

220

Returns:

221

dask.bag.Bag: Filtered bag

222

"""

223

224

def map_partitions(func, *bags, **kwargs):

225

"""

226

Apply function to each partition.

227

228

Parameters:

229

- func: Function that takes and returns iterables

230

- *bags: Input bags

231

- **kwargs: Additional arguments to func

232

233

Returns:

234

dask.bag.Bag: Transformed bag

235

"""

236

237

def flatten(bag, nlevels=1):

238

"""

239

Flatten nested sequences.

240

241

Parameters:

242

- bag: Bag with nested sequences

243

- nlevels: Number of levels to flatten

244

245

Returns:

246

dask.bag.Bag: Flattened bag

247

"""

248

249

def pluck(key, bag, default=None):

250

"""

251

Select values from dictionaries/objects.

252

253

Parameters:

254

- key: Key or attribute name to pluck

255

- bag: Bag of dictionaries/objects

256

- default: Default value if key missing

257

258

Returns:

259

dask.bag.Bag: Bag of plucked values

260

"""

261

262

def distinct(bag, key=None):

263

"""

264

Remove duplicate elements.

265

266

Parameters:

267

- bag: Input bag

268

- key: Function to compute comparison key

269

270

Returns:

271

dask.bag.Bag: Bag with unique elements

272

"""

273

274

def frequencies(bag, sort=False, normalize=False, split_every=None):

275

"""

276

Count frequency of each element.

277

278

Parameters:

279

- bag: Input bag

280

- sort: Sort results by frequency

281

- normalize: Return proportions instead of counts

282

- split_every: Tree reduction factor

283

284

Returns:

285

dask.bag.Bag: Bag of (item, count) pairs

286

"""

287

288

def topk(bag, k, key=None, split_every=None):

289

"""

290

Find k largest elements.

291

292

Parameters:

293

- bag: Input bag

294

- k: Number of top elements

295

- key: Function to compute comparison key

296

- split_every: Tree reduction factor

297

298

Returns:

299

dask.bag.Bag: Bag of top k elements

300

"""

301

```

302

303

### Reduction Operations

304

305

Aggregate operations that combine all elements into single values.

306

307

```python { .api }

308

def fold(binop, bag, initial=None, combine=None, split_every=None):

309

"""

310

Fold bag using binary operation.

311

312

Parameters:

313

- binop: Binary function for folding

314

- bag: Input bag

315

- initial: Initial value for fold

316

- combine: Function for combining intermediate results

317

- split_every: Tree reduction factor

318

319

Returns:

320

Delayed object with folded result

321

"""

322

323

def reduce(func, bag, initial=None, split_every=None):

324

"""

325

Reduce bag using function.

326

327

Parameters:

328

- func: Reduction function

329

- bag: Input bag

330

- initial: Initial value

331

- split_every: Tree reduction factor

332

333

Returns:

334

Delayed object with reduced result

335

"""

336

337

def sum(bag, split_every=None):

338

"""Sum all elements."""

339

340

def count(bag, split_every=None):

341

"""Count number of elements."""

342

343

def min(bag, split_every=None):

344

"""Find minimum element."""

345

346

def max(bag, split_every=None):

347

"""Find maximum element."""

348

349

def mean(bag, split_every=None):

350

"""Compute mean of numeric elements."""

351

352

def std(bag, split_every=None):

353

"""Compute standard deviation."""

354

355

def var(bag, split_every=None):

356

"""Compute variance."""

357

```

358

359

### Grouping Operations

360

361

Group elements by key for further processing.

362

363

```python { .api }

364

def groupby(bag, key, npartitions=None, partition_size=None):

365

"""

366

Group elements by key function.

367

368

Parameters:

369

- bag: Input bag

370

- key: Function to compute grouping key

371

- npartitions: Number of output partitions

372

- partition_size: Target partition size

373

374

Returns:

375

dask.bag.Bag: Bag of (key, group) pairs

376

"""

377

378

def foldby(key, binop, bag, initial=None, combine=None, combine_initial=None):

379

"""

380

Fold values by key.

381

382

Parameters:

383

- key: Function to compute grouping key

384

- binop: Binary operation for folding

385

- bag: Input bag

386

- initial: Initial value for each group

387

- combine: Function for combining results

388

- combine_initial: Initial value for combining

389

390

Returns:

391

dask.bag.Bag: Bag of (key, folded_value) pairs

392

"""

393

```

394

395

### Conversion Operations

396

397

Convert bags to other collection types.

398

399

```python { .api }

400

def to_dataframe(bag, columns=None, meta=None):

401

"""

402

Convert bag to Dask DataFrame.

403

404

Parameters:

405

- bag: Input bag of records/dictionaries

406

- columns: Column names for DataFrame

407

- meta: Metadata DataFrame

408

409

Returns:

410

dask.dataframe.DataFrame: Converted DataFrame

411

"""

412

413

def to_delayed(bag, optimize_graph=True):

414

"""

415

Convert bag partitions to delayed objects.

416

417

Parameters:

418

- bag: Input bag

419

- optimize_graph: Whether to optimize task graph

420

421

Returns:

422

list: List of delayed objects

423

"""

424

```

425

426

### Item Access

427

428

Access individual items from bags.

429

430

```python { .api }

431

class Item:

432

"""

433

Single item reference from a bag.

434

435

Used for accessing specific elements by index.

436

"""

437

438

def compute(self, **kwargs):

439

"""Compute and return the item value."""

440

441

def key(self):

442

"""Get the task key for this item."""

443

```

444

445

## Usage Examples

446

447

### Basic Text Processing

448

449

```python

450

import dask.bag as db

451

452

# Read text files

453

lines = db.read_text('logs/*.txt')

454

455

# Process lines

456

words = (lines.str.strip()

457

.str.split()

458

.flatten())

459

460

# Count word frequencies

461

word_counts = words.frequencies(sort=True)

462

top_words = word_counts.take(10)

463

464

print(top_words)

465

```

466

467

### JSON Data Processing

468

469

```python

470

import dask.bag as db

471

import json

472

473

# Read JSON files

474

records = db.read_text('data/*.json').map(json.loads)

475

476

# Extract and process fields

477

user_ages = records.pluck('age').filter(lambda x: x is not None)

478

avg_age = user_ages.mean().compute()

479

480

# Group by category

481

by_category = records.groupby(lambda x: x.get('category', 'unknown'))

482

category_counts = by_category.map(lambda x: (x[0], len(x[1]))).compute()

483

```

484

485

### Custom Data Processing Pipeline

486

487

```python

488

import dask.bag as db

489

490

def clean_record(record):

491

"""Clean and validate a data record."""

492

if record.get('value', 0) > 0:

493

return {

494

'id': record['id'],

495

'value': float(record['value']),

496

'category': record.get('category', 'default')

497

}

498

return None

499

500

def aggregate_by_category(group):

501

"""Aggregate records in a group."""

502

key, records = group

503

values = [r['value'] for r in records if r is not None]

504

return {

505

'category': key,

506

'count': len(values),

507

'sum': sum(values),

508

'avg': sum(values) / len(values) if values else 0

509

}

510

511

# Process data pipeline

512

raw_data = db.from_sequence(data_source, partition_size=1000)

513

cleaned = raw_data.map(clean_record).filter(lambda x: x is not None)

514

grouped = cleaned.groupby(lambda x: x['category'])

515

aggregated = grouped.map(aggregate_by_category)

516

517

results = aggregated.compute()

518

```

519

520

### Parallel File Processing

521

522

```python

523

import dask.bag as db

524

import re

525

526

def extract_features(text_file_content):

527

"""Extract features from text content."""

528

lines = text_file_content.split('\n')

529

return {

530

'line_count': len(lines),

531

'word_count': len(text_file_content.split()),

532

'char_count': len(text_file_content),

533

'email_count': len(re.findall(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', text_file_content))

534

}

535

536

# Process multiple files in parallel

537

files = db.read_text('documents/*.txt', blocksize=None) # One file per partition

538

features = files.map(extract_features)

539

summary = features.compute()

540

541

print(f"Processed {len(summary)} files")

542

```

543

544

### Integration with Other Collections

545

546

```python

547

import dask.bag as db

548

import dask.dataframe as dd

549

550

# Start with bag of records

551

records = db.from_sequence(json_records, partition_size=10000)

552

553

# Convert to DataFrame for structured analysis

554

df = records.to_dataframe()

555

556

# Process with DataFrame operations

557

summary = df.groupby('category').value.agg(['mean', 'count', 'sum'])

558

559

# Back to bag for further processing

560

result_records = df.to_bag(format='dict')

561

final_result = result_records.take(100)

562

```