or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

clustering.mddistributed-copy.mdgraph-processing.mdindex.mdmisc-examples.mdrelational-processing.mdword-count.md

relational-processing.mddocs/

0

# Relational Processing

1

2

SQL-like operations and analytics including web log analysis, TPC-H benchmark queries, and accumulator examples. Features filtering, joining, grouping, and custom metrics collection patterns.

3

4

## Capabilities

5

6

### Web Log Analysis

7

8

Analytical processing of web server logs with filtering, joining, and aggregation operations.

9

10

```java { .api }

11

/**

12

* Web log analysis demonstrating relational operations on log data.

13

* Usage: WebLogAnalysis --documents <path> --ranks <path> --visits <path> --output <path>

14

*/

15

public class WebLogAnalysis {

16

public static void main(String[] args) throws Exception;

17

18

/**

19

* Filters documents by keyword content

20

*/

21

public static class FilterDocByKeyWords

22

implements FilterFunction<Tuple2<String, String>> {

23

/**

24

* Checks if document contains specified keywords

25

* @param value Tuple (document_url, content)

26

* @return true if document contains keywords, false otherwise

27

*/

28

public boolean filter(Tuple2<String, String> value);

29

}

30

31

/**

32

* Filters entries by rank threshold

33

*/

34

public static class FilterByRank

35

implements FilterFunction<Tuple3<Integer, String, Integer>> {

36

/**

37

* Checks if rank exceeds threshold

38

* @param value Tuple (rank, url, visitor_count)

39

* @return true if rank above threshold, false otherwise

40

*/

41

public boolean filter(Tuple3<Integer, String, Integer> value);

42

}

43

44

/**

45

* Filters visits by date criteria

46

*/

47

public static class FilterVisitsByDate

48

implements FilterFunction<Tuple2<String, String>> {

49

/**

50

* Checks if visit meets date criteria

51

* @param value Tuple (url, visit_date)

52

* @return true if visit matches date filter, false otherwise

53

*/

54

public boolean filter(Tuple2<String, String> value);

55

}

56

57

/**

58

* Anti-join operation for excluding visits

59

*/

60

public static class AntiJoinVisits

61

implements CoGroupFunction<

62

Tuple2<String, String>,

63

Tuple2<String, String>,

64

Tuple2<String, String>> {

65

/**

66

* Performs anti-join to exclude certain visits

67

* @param ranks Iterator of rank data

68

* @param visits Iterator of visit data

69

* @param out Collector for anti-join results

70

*/

71

public void coGroup(

72

Iterable<Tuple2<String, String>> ranks,

73

Iterable<Tuple2<String, String>> visits,

74

Collector<Tuple2<String, String>> out);

75

}

76

}

77

```

78

79

**Usage Examples:**

80

81

```java

82

// Run web log analysis with custom data

83

String[] args = {

84

"--documents", "/path/to/documents.txt",

85

"--ranks", "/path/to/ranks.txt",

86

"--visits", "/path/to/visits.txt",

87

"--output", "/path/to/output"

88

};

89

WebLogAnalysis.main(args);

90

91

// Use web log filters in custom analysis

92

DataSet<Tuple2<String, String>> documents = getDocumentDataSet(env);

93

DataSet<Tuple2<String, String>> filtered = documents

94

.filter(new WebLogAnalysis.FilterDocByKeyWords());

95

96

DataSet<Tuple3<Integer, String, Integer>> ranks = getRankDataSet(env);

97

DataSet<Tuple3<Integer, String, Integer>> highRanks = ranks

98

.filter(new WebLogAnalysis.FilterByRank());

99

```

100

101

### TPC-H Benchmark Queries

102

103

Implementation of TPC-H decision support benchmark queries demonstrating complex relational operations.

104

105

