or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

algorithm-operators.mdenvironment-management.mdindex.mdlinear-algebra.mdparameter-system.mdpipeline-base-classes.mdpipeline-framework.mdutility-libraries.md

algorithm-operators.mddocs/

0

# Algorithm Operators

1

2

Batch and stream processing operators for building custom ML algorithms. Provides linking capabilities and integration with Flink's Table API for scalable data processing workflows.

3

4

## Capabilities

5

6

### AlgoOperator Base Class

7

8

Abstract base class for all algorithm operators providing common functionality.

9

10

```java { .api }

11

/**

12

* Base class for algorithm operators with parameter support

13

* @param <T> The concrete operator type for method chaining

14

*/

15

public abstract class AlgoOperator<T extends AlgoOperator<T>>

16

implements WithParams<T>, HasMLEnvironmentId<T>, Serializable {

17

18

/** Create operator with empty parameters */

19

public AlgoOperator();

20

21

/** Create operator with initial parameters */

22

public AlgoOperator(Params params);

23

24

/** Get all operator parameters */

25

public Params getParams();

26

27

/** Get primary output table */

28

public Table getOutput();

29

30

/** Get all side output tables */

31

public Table[] getSideOutputs();

32

33

/** Get output column names */

34

public String[] getColNames();

35

36

/** Get output column types */

37

public TypeInformation<?>[] getColTypes();

38

39

/** Get side output column names by index */

40

public String[] getSideOutputColNames(int index);

41

42

/** Get side output column types by index */

43

public TypeInformation<?>[] getSideOutputColTypes(int index);

44

45

/** Get output table schema */

46

public TableSchema getSchema();

47

48

/** Set side output tables (protected) */

49

protected void setSideOutputs(Table[] sideOutputs);

50

51

/** Set primary output table (protected) */

52

protected void setOutput(Table output);

53

54

/** Validate minimum number of input operators */

55

public static void checkMinOpSize(int size, AlgoOperator<?>... inputs);

56

57

/** Validate exact number of input operators */

58

public static void checkOpSize(int size, AlgoOperator<?>... inputs);

59

}

60

```

61

62

### BatchOperator Class

63

64

Base class for batch algorithm operators with linking and chaining capabilities.

65

66

```java { .api }

67

/**

68

* Base class for batch algorithm operators

69

* Provides operator linking and batch-specific functionality

70

* @param <T> The concrete batch operator type

71

*/

72

public abstract class BatchOperator<T extends BatchOperator<T>> extends AlgoOperator<T> {

73

74

/** Create batch operator with empty parameters */

75

public BatchOperator();

76

77

/** Create batch operator with initial parameters */

78

public BatchOperator(Params params);

79

80

/** Link this operator to the next operator in chain */

81

public <B extends BatchOperator<?>> B link(B next);

82

83

/** Link this operator from input operators (abstract) */

84

public abstract T linkFrom(BatchOperator<?>... inputs);

85

86

/** Create batch operator from existing table */

87

public static BatchOperator<?> fromTable(Table table);

88

89

/** Get first input operator, validating inputs exist */

90

public static BatchOperator<?> checkAndGetFirst(BatchOperator<?>... inputs);

91

}

92

```

93

94

**Usage Examples:**

95

96

```java

97

import org.apache.flink.ml.operator.batch.BatchOperator;

98

import org.apache.flink.table.api.Table;

99

100

// Create operators from tables

101

Table inputTable = // ... your input table

102

BatchOperator<?> source = BatchOperator.fromTable(inputTable);

103

104

// Chain operators

105

BatchOperator<?> result = source

106

.link(new MyBatchTransformer())

107

.link(new MyBatchEstimator())

108

.link(new MyBatchSink());

109

110

// Custom batch operator implementation

111

public class MyBatchTransformer extends BatchOperator<MyBatchTransformer> {

112

@Override

113

public MyBatchTransformer linkFrom(BatchOperator<?>... inputs) {

114

BatchOperator.checkOpSize(1, inputs);

115

BatchOperator<?> input = inputs[0];

116

117

// Process input table

118

Table processedTable = // ... transformation logic

119

this.setOutput(processedTable);

120

121

return this;

122

}

123

}

124

```

125

126

### StreamOperator Class

127

128

Base class for stream algorithm operators with real-time processing capabilities.

129

130

