or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

environment.mdfunctions.mdindex.mdjoins.mdsources-sinks.mdtransformations.md

transformations.mddocs/

0

# Data Transformations

1

2

Comprehensive transformation operations for processing distributed datasets. These operations form the core of data processing pipelines, enabling map-reduce style computations, filtering, aggregations, and advanced data manipulation patterns.

3

4

## Capabilities

5

6

### Basic Transformations

7

8

#### Map Operations

9

10

Applies a function to each element, producing a 1-to-1 transformation.

11

12

```python { .api }

13

def map(self, operator):

14

"""

15

Applies a MapFunction to each element (1-to-1 transformation).

16

17

Parameters:

18

operator (MapFunction or lambda): Transformation function

19

20

Returns:

21

OperatorSet: Transformed dataset

22

"""

23

```

24

25

#### Flat Map Operations

26

27

Applies a function to each element, producing zero or more output elements.

28

29

```python { .api }

30

def flat_map(self, operator):

31

"""

32

Applies a FlatMapFunction to each element (1-to-many transformation).

33

34

Parameters:

35

operator (FlatMapFunction or lambda): Transformation function

36

37

Returns:

38

OperatorSet: Transformed dataset

39

"""

40

```

41

42

#### Filter Operations

43

44

Filters elements using a predicate function.

45

46

```python { .api }

47

def filter(self, operator):

48

"""

49

Filters elements using predicate function.

50

51

Parameters:

52

operator (FilterFunction or lambda): Predicate function returning boolean

53

54

Returns:

55

OperatorSet: Filtered dataset

56

"""

57

```

58

59

#### Map Partition Operations

60

61

Applies a function to entire partitions rather than individual elements.

62

63

```python { .api }

64

def map_partition(self, operator):

65

"""

66

Applies MapPartitionFunction to entire partitions.

67

68

Parameters:

69

operator (MapPartitionFunction or lambda): Partition transformation function

70

71

Returns:

72

OperatorSet: Transformed dataset

73

"""

74

```

75

76

### Reduce Operations

77

78

#### Element Reduction

79

80

Reduces the entire DataSet to a single element using a ReduceFunction.

81

82

```python { .api }

83

def reduce(self, operator):

84

"""

85

Reduces DataSet to single element using ReduceFunction.

86

87

The transformation consecutively calls a ReduceFunction until only a single element remains.

88

89

Parameters:

90

operator (ReduceFunction or lambda): Reduction function combining two elements

91

92

Returns:

93

OperatorSet: Reduced dataset with single element

94

"""

95

```

96

97

#### Group Reduction

98

99

Applies a GroupReduceFunction to grouped elements or the entire DataSet.

100

101

```python { .api }

102

def reduce_group(self, operator, combinable=False):

103

"""

104

Applies a GroupReduceFunction to grouped DataSet.

105

106

The transformation calls a GroupReduceFunction once for each group, or once for the entire DataSet

107

if not grouped. The function can iterate over all elements and emit any number of outputs.

108

109

Parameters:

110

operator (GroupReduceFunction or lambda): Group reduction function

111

combinable (bool): Whether function is combinable for optimization

112

113

Returns:

114

OperatorSet: Transformed dataset

115

"""

116

```

117

118

### Aggregation Operations

119

120

#### Generic Aggregation

121

122

Applies aggregation operations to specified fields.

123

124

```python { .api }

125

def aggregate(self, aggregation, field):

126

"""

127

Applies aggregation operation to specified field.

128

129

Parameters:

130

aggregation (Aggregation): Aggregation type (Sum, Min, Max)

131

field (int): Field index to aggregate

132

133

Returns:

134

OperatorSet: Aggregated dataset

135

"""

136

```

137

138

#### Built-in Aggregations

139

140

Convenience methods for common aggregations.

141

142

```python { .api }

143

def min(self, field):

144

"""

145

Finds minimum value in specified field.

146

147

Parameters:

148

field (int): Field index

149

150

Returns:

151

OperatorSet: Dataset with minimum value

152

"""

153

154

def max(self, field):

155

"""

156

Finds maximum value in specified field.

157

158

Parameters:

159

field (int): Field index

160

161

Returns:

162

OperatorSet: Dataset with maximum value

163

"""

164

165

def sum(self, field):

166

"""

167

Sums values in specified field.

168

169

Parameters:

170

field (int): Field index

171

172

Returns:

173

OperatorSet: Dataset with sum

174

"""

175

```

176

177

### Grouping Operations

178

179

#### Group By Keys

180

181

Groups DataSet by specified key fields.

182

183

```python { .api }

184

def group_by(self, *keys):

185

"""

186

Groups DataSet by specified key fields.

187

188

Parameters:

189

*keys (int): Field indices for grouping keys

190

191

Returns:

192

UnsortedGrouping: Grouped dataset supporting group-wise operations

193

"""

194

```

195

196

### Utility Operations

197

198

#### Distinct Elements

199

200

Removes duplicate records based on specified fields.

201

202

```python { .api }

203

def distinct(self, *fields):

204

"""

205

Removes duplicate records based on specified fields.

206

207

Parameters:

208

*fields (int): Field indices for uniqueness comparison

209

210

Returns:

211

OperatorSet: Dataset with unique records

212

"""

213

```

214

215

#### First N Elements

216

217

Returns the first n elements from the DataSet.

218

219

```python { .api }

220

def first(self, count):

221

"""

222

Returns first n elements.

223

224

Parameters:

225

count (int): Number of elements to return

226

227

Returns:

228

OperatorSet: Dataset with first n elements

229

"""

230

```

