or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

clustering.mddistributed-copy.mdgraph-processing.mdindex.mdmisc-examples.mdrelational-processing.mdword-count.md
tile.json

relational-processing.mddocs/

Relational Processing

SQL-like operations and analytics including web log analysis, TPC-H benchmark queries, and accumulator examples. Features filtering, joining, grouping, and custom metrics collection patterns.

Capabilities

Web Log Analysis

Analytical processing of web server logs with filtering, joining, and aggregation operations.

/**
 * Web log analysis demonstrating relational operations on log data.
 * Usage: WebLogAnalysis --documents <path> --ranks <path> --visits <path> --output <path>
 */
public class WebLogAnalysis {
    public static void main(String[] args) throws Exception;
    
    /**
     * Filters documents by keyword content
     */
    public static class FilterDocByKeyWords 
            implements FilterFunction<Tuple2<String, String>> {
        /**
         * Checks if document contains specified keywords
         * @param value Tuple (document_url, content)
         * @return true if document contains keywords, false otherwise
         */
        public boolean filter(Tuple2<String, String> value);
    }
    
    /**
     * Filters entries by rank threshold
     */
    public static class FilterByRank 
            implements FilterFunction<Tuple3<Integer, String, Integer>> {
        /**
         * Checks if rank exceeds threshold
         * @param value Tuple (rank, url, visitor_count)
         * @return true if rank above threshold, false otherwise
         */
        public boolean filter(Tuple3<Integer, String, Integer> value);
    }
    
    /**
     * Filters visits by date criteria
     */
    public static class FilterVisitsByDate 
            implements FilterFunction<Tuple2<String, String>> {
        /**
         * Checks if visit meets date criteria
         * @param value Tuple (url, visit_date)
         * @return true if visit matches date filter, false otherwise
         */
        public boolean filter(Tuple2<String, String> value);
    }
    
    /**
     * Anti-join operation for excluding visits
     */
    public static class AntiJoinVisits 
            implements CoGroupFunction<
                    Tuple2<String, String>, 
                    Tuple2<String, String>, 
                    Tuple2<String, String>> {
        /**
         * Performs anti-join to exclude certain visits
         * @param ranks Iterator of rank data
         * @param visits Iterator of visit data
         * @param out Collector for anti-join results
         */
        public void coGroup(
                Iterable<Tuple2<String, String>> ranks,
                Iterable<Tuple2<String, String>> visits, 
                Collector<Tuple2<String, String>> out);
    }
}

Usage Examples:

// Run web log analysis with custom data
String[] args = {
    "--documents", "/path/to/documents.txt",
    "--ranks", "/path/to/ranks.txt", 
    "--visits", "/path/to/visits.txt",
    "--output", "/path/to/output"
};
WebLogAnalysis.main(args);

// Use web log filters in custom analysis
DataSet<Tuple2<String, String>> documents = getDocumentDataSet(env);
DataSet<Tuple2<String, String>> filtered = documents
    .filter(new WebLogAnalysis.FilterDocByKeyWords());

DataSet<Tuple3<Integer, String, Integer>> ranks = getRankDataSet(env);
DataSet<Tuple3<Integer, String, Integer>> highRanks = ranks
    .filter(new WebLogAnalysis.FilterByRank());

TPC-H Benchmark Queries

Implementation of TPC-H decision support benchmark queries demonstrating complex relational operations.

/**
 * TPC-H Query 3: Shipping Priority Query
 * Usage: TPCHQuery3 --lineitem <path> --customer <path> --orders <path> --output <path>
 */
public class TPCHQuery3 {
    public static void main(String[] args) throws Exception;
    
    /**
     * Line item record from TPC-H schema
     */
    public static class Lineitem extends Tuple4<Long, Double, Double, String> {
        public Lineitem();
        public Lineitem(Long orderkey, Double extendedprice, Double discount, String shipdate);
        
