CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-phoenix--phoenix-core

Apache Phoenix Core library providing SQL-on-HBase functionality with JDBC connectivity, query compilation, and transaction support

Overview
Eval results
Files

mapreduce.mddocs/

MapReduce Integration

Phoenix provides comprehensive MapReduce integration enabling distributed processing of large datasets stored in Phoenix tables. The integration includes input/output formats, bulk loading tools, and utilities for efficient data processing workflows.

Core Imports

import org.apache.phoenix.mapreduce.*;
import org.apache.phoenix.mapreduce.bulkload.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.io.*;
import org.apache.phoenix.util.ColumnInfo;

Input/Output Formats

PhoenixInputFormat

MapReduce InputFormat for reading data from Phoenix tables with SQL-based filtering and projection.

public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWritable, T> {
    // Configuration methods
    public static void setInput(Job job, Class<? extends DBWritable> inputClass,
                               String tableName, String conditions)
    public static void setInput(Job job, Class<? extends DBWritable> inputClass,
                               String selectStatement)
    public static void setInput(Job job, Class<? extends DBWritable> inputClass,
                               String tableName, String conditions, String... fieldNames)

    // InputFormat implementation
    public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException
    public RecordReader<NullWritable, T> createRecordReader(InputSplit split, TaskAttemptContext context)
        throws IOException, InterruptedException

    // Phoenix-specific configuration
    public static void setBatchSize(Configuration configuration, long batchSize)
    public static void setSelectColumnList(Job job, String... columns)
    public static void setSchemaType(Configuration configuration, SchemaType schemaType)
}

PhoenixOutputFormat

MapReduce OutputFormat for writing data to Phoenix tables with automatic batching and transaction management.

public class PhoenixOutputFormat<T extends DBWritable> extends OutputFormat<NullWritable, T> {
    // Configuration methods
    public static void setOutput(Job job, String tableName, String... fieldNames)
    public static void setOutput(Job job, String tableName, List<String> fieldNames)

    // OutputFormat implementation
    public RecordWriter<NullWritable, T> getRecordWriter(TaskAttemptContext context)
        throws IOException, InterruptedException
    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException
    public OutputCommitter getOutputCommitter(TaskAttemptContext context)
        throws IOException, InterruptedException

    // Phoenix-specific configuration
    public static void setBatchSize(Configuration configuration, long batchSize)
    public static void setUpsertStatement(Job job, String upsertStatement)
}

Usage:

// Configure Phoenix input format
public class PhoenixMapReduceJob {
    public static void configureInputJob(Job job) throws IOException {
        // Set input configuration
        PhoenixInputFormat.setInput(job,
                                  EmployeeRecord.class,
                                  "employees",
                                  "department = 'ENGINEERING' AND salary > 50000",
                                  "id", "name", "salary", "department");

        // Optional: Configure batch size for better performance
        PhoenixInputFormat.setBatchSize(job.getConfiguration(), 5000);

        // Set mapper class
        job.setMapperClass(EmployeeProcessingMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
    }

    public static void configureOutputJob(Job job) throws IOException {
        // Set output configuration
        PhoenixOutputFormat.setOutput(job, "employee_summary", "department", "avg_salary", "employee_count");

        // Optional: Configure batch size
        PhoenixOutputFormat.setBatchSize(job.getConfiguration(), 1000);

        // Set reducer class
        job.setReducerClass(SummaryReducer.class);
        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(SummaryRecord.class);
    }
}

// DBWritable implementation for input records
public class EmployeeRecord implements DBWritable, Writable {
    private long id;
    private String name;
    private BigDecimal salary;
    private String department;

    // DBWritable implementation
    @Override
    public void readFields(ResultSet resultSet) throws SQLException {
        id = resultSet.getLong("id");
        name = resultSet.getString("name");
        salary = resultSet.getBigDecimal("salary");
        department = resultSet.getString("department");
    }

    @Override
    public void write(PreparedStatement statement) throws SQLException {
        statement.setLong(1, id);
        statement.setString(2, name);
        statement.setBigDecimal(3, salary);
        statement.setString(4, department);
    }