```java { .api }

106

/**

107

* TPC-H Query 3: Shipping Priority Query

108

* Usage: TPCHQuery3 --lineitem <path> --customer <path> --orders <path> --output <path>

109

*/

110

public class TPCHQuery3 {

111

public static void main(String[] args) throws Exception;

112

113

/**

114

* Line item record from TPC-H schema

115

*/

116

public static class Lineitem extends Tuple4<Long, Double, Double, String> {

117

public Lineitem();

118

public Lineitem(Long orderkey, Double extendedprice, Double discount, String shipdate);

119

120

public Long getOrderkey();

121

public Double getExtendedprice();

122

public Double getDiscount();

123

public String getShipdate();

124

125

public void setOrderkey(Long orderkey);

126

public void setExtendedprice(Double extendedprice);

127

public void setDiscount(Double discount);

128

public void setShipdate(String shipdate);

129

}

130

131

/**

132

* Customer record from TPC-H schema

133

*/

134

public static class Customer extends Tuple2<Long, String> {

135

public Customer();

136

public Customer(Long custkey, String mktsegment);

137

138

public Long getCustkey();

139

public String getMktsegment();

140

141

public void setCustkey(Long custkey);

142

public void setMktsegment(String mktsegment);

143

}

144

145

/**

146

* Order record from TPC-H schema

147

*/

148

public static class Order extends Tuple4<Long, Long, String, Long> {

149

public Order();

150

public Order(Long orderkey, Long custkey, String orderpriority, Long shippriority);

151

152

public Long getOrderkey();

153

public Long getCustkey();

154

public String getOrderpriority();

155

public Long getShippriority();

156

157

public void setOrderkey(Long orderkey);

158

public void setCustkey(Long custkey);

159

public void setOrderpriority(String orderpriority);

160

public void setShippriority(Long shippriority);

161

}

162

163

/**

164

* Result record for shipping priority query

165

*/

166

public static class ShippingPriorityItem extends Tuple4<Long, Double, String, Long> {

167

public ShippingPriorityItem();

168

public ShippingPriorityItem(Long orderkey, Double revenue, String orderdate, Long shippriority);

169

170

public Long getOrderkey();

171

public Double getRevenue();

172

public String getOrderdate();

173

public Long getShippriority();

174

175

public void setOrderkey(Long orderkey);

176

public void setRevenue(Double revenue);

177

public void setOrderdate(String orderdate);

178

public void setShippriority(Long shippriority);

179

}

180

}

181

182

/**

183

* TPC-H Query 10: Customer Return Query

184

* Usage: TPCHQuery10 --customer <path> --orders <path> --lineitem <path> --nation <path> --output <path>

185

*/

186

public class TPCHQuery10 {

187

public static void main(String[] args) throws Exception;

188

}

189

```

190

191

**Usage Examples:**

192

193

```java

194

// Run TPC-H Query 3

195

String[] args = {

196

"--lineitem", "/path/to/lineitem.tbl",

197

"--customer", "/path/to/customer.tbl",

198

"--orders", "/path/to/orders.tbl",

199

"--output", "/path/to/query3_results"

200

};

201

TPCHQuery3.main(args);

202

203

// Use TPC-H data types in custom queries

204

TPCHQuery3.Customer customer = new TPCHQuery3.Customer(1L, "BUILDING");

205

TPCHQuery3.Order order = new TPCHQuery3.Order(1L, 1L, "1-URGENT", 0L);

206

TPCHQuery3.Lineitem item = new TPCHQuery3.Lineitem(1L, 25000.0, 0.05, "1995-03-15");

207

208

// Calculate revenue

209

double revenue = item.getExtendedprice() * (1.0 - item.getDiscount());

210

```

211

212

### Accumulator Examples

213

214

Demonstration of custom accumulators for collecting metrics during job execution.

215

216

```java { .api }

217

/**

218

* Example using custom accumulators to count empty fields in data processing.

219

* Usage: EmptyFieldsCountAccumulator --input <path> --output <path>

220

*/

221

public class EmptyFieldsCountAccumulator {

222

public static final String EMPTY_FIELD_ACCUMULATOR = "empty-fields";

223

224

public static void main(final String[] args) throws Exception;

225

226

/**

227

* String triple data type for processing

228

*/

229

public static class StringTriple extends Tuple3<String, String, String> {

230

public StringTriple();

231

public StringTriple(String first, String second, String third);

232

233

public String getFirst();

234

public String getSecond();

235

public String getThird();

236

237

public void setFirst(String first);

238

public void setSecond(String second);

239

public void setThird(String third);

240

}

241

242

/**

243

* Filter that counts empty fields using accumulator

244

*/

245

public static final class EmptyFieldFilter extends RichFilterFunction<StringTriple> {

246

private VectorAccumulator emptyFieldCounter;

247

248

/**

249

* Initialize accumulator

250

* @param parameters Configuration parameters

251

*/

252

@Override

253

public void open(Configuration parameters) throws Exception;

254

255

/**

256

* Filter records and count empty fields

257

* @param value Input string triple

258

* @return true to keep record, false to filter out

259

*/

260

@Override

261

public boolean filter(StringTriple value) throws Exception;

262

}

263

264

/**

265

* Custom vector accumulator for collecting integer lists

266

*/

267

public static class VectorAccumulator implements Accumulator<Integer, ArrayList<Integer>> {

268

private ArrayList<Integer> localValue = new ArrayList<>();

269

270

/**

271

* Add value to accumulator

272

* @param value Value to add

273

*/

274

@Override

275

public void add(Integer value);

276

277

/**

278

* Get local accumulated value

279

* @return Local accumulator value

280

*/

281

@Override

282

public ArrayList<Integer> getLocalValue();

283

284

/**

285

* Reset local accumulator

286

*/

287

@Override

288

public void resetLocal();

289

290

/**

291

* Merge accumulator values

292

* @param other Other accumulator to merge

293

*/

294

@Override

295

public void merge(Accumulator<Integer, ArrayList<Integer>> other);

296

297

/**

298

* Clone accumulator

299

* @return Cloned accumulator instance

300

*/

301

@Override

302

public Accumulator<Integer, ArrayList<Integer>> clone();

303

}

304

}

305

```