```java { .api }

131

/**

132

* Base class for stream algorithm operators

133

* Provides operator linking and stream-specific functionality

134

* @param <T> The concrete stream operator type

135

*/

136

public abstract class StreamOperator<T extends StreamOperator<T>> extends AlgoOperator<T> {

137

138

/** Create stream operator with empty parameters */

139

public StreamOperator();

140

141

/** Create stream operator with initial parameters */

142

public StreamOperator(Params params);

143

144

/** Link this operator to the next operator in chain */

145

public <S extends StreamOperator<?>> S link(S next);

146

147

/** Link this operator from input operators (abstract) */

148

public abstract T linkFrom(StreamOperator<?>... inputs);

149

150

/** Create stream operator from existing table */

151

public static StreamOperator<?> fromTable(Table table);

152

153

/** Get first input operator, validating inputs exist */

154

public static StreamOperator<?> checkAndGetFirst(StreamOperator<?>... inputs);

155

}

156

```

157

158

**Usage Examples:**

159

160

```java

161

import org.apache.flink.ml.operator.stream.StreamOperator;

162

import org.apache.flink.table.api.Table;

163

164

// Create operators from streaming tables

165

Table streamingTable = // ... your streaming table

166

StreamOperator<?> source = StreamOperator.fromTable(streamingTable);

167

168

// Chain streaming operators

169

StreamOperator<?> result = source

170

.link(new MyStreamTransformer())

171

.link(new MyStreamProcessor())

172

.link(new MyStreamSink());

173

174

// Custom stream operator implementation

175

public class MyStreamTransformer extends StreamOperator<MyStreamTransformer> {

176

@Override

177

public MyStreamTransformer linkFrom(StreamOperator<?>... inputs) {

178

StreamOperator.checkOpSize(1, inputs);

179

StreamOperator<?> input = inputs[0];

180

181

// Process streaming table

182

Table processedStream = // ... streaming transformation logic

183

this.setOutput(processedStream);

184

185

return this;

186

}

187

}

188

```

189

190

### Source Operators

191

192

Specialized operators for converting Tables to operator chains.

193

194

#### TableSourceBatchOp

195

196

```java { .api }

197

/**

198

* Transform Table to batch source operator

199

* Entry point for batch operator chains

200

*/

201

public final class TableSourceBatchOp extends BatchOperator<TableSourceBatchOp> {

202

203

/** Create batch source from table */

204

public TableSourceBatchOp(Table table);

205

206

/** Not supported - throws UnsupportedOperationException */

207

public TableSourceBatchOp linkFrom(BatchOperator<?>... inputs);

208

}

209

```

210

211

#### TableSourceStreamOp

212

213

```java { .api }

214

/**

215

* Transform Table to stream source operator

216

* Entry point for stream operator chains

217

*/

218

public final class TableSourceStreamOp extends StreamOperator<TableSourceStreamOp> {

219

220

/** Create stream source from table */

221

public TableSourceStreamOp(Table table);

222

223

/** Not supported - throws UnsupportedOperationException */

224

public TableSourceStreamOp linkFrom(StreamOperator<?>... inputs);

225

}

226

```

227

228

**Usage Examples:**

229

230

```java

231

import org.apache.flink.ml.operator.batch.source.TableSourceBatchOp;

232

import org.apache.flink.ml.operator.stream.source.TableSourceStreamOp;

233

234

// Batch processing chain

235

Table batchData = // ... your batch table

236

BatchOperator<?> batchChain = new TableSourceBatchOp(batchData)

237

.link(new FeatureScaler())

238

.link(new LinearRegression())

239

.link(new ModelEvaluator());

240

241

// Stream processing chain

242

Table streamData = // ... your streaming table

243

StreamOperator<?> streamChain = new TableSourceStreamOp(streamData)

244

.link(new StreamingFeatureTransformer())

245

.link(new OnlinePredictor())

246

.link(new AlertSystem());

247

```

248

249

## Operator Chaining Patterns

250

251

### Linear Chaining

252

253

Most common pattern where operators are linked in sequence:

254

255

```java

256

// Simple linear chain

257

BatchOperator<?> result = source

258

.link(preprocessor)

259

.link(featureExtractor)

260

.link(classifier);

261

```

262

263

### Multi-Input Operators

264

265

Operators that consume multiple inputs:

266

267