        public Long getOrderkey();
        public Double getExtendedprice();
        public Double getDiscount();
        public String getShipdate();
        
        public void setOrderkey(Long orderkey);
        public void setExtendedprice(Double extendedprice);
        public void setDiscount(Double discount);
        public void setShipdate(String shipdate);
    }
    
    /**
     * Customer record from TPC-H schema
     */
    public static class Customer extends Tuple2<Long, String> {
        public Customer();
        public Customer(Long custkey, String mktsegment);
        
        public Long getCustkey();
        public String getMktsegment();
        
        public void setCustkey(Long custkey);
        public void setMktsegment(String mktsegment);
    }
    
    /**
     * Order record from TPC-H schema
     */
    public static class Order extends Tuple4<Long, Long, String, Long> {
        public Order();
        public Order(Long orderkey, Long custkey, String orderpriority, Long shippriority);
        
        public Long getOrderkey();
        public Long getCustkey();
        public String getOrderpriority();
        public Long getShippriority();
        
        public void setOrderkey(Long orderkey);
        public void setCustkey(Long custkey);
        public void setOrderpriority(String orderpriority);
        public void setShippriority(Long shippriority);
    }
    
    /**
     * Result record for shipping priority query
     */
    public static class ShippingPriorityItem extends Tuple4<Long, Double, String, Long> {
        public ShippingPriorityItem();
        public ShippingPriorityItem(Long orderkey, Double revenue, String orderdate, Long shippriority);
        
        public Long getOrderkey();
        public Double getRevenue();
        public String getOrderdate();
        public Long getShippriority();
        
        public void setOrderkey(Long orderkey);
        public void setRevenue(Double revenue);
        public void setOrderdate(String orderdate);
        public void setShippriority(Long shippriority);
    }
}

/**
 * TPC-H Query 10: Customer Return Query
 * Usage: TPCHQuery10 --customer <path> --orders <path> --lineitem <path> --nation <path> --output <path>
 */
public class TPCHQuery10 {
    public static void main(String[] args) throws Exception;
}

Usage Examples:

// Run TPC-H Query 3
String[] args = {
    "--lineitem", "/path/to/lineitem.tbl",
    "--customer", "/path/to/customer.tbl",
    "--orders", "/path/to/orders.tbl", 
    "--output", "/path/to/query3_results"
};
TPCHQuery3.main(args);

// Use TPC-H data types in custom queries
TPCHQuery3.Customer customer = new TPCHQuery3.Customer(1L, "BUILDING");
TPCHQuery3.Order order = new TPCHQuery3.Order(1L, 1L, "1-URGENT", 0L);
TPCHQuery3.Lineitem item = new TPCHQuery3.Lineitem(1L, 25000.0, 0.05, "1995-03-15");

// Calculate revenue
double revenue = item.getExtendedprice() * (1.0 - item.getDiscount());

Accumulator Examples

Demonstration of custom accumulators for collecting metrics during job execution.

/**
 * Example using custom accumulators to count empty fields in data processing.
 * Usage: EmptyFieldsCountAccumulator --input <path> --output <path>
 */
public class EmptyFieldsCountAccumulator {
    public static final String EMPTY_FIELD_ACCUMULATOR = "empty-fields";
    
    public static void main(final String[] args) throws Exception;
    
    /**
     * String triple data type for processing
     */
    public static class StringTriple extends Tuple3<String, String, String> {
        public StringTriple();
        public StringTriple(String first, String second, String third);
        
        public String getFirst();
        public String getSecond();
        public String getThird();
        
        public void setFirst(String first);
        public void setSecond(String second);
        public void setThird(String third);
    }
    
    /**
     * Filter that counts empty fields using accumulator
     */
    public static final class EmptyFieldFilter extends RichFilterFunction<StringTriple> {
        private VectorAccumulator emptyFieldCounter;
        