306

307

**Usage Examples:**

308

309

```java

310

// Run accumulator example

311

String[] args = {

312

"--input", "/path/to/data.txt",

313

"--output", "/path/to/filtered_output"

314

};

315

EmptyFieldsCountAccumulator.main(args);

316

317

// Use custom accumulator in job

318

DataSet<EmptyFieldsCountAccumulator.StringTriple> data = getStringTripleDataSet(env);

319

DataSet<EmptyFieldsCountAccumulator.StringTriple> filtered = data

320

.filter(new EmptyFieldsCountAccumulator.EmptyFieldFilter());

321

322

// Access accumulator results after execution

323

JobExecutionResult result = env.execute("Accumulator Job");

324

Map<String, Object> accumulatorResults = result.getAllAccumulatorResults();

325

ArrayList<Integer> emptyCounts = (ArrayList<Integer>) accumulatorResults

326

.get(EmptyFieldsCountAccumulator.EMPTY_FIELD_ACCUMULATOR);

327

```

328

329

### Relational Data Providers

330

331

Utility classes providing default relational datasets for testing and examples.

332

333

```java { .api }

334

/**

335

* Provides default web log data sets

336

*/

337

public class WebLogData {

338

/**

339

* Default document data as object arrays

340

*/

341

public static final Object[][] DOCUMENTS;

342

343

/**

344

* Default rank data as object arrays

345

*/

346

public static final Object[][] RANKS;

347

348

/**

349

* Default visit data as object arrays

350

*/

351

public static final Object[][] VISITS;

352

353

/**

354

* Creates DataSet with default document data

355

* @param env Execution environment

356

* @return DataSet containing default documents

357

*/

358

public static DataSet<Tuple2<String, String>> getDocumentDataSet(ExecutionEnvironment env);

359

360

/**

361

* Creates DataSet with default rank data

362

* @param env Execution environment

363

* @return DataSet containing default ranks

364

*/

365

public static DataSet<Tuple3<Integer, String, Integer>> getRankDataSet(ExecutionEnvironment env);

366

367

/**

368

* Creates DataSet with default visit data

369

* @param env Execution environment

370

* @return DataSet containing default visits

371

*/

372

public static DataSet<Tuple2<String, String>> getVisitDataSet(ExecutionEnvironment env);

373

}

374

375

/**

376

* Generates web log data files for testing

377

*/

378

public class WebLogDataGenerator {

379

public static void main(String[] args) throws Exception;

380

}

381

```

382

383

**Usage Examples:**

384

385

```java

386

// Use default web log data

387

import org.apache.flink.examples.java.relational.util.WebLogData;

388

389

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

390

DataSet<Tuple2<String, String>> documents = WebLogData.getDocumentDataSet(env);

391

DataSet<Tuple3<Integer, String, Integer>> ranks = WebLogData.getRankDataSet(env);

392

DataSet<Tuple2<String, String>> visits = WebLogData.getVisitDataSet(env);

393

394

// Generate custom web log data

395

String[] generatorArgs = {

396

"--output", "/path/to/weblog_data",

397

"--documents", "1000",

398

"--visits", "5000"

399

};

400

WebLogDataGenerator.main(generatorArgs);

401

```

402

403

## Common Relational Processing Patterns