```java

268

public class JoinOperator extends BatchOperator<JoinOperator> {

269

@Override

270

public JoinOperator linkFrom(BatchOperator<?>... inputs) {

271

BatchOperator.checkOpSize(2, inputs); // Require exactly 2 inputs

272

273

BatchOperator<?> left = inputs[0];

274

BatchOperator<?> right = inputs[1];

275

276

// Join logic here

277

Table joined = // ... join left and right tables

278

this.setOutput(joined);

279

280

return this;

281

}

282

}

283

284

// Usage

285

BatchOperator<?> joined = new JoinOperator()

286

.linkFrom(leftOperator, rightOperator);

287

```

288

289

### Multi-Output Operators

290

291

Operators that produce multiple outputs:

292

293

```java

294

public class SplitOperator extends BatchOperator<SplitOperator> {

295

@Override

296

public SplitOperator linkFrom(BatchOperator<?>... inputs) {

297

BatchOperator.checkOpSize(1, inputs);

298

299

Table input = inputs[0].getOutput();

300

301

// Split logic

302

Table mainOutput = // ... main result

303

Table[] sideOutputs = new Table[]{

304

// ... additional outputs

305

};

306

307

this.setOutput(mainOutput);

308

this.setSideOutputs(sideOutputs);

309

310

return this;

311

}

312

}

313

314

// Usage

315

SplitOperator splitter = new SplitOperator().linkFrom(source);

316

Table main = splitter.getOutput();

317

Table[] sides = splitter.getSideOutputs();

318

```

319

320

### Parameter Configuration

321

322

All operators support parameter configuration through the WithParams interface:

323

324

```java

325

public class ConfigurableOperator extends BatchOperator<ConfigurableOperator> {

326

// Parameter definitions

327

public static final ParamInfo<Integer> NUM_ITERATIONS = ParamInfoFactory

328

.createParamInfo("numIterations", Integer.class)

329

.setHasDefaultValue(10)

330

.build();

331

332

public static final ParamInfo<Double> LEARNING_RATE = ParamInfoFactory

333

.createParamInfo("learningRate", Double.class)

334

.setHasDefaultValue(0.01)

335

.build();

336

337

// Convenience methods

338

public ConfigurableOperator setNumIterations(int numIter) {

339

return set(NUM_ITERATIONS, numIter);

340

}

341

342

public int getNumIterations() {

343

return get(NUM_ITERATIONS);

344

}

345

346

@Override

347

public ConfigurableOperator linkFrom(BatchOperator<?>... inputs) {

348

// Use parameters in processing

349

int numIter = getNumIterations();

350

double lr = get(LEARNING_RATE);

351

352

// Processing logic using parameters

353

// ...

354

355

return this;

356

}

357

}

358

359

// Usage with parameters

360

BatchOperator<?> configured = new ConfigurableOperator()

361

.setNumIterations(20)

362

.set(ConfigurableOperator.LEARNING_RATE, 0.001)

363

.linkFrom(source);

364

```

365

366

## Integration with Pipeline Framework

367

368

Algorithm operators can be integrated with the higher-level pipeline framework:

369

370

```java

371

public class MyEstimatorFromOperator extends EstimatorBase<MyEstimatorFromOperator, MyModelFromOperator> {

372

373

@Override

374

protected MyModelFromOperator fit(BatchOperator input) {

375

// Use batch operators for training

376

BatchOperator<?> trained = input

377

.link(new FeaturePreprocessor())

378

.link(new TrainingOperator())

379

.link(new ModelExtractor());

380

381

// Extract model data

382

Table modelData = trained.getOutput();

383

384

return new MyModelFromOperator(this.getParams()).setModelData(modelData);

385

}

386

}

387

388

public class MyModelFromOperator extends ModelBase<MyModelFromOperator> {

389

390

@Override

391

protected BatchOperator transform(BatchOperator input) {

392

// Use batch operators for prediction

393

return input

394

.link(new FeaturePreprocessor())

395

.link(new PredictionOperator().setModelData(this.getModelData()));

396

}

397

398

@Override

399

protected StreamOperator transform(StreamOperator input) {

400

// Use stream operators for real-time prediction

401

return input

402

.link(new StreamFeaturePreprocessor())

403

.link(new StreamPredictionOperator().setModelData(this.getModelData()));

404

}

405

}

406

```

407

408

This integration allows you to leverage the low-level operator framework within the high-level pipeline abstractions, providing flexibility for custom algorithm implementations while maintaining compatibility with the broader ML ecosystem.