    // Writable implementation
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeLong(id);
        Text.writeString(out, name != null ? name : "");
        out.writeUTF(salary != null ? salary.toString() : "0");
        Text.writeString(out, department != null ? department : "");
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        id = in.readLong();
        name = Text.readString(in);
        salary = new BigDecimal(in.readUTF());
        department = Text.readString(in);
    }

    // Getters and setters
    public long getId() { return id; }
    public void setId(long id) { this.id = id; }
    public String getName() { return name; }
    public void setName(String name) { this.name = name; }
    public BigDecimal getSalary() { return salary; }
    public void setSalary(BigDecimal salary) { this.salary = salary; }
    public String getDepartment() { return department; }
    public void setDepartment(String department) { this.department = department; }
}

// DBWritable implementation for output records
public class SummaryRecord implements DBWritable, Writable {
    private String department;
    private BigDecimal avgSalary;
    private int employeeCount;

    @Override
    public void readFields(ResultSet resultSet) throws SQLException {
        department = resultSet.getString("department");
        avgSalary = resultSet.getBigDecimal("avg_salary");
        employeeCount = resultSet.getInt("employee_count");
    }

    @Override
    public void write(PreparedStatement statement) throws SQLException {
        statement.setString(1, department);
        statement.setBigDecimal(2, avgSalary);
        statement.setInt(3, employeeCount);
    }

    @Override
    public void write(DataOutput out) throws IOException {
        Text.writeString(out, department != null ? department : "");
        out.writeUTF(avgSalary != null ? avgSalary.toString() : "0");
        out.writeInt(employeeCount);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        department = Text.readString(in);
        avgSalary = new BigDecimal(in.readUTF());
        employeeCount = in.readInt();
    }

    // Getters and setters
    public String getDepartment() { return department; }
    public void setDepartment(String department) { this.department = department; }
    public BigDecimal getAvgSalary() { return avgSalary; }
    public void setAvgSalary(BigDecimal avgSalary) { this.avgSalary = avgSalary; }
    public int getEmployeeCount() { return employeeCount; }
    public void setEmployeeCount(int employeeCount) { this.employeeCount = employeeCount; }
}

Bulk Loading Tools

BulkLoadTool

Tool for efficient bulk loading of large datasets into Phoenix tables.

public class BulkLoadTool extends Configured implements Tool {
    // Main execution method
    public int run(String[] args) throws Exception

    // Configuration options
    public static class Options {
        public String getInputPath()
        public String getTableName()
        public String getZkQuorum()
        public char getFieldDelimiter()
        public char getQuoteChar()
        public char getEscapeChar()
        public String getArrayElementSeparator()
        public boolean isStrict()
        public List<ColumnInfo> getColumns()
    }

    // Bulk loading methods
    public static void bulkLoad(Configuration conf, String inputPath, String tableName,
                               List<ColumnInfo> columnInfos) throws Exception
    public static Job createBulkLoadJob(Configuration conf, String inputPath, String tableName,
                                      List<ColumnInfo> columnInfos) throws IOException
}

Usage:

// Bulk load CSV data into Phoenix table
public class BulkLoadExample {
    public static void performBulkLoad() throws Exception {
        Configuration conf = HBaseConfiguration.create();
        conf.set("hbase.zookeeper.quorum", "localhost:2181");

        // Define column information
        List<ColumnInfo> columnInfos = Arrays.asList(
            new ColumnInfo("id", Types.BIGINT),
            new ColumnInfo("name", Types.VARCHAR),
            new ColumnInfo("email", Types.VARCHAR),
            new ColumnInfo("salary", Types.DECIMAL),
            new ColumnInfo("department", Types.VARCHAR),
            new ColumnInfo("hire_date", Types.DATE)
        );

        // Configure decimal precision
        ColumnInfo salaryColumn = columnInfos.get(3);
        salaryColumn.setPrecision(10);
        salaryColumn.setScale(2);

        // Perform bulk load
        String inputPath = "hdfs://namenode:port/path/to/csv/files";
        String tableName = "employees";

        BulkLoadTool.bulkLoad(conf, inputPath, tableName, columnInfos);
        System.out.println("Bulk load completed successfully");
    }