        /**
         * Initialize accumulator
         * @param parameters Configuration parameters
         */
        @Override
        public void open(Configuration parameters) throws Exception;
        
        /**
         * Filter records and count empty fields
         * @param value Input string triple
         * @return true to keep record, false to filter out
         */
        @Override
        public boolean filter(StringTriple value) throws Exception;
    }
    
    /**
     * Custom vector accumulator for collecting integer lists
     */
    public static class VectorAccumulator implements Accumulator<Integer, ArrayList<Integer>> {
        private ArrayList<Integer> localValue = new ArrayList<>();
        
        /**
         * Add value to accumulator
         * @param value Value to add
         */
        @Override
        public void add(Integer value);
        
        /**
         * Get local accumulated value
         * @return Local accumulator value
         */
        @Override
        public ArrayList<Integer> getLocalValue();
        
        /**
         * Reset local accumulator
         */
        @Override
        public void resetLocal();
        
        /**
         * Merge accumulator values
         * @param other Other accumulator to merge
         */  
        @Override
        public void merge(Accumulator<Integer, ArrayList<Integer>> other);
        
        /**
         * Clone accumulator
         * @return Cloned accumulator instance
         */
        @Override
        public Accumulator<Integer, ArrayList<Integer>> clone();
    }
}

Usage Examples:

// Run accumulator example
String[] args = {
    "--input", "/path/to/data.txt",
    "--output", "/path/to/filtered_output"
};
EmptyFieldsCountAccumulator.main(args);

// Use custom accumulator in job
DataSet<EmptyFieldsCountAccumulator.StringTriple> data = getStringTripleDataSet(env);
DataSet<EmptyFieldsCountAccumulator.StringTriple> filtered = data
    .filter(new EmptyFieldsCountAccumulator.EmptyFieldFilter());

// Access accumulator results after execution
JobExecutionResult result = env.execute("Accumulator Job");
Map<String, Object> accumulatorResults = result.getAllAccumulatorResults();
ArrayList<Integer> emptyCounts = (ArrayList<Integer>) accumulatorResults
    .get(EmptyFieldsCountAccumulator.EMPTY_FIELD_ACCUMULATOR);

Relational Data Providers

Utility classes providing default relational datasets for testing and examples.

/**
 * Provides default web log data sets
 */
public class WebLogData {
    /**
     * Default document data as object arrays
     */
    public static final Object[][] DOCUMENTS;
    
    /**
     * Default rank data as object arrays
     */
    public static final Object[][] RANKS;
    
    /**
     * Default visit data as object arrays
     */
    public static final Object[][] VISITS;
    
    /**
     * Creates DataSet with default document data
     * @param env Execution environment
     * @return DataSet containing default documents
     */
    public static DataSet<Tuple2<String, String>> getDocumentDataSet(ExecutionEnvironment env);
    
    /**
     * Creates DataSet with default rank data
     * @param env Execution environment
     * @return DataSet containing default ranks
     */
    public static DataSet<Tuple3<Integer, String, Integer>> getRankDataSet(ExecutionEnvironment env);
    
    /**
     * Creates DataSet with default visit data
     * @param env Execution environment
     * @return DataSet containing default visits
     */
    public static DataSet<Tuple2<String, String>> getVisitDataSet(ExecutionEnvironment env);
}

/**
 * Generates web log data files for testing
 */
public class WebLogDataGenerator {
    public static void main(String[] args) throws Exception;
}

Usage Examples:

// Use default web log data
import org.apache.flink.examples.java.relational.util.WebLogData;

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Tuple2<String, String>> documents = WebLogData.getDocumentDataSet(env);
DataSet<Tuple3<Integer, String, Integer>> ranks = WebLogData.getRankDataSet(env);
DataSet<Tuple2<String, String>> visits = WebLogData.getVisitDataSet(env);

// Generate custom web log data
String[] generatorArgs = {
    "--output", "/path/to/weblog_data",
    "--documents", "1000",
    "--visits", "5000"
};
WebLogDataGenerator.main(generatorArgs);

