or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

algorithm-operators.mdindex.mdlinear-algebra.mdml-environment.mdml-pipeline.mdstatistical-operations.mdtable-utilities.md

algorithm-operators.mddocs/

0

# Algorithm Operators

1

2

Base classes for algorithm operators supporting both batch and stream processing with output table management and parameter configuration. The operator system provides the foundation for implementing ML algorithms in Flink.

3

4

## Capabilities

5

6

### AlgoOperator Abstract Class

7

8

Base class for all algorithm operators providing common functionality for output table management, parameter handling, and schema operations.

9

10

```java { .api }

11

/**

12

* Base class for algorithm operators with output table management

13

*/

14

public abstract class AlgoOperator<T extends AlgoOperator<T>>

15

implements WithParams<T>, HasMLEnvironmentId<T>, Serializable {

16

17

/** Default constructor */

18

public AlgoOperator();

19

20

/** Constructor with parameters */

21

public AlgoOperator(Params params);

22

23

/** Get parameters */

24

public Params getParams();

25

26

/**

27

* Get primary output table

28

* @return Primary output table

29

*/

30

public Table getOutput();

31

32

/**

33

* Get side output tables

34

* @return Array of side output tables

35

*/

36

public Table[] getSideOutputs();

37

38

/**

39

* Get output column names

40

* @return Array of column names

41

*/

42

public String[] getColNames();

43

44

/**

45

* Get output column types

46

* @return Array of column type information

47

*/

48

public TypeInformation<?>[] getColTypes();

49

50

/**

51

* Get side output column names by index

52

* @param index Side output index

53

* @return Array of column names for specified side output

54

*/

55

public String[] getSideOutputColNames(int index);

56

57

/**

58

* Get side output column types by index

59

* @param index Side output index

60

* @return Array of column types for specified side output

61

*/

62

public TypeInformation<?>[] getSideOutputColTypes(int index);

63

64

/**

65

* Get output table schema

66

* @return TableSchema of primary output

67

*/

68

public TableSchema getSchema();

69

70

/**

71

* String representation of operator

72

* @return String description

73

*/

74

public String toString();

75

76

/** Set output table (protected) */

77

protected void setOutput(Table output);

78

79

/** Set side output tables (protected) */

80

protected void setSideOutputs(Table[] sideOutputs);

81

}

82

```

83

84

### BatchOperator Abstract Class

85

86

Base class for batch algorithm operators providing linking capabilities and batch-specific functionality.

87

88

```java { .api }

89

/**

90

* Base class for batch algorithm operators

91

*/

92

public abstract class BatchOperator<T extends BatchOperator<T>> extends AlgoOperator<T> {

93

94

/** Default constructor */

95

public BatchOperator();

96

97

/** Constructor with parameters */

98

public BatchOperator(Params params);

99

100

/**

101

* Link to next batch operator in chain

102

* @param next Next operator to link to

103

* @return The next operator for method chaining

104

*/

105

public <B extends BatchOperator<?>> B link(B next);

106

107

/**

108

* Link from input batch operators (abstract - implemented by subclasses)

109

* @param inputs Input batch operators

110

* @return This operator instance configured with inputs

111

*/

112

public abstract T linkFrom(BatchOperator<?>... inputs);

113

114

/**

115

* Create batch operator from table

116

* @param table Input table

117

* @return BatchOperator wrapping the table

118

*/

119

public static BatchOperator<?> fromTable(Table table);

120

121

/**

122

* Validate and get first input operator (protected utility)

123

* @param inputs Input operators

124

* @return First input operator

125

*/

126

protected static BatchOperator<?> checkAndGetFirst(BatchOperator<?>... inputs);

127

}

128

```

129

130

**Usage Examples:**

131

132