    public static void performCustomBulkLoad() throws Exception {
        Configuration conf = HBaseConfiguration.create();

        // Create bulk load job with custom configuration
        List<ColumnInfo> columnInfos = createColumnInfos();
        Job job = BulkLoadTool.createBulkLoadJob(conf, "input/path", "target_table", columnInfos);

        // Customize job settings
        job.setJobName("Custom Phoenix Bulk Load");
        job.getConfiguration().set("phoenix.bulk.load.delimiter", "|");
        job.getConfiguration().set("phoenix.bulk.load.quote", "\"");
        job.getConfiguration().setBoolean("phoenix.bulk.load.strict", true);

        // Submit and wait for completion
        boolean success = job.waitForCompletion(true);
        if (success) {
            System.out.println("Bulk load job completed successfully");
        } else {
            System.err.println("Bulk load job failed");
        }
    }

    private static List<ColumnInfo> createColumnInfos() {
        return Arrays.asList(
            new ColumnInfo("transaction_id", Types.BIGINT),
            new ColumnInfo("customer_id", Types.BIGINT),
            new ColumnInfo("amount", Types.DECIMAL),
            new ColumnInfo("transaction_date", Types.TIMESTAMP),
            new ColumnInfo("merchant", Types.VARCHAR),
            new ColumnInfo("category", Types.VARCHAR)
        );
    }
}

// Command line bulk load
public class BulkLoadRunner {
    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();

        // Set up tool options
        BulkLoadTool tool = new BulkLoadTool();
        tool.setConf(conf);

        // Command line arguments: input_path table_name zk_quorum
        String[] bulkLoadArgs = {
            "hdfs://namenode:port/data/transactions.csv",
            "transactions",
            "zk1,zk2,zk3:2181"
        };

        int result = tool.run(bulkLoadArgs);
        System.exit(result);
    }
}

MapReduce Job Examples

Data Processing Job

// Complete MapReduce job for Phoenix data processing
public class PhoenixDataProcessingJob extends Configured implements Tool {

    // Mapper class
    public static class DataProcessingMapper
            extends Mapper<NullWritable, EmployeeRecord, Text, LongWritable> {

        @Override
        protected void map(NullWritable key, EmployeeRecord employee, Context context)
                throws IOException, InterruptedException {

            // Process employee data
            String department = employee.getDepartment();
            BigDecimal salary = employee.getSalary();

            // Emit department and salary
            if (department != null && salary != null) {
                context.write(new Text(department), new LongWritable(salary.longValue()));

                // Additional processing based on salary ranges
                if (salary.compareTo(new BigDecimal("100000")) > 0) {
                    context.write(new Text("HIGH_EARNERS"), new LongWritable(1));
                }
            }
        }
    }

    // Reducer class
    public static class SalaryAggregationReducer
            extends Reducer<Text, LongWritable, NullWritable, SummaryRecord> {

        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context)
                throws IOException, InterruptedException {

            long sum = 0;
            int count = 0;

            for (LongWritable value : values) {
                sum += value.get();
                count++;
            }

            // Create summary record
            SummaryRecord summary = new SummaryRecord();
            summary.setDepartment(key.toString());
            summary.setAvgSalary(new BigDecimal(sum / count));
            summary.setEmployeeCount(count);

            context.write(NullWritable.get(), summary);
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        Job job = Job.getInstance(conf, "Phoenix Data Processing");

        job.setJarByClass(PhoenixDataProcessingJob.class);

        // Configure input
        PhoenixInputFormat.setInput(job, EmployeeRecord.class,
                                  "employees",
                                  "status = 'ACTIVE'",
                                  "id", "name", "salary", "department");

        // Configure output
        PhoenixOutputFormat.setOutput(job, "department_summary",
                                    "department", "avg_salary", "employee_count");

        // Configure mapper and reducer
        job.setMapperClass(DataProcessingMapper.class);
        job.setReducerClass(SalaryAggregationReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(SummaryRecord.class);

        // Configure input and output formats
        job.setInputFormatClass(PhoenixInputFormat.class);
        job.setOutputFormatClass(PhoenixOutputFormat.class);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        int result = ToolRunner.run(conf, new PhoenixDataProcessingJob(), args);
        System.exit(result);
    }
}

ETL Pipeline Job

// ETL (Extract, Transform, Load) pipeline using Phoenix MapReduce
public class PhoenixETLPipeline extends Configured implements Tool {