231

232

#### Field Projection

233

234

Projects (selects) specified fields from tuple elements.

235

236

```python { .api }

237

def project(self, *fields):

238

"""

239

Projects (selects) specified fields from tuples.

240

241

Parameters:

242

*fields (int): Field indices to project

243

244

Returns:

245

OperatorSet: Dataset with projected fields

246

"""

247

```

248

249

### Partitioning Operations

250

251

#### Hash Partitioning

252

253

Hash-partitions DataSet by specified fields for optimal data distribution.

254

255

```python { .api }

256

def partition_by_hash(self, *fields):

257

"""

258

Hash-partitions DataSet by specified fields.

259

260

Parameters:

261

*fields (int): Fields to use for hash partitioning

262

263

Returns:

264

OperatorSet: Hash-partitioned dataset

265

"""

266

```

267

268

#### Rebalancing

269

270

Re-balances DataSet across available partitions for better load distribution.

271

272

```python { .api }

273

def rebalance(self):

274

"""

275

Re-balances DataSet across available partitions.

276

277

Returns:

278

OperatorSet: Rebalanced dataset

279

"""

280

```

281

282

### Advanced Operations

283

284

#### Element Counting per Partition

285

286

Counts elements in each partition.

287

288

```python { .api }

289

def count_elements_per_partition(self):

290

"""

291

Counts elements in each partition.

292

293

Returns:

294

OperatorSet: Dataset with partition element counts

295

"""

296

```

297

298

#### Index Assignment

299

300

Adds unique index to each element.

301

302

```python { .api }

303

def zip_with_index(self):

304

"""

305

Adds unique index to each element.

306

307

Returns:

308

OperatorSet: Dataset with indexed elements

309

"""

310

```

311

312

### Operation Configuration

313

314

#### Operation Naming

315

316

Sets names for operations for debugging and monitoring.

317

318

```python { .api }

319

def name(self, name):

320

"""

321

Sets name for the operation (debugging/monitoring).

322

323

Parameters:

324

name (str): Operation name

325

326

Returns:

327

DataSet: Self for method chaining

328

"""

329

```

330

331

#### Parallelism Configuration

332

333

Sets parallelism for specific operations.

334

335

```python { .api }

336

def set_parallelism(self, parallelism):

337

"""

338

Sets parallelism for this specific operation.

339

340

Parameters:

341

parallelism (int): Degree of parallelism

342

343

Returns:

344

DataSet: Self for method chaining

345

"""

346

```

347

348

## Grouping Classes

349

350

### UnsortedGrouping

351

352

Represents a grouped DataSet supporting group-wise operations.

353

354

```python { .api }

355

class UnsortedGrouping:

356

def reduce(self, operator):

357

"""Reduces each group to single element."""

358

359

def aggregate(self, aggregation, field):

360

"""Aggregates each group on specified field."""

361

362

def min(self, field):

363

"""Finds minimum in each group."""

364

365

def max(self, field):

366

"""Finds maximum in each group."""

367

368

def sum(self, field):

369

"""Sums values in each group."""

370

```

371

372

### SortedGrouping

373

374

Extends UnsortedGrouping with intra-group sorting capabilities.

375

376

```python { .api }

377

class SortedGrouping(UnsortedGrouping):

378

def sort_group(self, field, order):

379

"""

380

Sorts elements within each group.

381

382

Parameters:

383

field (int): Field to sort by

384

order (Order): Sort direction (ASCENDING, DESCENDING)

385

386

Returns:

387

SortedGrouping: Self for method chaining

388

"""

389

```

390

391

## Usage Examples

392

393

### Basic Transformations

394

395

```python

396

from flink.plan.Environment import get_environment

397

398

env = get_environment()

399

data = env.from_elements(1, 2, 3, 4, 5)

400

401

# Map transformation

402

doubled = data.map(lambda x: x * 2)

403

404

# Filter transformation

405

evens = data.filter(lambda x: x % 2 == 0)

406

407

# Flat map transformation

408

pairs = data.flat_map(lambda x: [x, x])

409

```

410

411

### Aggregations and Grouping

412

413

```python

414

# Create data with tuples

415

data = env.from_elements(("apple", 5), ("banana", 3), ("apple", 2), ("banana", 7))

416

417

# Group by first field and sum second field

418

result = data.group_by(0).sum(1)

419

420

# Alternative using aggregate

421

from flink.functions.Aggregation import Sum

422

result = data.group_by(0).aggregate(Sum(), 1)

423

```

424

425

### Advanced Processing Patterns

426

427

```python

428

from flink.functions.GroupReduceFunction import GroupReduceFunction

429

430

class WordCounter(GroupReduceFunction):

431

def reduce(self, iterator, collector):

432

count = 0

433

word = None

434

for element in iterator:

435

word = element

436

count += 1

437

collector.collect((word, count))

438

439

# Word counting pipeline

440

text_data = env.read_text("input.txt")

441

words = text_data.flat_map(lambda line: line.lower().split())

442

word_counts = words.group_by(0).reduce_group(WordCounter())

443

```

444

445

### Performance Optimization

446

447

```python

448

# Hash partition for better distribution

449

partitioned_data = data.partition_by_hash(0)

450

451

# Rebalance for even load distribution

452

balanced_data = data.rebalance()

453

454

# Configure operation parallelism

455

result = data.map(lambda x: x * 2).set_parallelism(8).name("Double Values")

456

```