404

405

### Filtering and Selection

406

407

Standard filtering patterns used across relational examples:

408

409

```java

410

// Document filtering by keyword

411

DataSet<Tuple2<String, String>> documents = getDocumentDataSet(env);

412

DataSet<Tuple2<String, String>> keywordDocs = documents

413

.filter(new WebLogAnalysis.FilterDocByKeyWords());

414

415

// Rank-based filtering

416

DataSet<Tuple3<Integer, String, Integer>> ranks = getRankDataSet(env);

417

DataSet<Tuple3<Integer, String, Integer>> highRanks = ranks

418

.filter(new WebLogAnalysis.FilterByRank());

419

420

// Date-based filtering

421

DataSet<Tuple2<String, String>> visits = getVisitDataSet(env);

422

DataSet<Tuple2<String, String>> recentVisits = visits

423

.filter(new WebLogAnalysis.FilterVisitsByDate());

424

```

425

426

### Joining and Co-grouping

427

428

Join operations for combining related datasets:

429

430

```java

431

// Inner join example

432

DataSet<TPCHQuery3.Customer> customers = getCustomerDataSet(env);

433

DataSet<TPCHQuery3.Order> orders = getOrderDataSet(env);

434

435

DataSet<Tuple2<TPCHQuery3.Customer, TPCHQuery3.Order>> joined = customers

436

.join(orders)

437

.where("custkey")

438

.equalTo("custkey");

439

440

// Anti-join using co-group

441

DataSet<Tuple2<String, String>> ranks = getRankDataSet(env);

442

DataSet<Tuple2<String, String>> visits = getVisitDataSet(env);

443

444

DataSet<Tuple2<String, String>> antiJoined = ranks

445

.coGroup(visits)

446

.where(1)

447

.equalTo(0)

448

.with(new WebLogAnalysis.AntiJoinVisits());

449

```

450

451

### TPC-H Data Format Requirements

452

453

TPC-H examples expect pipe-delimited files:

454

455

**Customer table format:**

456

```

457

1|Customer#000000001|IVhzIApeRb ot,c,E|15|25-989-741-2988|711.56|BUILDING|to the even...

458

2|Customer#000000002|XSTf4,NCwDVaWNE6tEgvwfmRchLXak|13|23-768-687-3665|121.65|AUTOMOBILE|l accounts...

459

```

460

461

**Orders table format:**

462

```

463

1|36901|O|173665.47|1996-01-02|5-LOW|Clerk#000000951|0|nstructions of fi|

464

2|78002|O|46929.18|1996-12-01|1-URGENT|Clerk#000000880|0|foxes. pending|

465

```

466

467

### Accumulator Usage Pattern

468

469

Standard pattern for using custom accumulators:

470

471

```java

472

// Register accumulator in open() method

473

private MyAccumulator myCounter;

474

475

@Override

476

public void open(Configuration parameters) throws Exception {

477

this.myCounter = getRuntimeContext().getAccumulator("my-counter");

478

}

479

480

// Use accumulator in user function

481

@Override

482

public boolean filter(MyType value) throws Exception {

483

if (someCondition(value)) {

484

myCounter.add(1);

485

return true;

486

}

487

return false;

488

}

489

490

// Access results after job execution

491

JobExecutionResult result = env.execute("My Job");

492

Object accumulatorValue = result.getAccumulatorResult("my-counter");

493

```

494

495

## Types

496

497

### Relational Data Types

498

499

```java { .api }

500

// Web log tuples

501

Tuple2<String, String> document = new Tuple2<>("url", "content");

502

Tuple3<Integer, String, Integer> rank = new Tuple3<>(50, "url", 1000);

503

Tuple2<String, String> visit = new Tuple2<>("url", "2023-01-01");

504

505

// TPC-H business objects

506

TPCHQuery3.Customer customer = new TPCHQuery3.Customer(1L, "BUILDING");

507

TPCHQuery3.Order order = new TPCHQuery3.Order(1L, 1L, "1-URGENT", 0L);

508

TPCHQuery3.Lineitem lineitem = new TPCHQuery3.Lineitem(1L, 1000.0, 0.05, "1995-03-15");

509

510

// Accumulator types

511

EmptyFieldsCountAccumulator.StringTriple triple = new EmptyFieldsCountAccumulator.StringTriple("a", "", "c");

512

ArrayList<Integer> accumulatorResult = new ArrayList<>();

513

```