    // Mapper for data transformation
    public static class ETLMapper
            extends Mapper<NullWritable, TransactionRecord, NullWritable, TransformedRecord> {

        private static final BigDecimal CURRENCY_CONVERSION_RATE = new BigDecimal("1.25");

        @Override
        protected void map(NullWritable key, TransactionRecord transaction, Context context)
                throws IOException, InterruptedException {

            // Transform the data
            TransformedRecord transformed = new TransformedRecord();
            transformed.setTransactionId(transaction.getTransactionId());
            transformed.setCustomerId(transaction.getCustomerId());

            // Convert currency
            BigDecimal originalAmount = transaction.getAmount();
            BigDecimal convertedAmount = originalAmount.multiply(CURRENCY_CONVERSION_RATE);
            transformed.setConvertedAmount(convertedAmount);

            // Categorize transaction
            String category = categorizeTransaction(transaction.getMerchant(), originalAmount);
            transformed.setCategory(category);

            // Add processing timestamp
            transformed.setProcessedTimestamp(new Timestamp(System.currentTimeMillis()));

            // Emit transformed record
            context.write(NullWritable.get(), transformed);
        }

        private String categorizeTransaction(String merchant, BigDecimal amount) {
            if (merchant.toLowerCase().contains("grocery") ||
                merchant.toLowerCase().contains("supermarket")) {
                return "GROCERY";
            } else if (merchant.toLowerCase().contains("gas") ||
                      merchant.toLowerCase().contains("fuel")) {
                return "FUEL";
            } else if (amount.compareTo(new BigDecimal("500")) > 0) {
                return "LARGE_PURCHASE";
            } else {
                return "OTHER";
            }
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();
        Job job = Job.getInstance(conf, "Phoenix ETL Pipeline");

        job.setJarByClass(PhoenixETLPipeline.class);

        // Configure input from source table
        PhoenixInputFormat.setInput(job, TransactionRecord.class,
                                  "raw_transactions",
                                  "processed_flag IS NULL OR processed_flag = false",
                                  "transaction_id", "customer_id", "amount",
                                  "transaction_date", "merchant");

        // Configure output to transformed table
        PhoenixOutputFormat.setOutput(job, "transformed_transactions",
                                    "transaction_id", "customer_id", "converted_amount",
                                    "category", "processed_timestamp");

        // This is a map-only job (no reducer needed)
        job.setMapperClass(ETLMapper.class);
        job.setNumReduceTasks(0);

        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(TransformedRecord.class);

        job.setInputFormatClass(PhoenixInputFormat.class);
        job.setOutputFormatClass(PhoenixOutputFormat.class);

        // Set batch size for better performance
        PhoenixInputFormat.setBatchSize(conf, 10000);
        PhoenixOutputFormat.setBatchSize(conf, 5000);

        return job.waitForCompletion(true) ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        int result = ToolRunner.run(conf, new PhoenixETLPipeline(), args);
        System.exit(result);
    }
}

// Supporting record classes
public class TransactionRecord implements DBWritable, Writable {
    private long transactionId;
    private long customerId;
    private BigDecimal amount;
    private Timestamp transactionDate;
    private String merchant;

    @Override
    public void readFields(ResultSet resultSet) throws SQLException {
        transactionId = resultSet.getLong("transaction_id");
        customerId = resultSet.getLong("customer_id");
        amount = resultSet.getBigDecimal("amount");
        transactionDate = resultSet.getTimestamp("transaction_date");
        merchant = resultSet.getString("merchant");
    }

