0
# MapReduce Integration
1
2
Phoenix provides comprehensive MapReduce integration enabling distributed processing of large datasets stored in Phoenix tables. The integration includes input/output formats, bulk loading tools, and utilities for efficient data processing workflows.
3
4
## Core Imports
5
6
```java
7
import org.apache.phoenix.mapreduce.*;
8
import org.apache.phoenix.mapreduce.bulkload.*;
9
import org.apache.hadoop.mapreduce.*;
10
import org.apache.hadoop.mapreduce.lib.input.*;
11
import org.apache.hadoop.mapreduce.lib.output.*;
12
import org.apache.hadoop.io.*;
13
import org.apache.phoenix.util.ColumnInfo;
14
```
15
16
## Input/Output Formats
17
18
### PhoenixInputFormat
19
20
MapReduce InputFormat for reading data from Phoenix tables with SQL-based filtering and projection.
21
22
```java{ .api }
23
public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWritable, T> {
24
// Configuration methods
25
public static void setInput(Job job, Class<? extends DBWritable> inputClass,
26
String tableName, String conditions)
27
public static void setInput(Job job, Class<? extends DBWritable> inputClass,
28
String selectStatement)
29
public static void setInput(Job job, Class<? extends DBWritable> inputClass,
30
String tableName, String conditions, String... fieldNames)
31
32
// InputFormat implementation
33
public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException
34
public RecordReader<NullWritable, T> createRecordReader(InputSplit split, TaskAttemptContext context)
35
throws IOException, InterruptedException
36
37
// Phoenix-specific configuration
38
public static void setBatchSize(Configuration configuration, long batchSize)
39
public static void setSelectColumnList(Job job, String... columns)
40
public static void setSchemaType(Configuration configuration, SchemaType schemaType)
41
}
42
```
43
44
### PhoenixOutputFormat
45
46
MapReduce OutputFormat for writing data to Phoenix tables with automatic batching and transaction management.
47
48
```java{ .api }
49
public class PhoenixOutputFormat<T extends DBWritable> extends OutputFormat<NullWritable, T> {
50
// Configuration methods
51
public static void setOutput(Job job, String tableName, String... fieldNames)
52
public static void setOutput(Job job, String tableName, List<String> fieldNames)
53
54
// OutputFormat implementation
55
public RecordWriter<NullWritable, T> getRecordWriter(TaskAttemptContext context)
56
throws IOException, InterruptedException
57
public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException
58
public OutputCommitter getOutputCommitter(TaskAttemptContext context)
59
throws IOException, InterruptedException
60
61
// Phoenix-specific configuration
62
public static void setBatchSize(Configuration configuration, long batchSize)
63
public static void setUpsertStatement(Job job, String upsertStatement)
64
}
65
```
66
67
**Usage:**
68
```java
69
// Configure Phoenix input format
70
public class PhoenixMapReduceJob {
71
public static void configureInputJob(Job job) throws IOException {
72
// Set input configuration
73
PhoenixInputFormat.setInput(job,
74
EmployeeRecord.class,
75
"employees",
76
"department = 'ENGINEERING' AND salary > 50000",
77
"id", "name", "salary", "department");
78
79
// Optional: Configure batch size for better performance
80
PhoenixInputFormat.setBatchSize(job.getConfiguration(), 5000);
81
82
// Set mapper class
83
job.setMapperClass(EmployeeProcessingMapper.class);
84
job.setMapOutputKeyClass(Text.class);
85
job.setMapOutputValueClass(IntWritable.class);
86
}
87
88
public static void configureOutputJob(Job job) throws IOException {
89
// Set output configuration
90
PhoenixOutputFormat.setOutput(job, "employee_summary", "department", "avg_salary", "employee_count");
91
92
// Optional: Configure batch size
93
PhoenixOutputFormat.setBatchSize(job.getConfiguration(), 1000);
94
95
// Set reducer class
96
job.setReducerClass(SummaryReducer.class);
97
job.setOutputKeyClass(NullWritable.class);
98
job.setOutputValueClass(SummaryRecord.class);
99
}
100
}
101
102
// DBWritable implementation for input records
103
public class EmployeeRecord implements DBWritable, Writable {
104
private long id;
105
private String name;
106
private BigDecimal salary;
107
private String department;
108
109
// DBWritable implementation
110
@Override
111
public void readFields(ResultSet resultSet) throws SQLException {
112
id = resultSet.getLong("id");
113
name = resultSet.getString("name");
114
salary = resultSet.getBigDecimal("salary");
115
department = resultSet.getString("department");
116
}
117
118
@Override
119
public void write(PreparedStatement statement) throws SQLException {
120
statement.setLong(1, id);
121
statement.setString(2, name);
122
statement.setBigDecimal(3, salary);
123
statement.setString(4, department);
124
}
125
126
// Writable implementation
127
@Override
128
public void write(DataOutput out) throws IOException {
129
out.writeLong(id);
130
Text.writeString(out, name != null ? name : "");
131
out.writeUTF(salary != null ? salary.toString() : "0");
132
Text.writeString(out, department != null ? department : "");
133
}
134
135
@Override
136
public void readFields(DataInput in) throws IOException {
137
id = in.readLong();
138
name = Text.readString(in);
139
salary = new BigDecimal(in.readUTF());
140
department = Text.readString(in);
141
}
142
143
// Getters and setters
144
public long getId() { return id; }
145
public void setId(long id) { this.id = id; }
146
public String getName() { return name; }
147
public void setName(String name) { this.name = name; }
148
public BigDecimal getSalary() { return salary; }
149
public void setSalary(BigDecimal salary) { this.salary = salary; }
150
public String getDepartment() { return department; }
151
public void setDepartment(String department) { this.department = department; }
152
}
153
154
// DBWritable implementation for output records
155
public class SummaryRecord implements DBWritable, Writable {
156
private String department;
157
private BigDecimal avgSalary;
158
private int employeeCount;
159
160
@Override
161
public void readFields(ResultSet resultSet) throws SQLException {
162
department = resultSet.getString("department");
163
avgSalary = resultSet.getBigDecimal("avg_salary");
164
employeeCount = resultSet.getInt("employee_count");
165
}
166
167
@Override
168
public void write(PreparedStatement statement) throws SQLException {
169
statement.setString(1, department);
170
statement.setBigDecimal(2, avgSalary);
171
statement.setInt(3, employeeCount);
172
}
173
174
@Override
175
public void write(DataOutput out) throws IOException {
176
Text.writeString(out, department != null ? department : "");
177
out.writeUTF(avgSalary != null ? avgSalary.toString() : "0");
178
out.writeInt(employeeCount);
179
}
180
181
@Override
182
public void readFields(DataInput in) throws IOException {
183
department = Text.readString(in);
184
avgSalary = new BigDecimal(in.readUTF());
185
employeeCount = in.readInt();
186
}
187
188
// Getters and setters
189
public String getDepartment() { return department; }
190
public void setDepartment(String department) { this.department = department; }
191
public BigDecimal getAvgSalary() { return avgSalary; }
192
public void setAvgSalary(BigDecimal avgSalary) { this.avgSalary = avgSalary; }
193
public int getEmployeeCount() { return employeeCount; }
194
public void setEmployeeCount(int employeeCount) { this.employeeCount = employeeCount; }
195
}
196
```
197
198
## Bulk Loading Tools
199
200
### BulkLoadTool
201
202
Tool for efficient bulk loading of large datasets into Phoenix tables.
203
204
```java{ .api }
205
public class BulkLoadTool extends Configured implements Tool {
206
// Main execution method
207
public int run(String[] args) throws Exception
208
209
// Configuration options
210
public static class Options {
211
public String getInputPath()
212
public String getTableName()
213
public String getZkQuorum()
214
public char getFieldDelimiter()
215
public char getQuoteChar()
216
public char getEscapeChar()
217
public String getArrayElementSeparator()
218
public boolean isStrict()
219
public List<ColumnInfo> getColumns()
220
}
221
222
// Bulk loading methods
223
public static void bulkLoad(Configuration conf, String inputPath, String tableName,
224
List<ColumnInfo> columnInfos) throws Exception
225
public static Job createBulkLoadJob(Configuration conf, String inputPath, String tableName,
226
List<ColumnInfo> columnInfos) throws IOException
227
}
228
```
229
230
**Usage:**
231
```java
232
// Bulk load CSV data into Phoenix table
233
public class BulkLoadExample {
234
public static void performBulkLoad() throws Exception {
235
Configuration conf = HBaseConfiguration.create();
236
conf.set("hbase.zookeeper.quorum", "localhost:2181");
237
238
// Define column information
239
List<ColumnInfo> columnInfos = Arrays.asList(
240
new ColumnInfo("id", Types.BIGINT),
241
new ColumnInfo("name", Types.VARCHAR),
242
new ColumnInfo("email", Types.VARCHAR),
243
new ColumnInfo("salary", Types.DECIMAL),
244
new ColumnInfo("department", Types.VARCHAR),
245
new ColumnInfo("hire_date", Types.DATE)
246
);
247
248
// Configure decimal precision
249
ColumnInfo salaryColumn = columnInfos.get(3);
250
salaryColumn.setPrecision(10);
251
salaryColumn.setScale(2);
252
253
// Perform bulk load
254
String inputPath = "hdfs://namenode:port/path/to/csv/files";
255
String tableName = "employees";
256
257
BulkLoadTool.bulkLoad(conf, inputPath, tableName, columnInfos);
258
System.out.println("Bulk load completed successfully");
259
}
260
261
public static void performCustomBulkLoad() throws Exception {
262
Configuration conf = HBaseConfiguration.create();
263
264
// Create bulk load job with custom configuration
265
List<ColumnInfo> columnInfos = createColumnInfos();
266
Job job = BulkLoadTool.createBulkLoadJob(conf, "input/path", "target_table", columnInfos);
267
268
// Customize job settings
269
job.setJobName("Custom Phoenix Bulk Load");
270
job.getConfiguration().set("phoenix.bulk.load.delimiter", "|");
271
job.getConfiguration().set("phoenix.bulk.load.quote", "\"");
272
job.getConfiguration().setBoolean("phoenix.bulk.load.strict", true);
273
274
// Submit and wait for completion
275
boolean success = job.waitForCompletion(true);
276
if (success) {
277
System.out.println("Bulk load job completed successfully");
278
} else {
279
System.err.println("Bulk load job failed");
280
}
281
}
282
283
private static List<ColumnInfo> createColumnInfos() {
284
return Arrays.asList(
285
new ColumnInfo("transaction_id", Types.BIGINT),
286
new ColumnInfo("customer_id", Types.BIGINT),
287
new ColumnInfo("amount", Types.DECIMAL),
288
new ColumnInfo("transaction_date", Types.TIMESTAMP),
289
new ColumnInfo("merchant", Types.VARCHAR),
290
new ColumnInfo("category", Types.VARCHAR)
291
);
292
}
293
}
294
295
// Command line bulk load
296
public class BulkLoadRunner {
297
public static void main(String[] args) throws Exception {
298
Configuration conf = HBaseConfiguration.create();
299
300
// Set up tool options
301
BulkLoadTool tool = new BulkLoadTool();
302
tool.setConf(conf);
303
304
// Command line arguments: input_path table_name zk_quorum
305
String[] bulkLoadArgs = {
306
"hdfs://namenode:port/data/transactions.csv",
307
"transactions",
308
"zk1,zk2,zk3:2181"
309
};
310
311
int result = tool.run(bulkLoadArgs);
312
System.exit(result);
313
}
314
}
315
```
316
317
## MapReduce Job Examples
318
319
### Data Processing Job
320
321
```java
322
// Complete MapReduce job for Phoenix data processing
323
public class PhoenixDataProcessingJob extends Configured implements Tool {
324
325
// Mapper class
326
public static class DataProcessingMapper
327
extends Mapper<NullWritable, EmployeeRecord, Text, LongWritable> {
328
329
@Override
330
protected void map(NullWritable key, EmployeeRecord employee, Context context)
331
throws IOException, InterruptedException {
332
333
// Process employee data
334
String department = employee.getDepartment();
335
BigDecimal salary = employee.getSalary();
336
337
// Emit department and salary
338
if (department != null && salary != null) {
339
context.write(new Text(department), new LongWritable(salary.longValue()));
340
341
// Additional processing based on salary ranges
342
if (salary.compareTo(new BigDecimal("100000")) > 0) {
343
context.write(new Text("HIGH_EARNERS"), new LongWritable(1));
344
}
345
}
346
}
347
}
348
349
// Reducer class
350
public static class SalaryAggregationReducer
351
extends Reducer<Text, LongWritable, NullWritable, SummaryRecord> {
352
353
@Override
354
protected void reduce(Text key, Iterable<LongWritable> values, Context context)
355
throws IOException, InterruptedException {
356
357
long sum = 0;
358
int count = 0;
359
360
for (LongWritable value : values) {
361
sum += value.get();
362
count++;
363
}
364
365
// Create summary record
366
SummaryRecord summary = new SummaryRecord();
367
summary.setDepartment(key.toString());
368
summary.setAvgSalary(new BigDecimal(sum / count));
369
summary.setEmployeeCount(count);
370
371
context.write(NullWritable.get(), summary);
372
}
373
}
374
375
@Override
376
public int run(String[] args) throws Exception {
377
Configuration conf = getConf();
378
Job job = Job.getInstance(conf, "Phoenix Data Processing");
379
380
job.setJarByClass(PhoenixDataProcessingJob.class);
381
382
// Configure input
383
PhoenixInputFormat.setInput(job, EmployeeRecord.class,
384
"employees",
385
"status = 'ACTIVE'",
386
"id", "name", "salary", "department");
387
388
// Configure output
389
PhoenixOutputFormat.setOutput(job, "department_summary",
390
"department", "avg_salary", "employee_count");
391
392
// Configure mapper and reducer
393
job.setMapperClass(DataProcessingMapper.class);
394
job.setReducerClass(SalaryAggregationReducer.class);
395
396
job.setMapOutputKeyClass(Text.class);
397
job.setMapOutputValueClass(LongWritable.class);
398
399
job.setOutputKeyClass(NullWritable.class);
400
job.setOutputValueClass(SummaryRecord.class);
401
402
// Configure input and output formats
403
job.setInputFormatClass(PhoenixInputFormat.class);
404
job.setOutputFormatClass(PhoenixOutputFormat.class);
405
406
return job.waitForCompletion(true) ? 0 : 1;
407
}
408
409
public static void main(String[] args) throws Exception {
410
Configuration conf = HBaseConfiguration.create();
411
int result = ToolRunner.run(conf, new PhoenixDataProcessingJob(), args);
412
System.exit(result);
413
}
414
}
415
```
416
417
### ETL Pipeline Job
418
419
```java
420
// ETL (Extract, Transform, Load) pipeline using Phoenix MapReduce
421
public class PhoenixETLPipeline extends Configured implements Tool {
422
423
// Mapper for data transformation
424
public static class ETLMapper
425
extends Mapper<NullWritable, TransactionRecord, NullWritable, TransformedRecord> {
426
427
private static final BigDecimal CURRENCY_CONVERSION_RATE = new BigDecimal("1.25");
428
429
@Override
430
protected void map(NullWritable key, TransactionRecord transaction, Context context)
431
throws IOException, InterruptedException {
432
433
// Transform the data
434
TransformedRecord transformed = new TransformedRecord();
435
transformed.setTransactionId(transaction.getTransactionId());
436
transformed.setCustomerId(transaction.getCustomerId());
437
438
// Convert currency
439
BigDecimal originalAmount = transaction.getAmount();
440
BigDecimal convertedAmount = originalAmount.multiply(CURRENCY_CONVERSION_RATE);
441
transformed.setConvertedAmount(convertedAmount);
442
443
// Categorize transaction
444
String category = categorizeTransaction(transaction.getMerchant(), originalAmount);
445
transformed.setCategory(category);
446
447
// Add processing timestamp
448
transformed.setProcessedTimestamp(new Timestamp(System.currentTimeMillis()));
449
450
// Emit transformed record
451
context.write(NullWritable.get(), transformed);
452
}
453
454
private String categorizeTransaction(String merchant, BigDecimal amount) {
455
if (merchant.toLowerCase().contains("grocery") ||
456
merchant.toLowerCase().contains("supermarket")) {
457
return "GROCERY";
458
} else if (merchant.toLowerCase().contains("gas") ||
459
merchant.toLowerCase().contains("fuel")) {
460
return "FUEL";
461
} else if (amount.compareTo(new BigDecimal("500")) > 0) {
462
return "LARGE_PURCHASE";
463
} else {
464
return "OTHER";
465
}
466
}
467
}
468
469
@Override
470
public int run(String[] args) throws Exception {
471
Configuration conf = getConf();
472
Job job = Job.getInstance(conf, "Phoenix ETL Pipeline");
473
474
job.setJarByClass(PhoenixETLPipeline.class);
475
476
// Configure input from source table
477
PhoenixInputFormat.setInput(job, TransactionRecord.class,
478
"raw_transactions",
479
"processed_flag IS NULL OR processed_flag = false",
480
"transaction_id", "customer_id", "amount",
481
"transaction_date", "merchant");
482
483
// Configure output to transformed table
484
PhoenixOutputFormat.setOutput(job, "transformed_transactions",
485
"transaction_id", "customer_id", "converted_amount",
486
"category", "processed_timestamp");
487
488
// This is a map-only job (no reducer needed)
489
job.setMapperClass(ETLMapper.class);
490
job.setNumReduceTasks(0);
491
492
job.setOutputKeyClass(NullWritable.class);
493
job.setOutputValueClass(TransformedRecord.class);
494
495
job.setInputFormatClass(PhoenixInputFormat.class);
496
job.setOutputFormatClass(PhoenixOutputFormat.class);
497
498
// Set batch size for better performance
499
PhoenixInputFormat.setBatchSize(conf, 10000);
500
PhoenixOutputFormat.setBatchSize(conf, 5000);
501
502
return job.waitForCompletion(true) ? 0 : 1;
503
}
504
505
public static void main(String[] args) throws Exception {
506
Configuration conf = HBaseConfiguration.create();
507
int result = ToolRunner.run(conf, new PhoenixETLPipeline(), args);
508
System.exit(result);
509
}
510
}
511
512
// Supporting record classes
513
public class TransactionRecord implements DBWritable, Writable {
514
private long transactionId;
515
private long customerId;
516
private BigDecimal amount;
517
private Timestamp transactionDate;
518
private String merchant;
519
520
@Override
521
public void readFields(ResultSet resultSet) throws SQLException {
522
transactionId = resultSet.getLong("transaction_id");
523
customerId = resultSet.getLong("customer_id");
524
amount = resultSet.getBigDecimal("amount");
525
transactionDate = resultSet.getTimestamp("transaction_date");
526
merchant = resultSet.getString("merchant");
527
}
528
529
@Override
530
public void write(PreparedStatement statement) throws SQLException {
531
statement.setLong(1, transactionId);
532
statement.setLong(2, customerId);
533
statement.setBigDecimal(3, amount);
534
statement.setTimestamp(4, transactionDate);
535
statement.setString(5, merchant);
536
}
537
538
// Writable implementation and getters/setters omitted for brevity...
539
}
540
541
public class TransformedRecord implements DBWritable, Writable {
542
private long transactionId;
543
private long customerId;
544
private BigDecimal convertedAmount;
545
private String category;
546
private Timestamp processedTimestamp;
547
548
@Override
549
public void readFields(ResultSet resultSet) throws SQLException {
550
transactionId = resultSet.getLong("transaction_id");
551
customerId = resultSet.getLong("customer_id");
552
convertedAmount = resultSet.getBigDecimal("converted_amount");
553
category = resultSet.getString("category");
554
processedTimestamp = resultSet.getTimestamp("processed_timestamp");
555
}
556
557
@Override
558
public void write(PreparedStatement statement) throws SQLException {
559
statement.setLong(1, transactionId);
560
statement.setLong(2, customerId);
561
statement.setBigDecimal(3, convertedAmount);
562
statement.setString(4, category);
563
statement.setTimestamp(5, processedTimestamp);
564
}
565
566
// Writable implementation and getters/setters omitted for brevity...
567
}
568
```
569
570
## Advanced MapReduce Patterns
571
572
### Multi-Table Join Job
573
574
```java
575
// MapReduce job performing joins across multiple Phoenix tables
576
public class MultiTableJoinJob extends Configured implements Tool {
577
578
// Mapper for customer data
579
public static class CustomerMapper
580
extends Mapper<NullWritable, CustomerRecord, LongWritable, Text> {
581
582
@Override
583
protected void map(NullWritable key, CustomerRecord customer, Context context)
584
throws IOException, InterruptedException {
585
586
// Emit customer ID as key with customer data as value
587
String customerData = "CUSTOMER:" + customer.getName() + "," +
588
customer.getEmail() + "," + customer.getSegment();
589
context.write(new LongWritable(customer.getCustomerId()), new Text(customerData));
590
}
591
}
592
593
// Mapper for order data
594
public static class OrderMapper
595
extends Mapper<NullWritable, OrderRecord, LongWritable, Text> {
596
597
@Override
598
protected void map(NullWritable key, OrderRecord order, Context context)
599
throws IOException, InterruptedException {
600
601
// Emit customer ID as key with order data as value
602
String orderData = "ORDER:" + order.getOrderId() + "," +
603
order.getOrderDate() + "," + order.getTotalAmount();
604
context.write(new LongWritable(order.getCustomerId()), new Text(orderData));
605
}
606
}
607
608
// Reducer to perform the join
609
public static class JoinReducer
610
extends Reducer<LongWritable, Text, NullWritable, CustomerOrderRecord> {
611
612
@Override
613
protected void reduce(LongWritable customerId, Iterable<Text> values, Context context)
614
throws IOException, InterruptedException {
615
616
List<String> customerData = new ArrayList<>();
617
List<String> orderData = new ArrayList<>();
618
619
// Separate customer and order data
620
for (Text value : values) {
621
String valueStr = value.toString();
622
if (valueStr.startsWith("CUSTOMER:")) {
623
customerData.add(valueStr.substring(9));
624
} else if (valueStr.startsWith("ORDER:")) {
625
orderData.add(valueStr.substring(6));
626
}
627
}
628
629
// Perform join - emit record for each customer-order combination
630
for (String customer : customerData) {
631
for (String order : orderData) {
632
CustomerOrderRecord joined = createJoinedRecord(customerId.get(), customer, order);
633
context.write(NullWritable.get(), joined);
634
}
635
}
636
}
637
638
private CustomerOrderRecord createJoinedRecord(long customerId, String customerData, String orderData) {
639
String[] customerParts = customerData.split(",");
640
String[] orderParts = orderData.split(",");
641
642
CustomerOrderRecord record = new CustomerOrderRecord();
643
record.setCustomerId(customerId);
644
record.setCustomerName(customerParts[0]);
645
record.setCustomerEmail(customerParts[1]);
646
record.setCustomerSegment(customerParts[2]);
647
record.setOrderId(Long.parseLong(orderParts[0]));
648
record.setOrderDate(Date.valueOf(orderParts[1]));
649
record.setOrderAmount(new BigDecimal(orderParts[2]));
650
651
return record;
652
}
653
}
654
655
@Override
656
public int run(String[] args) throws Exception {
657
Configuration conf = getConf();
658
659
// Job 1: Process customers
660
Job customerJob = Job.getInstance(conf, "Customer Processing");
661
customerJob.setJarByClass(MultiTableJoinJob.class);
662
663
PhoenixInputFormat.setInput(customerJob, CustomerRecord.class, "customers");
664
customerJob.setMapperClass(CustomerMapper.class);
665
customerJob.setNumReduceTasks(0);
666
667
Path customerOutput = new Path("/tmp/customers");
668
FileOutputFormat.setOutputPath(customerJob, customerOutput);
669
670
// Job 2: Process orders
671
Job orderJob = Job.getInstance(conf, "Order Processing");
672
orderJob.setJarByClass(MultiTableJoinJob.class);
673
674
PhoenixInputFormat.setInput(orderJob, OrderRecord.class, "orders");
675
orderJob.setMapperClass(OrderMapper.class);
676
orderJob.setNumReduceTasks(0);
677
678
Path orderOutput = new Path("/tmp/orders");
679
FileOutputFormat.setOutputPath(orderJob, orderOutput);
680
681
// Wait for both jobs to complete
682
boolean success = customerJob.waitForCompletion(true) && orderJob.waitForCompletion(true);
683
if (!success) {
684
return 1;
685
}
686
687
// Job 3: Join the results
688
Job joinJob = Job.getInstance(conf, "Customer Order Join");
689
joinJob.setJarByClass(MultiTableJoinJob.class);
690
691
FileInputFormat.addInputPath(joinJob, customerOutput);
692
FileInputFormat.addInputPath(joinJob, orderOutput);
693
694
PhoenixOutputFormat.setOutput(joinJob, "customer_orders",
695
"customer_id", "customer_name", "customer_email",
696
"customer_segment", "order_id", "order_date", "order_amount");
697
698
joinJob.setReducerClass(JoinReducer.class);
699
joinJob.setOutputKeyClass(NullWritable.class);
700
joinJob.setOutputValueClass(CustomerOrderRecord.class);
701
joinJob.setOutputFormatClass(PhoenixOutputFormat.class);
702
703
return joinJob.waitForCompletion(true) ? 0 : 1;
704
}
705
}
706
```
707
708
### Incremental Data Processing
709
710
```java
711
// Incremental processing pattern for Phoenix MapReduce
712
public class IncrementalProcessingJob extends Configured implements Tool {
713
714
public static class IncrementalMapper
715
extends Mapper<NullWritable, TransactionRecord, NullWritable, ProcessedRecord> {
716
717
private Timestamp lastProcessedTime;
718
719
@Override
720
protected void setup(Context context) throws IOException, InterruptedException {
721
// Get last processed timestamp from configuration
722
String lastProcessedStr = context.getConfiguration().get("last.processed.timestamp");
723
if (lastProcessedStr != null) {
724
lastProcessedTime = Timestamp.valueOf(lastProcessedStr);
725
} else {
726
// Default to 24 hours ago if no timestamp provided
727
lastProcessedTime = new Timestamp(System.currentTimeMillis() - 24 * 60 * 60 * 1000);
728
}
729
}
730
731
@Override
732
protected void map(NullWritable key, TransactionRecord transaction, Context context)
733
throws IOException, InterruptedException {
734
735
// Only process records newer than last processed time
736
if (transaction.getTransactionDate().after(lastProcessedTime)) {
737
ProcessedRecord processed = processTransaction(transaction);
738
context.write(NullWritable.get(), processed);
739
}
740
}
741
742
private ProcessedRecord processTransaction(TransactionRecord transaction) {
743
ProcessedRecord processed = new ProcessedRecord();
744
processed.setTransactionId(transaction.getTransactionId());
745
processed.setCustomerId(transaction.getCustomerId());
746
processed.setAmount(transaction.getAmount());
747
748
// Add risk score calculation
749
BigDecimal riskScore = calculateRiskScore(transaction);
750
processed.setRiskScore(riskScore);
751
752
// Add fraud flag
753
boolean isFraud = detectFraud(transaction, riskScore);
754
processed.setFraudFlag(isFraud);
755
756
processed.setProcessedTimestamp(new Timestamp(System.currentTimeMillis()));
757
758
return processed;
759
}
760
761
private BigDecimal calculateRiskScore(TransactionRecord transaction) {
762
// Simple risk scoring based on amount and time
763
BigDecimal amount = transaction.getAmount();
764
long currentTime = System.currentTimeMillis();
765
long transactionTime = transaction.getTransactionDate().getTime();
766
767
// Higher risk for large amounts and late night transactions
768
BigDecimal baseScore = amount.divide(new BigDecimal("1000"));
769
770
Calendar cal = Calendar.getInstance();
771
cal.setTimeInMillis(transactionTime);
772
int hour = cal.get(Calendar.HOUR_OF_DAY);
773
774
if (hour < 6 || hour > 23) {
775
baseScore = baseScore.multiply(new BigDecimal("1.5"));
776
}
777
778
return baseScore.min(new BigDecimal("10.0")); // Cap at 10.0
779
}
780
781
private boolean detectFraud(TransactionRecord transaction, BigDecimal riskScore) {
782
// Simple fraud detection based on risk score
783
return riskScore.compareTo(new BigDecimal("7.5")) > 0;
784
}
785
}
786
787
@Override
788
public int run(String[] args) throws Exception {
789
Configuration conf = getConf();
790
791
// Get last processed timestamp
792
String lastProcessedTimestamp = getLastProcessedTimestamp(conf);
793
conf.set("last.processed.timestamp", lastProcessedTimestamp);
794
795
Job job = Job.getInstance(conf, "Incremental Transaction Processing");
796
job.setJarByClass(IncrementalProcessingJob.class);
797
798
// Configure input with time-based filter
799
String whereClause = "transaction_date > '" + lastProcessedTimestamp + "'";
800
PhoenixInputFormat.setInput(job, TransactionRecord.class, "transactions", whereClause,
801
"transaction_id", "customer_id", "amount", "transaction_date", "merchant");
802
803
// Configure output
804
PhoenixOutputFormat.setOutput(job, "processed_transactions",
805
"transaction_id", "customer_id", "amount", "risk_score",
806
"fraud_flag", "processed_timestamp");
807
808
job.setMapperClass(IncrementalMapper.class);
809
job.setNumReduceTasks(0); // Map-only job
810
811
job.setOutputKeyClass(NullWritable.class);
812
job.setOutputValueClass(ProcessedRecord.class);
813
814
job.setInputFormatClass(PhoenixInputFormat.class);
815
job.setOutputFormatClass(PhoenixOutputFormat.class);
816
817
boolean success = job.waitForCompletion(true);
818
819
if (success) {
820
// Update last processed timestamp
821
updateLastProcessedTimestamp(conf, new Timestamp(System.currentTimeMillis()));
822
}
823
824
return success ? 0 : 1;
825
}
826
827
private String getLastProcessedTimestamp(Configuration conf) throws SQLException {
828
// Query the control table to get last processed timestamp
829
String url = conf.get("phoenix.connection.url", "jdbc:phoenix:localhost:2181");
830
831
try (Connection conn = DriverManager.getConnection(url)) {
832
PreparedStatement stmt = conn.prepareStatement(
833
"SELECT last_processed_timestamp FROM processing_control WHERE job_name = ?"
834
);
835
stmt.setString(1, "incremental_transaction_processing");
836
837
ResultSet rs = stmt.executeQuery();
838
if (rs.next()) {
839
return rs.getTimestamp("last_processed_timestamp").toString();
840
} else {
841
// Return default timestamp if no record exists
842
return new Timestamp(System.currentTimeMillis() - 24 * 60 * 60 * 1000).toString();
843
}
844
}
845
}
846
847
private void updateLastProcessedTimestamp(Configuration conf, Timestamp timestamp) throws SQLException {
848
String url = conf.get("phoenix.connection.url", "jdbc:phoenix:localhost:2181");
849
850
try (Connection conn = DriverManager.getConnection(url)) {
851
PreparedStatement stmt = conn.prepareStatement(
852
"UPSERT INTO processing_control (job_name, last_processed_timestamp) VALUES (?, ?)"
853
);
854
stmt.setString(1, "incremental_transaction_processing");
855
stmt.setTimestamp(2, timestamp);
856
stmt.executeUpdate();
857
conn.commit();
858
}
859
}
860
861
public static void main(String[] args) throws Exception {
862
Configuration conf = HBaseConfiguration.create();
863
int result = ToolRunner.run(conf, new IncrementalProcessingJob(), args);
864
System.exit(result);
865
}
866
}
867
```
868
869
This comprehensive documentation covers Phoenix's MapReduce integration including input/output formats, bulk loading capabilities, and advanced processing patterns. The examples demonstrate practical usage scenarios for distributed data processing with Phoenix tables.