Apache Phoenix Core library providing SQL-on-HBase functionality with JDBC connectivity, query compilation, and transaction support
Phoenix provides comprehensive MapReduce integration enabling distributed processing of large datasets stored in Phoenix tables. The integration includes input/output formats, bulk loading tools, and utilities for efficient data processing workflows.
import org.apache.phoenix.mapreduce.*;
import org.apache.phoenix.mapreduce.bulkload.*;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.*;
import org.apache.hadoop.mapreduce.lib.output.*;
import org.apache.hadoop.io.*;
import org.apache.phoenix.util.ColumnInfo;MapReduce InputFormat for reading data from Phoenix tables with SQL-based filtering and projection.
public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWritable, T> {
// Configuration methods
public static void setInput(Job job, Class<? extends DBWritable> inputClass,
String tableName, String conditions)
public static void setInput(Job job, Class<? extends DBWritable> inputClass,
String selectStatement)
public static void setInput(Job job, Class<? extends DBWritable> inputClass,
String tableName, String conditions, String... fieldNames)
// InputFormat implementation
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException
public RecordReader<NullWritable, T> createRecordReader(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException
// Phoenix-specific configuration
public static void setBatchSize(Configuration configuration, long batchSize)
public static void setSelectColumnList(Job job, String... columns)
public static void setSchemaType(Configuration configuration, SchemaType schemaType)
}MapReduce OutputFormat for writing data to Phoenix tables with automatic batching and transaction management.
public class PhoenixOutputFormat<T extends DBWritable> extends OutputFormat<NullWritable, T> {
// Configuration methods
public static void setOutput(Job job, String tableName, String... fieldNames)
public static void setOutput(Job job, String tableName, List<String> fieldNames)
// OutputFormat implementation
public RecordWriter<NullWritable, T> getRecordWriter(TaskAttemptContext context)
throws IOException, InterruptedException
public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
throws IOException, InterruptedException
// Phoenix-specific configuration
public static void setBatchSize(Configuration configuration, long batchSize)
public static void setUpsertStatement(Job job, String upsertStatement)
}Usage:
// Configure Phoenix input format
public class PhoenixMapReduceJob {
public static void configureInputJob(Job job) throws IOException {
// Set input configuration
PhoenixInputFormat.setInput(job,
EmployeeRecord.class,
"employees",
"department = 'ENGINEERING' AND salary > 50000",
"id", "name", "salary", "department");
// Optional: Configure batch size for better performance
PhoenixInputFormat.setBatchSize(job.getConfiguration(), 5000);
// Set mapper class
job.setMapperClass(EmployeeProcessingMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
}
public static void configureOutputJob(Job job) throws IOException {
// Set output configuration
PhoenixOutputFormat.setOutput(job, "employee_summary", "department", "avg_salary", "employee_count");
// Optional: Configure batch size
PhoenixOutputFormat.setBatchSize(job.getConfiguration(), 1000);
// Set reducer class
job.setReducerClass(SummaryReducer.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(SummaryRecord.class);
}
}
// DBWritable implementation for input records
public class EmployeeRecord implements DBWritable, Writable {
private long id;
private String name;
private BigDecimal salary;
private String department;
// DBWritable implementation
@Override
public void readFields(ResultSet resultSet) throws SQLException {
id = resultSet.getLong("id");
name = resultSet.getString("name");
salary = resultSet.getBigDecimal("salary");
department = resultSet.getString("department");
}
@Override
public void write(PreparedStatement statement) throws SQLException {
statement.setLong(1, id);
statement.setString(2, name);
statement.setBigDecimal(3, salary);
statement.setString(4, department);
}
// Writable implementation
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(id);
Text.writeString(out, name != null ? name : "");
out.writeUTF(salary != null ? salary.toString() : "0");
Text.writeString(out, department != null ? department : "");
}
@Override
public void readFields(DataInput in) throws IOException {
id = in.readLong();
name = Text.readString(in);
salary = new BigDecimal(in.readUTF());
department = Text.readString(in);
}
// Getters and setters
public long getId() { return id; }
public void setId(long id) { this.id = id; }
public String getName() { return name; }
public void setName(String name) { this.name = name; }
public BigDecimal getSalary() { return salary; }
public void setSalary(BigDecimal salary) { this.salary = salary; }
public String getDepartment() { return department; }
public void setDepartment(String department) { this.department = department; }
}
// DBWritable implementation for output records
public class SummaryRecord implements DBWritable, Writable {
private String department;
private BigDecimal avgSalary;
private int employeeCount;
@Override
public void readFields(ResultSet resultSet) throws SQLException {
department = resultSet.getString("department");
avgSalary = resultSet.getBigDecimal("avg_salary");
employeeCount = resultSet.getInt("employee_count");
}
@Override
public void write(PreparedStatement statement) throws SQLException {
statement.setString(1, department);
statement.setBigDecimal(2, avgSalary);
statement.setInt(3, employeeCount);
}
@Override
public void write(DataOutput out) throws IOException {
Text.writeString(out, department != null ? department : "");
out.writeUTF(avgSalary != null ? avgSalary.toString() : "0");
out.writeInt(employeeCount);
}
@Override
public void readFields(DataInput in) throws IOException {
department = Text.readString(in);
avgSalary = new BigDecimal(in.readUTF());
employeeCount = in.readInt();
}
// Getters and setters
public String getDepartment() { return department; }
public void setDepartment(String department) { this.department = department; }
public BigDecimal getAvgSalary() { return avgSalary; }
public void setAvgSalary(BigDecimal avgSalary) { this.avgSalary = avgSalary; }
public int getEmployeeCount() { return employeeCount; }
public void setEmployeeCount(int employeeCount) { this.employeeCount = employeeCount; }
}Tool for efficient bulk loading of large datasets into Phoenix tables.
public class BulkLoadTool extends Configured implements Tool {
// Main execution method
public int run(String[] args) throws Exception
// Configuration options
public static class Options {
public String getInputPath()
public String getTableName()
public String getZkQuorum()
public char getFieldDelimiter()
public char getQuoteChar()
public char getEscapeChar()
public String getArrayElementSeparator()
public boolean isStrict()
public List<ColumnInfo> getColumns()
}
// Bulk loading methods
public static void bulkLoad(Configuration conf, String inputPath, String tableName,
List<ColumnInfo> columnInfos) throws Exception
public static Job createBulkLoadJob(Configuration conf, String inputPath, String tableName,
List<ColumnInfo> columnInfos) throws IOException
}Usage:
// Bulk load CSV data into Phoenix table
public class BulkLoadExample {
public static void performBulkLoad() throws Exception {
Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "localhost:2181");
// Define column information
List<ColumnInfo> columnInfos = Arrays.asList(
new ColumnInfo("id", Types.BIGINT),
new ColumnInfo("name", Types.VARCHAR),
new ColumnInfo("email", Types.VARCHAR),
new ColumnInfo("salary", Types.DECIMAL),
new ColumnInfo("department", Types.VARCHAR),
new ColumnInfo("hire_date", Types.DATE)
);
// Configure decimal precision
ColumnInfo salaryColumn = columnInfos.get(3);
salaryColumn.setPrecision(10);
salaryColumn.setScale(2);
// Perform bulk load
String inputPath = "hdfs://namenode:port/path/to/csv/files";
String tableName = "employees";
BulkLoadTool.bulkLoad(conf, inputPath, tableName, columnInfos);
System.out.println("Bulk load completed successfully");
}
public static void performCustomBulkLoad() throws Exception {
Configuration conf = HBaseConfiguration.create();
// Create bulk load job with custom configuration
List<ColumnInfo> columnInfos = createColumnInfos();
Job job = BulkLoadTool.createBulkLoadJob(conf, "input/path", "target_table", columnInfos);
// Customize job settings
job.setJobName("Custom Phoenix Bulk Load");
job.getConfiguration().set("phoenix.bulk.load.delimiter", "|");
job.getConfiguration().set("phoenix.bulk.load.quote", "\"");
job.getConfiguration().setBoolean("phoenix.bulk.load.strict", true);
// Submit and wait for completion
boolean success = job.waitForCompletion(true);
if (success) {
System.out.println("Bulk load job completed successfully");
} else {
System.err.println("Bulk load job failed");
}
}
private static List<ColumnInfo> createColumnInfos() {
return Arrays.asList(
new ColumnInfo("transaction_id", Types.BIGINT),
new ColumnInfo("customer_id", Types.BIGINT),
new ColumnInfo("amount", Types.DECIMAL),
new ColumnInfo("transaction_date", Types.TIMESTAMP),
new ColumnInfo("merchant", Types.VARCHAR),
new ColumnInfo("category", Types.VARCHAR)
);
}
}
// Command line bulk load
public class BulkLoadRunner {
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
// Set up tool options
BulkLoadTool tool = new BulkLoadTool();
tool.setConf(conf);
// Command line arguments: input_path table_name zk_quorum
String[] bulkLoadArgs = {
"hdfs://namenode:port/data/transactions.csv",
"transactions",
"zk1,zk2,zk3:2181"
};
int result = tool.run(bulkLoadArgs);
System.exit(result);
}
}// Complete MapReduce job for Phoenix data processing
public class PhoenixDataProcessingJob extends Configured implements Tool {
// Mapper class
public static class DataProcessingMapper
extends Mapper<NullWritable, EmployeeRecord, Text, LongWritable> {
@Override
protected void map(NullWritable key, EmployeeRecord employee, Context context)
throws IOException, InterruptedException {
// Process employee data
String department = employee.getDepartment();
BigDecimal salary = employee.getSalary();
// Emit department and salary
if (department != null && salary != null) {
context.write(new Text(department), new LongWritable(salary.longValue()));
// Additional processing based on salary ranges
if (salary.compareTo(new BigDecimal("100000")) > 0) {
context.write(new Text("HIGH_EARNERS"), new LongWritable(1));
}
}
}
}
// Reducer class
public static class SalaryAggregationReducer
extends Reducer<Text, LongWritable, NullWritable, SummaryRecord> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
long sum = 0;
int count = 0;
for (LongWritable value : values) {
sum += value.get();
count++;
}
// Create summary record
SummaryRecord summary = new SummaryRecord();
summary.setDepartment(key.toString());
summary.setAvgSalary(new BigDecimal(sum / count));
summary.setEmployeeCount(count);
context.write(NullWritable.get(), summary);
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = Job.getInstance(conf, "Phoenix Data Processing");
job.setJarByClass(PhoenixDataProcessingJob.class);
// Configure input
PhoenixInputFormat.setInput(job, EmployeeRecord.class,
"employees",
"status = 'ACTIVE'",
"id", "name", "salary", "department");
// Configure output
PhoenixOutputFormat.setOutput(job, "department_summary",
"department", "avg_salary", "employee_count");
// Configure mapper and reducer
job.setMapperClass(DataProcessingMapper.class);
job.setReducerClass(SalaryAggregationReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(SummaryRecord.class);
// Configure input and output formats
job.setInputFormatClass(PhoenixInputFormat.class);
job.setOutputFormatClass(PhoenixOutputFormat.class);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
int result = ToolRunner.run(conf, new PhoenixDataProcessingJob(), args);
System.exit(result);
}
}// ETL (Extract, Transform, Load) pipeline using Phoenix MapReduce
public class PhoenixETLPipeline extends Configured implements Tool {
// Mapper for data transformation
public static class ETLMapper
extends Mapper<NullWritable, TransactionRecord, NullWritable, TransformedRecord> {
private static final BigDecimal CURRENCY_CONVERSION_RATE = new BigDecimal("1.25");
@Override
protected void map(NullWritable key, TransactionRecord transaction, Context context)
throws IOException, InterruptedException {
// Transform the data
TransformedRecord transformed = new TransformedRecord();
transformed.setTransactionId(transaction.getTransactionId());
transformed.setCustomerId(transaction.getCustomerId());
// Convert currency
BigDecimal originalAmount = transaction.getAmount();
BigDecimal convertedAmount = originalAmount.multiply(CURRENCY_CONVERSION_RATE);
transformed.setConvertedAmount(convertedAmount);
// Categorize transaction
String category = categorizeTransaction(transaction.getMerchant(), originalAmount);
transformed.setCategory(category);
// Add processing timestamp
transformed.setProcessedTimestamp(new Timestamp(System.currentTimeMillis()));
// Emit transformed record
context.write(NullWritable.get(), transformed);
}
private String categorizeTransaction(String merchant, BigDecimal amount) {
if (merchant.toLowerCase().contains("grocery") ||
merchant.toLowerCase().contains("supermarket")) {
return "GROCERY";
} else if (merchant.toLowerCase().contains("gas") ||
merchant.toLowerCase().contains("fuel")) {
return "FUEL";
} else if (amount.compareTo(new BigDecimal("500")) > 0) {
return "LARGE_PURCHASE";
} else {
return "OTHER";
}
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
Job job = Job.getInstance(conf, "Phoenix ETL Pipeline");
job.setJarByClass(PhoenixETLPipeline.class);
// Configure input from source table
PhoenixInputFormat.setInput(job, TransactionRecord.class,
"raw_transactions",
"processed_flag IS NULL OR processed_flag = false",
"transaction_id", "customer_id", "amount",
"transaction_date", "merchant");
// Configure output to transformed table
PhoenixOutputFormat.setOutput(job, "transformed_transactions",
"transaction_id", "customer_id", "converted_amount",
"category", "processed_timestamp");
// This is a map-only job (no reducer needed)
job.setMapperClass(ETLMapper.class);
job.setNumReduceTasks(0);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(TransformedRecord.class);
job.setInputFormatClass(PhoenixInputFormat.class);
job.setOutputFormatClass(PhoenixOutputFormat.class);
// Set batch size for better performance
PhoenixInputFormat.setBatchSize(conf, 10000);
PhoenixOutputFormat.setBatchSize(conf, 5000);
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
int result = ToolRunner.run(conf, new PhoenixETLPipeline(), args);
System.exit(result);
}
}
// Supporting record classes
public class TransactionRecord implements DBWritable, Writable {
private long transactionId;
private long customerId;
private BigDecimal amount;
private Timestamp transactionDate;
private String merchant;
@Override
public void readFields(ResultSet resultSet) throws SQLException {
transactionId = resultSet.getLong("transaction_id");
customerId = resultSet.getLong("customer_id");
amount = resultSet.getBigDecimal("amount");
transactionDate = resultSet.getTimestamp("transaction_date");
merchant = resultSet.getString("merchant");
}
@Override
public void write(PreparedStatement statement) throws SQLException {
statement.setLong(1, transactionId);
statement.setLong(2, customerId);
statement.setBigDecimal(3, amount);
statement.setTimestamp(4, transactionDate);
statement.setString(5, merchant);
}
// Writable implementation and getters/setters omitted for brevity...
}
public class TransformedRecord implements DBWritable, Writable {
private long transactionId;
private long customerId;
private BigDecimal convertedAmount;
private String category;
private Timestamp processedTimestamp;
@Override
public void readFields(ResultSet resultSet) throws SQLException {
transactionId = resultSet.getLong("transaction_id");
customerId = resultSet.getLong("customer_id");
convertedAmount = resultSet.getBigDecimal("converted_amount");
category = resultSet.getString("category");
processedTimestamp = resultSet.getTimestamp("processed_timestamp");
}
@Override
public void write(PreparedStatement statement) throws SQLException {
statement.setLong(1, transactionId);
statement.setLong(2, customerId);
statement.setBigDecimal(3, convertedAmount);
statement.setString(4, category);
statement.setTimestamp(5, processedTimestamp);
}
// Writable implementation and getters/setters omitted for brevity...
}// MapReduce job performing joins across multiple Phoenix tables
public class MultiTableJoinJob extends Configured implements Tool {
// Mapper for customer data
public static class CustomerMapper
extends Mapper<NullWritable, CustomerRecord, LongWritable, Text> {
@Override
protected void map(NullWritable key, CustomerRecord customer, Context context)
throws IOException, InterruptedException {
// Emit customer ID as key with customer data as value
String customerData = "CUSTOMER:" + customer.getName() + "," +
customer.getEmail() + "," + customer.getSegment();
context.write(new LongWritable(customer.getCustomerId()), new Text(customerData));
}
}
// Mapper for order data
public static class OrderMapper
extends Mapper<NullWritable, OrderRecord, LongWritable, Text> {
@Override
protected void map(NullWritable key, OrderRecord order, Context context)
throws IOException, InterruptedException {
// Emit customer ID as key with order data as value
String orderData = "ORDER:" + order.getOrderId() + "," +
order.getOrderDate() + "," + order.getTotalAmount();
context.write(new LongWritable(order.getCustomerId()), new Text(orderData));
}
}
// Reducer to perform the join
public static class JoinReducer
extends Reducer<LongWritable, Text, NullWritable, CustomerOrderRecord> {
@Override
protected void reduce(LongWritable customerId, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
List<String> customerData = new ArrayList<>();
List<String> orderData = new ArrayList<>();
// Separate customer and order data
for (Text value : values) {
String valueStr = value.toString();
if (valueStr.startsWith("CUSTOMER:")) {
customerData.add(valueStr.substring(9));
} else if (valueStr.startsWith("ORDER:")) {
orderData.add(valueStr.substring(6));
}
}
// Perform join - emit record for each customer-order combination
for (String customer : customerData) {
for (String order : orderData) {
CustomerOrderRecord joined = createJoinedRecord(customerId.get(), customer, order);
context.write(NullWritable.get(), joined);
}
}
}
private CustomerOrderRecord createJoinedRecord(long customerId, String customerData, String orderData) {
String[] customerParts = customerData.split(",");
String[] orderParts = orderData.split(",");
CustomerOrderRecord record = new CustomerOrderRecord();
record.setCustomerId(customerId);
record.setCustomerName(customerParts[0]);
record.setCustomerEmail(customerParts[1]);
record.setCustomerSegment(customerParts[2]);
record.setOrderId(Long.parseLong(orderParts[0]));
record.setOrderDate(Date.valueOf(orderParts[1]));
record.setOrderAmount(new BigDecimal(orderParts[2]));
return record;
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
// Job 1: Process customers
Job customerJob = Job.getInstance(conf, "Customer Processing");
customerJob.setJarByClass(MultiTableJoinJob.class);
PhoenixInputFormat.setInput(customerJob, CustomerRecord.class, "customers");
customerJob.setMapperClass(CustomerMapper.class);
customerJob.setNumReduceTasks(0);
Path customerOutput = new Path("/tmp/customers");
FileOutputFormat.setOutputPath(customerJob, customerOutput);
// Job 2: Process orders
Job orderJob = Job.getInstance(conf, "Order Processing");
orderJob.setJarByClass(MultiTableJoinJob.class);
PhoenixInputFormat.setInput(orderJob, OrderRecord.class, "orders");
orderJob.setMapperClass(OrderMapper.class);
orderJob.setNumReduceTasks(0);
Path orderOutput = new Path("/tmp/orders");
FileOutputFormat.setOutputPath(orderJob, orderOutput);
// Wait for both jobs to complete
boolean success = customerJob.waitForCompletion(true) && orderJob.waitForCompletion(true);
if (!success) {
return 1;
}
// Job 3: Join the results
Job joinJob = Job.getInstance(conf, "Customer Order Join");
joinJob.setJarByClass(MultiTableJoinJob.class);
FileInputFormat.addInputPath(joinJob, customerOutput);
FileInputFormat.addInputPath(joinJob, orderOutput);
PhoenixOutputFormat.setOutput(joinJob, "customer_orders",
"customer_id", "customer_name", "customer_email",
"customer_segment", "order_id", "order_date", "order_amount");
joinJob.setReducerClass(JoinReducer.class);
joinJob.setOutputKeyClass(NullWritable.class);
joinJob.setOutputValueClass(CustomerOrderRecord.class);
joinJob.setOutputFormatClass(PhoenixOutputFormat.class);
return joinJob.waitForCompletion(true) ? 0 : 1;
}
}// Incremental processing pattern for Phoenix MapReduce
public class IncrementalProcessingJob extends Configured implements Tool {
public static class IncrementalMapper
extends Mapper<NullWritable, TransactionRecord, NullWritable, ProcessedRecord> {
private Timestamp lastProcessedTime;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// Get last processed timestamp from configuration
String lastProcessedStr = context.getConfiguration().get("last.processed.timestamp");
if (lastProcessedStr != null) {
lastProcessedTime = Timestamp.valueOf(lastProcessedStr);
} else {
// Default to 24 hours ago if no timestamp provided
lastProcessedTime = new Timestamp(System.currentTimeMillis() - 24 * 60 * 60 * 1000);
}
}
@Override
protected void map(NullWritable key, TransactionRecord transaction, Context context)
throws IOException, InterruptedException {
// Only process records newer than last processed time
if (transaction.getTransactionDate().after(lastProcessedTime)) {
ProcessedRecord processed = processTransaction(transaction);
context.write(NullWritable.get(), processed);
}
}
private ProcessedRecord processTransaction(TransactionRecord transaction) {
ProcessedRecord processed = new ProcessedRecord();
processed.setTransactionId(transaction.getTransactionId());
processed.setCustomerId(transaction.getCustomerId());
processed.setAmount(transaction.getAmount());
// Add risk score calculation
BigDecimal riskScore = calculateRiskScore(transaction);
processed.setRiskScore(riskScore);
// Add fraud flag
boolean isFraud = detectFraud(transaction, riskScore);
processed.setFraudFlag(isFraud);
processed.setProcessedTimestamp(new Timestamp(System.currentTimeMillis()));
return processed;
}
private BigDecimal calculateRiskScore(TransactionRecord transaction) {
// Simple risk scoring based on amount and time
BigDecimal amount = transaction.getAmount();
long currentTime = System.currentTimeMillis();
long transactionTime = transaction.getTransactionDate().getTime();
// Higher risk for large amounts and late night transactions
BigDecimal baseScore = amount.divide(new BigDecimal("1000"));
Calendar cal = Calendar.getInstance();
cal.setTimeInMillis(transactionTime);
int hour = cal.get(Calendar.HOUR_OF_DAY);
if (hour < 6 || hour > 23) {
baseScore = baseScore.multiply(new BigDecimal("1.5"));
}
return baseScore.min(new BigDecimal("10.0")); // Cap at 10.0
}
private boolean detectFraud(TransactionRecord transaction, BigDecimal riskScore) {
// Simple fraud detection based on risk score
return riskScore.compareTo(new BigDecimal("7.5")) > 0;
}
}
@Override
public int run(String[] args) throws Exception {
Configuration conf = getConf();
// Get last processed timestamp
String lastProcessedTimestamp = getLastProcessedTimestamp(conf);
conf.set("last.processed.timestamp", lastProcessedTimestamp);
Job job = Job.getInstance(conf, "Incremental Transaction Processing");
job.setJarByClass(IncrementalProcessingJob.class);
// Configure input with time-based filter
String whereClause = "transaction_date > '" + lastProcessedTimestamp + "'";
PhoenixInputFormat.setInput(job, TransactionRecord.class, "transactions", whereClause,
"transaction_id", "customer_id", "amount", "transaction_date", "merchant");
// Configure output
PhoenixOutputFormat.setOutput(job, "processed_transactions",
"transaction_id", "customer_id", "amount", "risk_score",
"fraud_flag", "processed_timestamp");
job.setMapperClass(IncrementalMapper.class);
job.setNumReduceTasks(0); // Map-only job
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(ProcessedRecord.class);
job.setInputFormatClass(PhoenixInputFormat.class);
job.setOutputFormatClass(PhoenixOutputFormat.class);
boolean success = job.waitForCompletion(true);
if (success) {
// Update last processed timestamp
updateLastProcessedTimestamp(conf, new Timestamp(System.currentTimeMillis()));
}
return success ? 0 : 1;
}
private String getLastProcessedTimestamp(Configuration conf) throws SQLException {
// Query the control table to get last processed timestamp
String url = conf.get("phoenix.connection.url", "jdbc:phoenix:localhost:2181");
try (Connection conn = DriverManager.getConnection(url)) {
PreparedStatement stmt = conn.prepareStatement(
"SELECT last_processed_timestamp FROM processing_control WHERE job_name = ?"
);
stmt.setString(1, "incremental_transaction_processing");
ResultSet rs = stmt.executeQuery();
if (rs.next()) {
return rs.getTimestamp("last_processed_timestamp").toString();
} else {
// Return default timestamp if no record exists
return new Timestamp(System.currentTimeMillis() - 24 * 60 * 60 * 1000).toString();
}
}
}
private void updateLastProcessedTimestamp(Configuration conf, Timestamp timestamp) throws SQLException {
String url = conf.get("phoenix.connection.url", "jdbc:phoenix:localhost:2181");
try (Connection conn = DriverManager.getConnection(url)) {
PreparedStatement stmt = conn.prepareStatement(
"UPSERT INTO processing_control (job_name, last_processed_timestamp) VALUES (?, ?)"
);
stmt.setString(1, "incremental_transaction_processing");
stmt.setTimestamp(2, timestamp);
stmt.executeUpdate();
conn.commit();
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
int result = ToolRunner.run(conf, new IncrementalProcessingJob(), args);
System.exit(result);
}
}This comprehensive documentation covers Phoenix's MapReduce integration including input/output formats, bulk loading capabilities, and advanced processing patterns. The examples demonstrate practical usage scenarios for distributed data processing with Phoenix tables.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-phoenix--phoenix-core