    @Override
    public void write(PreparedStatement statement) throws SQLException {
        statement.setLong(1, transactionId);
        statement.setLong(2, customerId);
        statement.setBigDecimal(3, amount);
        statement.setTimestamp(4, transactionDate);
        statement.setString(5, merchant);
    }

    // Writable implementation and getters/setters omitted for brevity...
}

public class TransformedRecord implements DBWritable, Writable {
    private long transactionId;
    private long customerId;
    private BigDecimal convertedAmount;
    private String category;
    private Timestamp processedTimestamp;

    @Override
    public void readFields(ResultSet resultSet) throws SQLException {
        transactionId = resultSet.getLong("transaction_id");
        customerId = resultSet.getLong("customer_id");
        convertedAmount = resultSet.getBigDecimal("converted_amount");
        category = resultSet.getString("category");
        processedTimestamp = resultSet.getTimestamp("processed_timestamp");
    }

    @Override
    public void write(PreparedStatement statement) throws SQLException {
        statement.setLong(1, transactionId);
        statement.setLong(2, customerId);
        statement.setBigDecimal(3, convertedAmount);
        statement.setString(4, category);
        statement.setTimestamp(5, processedTimestamp);
    }

    // Writable implementation and getters/setters omitted for brevity...
}

Advanced MapReduce Patterns

Multi-Table Join Job

// MapReduce job performing joins across multiple Phoenix tables
public class MultiTableJoinJob extends Configured implements Tool {

    // Mapper for customer data
    public static class CustomerMapper
            extends Mapper<NullWritable, CustomerRecord, LongWritable, Text> {

        @Override
        protected void map(NullWritable key, CustomerRecord customer, Context context)
                throws IOException, InterruptedException {

            // Emit customer ID as key with customer data as value
            String customerData = "CUSTOMER:" + customer.getName() + "," +
                                customer.getEmail() + "," + customer.getSegment();
            context.write(new LongWritable(customer.getCustomerId()), new Text(customerData));
        }
    }

    // Mapper for order data
    public static class OrderMapper
            extends Mapper<NullWritable, OrderRecord, LongWritable, Text> {

        @Override
        protected void map(NullWritable key, OrderRecord order, Context context)
                throws IOException, InterruptedException {

            // Emit customer ID as key with order data as value
            String orderData = "ORDER:" + order.getOrderId() + "," +
                             order.getOrderDate() + "," + order.getTotalAmount();
            context.write(new LongWritable(order.getCustomerId()), new Text(orderData));
        }
    }