```java

133

import org.apache.flink.ml.operator.batch.BatchOperator;

134

import org.apache.flink.table.api.Table;

135

136

// Example custom batch operator implementation

137

public class MyBatchOperator extends BatchOperator<MyBatchOperator> {

138

139

public MyBatchOperator() {

140

super();

141

}

142

143

public MyBatchOperator(Params params) {

144

super(params);

145

}

146

147

@Override

148

public MyBatchOperator linkFrom(BatchOperator<?>... inputs) {

149

// Validate inputs

150

BatchOperator<?> input = checkAndGetFirst(inputs);

151

152

// Get input table

153

Table inputTable = input.getOutput();

154

155

// Process the table (example transformation)

156

Table outputTable = inputTable.select("*"); // Your processing logic here

157

158

// Set the output

159

this.setOutput(outputTable);

160

161

return this;

162

}

163

}

164

165

// Usage - operator chaining

166

BatchOperator<?> source = BatchOperator.fromTable(inputTable);

167

168

MyBatchOperator operator1 = new MyBatchOperator()

169

.setParam("parameter1", "value1");

170

171

MyBatchOperator operator2 = new MyBatchOperator()

172

.setParam("parameter2", "value2");

173

174

// Chain operators

175

Table result = source

176

.link(operator1.linkFrom(source))

177

.link(operator2.linkFrom(operator1))

178

.getOutput();

179

```

180

181

### StreamOperator Abstract Class

182

183

Base class for stream algorithm operators providing linking capabilities and stream-specific functionality.

184

185

```java { .api }

186

/**

187

* Base class for stream algorithm operators

188

*/

189

public abstract class StreamOperator<T extends StreamOperator<T>> extends AlgoOperator<T> {

190

191

/** Default constructor */

192

public StreamOperator();

193

194

/** Constructor with parameters */

195

public StreamOperator(Params params);

196

197

/**

198

* Link to next stream operator in chain

199

* @param next Next operator to link to

200

* @return The next operator for method chaining

201

*/

202

public <S extends StreamOperator<?>> S link(S next);

203

204

/**

205

* Link from input stream operators (abstract - implemented by subclasses)

206

* @param inputs Input stream operators

207

* @return This operator instance configured with inputs

208

*/

209

public abstract T linkFrom(StreamOperator<?>... inputs);

210

211

/**

212

* Create stream operator from table

213

* @param table Input table

214

* @return StreamOperator wrapping the table

215

*/

216

public static StreamOperator<?> fromTable(Table table);

217

}

218

```

219

220

**Usage Examples:**

221

222

```java

223

import org.apache.flink.ml.operator.stream.StreamOperator;

224

import org.apache.flink.table.api.Table;

225

226

// Example custom stream operator implementation

227

public class MyStreamOperator extends StreamOperator<MyStreamOperator> {

228

229

public MyStreamOperator() {

230

super();

231

}

232

233

@Override

234

public MyStreamOperator linkFrom(StreamOperator<?>... inputs) {

235

StreamOperator<?> input = inputs[0]; // Get first input

236

237

// Get input table

238

Table inputTable = input.getOutput();

239

240

// Process the streaming table

241

Table outputTable = inputTable.select("*"); // Your streaming logic here

242

243

// Set the output

244

this.setOutput(outputTable);

245

246

return this;

247

}

248

}

249

250

// Usage - stream processing

251

StreamOperator<?> streamSource = StreamOperator.fromTable(streamingTable);

252

253

MyStreamOperator streamProcessor = new MyStreamOperator();

254

255

Table streamResult = streamSource

256

.link(streamProcessor.linkFrom(streamSource))

257

.getOutput();

258

```

259

260

### Table Source Operators

261

262

Specialized operators for creating operators from table sources in both batch and stream contexts.

263

264

```java { .api }

265

/**

266

* Batch operator for table sources

267

*/

268

public class TableSourceBatchOp extends BatchOperator<TableSourceBatchOp> {

269

/**

270

* Constructor from table

271

* @param table Source table

272

*/

273

public TableSourceBatchOp(Table table);

274

275

@Override

276

public TableSourceBatchOp linkFrom(BatchOperator<?>... inputs);

277

}

278

279

/**

280

* Stream operator for table sources

281

*/

282

public class TableSourceStreamOp extends StreamOperator<TableSourceStreamOp> {

283

/**

284

* Constructor from table

285

* @param table Source table

286

*/

287

public TableSourceStreamOp(Table table);

288

289

@Override

290

public TableSourceStreamOp linkFrom(StreamOperator<?>... inputs);

291

}

292

```

293

294

**Usage Examples:**

295

296

