0
# Relational Processing
1
2
SQL-like operations and analytics including web log analysis, TPC-H benchmark queries, and accumulator examples. Features filtering, joining, grouping, and custom metrics collection patterns.
3
4
## Capabilities
5
6
### Web Log Analysis
7
8
Analytical processing of web server logs with filtering, joining, and aggregation operations.
9
10
```java { .api }
11
/**
12
* Web log analysis demonstrating relational operations on log data.
13
* Usage: WebLogAnalysis --documents <path> --ranks <path> --visits <path> --output <path>
14
*/
15
public class WebLogAnalysis {
16
public static void main(String[] args) throws Exception;
17
18
/**
19
* Filters documents by keyword content
20
*/
21
public static class FilterDocByKeyWords
22
implements FilterFunction<Tuple2<String, String>> {
23
/**
24
* Checks if document contains specified keywords
25
* @param value Tuple (document_url, content)
26
* @return true if document contains keywords, false otherwise
27
*/
28
public boolean filter(Tuple2<String, String> value);
29
}
30
31
/**
32
* Filters entries by rank threshold
33
*/
34
public static class FilterByRank
35
implements FilterFunction<Tuple3<Integer, String, Integer>> {
36
/**
37
* Checks if rank exceeds threshold
38
* @param value Tuple (rank, url, visitor_count)
39
* @return true if rank above threshold, false otherwise
40
*/
41
public boolean filter(Tuple3<Integer, String, Integer> value);
42
}
43
44
/**
45
* Filters visits by date criteria
46
*/
47
public static class FilterVisitsByDate
48
implements FilterFunction<Tuple2<String, String>> {
49
/**
50
* Checks if visit meets date criteria
51
* @param value Tuple (url, visit_date)
52
* @return true if visit matches date filter, false otherwise
53
*/
54
public boolean filter(Tuple2<String, String> value);
55
}
56
57
/**
58
* Anti-join operation for excluding visits
59
*/
60
public static class AntiJoinVisits
61
implements CoGroupFunction<
62
Tuple2<String, String>,
63
Tuple2<String, String>,
64
Tuple2<String, String>> {
65
/**
66
* Performs anti-join to exclude certain visits
67
* @param ranks Iterator of rank data
68
* @param visits Iterator of visit data
69
* @param out Collector for anti-join results
70
*/
71
public void coGroup(
72
Iterable<Tuple2<String, String>> ranks,
73
Iterable<Tuple2<String, String>> visits,
74
Collector<Tuple2<String, String>> out);
75
}
76
}
77
```
78
79
**Usage Examples:**
80
81
```java
82
// Run web log analysis with custom data
83
String[] args = {
84
"--documents", "/path/to/documents.txt",
85
"--ranks", "/path/to/ranks.txt",
86
"--visits", "/path/to/visits.txt",
87
"--output", "/path/to/output"
88
};
89
WebLogAnalysis.main(args);
90
91
// Use web log filters in custom analysis
92
DataSet<Tuple2<String, String>> documents = getDocumentDataSet(env);
93
DataSet<Tuple2<String, String>> filtered = documents
94
.filter(new WebLogAnalysis.FilterDocByKeyWords());
95
96
DataSet<Tuple3<Integer, String, Integer>> ranks = getRankDataSet(env);
97
DataSet<Tuple3<Integer, String, Integer>> highRanks = ranks
98
.filter(new WebLogAnalysis.FilterByRank());
99
```
100
101
### TPC-H Benchmark Queries
102
103
Implementation of TPC-H decision support benchmark queries demonstrating complex relational operations.
104
105
```java { .api }
106
/**
107
* TPC-H Query 3: Shipping Priority Query
108
* Usage: TPCHQuery3 --lineitem <path> --customer <path> --orders <path> --output <path>
109
*/
110
public class TPCHQuery3 {
111
public static void main(String[] args) throws Exception;
112
113
/**
114
* Line item record from TPC-H schema
115
*/
116
public static class Lineitem extends Tuple4<Long, Double, Double, String> {
117
public Lineitem();
118
public Lineitem(Long orderkey, Double extendedprice, Double discount, String shipdate);
119
120
public Long getOrderkey();
121
public Double getExtendedprice();
122
public Double getDiscount();
123
public String getShipdate();
124
125
public void setOrderkey(Long orderkey);
126
public void setExtendedprice(Double extendedprice);
127
public void setDiscount(Double discount);
128
public void setShipdate(String shipdate);
129
}
130
131
/**
132
* Customer record from TPC-H schema
133
*/
134
public static class Customer extends Tuple2<Long, String> {
135
public Customer();
136
public Customer(Long custkey, String mktsegment);
137
138
public Long getCustkey();
139
public String getMktsegment();
140
141
public void setCustkey(Long custkey);
142
public void setMktsegment(String mktsegment);
143
}
144
145
/**
146
* Order record from TPC-H schema
147
*/
148
public static class Order extends Tuple4<Long, Long, String, Long> {
149
public Order();
150
public Order(Long orderkey, Long custkey, String orderpriority, Long shippriority);
151
152
public Long getOrderkey();
153
public Long getCustkey();
154
public String getOrderpriority();
155
public Long getShippriority();
156
157
public void setOrderkey(Long orderkey);
158
public void setCustkey(Long custkey);
159
public void setOrderpriority(String orderpriority);
160
public void setShippriority(Long shippriority);
161
}
162
163
/**
164
* Result record for shipping priority query
165
*/
166
public static class ShippingPriorityItem extends Tuple4<Long, Double, String, Long> {
167
public ShippingPriorityItem();
168
public ShippingPriorityItem(Long orderkey, Double revenue, String orderdate, Long shippriority);
169
170
public Long getOrderkey();
171
public Double getRevenue();
172
public String getOrderdate();
173
public Long getShippriority();
174
175
public void setOrderkey(Long orderkey);
176
public void setRevenue(Double revenue);
177
public void setOrderdate(String orderdate);
178
public void setShippriority(Long shippriority);
179
}
180
}
181
182
/**
183
* TPC-H Query 10: Customer Return Query
184
* Usage: TPCHQuery10 --customer <path> --orders <path> --lineitem <path> --nation <path> --output <path>
185
*/
186
public class TPCHQuery10 {
187
public static void main(String[] args) throws Exception;
188
}
189
```
190
191
**Usage Examples:**
192
193
```java
194
// Run TPC-H Query 3
195
String[] args = {
196
"--lineitem", "/path/to/lineitem.tbl",
197
"--customer", "/path/to/customer.tbl",
198
"--orders", "/path/to/orders.tbl",
199
"--output", "/path/to/query3_results"
200
};
201
TPCHQuery3.main(args);
202
203
// Use TPC-H data types in custom queries
204
TPCHQuery3.Customer customer = new TPCHQuery3.Customer(1L, "BUILDING");
205
TPCHQuery3.Order order = new TPCHQuery3.Order(1L, 1L, "1-URGENT", 0L);
206
TPCHQuery3.Lineitem item = new TPCHQuery3.Lineitem(1L, 25000.0, 0.05, "1995-03-15");
207
208
// Calculate revenue
209
double revenue = item.getExtendedprice() * (1.0 - item.getDiscount());
210
```
211
212
### Accumulator Examples
213
214
Demonstration of custom accumulators for collecting metrics during job execution.
215
216
```java { .api }
217
/**
218
* Example using custom accumulators to count empty fields in data processing.
219
* Usage: EmptyFieldsCountAccumulator --input <path> --output <path>
220
*/
221
public class EmptyFieldsCountAccumulator {
222
public static final String EMPTY_FIELD_ACCUMULATOR = "empty-fields";
223
224
public static void main(final String[] args) throws Exception;
225
226
/**
227
* String triple data type for processing
228
*/
229
public static class StringTriple extends Tuple3<String, String, String> {
230
public StringTriple();
231
public StringTriple(String first, String second, String third);
232
233
public String getFirst();
234
public String getSecond();
235
public String getThird();
236
237
public void setFirst(String first);
238
public void setSecond(String second);
239
public void setThird(String third);
240
}
241
242
/**
243
* Filter that counts empty fields using accumulator
244
*/
245
public static final class EmptyFieldFilter extends RichFilterFunction<StringTriple> {
246
private VectorAccumulator emptyFieldCounter;
247
248
/**
249
* Initialize accumulator
250
* @param parameters Configuration parameters
251
*/
252
@Override
253
public void open(Configuration parameters) throws Exception;
254
255
/**
256
* Filter records and count empty fields
257
* @param value Input string triple
258
* @return true to keep record, false to filter out
259
*/
260
@Override
261
public boolean filter(StringTriple value) throws Exception;
262
}
263
264
/**
265
* Custom vector accumulator for collecting integer lists
266
*/
267
public static class VectorAccumulator implements Accumulator<Integer, ArrayList<Integer>> {
268
private ArrayList<Integer> localValue = new ArrayList<>();
269
270
/**
271
* Add value to accumulator
272
* @param value Value to add
273
*/
274
@Override
275
public void add(Integer value);
276
277
/**
278
* Get local accumulated value
279
* @return Local accumulator value
280
*/
281
@Override
282
public ArrayList<Integer> getLocalValue();
283
284
/**
285
* Reset local accumulator
286
*/
287
@Override
288
public void resetLocal();
289
290
/**
291
* Merge accumulator values
292
* @param other Other accumulator to merge
293
*/
294
@Override
295
public void merge(Accumulator<Integer, ArrayList<Integer>> other);
296
297
/**
298
* Clone accumulator
299
* @return Cloned accumulator instance
300
*/
301
@Override
302
public Accumulator<Integer, ArrayList<Integer>> clone();
303
}
304
}
305
```
306
307
**Usage Examples:**
308
309
```java
310
// Run accumulator example
311
String[] args = {
312
"--input", "/path/to/data.txt",
313
"--output", "/path/to/filtered_output"
314
};
315
EmptyFieldsCountAccumulator.main(args);
316
317
// Use custom accumulator in job
318
DataSet<EmptyFieldsCountAccumulator.StringTriple> data = getStringTripleDataSet(env);
319
DataSet<EmptyFieldsCountAccumulator.StringTriple> filtered = data
320
.filter(new EmptyFieldsCountAccumulator.EmptyFieldFilter());
321
322
// Access accumulator results after execution
323
JobExecutionResult result = env.execute("Accumulator Job");
324
Map<String, Object> accumulatorResults = result.getAllAccumulatorResults();
325
ArrayList<Integer> emptyCounts = (ArrayList<Integer>) accumulatorResults
326
.get(EmptyFieldsCountAccumulator.EMPTY_FIELD_ACCUMULATOR);
327
```
328
329
### Relational Data Providers
330
331
Utility classes providing default relational datasets for testing and examples.
332
333
```java { .api }
334
/**
335
* Provides default web log data sets
336
*/
337
public class WebLogData {
338
/**
339
* Default document data as object arrays
340
*/
341
public static final Object[][] DOCUMENTS;
342
343
/**
344
* Default rank data as object arrays
345
*/
346
public static final Object[][] RANKS;
347
348
/**
349
* Default visit data as object arrays
350
*/
351
public static final Object[][] VISITS;
352
353
/**
354
* Creates DataSet with default document data
355
* @param env Execution environment
356
* @return DataSet containing default documents
357
*/
358
public static DataSet<Tuple2<String, String>> getDocumentDataSet(ExecutionEnvironment env);
359
360
/**
361
* Creates DataSet with default rank data
362
* @param env Execution environment
363
* @return DataSet containing default ranks
364
*/
365
public static DataSet<Tuple3<Integer, String, Integer>> getRankDataSet(ExecutionEnvironment env);
366
367
/**
368
* Creates DataSet with default visit data
369
* @param env Execution environment
370
* @return DataSet containing default visits
371
*/
372
public static DataSet<Tuple2<String, String>> getVisitDataSet(ExecutionEnvironment env);
373
}
374
375
/**
376
* Generates web log data files for testing
377
*/
378
public class WebLogDataGenerator {
379
public static void main(String[] args) throws Exception;
380
}
381
```
382
383
**Usage Examples:**
384
385
```java
386
// Use default web log data
387
import org.apache.flink.examples.java.relational.util.WebLogData;
388
389
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
390
DataSet<Tuple2<String, String>> documents = WebLogData.getDocumentDataSet(env);
391
DataSet<Tuple3<Integer, String, Integer>> ranks = WebLogData.getRankDataSet(env);
392
DataSet<Tuple2<String, String>> visits = WebLogData.getVisitDataSet(env);
393
394
// Generate custom web log data
395
String[] generatorArgs = {
396
"--output", "/path/to/weblog_data",
397
"--documents", "1000",
398
"--visits", "5000"
399
};
400
WebLogDataGenerator.main(generatorArgs);
401
```
402
403
## Common Relational Processing Patterns
404
405
### Filtering and Selection
406
407
Standard filtering patterns used across relational examples:
408
409
```java
410
// Document filtering by keyword
411
DataSet<Tuple2<String, String>> documents = getDocumentDataSet(env);
412
DataSet<Tuple2<String, String>> keywordDocs = documents
413
.filter(new WebLogAnalysis.FilterDocByKeyWords());
414
415
// Rank-based filtering
416
DataSet<Tuple3<Integer, String, Integer>> ranks = getRankDataSet(env);
417
DataSet<Tuple3<Integer, String, Integer>> highRanks = ranks
418
.filter(new WebLogAnalysis.FilterByRank());
419
420
// Date-based filtering
421
DataSet<Tuple2<String, String>> visits = getVisitDataSet(env);
422
DataSet<Tuple2<String, String>> recentVisits = visits
423
.filter(new WebLogAnalysis.FilterVisitsByDate());
424
```
425
426
### Joining and Co-grouping
427
428
Join operations for combining related datasets:
429
430
```java
431
// Inner join example
432
DataSet<TPCHQuery3.Customer> customers = getCustomerDataSet(env);
433
DataSet<TPCHQuery3.Order> orders = getOrderDataSet(env);
434
435
DataSet<Tuple2<TPCHQuery3.Customer, TPCHQuery3.Order>> joined = customers
436
.join(orders)
437
.where("custkey")
438
.equalTo("custkey");
439
440
// Anti-join using co-group
441
DataSet<Tuple2<String, String>> ranks = getRankDataSet(env);
442
DataSet<Tuple2<String, String>> visits = getVisitDataSet(env);
443
444
DataSet<Tuple2<String, String>> antiJoined = ranks
445
.coGroup(visits)
446
.where(1)
447
.equalTo(0)
448
.with(new WebLogAnalysis.AntiJoinVisits());
449
```
450
451
### TPC-H Data Format Requirements
452
453
TPC-H examples expect pipe-delimited files:
454
455
**Customer table format:**
456
```
457
1|Customer#000000001|IVhzIApeRb ot,c,E|15|25-989-741-2988|711.56|BUILDING|to the even...
458
2|Customer#000000002|XSTf4,NCwDVaWNE6tEgvwfmRchLXak|13|23-768-687-3665|121.65|AUTOMOBILE|l accounts...
459
```
460
461
**Orders table format:**
462
```
463
1|36901|O|173665.47|1996-01-02|5-LOW|Clerk#000000951|0|nstructions of fi|
464
2|78002|O|46929.18|1996-12-01|1-URGENT|Clerk#000000880|0|foxes. pending|
465
```
466
467
### Accumulator Usage Pattern
468
469
Standard pattern for using custom accumulators:
470
471
```java
472
// Register accumulator in open() method
473
private MyAccumulator myCounter;
474
475
@Override
476
public void open(Configuration parameters) throws Exception {
477
this.myCounter = getRuntimeContext().getAccumulator("my-counter");
478
}
479
480
// Use accumulator in user function
481
@Override
482
public boolean filter(MyType value) throws Exception {
483
if (someCondition(value)) {
484
myCounter.add(1);
485
return true;
486
}
487
return false;
488
}
489
490
// Access results after job execution
491
JobExecutionResult result = env.execute("My Job");
492
Object accumulatorValue = result.getAccumulatorResult("my-counter");
493
```
494
495
## Types
496
497
### Relational Data Types
498
499
```java { .api }
500
// Web log tuples
501
Tuple2<String, String> document = new Tuple2<>("url", "content");
502
Tuple3<Integer, String, Integer> rank = new Tuple3<>(50, "url", 1000);
503
Tuple2<String, String> visit = new Tuple2<>("url", "2023-01-01");
504
505
// TPC-H business objects
506
TPCHQuery3.Customer customer = new TPCHQuery3.Customer(1L, "BUILDING");
507
TPCHQuery3.Order order = new TPCHQuery3.Order(1L, 1L, "1-URGENT", 0L);
508
TPCHQuery3.Lineitem lineitem = new TPCHQuery3.Lineitem(1L, 1000.0, 0.05, "1995-03-15");
509
510
// Accumulator types
511
EmptyFieldsCountAccumulator.StringTriple triple = new EmptyFieldsCountAccumulator.StringTriple("a", "", "c");
512
ArrayList<Integer> accumulatorResult = new ArrayList<>();
513
```