    // Reducer to perform the join
    public static class JoinReducer
            extends Reducer<LongWritable, Text, NullWritable, CustomerOrderRecord> {

        @Override
        protected void reduce(LongWritable customerId, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {

            List<String> customerData = new ArrayList<>();
            List<String> orderData = new ArrayList<>();

            // Separate customer and order data
            for (Text value : values) {
                String valueStr = value.toString();
                if (valueStr.startsWith("CUSTOMER:")) {
                    customerData.add(valueStr.substring(9));
                } else if (valueStr.startsWith("ORDER:")) {
                    orderData.add(valueStr.substring(6));
                }
            }

            // Perform join - emit record for each customer-order combination
            for (String customer : customerData) {
                for (String order : orderData) {
                    CustomerOrderRecord joined = createJoinedRecord(customerId.get(), customer, order);
                    context.write(NullWritable.get(), joined);
                }
            }
        }

        private CustomerOrderRecord createJoinedRecord(long customerId, String customerData, String orderData) {
            String[] customerParts = customerData.split(",");
            String[] orderParts = orderData.split(",");

            CustomerOrderRecord record = new CustomerOrderRecord();
            record.setCustomerId(customerId);
            record.setCustomerName(customerParts[0]);
            record.setCustomerEmail(customerParts[1]);
            record.setCustomerSegment(customerParts[2]);
            record.setOrderId(Long.parseLong(orderParts[0]));
            record.setOrderDate(Date.valueOf(orderParts[1]));
            record.setOrderAmount(new BigDecimal(orderParts[2]));

            return record;
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();

        // Job 1: Process customers
        Job customerJob = Job.getInstance(conf, "Customer Processing");
        customerJob.setJarByClass(MultiTableJoinJob.class);

        PhoenixInputFormat.setInput(customerJob, CustomerRecord.class, "customers");
        customerJob.setMapperClass(CustomerMapper.class);
        customerJob.setNumReduceTasks(0);

        Path customerOutput = new Path("/tmp/customers");
        FileOutputFormat.setOutputPath(customerJob, customerOutput);

        // Job 2: Process orders
        Job orderJob = Job.getInstance(conf, "Order Processing");
        orderJob.setJarByClass(MultiTableJoinJob.class);

        PhoenixInputFormat.setInput(orderJob, OrderRecord.class, "orders");
        orderJob.setMapperClass(OrderMapper.class);
        orderJob.setNumReduceTasks(0);

        Path orderOutput = new Path("/tmp/orders");
        FileOutputFormat.setOutputPath(orderJob, orderOutput);

        // Wait for both jobs to complete
        boolean success = customerJob.waitForCompletion(true) && orderJob.waitForCompletion(true);
        if (!success) {
            return 1;
        }

        // Job 3: Join the results
        Job joinJob = Job.getInstance(conf, "Customer Order Join");
        joinJob.setJarByClass(MultiTableJoinJob.class);

        FileInputFormat.addInputPath(joinJob, customerOutput);
        FileInputFormat.addInputPath(joinJob, orderOutput);

        PhoenixOutputFormat.setOutput(joinJob, "customer_orders",
                                    "customer_id", "customer_name", "customer_email",
                                    "customer_segment", "order_id", "order_date", "order_amount");

        joinJob.setReducerClass(JoinReducer.class);
        joinJob.setOutputKeyClass(NullWritable.class);
        joinJob.setOutputValueClass(CustomerOrderRecord.class);
        joinJob.setOutputFormatClass(PhoenixOutputFormat.class);

        return joinJob.waitForCompletion(true) ? 0 : 1;
    }
}

Incremental Data Processing

// Incremental processing pattern for Phoenix MapReduce
public class IncrementalProcessingJob extends Configured implements Tool {

    public static class IncrementalMapper
            extends Mapper<NullWritable, TransactionRecord, NullWritable, ProcessedRecord> {

        private Timestamp lastProcessedTime;

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            // Get last processed timestamp from configuration
            String lastProcessedStr = context.getConfiguration().get("last.processed.timestamp");
            if (lastProcessedStr != null) {
                lastProcessedTime = Timestamp.valueOf(lastProcessedStr);
            } else {
                // Default to 24 hours ago if no timestamp provided
                lastProcessedTime = new Timestamp(System.currentTimeMillis() - 24 * 60 * 60 * 1000);
            }
        }

        @Override
        protected void map(NullWritable key, TransactionRecord transaction, Context context)
                throws IOException, InterruptedException {

            // Only process records newer than last processed time
            if (transaction.getTransactionDate().after(lastProcessedTime)) {
                ProcessedRecord processed = processTransaction(transaction);
                context.write(NullWritable.get(), processed);
            }
        }

        private ProcessedRecord processTransaction(TransactionRecord transaction) {
            ProcessedRecord processed = new ProcessedRecord();
            processed.setTransactionId(transaction.getTransactionId());
            processed.setCustomerId(transaction.getCustomerId());
            processed.setAmount(transaction.getAmount());

            // Add risk score calculation
            BigDecimal riskScore = calculateRiskScore(transaction);
            processed.setRiskScore(riskScore);

            // Add fraud flag
            boolean isFraud = detectFraud(transaction, riskScore);
            processed.setFraudFlag(isFraud);

            processed.setProcessedTimestamp(new Timestamp(System.currentTimeMillis()));

            return processed;
        }

        private BigDecimal calculateRiskScore(TransactionRecord transaction) {
            // Simple risk scoring based on amount and time
            BigDecimal amount = transaction.getAmount();
            long currentTime = System.currentTimeMillis();
            long transactionTime = transaction.getTransactionDate().getTime();

            // Higher risk for large amounts and late night transactions
            BigDecimal baseScore = amount.divide(new BigDecimal("1000"));

            Calendar cal = Calendar.getInstance();
            cal.setTimeInMillis(transactionTime);
            int hour = cal.get(Calendar.HOUR_OF_DAY);

            if (hour < 6 || hour > 23) {
                baseScore = baseScore.multiply(new BigDecimal("1.5"));
            }

            return baseScore.min(new BigDecimal("10.0")); // Cap at 10.0
        }

        private boolean detectFraud(TransactionRecord transaction, BigDecimal riskScore) {
            // Simple fraud detection based on risk score
            return riskScore.compareTo(new BigDecimal("7.5")) > 0;
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = getConf();

        // Get last processed timestamp
        String lastProcessedTimestamp = getLastProcessedTimestamp(conf);
        conf.set("last.processed.timestamp", lastProcessedTimestamp);

        Job job = Job.getInstance(conf, "Incremental Transaction Processing");
        job.setJarByClass(IncrementalProcessingJob.class);

        // Configure input with time-based filter
        String whereClause = "transaction_date > '" + lastProcessedTimestamp + "'";
        PhoenixInputFormat.setInput(job, TransactionRecord.class, "transactions", whereClause,
                                  "transaction_id", "customer_id", "amount", "transaction_date", "merchant");

        // Configure output
        PhoenixOutputFormat.setOutput(job, "processed_transactions",
                                    "transaction_id", "customer_id", "amount", "risk_score",
                                    "fraud_flag", "processed_timestamp");

        job.setMapperClass(IncrementalMapper.class);
        job.setNumReduceTasks(0); // Map-only job

        job.setOutputKeyClass(NullWritable.class);
        job.setOutputValueClass(ProcessedRecord.class);

        job.setInputFormatClass(PhoenixInputFormat.class);
        job.setOutputFormatClass(PhoenixOutputFormat.class);

        boolean success = job.waitForCompletion(true);

        if (success) {
            // Update last processed timestamp
            updateLastProcessedTimestamp(conf, new Timestamp(System.currentTimeMillis()));
        }

        return success ? 0 : 1;
    }

    private String getLastProcessedTimestamp(Configuration conf) throws SQLException {
        // Query the control table to get last processed timestamp
        String url = conf.get("phoenix.connection.url", "jdbc:phoenix:localhost:2181");

        try (Connection conn = DriverManager.getConnection(url)) {
            PreparedStatement stmt = conn.prepareStatement(
                "SELECT last_processed_timestamp FROM processing_control WHERE job_name = ?"
            );
            stmt.setString(1, "incremental_transaction_processing");

            ResultSet rs = stmt.executeQuery();
            if (rs.next()) {
                return rs.getTimestamp("last_processed_timestamp").toString();
            } else {
                // Return default timestamp if no record exists
                return new Timestamp(System.currentTimeMillis() - 24 * 60 * 60 * 1000).toString();
            }
        }
    }

    private void updateLastProcessedTimestamp(Configuration conf, Timestamp timestamp) throws SQLException {
        String url = conf.get("phoenix.connection.url", "jdbc:phoenix:localhost:2181");

        try (Connection conn = DriverManager.getConnection(url)) {
            PreparedStatement stmt = conn.prepareStatement(
                "UPSERT INTO processing_control (job_name, last_processed_timestamp) VALUES (?, ?)"
            );
            stmt.setString(1, "incremental_transaction_processing");
            stmt.setTimestamp(2, timestamp);
            stmt.executeUpdate();
            conn.commit();
        }
    }

    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        int result = ToolRunner.run(conf, new IncrementalProcessingJob(), args);
        System.exit(result);
    }
}

This comprehensive documentation covers Phoenix's MapReduce integration including input/output formats, bulk loading capabilities, and advanced processing patterns. The examples demonstrate practical usage scenarios for distributed data processing with Phoenix tables.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-phoenix--phoenix-core

docs

configuration.md

exceptions.md

execution.md

expressions.md

index.md

jdbc.md

mapreduce.md

monitoring.md

query-compilation.md

schema-metadata.md

server.md

transactions.md

types.md

tile.json