Common Relational Processing Patterns

Filtering and Selection

Standard filtering patterns used across relational examples:

// Document filtering by keyword
DataSet<Tuple2<String, String>> documents = getDocumentDataSet(env);
DataSet<Tuple2<String, String>> keywordDocs = documents
    .filter(new WebLogAnalysis.FilterDocByKeyWords());

// Rank-based filtering
DataSet<Tuple3<Integer, String, Integer>> ranks = getRankDataSet(env);
DataSet<Tuple3<Integer, String, Integer>> highRanks = ranks
    .filter(new WebLogAnalysis.FilterByRank());

// Date-based filtering
DataSet<Tuple2<String, String>> visits = getVisitDataSet(env);
DataSet<Tuple2<String, String>> recentVisits = visits
    .filter(new WebLogAnalysis.FilterVisitsByDate());

Joining and Co-grouping

Join operations for combining related datasets:

// Inner join example
DataSet<TPCHQuery3.Customer> customers = getCustomerDataSet(env);
DataSet<TPCHQuery3.Order> orders = getOrderDataSet(env);

DataSet<Tuple2<TPCHQuery3.Customer, TPCHQuery3.Order>> joined = customers
    .join(orders)
    .where("custkey")
    .equalTo("custkey");

// Anti-join using co-group
DataSet<Tuple2<String, String>> ranks = getRankDataSet(env);
DataSet<Tuple2<String, String>> visits = getVisitDataSet(env);

DataSet<Tuple2<String, String>> antiJoined = ranks
    .coGroup(visits)
    .where(1)
    .equalTo(0)
    .with(new WebLogAnalysis.AntiJoinVisits());

TPC-H Data Format Requirements

TPC-H examples expect pipe-delimited files:

Customer table format:

1|Customer#000000001|IVhzIApeRb ot,c,E|15|25-989-741-2988|711.56|BUILDING|to the even...
2|Customer#000000002|XSTf4,NCwDVaWNE6tEgvwfmRchLXak|13|23-768-687-3665|121.65|AUTOMOBILE|l accounts...

Orders table format:

1|36901|O|173665.47|1996-01-02|5-LOW|Clerk#000000951|0|nstructions of fi|
2|78002|O|46929.18|1996-12-01|1-URGENT|Clerk#000000880|0|foxes. pending|

Accumulator Usage Pattern

Standard pattern for using custom accumulators:

// Register accumulator in open() method
private MyAccumulator myCounter;

@Override
public void open(Configuration parameters) throws Exception {
    this.myCounter = getRuntimeContext().getAccumulator("my-counter");
}

// Use accumulator in user function
@Override
public boolean filter(MyType value) throws Exception {
    if (someCondition(value)) {
        myCounter.add(1);
        return true;
    }
    return false;
}

// Access results after job execution
JobExecutionResult result = env.execute("My Job");
Object accumulatorValue = result.getAccumulatorResult("my-counter");

Types

Relational Data Types

// Web log tuples
Tuple2<String, String> document = new Tuple2<>("url", "content");
Tuple3<Integer, String, Integer> rank = new Tuple3<>(50, "url", 1000);
Tuple2<String, String> visit = new Tuple2<>("url", "2023-01-01");

// TPC-H business objects
TPCHQuery3.Customer customer = new TPCHQuery3.Customer(1L, "BUILDING");
TPCHQuery3.Order order = new TPCHQuery3.Order(1L, 1L, "1-URGENT", 0L);
TPCHQuery3.Lineitem lineitem = new TPCHQuery3.Lineitem(1L, 1000.0, 0.05, "1995-03-15");

// Accumulator types
EmptyFieldsCountAccumulator.StringTriple triple = new EmptyFieldsCountAccumulator.StringTriple("a", "", "c");
ArrayList<Integer> accumulatorResult = new ArrayList<>();