```java

297

import org.apache.flink.ml.operator.batch.source.TableSourceBatchOp;

298

import org.apache.flink.ml.operator.stream.source.TableSourceStreamOp;

299

300

// Create operators from tables

301

Table batchTable = getBatchTable();

302

Table streamTable = getStreamTable();

303

304

// Batch table source

305

TableSourceBatchOp batchSource = new TableSourceBatchOp(batchTable);

306

307

// Stream table source

308

TableSourceStreamOp streamSource = new TableSourceStreamOp(streamTable);

309

310

// Use in operator chains

311

MyBatchOperator batchProcessor = new MyBatchOperator();

312

Table batchResult = batchSource

313

.link(batchProcessor.linkFrom(batchSource))

314

.getOutput();

315

316

MyStreamOperator streamProcessor = new MyStreamOperator();

317

Table streamResult = streamSource

318

.link(streamProcessor.linkFrom(streamSource))

319

.getOutput();

320

```

321

322

### Operator Pattern Examples

323

324

Common patterns for implementing algorithm operators with proper input validation and output management.

325

326

**Usage Examples:**

327

328

```java

329

// Multi-input operator example

330

public class JoinOperator extends BatchOperator<JoinOperator> {

331

332

@Override

333

public JoinOperator linkFrom(BatchOperator<?>... inputs) {

334

// Validate multiple inputs

335

if (inputs.length != 2) {

336

throw new IllegalArgumentException("JoinOperator requires exactly 2 inputs");

337

}

338

339

BatchOperator<?> left = inputs[0];

340

BatchOperator<?> right = inputs[1];

341

342

// Get input tables

343

Table leftTable = left.getOutput();

344

Table rightTable = right.getOutput();

345

346

// Perform join operation

347

String joinCondition = getParams().get(JOIN_CONDITION);

348

Table joinedTable = leftTable.join(rightTable, joinCondition);

349

350

// Set output

351

this.setOutput(joinedTable);

352

353

return this;

354

}

355

}

356

357

// Side output operator example

358

public class SplitOperator extends BatchOperator<SplitOperator> {

359

360

@Override

361

public SplitOperator linkFrom(BatchOperator<?>... inputs) {

362

BatchOperator<?> input = checkAndGetFirst(inputs);

363

Table inputTable = input.getOutput();

364

365

// Split condition

366

String condition = getParams().get(SPLIT_CONDITION);

367

368

// Create main and side outputs

369

Table mainOutput = inputTable.filter(condition);

370

Table sideOutput = inputTable.filter("!(" + condition + ")");

371

372

// Set outputs

373

this.setOutput(mainOutput);

374

this.setSideOutputs(new Table[]{sideOutput});

375

376

return this;

377

}

378

}

379

380

// Parameter-driven operator example

381

public class SelectOperator extends BatchOperator<SelectOperator>

382

implements HasSelectedCols<SelectOperator> {

383

384

@Override

385

public SelectOperator linkFrom(BatchOperator<?>... inputs) {

386

BatchOperator<?> input = checkAndGetFirst(inputs);

387

Table inputTable = input.getOutput();

388

389

// Get selected columns from parameters

390

String[] selectedCols = getSelectedCols();

391

if (selectedCols == null || selectedCols.length == 0) {

392

selectedCols = inputTable.getSchema().getFieldNames();

393

}

394

395

// Select columns

396

Table selectedTable = inputTable.select(String.join(",", selectedCols));

397

398

this.setOutput(selectedTable);

399

return this;

400

}

401

}

402

403

// Usage

404

SelectOperator selector = new SelectOperator()

405

.setSelectedCols(new String[]{"col1", "col2", "col3"});

406

407

JoinOperator joiner = new JoinOperator()

408

.set(JOIN_CONDITION, "a.id = b.id");

409

410

SplitOperator splitter = new SplitOperator()

411

.set(SPLIT_CONDITION, "age > 18");

412

413

// Build pipeline

414

Table result = source

415

.link(selector.linkFrom(source))

416

.link(joiner.linkFrom(selector, anotherSource))

417

.link(splitter.linkFrom(joiner))

418

.getOutput();

419

420

// Access side outputs

421

Table sideOutput = splitter.getSideOutputs()[0];

422

```