SQL-like operations and analytics including web log analysis, TPC-H benchmark queries, and accumulator examples. Features filtering, joining, grouping, and custom metrics collection patterns.
Analytical processing of web server logs with filtering, joining, and aggregation operations.
/**
* Web log analysis demonstrating relational operations on log data.
* Usage: WebLogAnalysis --documents <path> --ranks <path> --visits <path> --output <path>
*/
public class WebLogAnalysis {
public static void main(String[] args) throws Exception;
/**
* Filters documents by keyword content
*/
public static class FilterDocByKeyWords
implements FilterFunction<Tuple2<String, String>> {
/**
* Checks if document contains specified keywords
* @param value Tuple (document_url, content)
* @return true if document contains keywords, false otherwise
*/
public boolean filter(Tuple2<String, String> value);
}
/**
* Filters entries by rank threshold
*/
public static class FilterByRank
implements FilterFunction<Tuple3<Integer, String, Integer>> {
/**
* Checks if rank exceeds threshold
* @param value Tuple (rank, url, visitor_count)
* @return true if rank above threshold, false otherwise
*/
public boolean filter(Tuple3<Integer, String, Integer> value);
}
/**
* Filters visits by date criteria
*/
public static class FilterVisitsByDate
implements FilterFunction<Tuple2<String, String>> {
/**
* Checks if visit meets date criteria
* @param value Tuple (url, visit_date)
* @return true if visit matches date filter, false otherwise
*/
public boolean filter(Tuple2<String, String> value);
}
/**
* Anti-join operation for excluding visits
*/
public static class AntiJoinVisits
implements CoGroupFunction<
Tuple2<String, String>,
Tuple2<String, String>,
Tuple2<String, String>> {
/**
* Performs anti-join to exclude certain visits
* @param ranks Iterator of rank data
* @param visits Iterator of visit data
* @param out Collector for anti-join results
*/
public void coGroup(
Iterable<Tuple2<String, String>> ranks,
Iterable<Tuple2<String, String>> visits,
Collector<Tuple2<String, String>> out);
}
}Usage Examples:
// Run web log analysis with custom data
String[] args = {
"--documents", "/path/to/documents.txt",
"--ranks", "/path/to/ranks.txt",
"--visits", "/path/to/visits.txt",
"--output", "/path/to/output"
};
WebLogAnalysis.main(args);
// Use web log filters in custom analysis
DataSet<Tuple2<String, String>> documents = getDocumentDataSet(env);
DataSet<Tuple2<String, String>> filtered = documents
.filter(new WebLogAnalysis.FilterDocByKeyWords());
DataSet<Tuple3<Integer, String, Integer>> ranks = getRankDataSet(env);
DataSet<Tuple3<Integer, String, Integer>> highRanks = ranks
.filter(new WebLogAnalysis.FilterByRank());Implementation of TPC-H decision support benchmark queries demonstrating complex relational operations.
/**
* TPC-H Query 3: Shipping Priority Query
* Usage: TPCHQuery3 --lineitem <path> --customer <path> --orders <path> --output <path>
*/
public class TPCHQuery3 {
public static void main(String[] args) throws Exception;
/**
* Line item record from TPC-H schema
*/
public static class Lineitem extends Tuple4<Long, Double, Double, String> {
public Lineitem();
public Lineitem(Long orderkey, Double extendedprice, Double discount, String shipdate);
public Long getOrderkey();
public Double getExtendedprice();
public Double getDiscount();
public String getShipdate();
public void setOrderkey(Long orderkey);
public void setExtendedprice(Double extendedprice);
public void setDiscount(Double discount);
public void setShipdate(String shipdate);
}
/**
* Customer record from TPC-H schema
*/
public static class Customer extends Tuple2<Long, String> {
public Customer();
public Customer(Long custkey, String mktsegment);
public Long getCustkey();
public String getMktsegment();
public void setCustkey(Long custkey);
public void setMktsegment(String mktsegment);
}
/**
* Order record from TPC-H schema
*/
public static class Order extends Tuple4<Long, Long, String, Long> {
public Order();
public Order(Long orderkey, Long custkey, String orderpriority, Long shippriority);
public Long getOrderkey();
public Long getCustkey();
public String getOrderpriority();
public Long getShippriority();
public void setOrderkey(Long orderkey);
public void setCustkey(Long custkey);
public void setOrderpriority(String orderpriority);
public void setShippriority(Long shippriority);
}
/**
* Result record for shipping priority query
*/
public static class ShippingPriorityItem extends Tuple4<Long, Double, String, Long> {
public ShippingPriorityItem();
public ShippingPriorityItem(Long orderkey, Double revenue, String orderdate, Long shippriority);
public Long getOrderkey();
public Double getRevenue();
public String getOrderdate();
public Long getShippriority();
public void setOrderkey(Long orderkey);
public void setRevenue(Double revenue);
public void setOrderdate(String orderdate);
public void setShippriority(Long shippriority);
}
}
/**
* TPC-H Query 10: Customer Return Query
* Usage: TPCHQuery10 --customer <path> --orders <path> --lineitem <path> --nation <path> --output <path>
*/
public class TPCHQuery10 {
public static void main(String[] args) throws Exception;
}Usage Examples:
// Run TPC-H Query 3
String[] args = {
"--lineitem", "/path/to/lineitem.tbl",
"--customer", "/path/to/customer.tbl",
"--orders", "/path/to/orders.tbl",
"--output", "/path/to/query3_results"
};
TPCHQuery3.main(args);
// Use TPC-H data types in custom queries
TPCHQuery3.Customer customer = new TPCHQuery3.Customer(1L, "BUILDING");
TPCHQuery3.Order order = new TPCHQuery3.Order(1L, 1L, "1-URGENT", 0L);
TPCHQuery3.Lineitem item = new TPCHQuery3.Lineitem(1L, 25000.0, 0.05, "1995-03-15");
// Calculate revenue
double revenue = item.getExtendedprice() * (1.0 - item.getDiscount());Demonstration of custom accumulators for collecting metrics during job execution.
/**
* Example using custom accumulators to count empty fields in data processing.
* Usage: EmptyFieldsCountAccumulator --input <path> --output <path>
*/
public class EmptyFieldsCountAccumulator {
public static final String EMPTY_FIELD_ACCUMULATOR = "empty-fields";
public static void main(final String[] args) throws Exception;
/**
* String triple data type for processing
*/
public static class StringTriple extends Tuple3<String, String, String> {
public StringTriple();
public StringTriple(String first, String second, String third);
public String getFirst();
public String getSecond();
public String getThird();
public void setFirst(String first);
public void setSecond(String second);
public void setThird(String third);
}
/**
* Filter that counts empty fields using accumulator
*/
public static final class EmptyFieldFilter extends RichFilterFunction<StringTriple> {
private VectorAccumulator emptyFieldCounter;
/**
* Initialize accumulator
* @param parameters Configuration parameters
*/
@Override
public void open(Configuration parameters) throws Exception;
/**
* Filter records and count empty fields
* @param value Input string triple
* @return true to keep record, false to filter out
*/
@Override
public boolean filter(StringTriple value) throws Exception;
}
/**
* Custom vector accumulator for collecting integer lists
*/
public static class VectorAccumulator implements Accumulator<Integer, ArrayList<Integer>> {
private ArrayList<Integer> localValue = new ArrayList<>();
/**
* Add value to accumulator
* @param value Value to add
*/
@Override
public void add(Integer value);
/**
* Get local accumulated value
* @return Local accumulator value
*/
@Override
public ArrayList<Integer> getLocalValue();
/**
* Reset local accumulator
*/
@Override
public void resetLocal();
/**
* Merge accumulator values
* @param other Other accumulator to merge
*/
@Override
public void merge(Accumulator<Integer, ArrayList<Integer>> other);
/**
* Clone accumulator
* @return Cloned accumulator instance
*/
@Override
public Accumulator<Integer, ArrayList<Integer>> clone();
}
}Usage Examples:
// Run accumulator example
String[] args = {
"--input", "/path/to/data.txt",
"--output", "/path/to/filtered_output"
};
EmptyFieldsCountAccumulator.main(args);
// Use custom accumulator in job
DataSet<EmptyFieldsCountAccumulator.StringTriple> data = getStringTripleDataSet(env);
DataSet<EmptyFieldsCountAccumulator.StringTriple> filtered = data
.filter(new EmptyFieldsCountAccumulator.EmptyFieldFilter());
// Access accumulator results after execution
JobExecutionResult result = env.execute("Accumulator Job");
Map<String, Object> accumulatorResults = result.getAllAccumulatorResults();
ArrayList<Integer> emptyCounts = (ArrayList<Integer>) accumulatorResults
.get(EmptyFieldsCountAccumulator.EMPTY_FIELD_ACCUMULATOR);Utility classes providing default relational datasets for testing and examples.
/**
* Provides default web log data sets
*/
public class WebLogData {
/**
* Default document data as object arrays
*/
public static final Object[][] DOCUMENTS;
/**
* Default rank data as object arrays
*/
public static final Object[][] RANKS;
/**
* Default visit data as object arrays
*/
public static final Object[][] VISITS;
/**
* Creates DataSet with default document data
* @param env Execution environment
* @return DataSet containing default documents
*/
public static DataSet<Tuple2<String, String>> getDocumentDataSet(ExecutionEnvironment env);
/**
* Creates DataSet with default rank data
* @param env Execution environment
* @return DataSet containing default ranks
*/
public static DataSet<Tuple3<Integer, String, Integer>> getRankDataSet(ExecutionEnvironment env);
/**
* Creates DataSet with default visit data
* @param env Execution environment
* @return DataSet containing default visits
*/
public static DataSet<Tuple2<String, String>> getVisitDataSet(ExecutionEnvironment env);
}
/**
* Generates web log data files for testing
*/
public class WebLogDataGenerator {
public static void main(String[] args) throws Exception;
}Usage Examples:
// Use default web log data
import org.apache.flink.examples.java.relational.util.WebLogData;
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<String, String>> documents = WebLogData.getDocumentDataSet(env);
DataSet<Tuple3<Integer, String, Integer>> ranks = WebLogData.getRankDataSet(env);
DataSet<Tuple2<String, String>> visits = WebLogData.getVisitDataSet(env);
// Generate custom web log data
String[] generatorArgs = {
"--output", "/path/to/weblog_data",
"--documents", "1000",
"--visits", "5000"
};
WebLogDataGenerator.main(generatorArgs);Standard filtering patterns used across relational examples:
// Document filtering by keyword
DataSet<Tuple2<String, String>> documents = getDocumentDataSet(env);
DataSet<Tuple2<String, String>> keywordDocs = documents
.filter(new WebLogAnalysis.FilterDocByKeyWords());
// Rank-based filtering
DataSet<Tuple3<Integer, String, Integer>> ranks = getRankDataSet(env);
DataSet<Tuple3<Integer, String, Integer>> highRanks = ranks
.filter(new WebLogAnalysis.FilterByRank());
// Date-based filtering
DataSet<Tuple2<String, String>> visits = getVisitDataSet(env);
DataSet<Tuple2<String, String>> recentVisits = visits
.filter(new WebLogAnalysis.FilterVisitsByDate());Join operations for combining related datasets:
// Inner join example
DataSet<TPCHQuery3.Customer> customers = getCustomerDataSet(env);
DataSet<TPCHQuery3.Order> orders = getOrderDataSet(env);
DataSet<Tuple2<TPCHQuery3.Customer, TPCHQuery3.Order>> joined = customers
.join(orders)
.where("custkey")
.equalTo("custkey");
// Anti-join using co-group
DataSet<Tuple2<String, String>> ranks = getRankDataSet(env);
DataSet<Tuple2<String, String>> visits = getVisitDataSet(env);
DataSet<Tuple2<String, String>> antiJoined = ranks
.coGroup(visits)
.where(1)
.equalTo(0)
.with(new WebLogAnalysis.AntiJoinVisits());TPC-H examples expect pipe-delimited files:
Customer table format:
1|Customer#000000001|IVhzIApeRb ot,c,E|15|25-989-741-2988|711.56|BUILDING|to the even...
2|Customer#000000002|XSTf4,NCwDVaWNE6tEgvwfmRchLXak|13|23-768-687-3665|121.65|AUTOMOBILE|l accounts...Orders table format:
1|36901|O|173665.47|1996-01-02|5-LOW|Clerk#000000951|0|nstructions of fi|
2|78002|O|46929.18|1996-12-01|1-URGENT|Clerk#000000880|0|foxes. pending|Standard pattern for using custom accumulators:
// Register accumulator in open() method
private MyAccumulator myCounter;
@Override
public void open(Configuration parameters) throws Exception {
this.myCounter = getRuntimeContext().getAccumulator("my-counter");
}
// Use accumulator in user function
@Override
public boolean filter(MyType value) throws Exception {
if (someCondition(value)) {
myCounter.add(1);
return true;
}
return false;
}
// Access results after job execution
JobExecutionResult result = env.execute("My Job");
Object accumulatorValue = result.getAccumulatorResult("my-counter");// Web log tuples
Tuple2<String, String> document = new Tuple2<>("url", "content");
Tuple3<Integer, String, Integer> rank = new Tuple3<>(50, "url", 1000);
Tuple2<String, String> visit = new Tuple2<>("url", "2023-01-01");
// TPC-H business objects
TPCHQuery3.Customer customer = new TPCHQuery3.Customer(1L, "BUILDING");
TPCHQuery3.Order order = new TPCHQuery3.Order(1L, 1L, "1-URGENT", 0L);
TPCHQuery3.Lineitem lineitem = new TPCHQuery3.Lineitem(1L, 1000.0, 0.05, "1995-03-15");
// Accumulator types
EmptyFieldsCountAccumulator.StringTriple triple = new EmptyFieldsCountAccumulator.StringTriple("a", "", "c");
ArrayList<Integer> accumulatorResult = new ArrayList<>();