or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration.mdexceptions.mdexecution.mdexpressions.mdindex.mdjdbc.mdmapreduce.mdmonitoring.mdquery-compilation.mdschema-metadata.mdserver.mdtransactions.mdtypes.md

mapreduce.mddocs/

0

# MapReduce Integration

1

2

Phoenix provides comprehensive MapReduce integration enabling distributed processing of large datasets stored in Phoenix tables. The integration includes input/output formats, bulk loading tools, and utilities for efficient data processing workflows.

3

4

## Core Imports

5

6

```java

7

import org.apache.phoenix.mapreduce.*;

8

import org.apache.phoenix.mapreduce.bulkload.*;

9

import org.apache.hadoop.mapreduce.*;

10

import org.apache.hadoop.mapreduce.lib.input.*;

11

import org.apache.hadoop.mapreduce.lib.output.*;

12

import org.apache.hadoop.io.*;

13

import org.apache.phoenix.util.ColumnInfo;

14

```

15

16

## Input/Output Formats

17

18

### PhoenixInputFormat

19

20

MapReduce InputFormat for reading data from Phoenix tables with SQL-based filtering and projection.

21

22

```java{ .api }

23

public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWritable, T> {

24

// Configuration methods

25

public static void setInput(Job job, Class<? extends DBWritable> inputClass,

26

String tableName, String conditions)

27

public static void setInput(Job job, Class<? extends DBWritable> inputClass,

28

String selectStatement)

29

public static void setInput(Job job, Class<? extends DBWritable> inputClass,

30

String tableName, String conditions, String... fieldNames)

31

32

// InputFormat implementation

33

public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException

34

public RecordReader<NullWritable, T> createRecordReader(InputSplit split, TaskAttemptContext context)

35

throws IOException, InterruptedException

36

37

// Phoenix-specific configuration

38

public static void setBatchSize(Configuration configuration, long batchSize)

39

public static void setSelectColumnList(Job job, String... columns)

40

public static void setSchemaType(Configuration configuration, SchemaType schemaType)

41

}

42

```

43

44

### PhoenixOutputFormat

45

46

MapReduce OutputFormat for writing data to Phoenix tables with automatic batching and transaction management.

47

48

```java{ .api }

49

public class PhoenixOutputFormat<T extends DBWritable> extends OutputFormat<NullWritable, T> {

50

// Configuration methods

51

public static void setOutput(Job job, String tableName, String... fieldNames)

52

public static void setOutput(Job job, String tableName, List<String> fieldNames)

53

54

// OutputFormat implementation

55

public RecordWriter<NullWritable, T> getRecordWriter(TaskAttemptContext context)

56

throws IOException, InterruptedException

57

public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException

58

public OutputCommitter getOutputCommitter(TaskAttemptContext context)

59

throws IOException, InterruptedException

60

61

// Phoenix-specific configuration

62

public static void setBatchSize(Configuration configuration, long batchSize)

63

public static void setUpsertStatement(Job job, String upsertStatement)

64

}

65

```

66

67

**Usage:**

68

```java

69

// Configure Phoenix input format

70

public class PhoenixMapReduceJob {

71

public static void configureInputJob(Job job) throws IOException {

72

// Set input configuration

73

PhoenixInputFormat.setInput(job,

74

EmployeeRecord.class,

75

"employees",

76

"department = 'ENGINEERING' AND salary > 50000",

77

"id", "name", "salary", "department");

78

79

// Optional: Configure batch size for better performance

80

PhoenixInputFormat.setBatchSize(job.getConfiguration(), 5000);

81

82

// Set mapper class

83

job.setMapperClass(EmployeeProcessingMapper.class);

84

job.setMapOutputKeyClass(Text.class);

85

job.setMapOutputValueClass(IntWritable.class);

86

}

87

88

public static void configureOutputJob(Job job) throws IOException {

89

// Set output configuration

90

PhoenixOutputFormat.setOutput(job, "employee_summary", "department", "avg_salary", "employee_count");

91

92

// Optional: Configure batch size

93

PhoenixOutputFormat.setBatchSize(job.getConfiguration(), 1000);

94

95

// Set reducer class

96

job.setReducerClass(SummaryReducer.class);

97

job.setOutputKeyClass(NullWritable.class);

98

job.setOutputValueClass(SummaryRecord.class);

99

}

100

}

101

102

// DBWritable implementation for input records

103

public class EmployeeRecord implements DBWritable, Writable {

104

private long id;

105

private String name;

106

private BigDecimal salary;

107

private String department;

108

109

// DBWritable implementation

110

@Override

111

public void readFields(ResultSet resultSet) throws SQLException {

112

id = resultSet.getLong("id");

113

name = resultSet.getString("name");

114

salary = resultSet.getBigDecimal("salary");

115

department = resultSet.getString("department");

116

}

117

118

@Override

119

public void write(PreparedStatement statement) throws SQLException {

120

statement.setLong(1, id);

121

statement.setString(2, name);

122

statement.setBigDecimal(3, salary);

123

statement.setString(4, department);

124

}

125

126

// Writable implementation

127

@Override

128

public void write(DataOutput out) throws IOException {

129

out.writeLong(id);

130

Text.writeString(out, name != null ? name : "");

131

out.writeUTF(salary != null ? salary.toString() : "0");

132

Text.writeString(out, department != null ? department : "");

133

}

134

135

@Override

136

public void readFields(DataInput in) throws IOException {

137

id = in.readLong();

138

name = Text.readString(in);

139

salary = new BigDecimal(in.readUTF());

140

department = Text.readString(in);

141

}

142

143

// Getters and setters

144

public long getId() { return id; }

145

public void setId(long id) { this.id = id; }

146

public String getName() { return name; }

147

public void setName(String name) { this.name = name; }

148

public BigDecimal getSalary() { return salary; }

149

public void setSalary(BigDecimal salary) { this.salary = salary; }

150

public String getDepartment() { return department; }

151

public void setDepartment(String department) { this.department = department; }

152

}

153

154

// DBWritable implementation for output records

155

public class SummaryRecord implements DBWritable, Writable {

156

private String department;

157

private BigDecimal avgSalary;

158

private int employeeCount;

159

160

@Override

161

public void readFields(ResultSet resultSet) throws SQLException {

162

department = resultSet.getString("department");

163

avgSalary = resultSet.getBigDecimal("avg_salary");

164

employeeCount = resultSet.getInt("employee_count");

165

}

166

167

@Override

168

public void write(PreparedStatement statement) throws SQLException {

169

statement.setString(1, department);

170

statement.setBigDecimal(2, avgSalary);

171

statement.setInt(3, employeeCount);

172

}

173

174

@Override

175

public void write(DataOutput out) throws IOException {

176

Text.writeString(out, department != null ? department : "");

177

out.writeUTF(avgSalary != null ? avgSalary.toString() : "0");

178

out.writeInt(employeeCount);

179

}

180

181

@Override

182

public void readFields(DataInput in) throws IOException {

183

department = Text.readString(in);

184

avgSalary = new BigDecimal(in.readUTF());

185

employeeCount = in.readInt();

186

}

187

188

// Getters and setters

189

public String getDepartment() { return department; }

190

public void setDepartment(String department) { this.department = department; }

191

public BigDecimal getAvgSalary() { return avgSalary; }

192

public void setAvgSalary(BigDecimal avgSalary) { this.avgSalary = avgSalary; }

193

public int getEmployeeCount() { return employeeCount; }

194

public void setEmployeeCount(int employeeCount) { this.employeeCount = employeeCount; }

195

}

196

```

197

198

## Bulk Loading Tools

199

200

### BulkLoadTool

201

202

Tool for efficient bulk loading of large datasets into Phoenix tables.

203

204

```java{ .api }

205

public class BulkLoadTool extends Configured implements Tool {

206

// Main execution method

207

public int run(String[] args) throws Exception

208

209

// Configuration options

210

public static class Options {

211

public String getInputPath()

212

public String getTableName()

213

public String getZkQuorum()

214

public char getFieldDelimiter()

215

public char getQuoteChar()

216

public char getEscapeChar()

217

public String getArrayElementSeparator()

218

public boolean isStrict()

219

public List<ColumnInfo> getColumns()

220

}

221

222

// Bulk loading methods

223

public static void bulkLoad(Configuration conf, String inputPath, String tableName,

224

List<ColumnInfo> columnInfos) throws Exception

225

public static Job createBulkLoadJob(Configuration conf, String inputPath, String tableName,

226

List<ColumnInfo> columnInfos) throws IOException

227

}

228

```

229

230

**Usage:**

231

```java

232

// Bulk load CSV data into Phoenix table

233

public class BulkLoadExample {

234

public static void performBulkLoad() throws Exception {

235

Configuration conf = HBaseConfiguration.create();

236

conf.set("hbase.zookeeper.quorum", "localhost:2181");

237

238

// Define column information

239

List<ColumnInfo> columnInfos = Arrays.asList(

240

new ColumnInfo("id", Types.BIGINT),

241

new ColumnInfo("name", Types.VARCHAR),

242

new ColumnInfo("email", Types.VARCHAR),

243

new ColumnInfo("salary", Types.DECIMAL),

244

new ColumnInfo("department", Types.VARCHAR),

245

new ColumnInfo("hire_date", Types.DATE)

246

);

247

248

// Configure decimal precision

249

ColumnInfo salaryColumn = columnInfos.get(3);

250

salaryColumn.setPrecision(10);

251

salaryColumn.setScale(2);

252

253

// Perform bulk load

254

String inputPath = "hdfs://namenode:port/path/to/csv/files";

255

String tableName = "employees";

256

257

BulkLoadTool.bulkLoad(conf, inputPath, tableName, columnInfos);

258

System.out.println("Bulk load completed successfully");

259

}

260

261

public static void performCustomBulkLoad() throws Exception {

262

Configuration conf = HBaseConfiguration.create();

263

264

// Create bulk load job with custom configuration

265

List<ColumnInfo> columnInfos = createColumnInfos();

266

Job job = BulkLoadTool.createBulkLoadJob(conf, "input/path", "target_table", columnInfos);

267

268

// Customize job settings

269

job.setJobName("Custom Phoenix Bulk Load");

270

job.getConfiguration().set("phoenix.bulk.load.delimiter", "|");

271

job.getConfiguration().set("phoenix.bulk.load.quote", "\"");

272

job.getConfiguration().setBoolean("phoenix.bulk.load.strict", true);

273

274

// Submit and wait for completion

275

boolean success = job.waitForCompletion(true);

276

if (success) {

277

System.out.println("Bulk load job completed successfully");

278

} else {

279

System.err.println("Bulk load job failed");

280

}

281

}

282

283

private static List<ColumnInfo> createColumnInfos() {

284

return Arrays.asList(

285

new ColumnInfo("transaction_id", Types.BIGINT),

286

new ColumnInfo("customer_id", Types.BIGINT),

287

new ColumnInfo("amount", Types.DECIMAL),

288

new ColumnInfo("transaction_date", Types.TIMESTAMP),

289

new ColumnInfo("merchant", Types.VARCHAR),

290

new ColumnInfo("category", Types.VARCHAR)

291

);

292

}

293

}

294

295

// Command line bulk load

296

public class BulkLoadRunner {

297

public static void main(String[] args) throws Exception {

298

Configuration conf = HBaseConfiguration.create();

299

300

// Set up tool options

301

BulkLoadTool tool = new BulkLoadTool();

302

tool.setConf(conf);

303

304

// Command line arguments: input_path table_name zk_quorum

305

String[] bulkLoadArgs = {

306

"hdfs://namenode:port/data/transactions.csv",

307

"transactions",

308

"zk1,zk2,zk3:2181"

309

};

310

311

int result = tool.run(bulkLoadArgs);

312

System.exit(result);

313

}

314

}

315

```

316

317

## MapReduce Job Examples

318

319

### Data Processing Job

320

321

```java

322

// Complete MapReduce job for Phoenix data processing

323

public class PhoenixDataProcessingJob extends Configured implements Tool {

324

325

// Mapper class

326

public static class DataProcessingMapper

327

extends Mapper<NullWritable, EmployeeRecord, Text, LongWritable> {

328

329

@Override

330

protected void map(NullWritable key, EmployeeRecord employee, Context context)

331

throws IOException, InterruptedException {

332

333

// Process employee data

334

String department = employee.getDepartment();

335

BigDecimal salary = employee.getSalary();

336

337

// Emit department and salary

338

if (department != null && salary != null) {

339

context.write(new Text(department), new LongWritable(salary.longValue()));

340

341

// Additional processing based on salary ranges

342

if (salary.compareTo(new BigDecimal("100000")) > 0) {

343

context.write(new Text("HIGH_EARNERS"), new LongWritable(1));

344

}

345

}

346

}

347

}

348

349

// Reducer class

350

public static class SalaryAggregationReducer

351

extends Reducer<Text, LongWritable, NullWritable, SummaryRecord> {

352

353

@Override

354

protected void reduce(Text key, Iterable<LongWritable> values, Context context)

355

throws IOException, InterruptedException {

356

357

long sum = 0;

358

int count = 0;

359

360

for (LongWritable value : values) {

361

sum += value.get();

362

count++;

363

}

364

365

// Create summary record

366

SummaryRecord summary = new SummaryRecord();

367

summary.setDepartment(key.toString());

368

summary.setAvgSalary(new BigDecimal(sum / count));

369

summary.setEmployeeCount(count);

370

371

context.write(NullWritable.get(), summary);

372

}

373

}

374

375

@Override

376

public int run(String[] args) throws Exception {

377

Configuration conf = getConf();

378

Job job = Job.getInstance(conf, "Phoenix Data Processing");

379

380

job.setJarByClass(PhoenixDataProcessingJob.class);

381

382

// Configure input

383

PhoenixInputFormat.setInput(job, EmployeeRecord.class,

384

"employees",

385

"status = 'ACTIVE'",

386

"id", "name", "salary", "department");

387

388

// Configure output

389

PhoenixOutputFormat.setOutput(job, "department_summary",

390

"department", "avg_salary", "employee_count");

391

392

// Configure mapper and reducer

393

job.setMapperClass(DataProcessingMapper.class);

394

job.setReducerClass(SalaryAggregationReducer.class);

395

396

job.setMapOutputKeyClass(Text.class);

397

job.setMapOutputValueClass(LongWritable.class);

398

399

job.setOutputKeyClass(NullWritable.class);

400

job.setOutputValueClass(SummaryRecord.class);

401

402

// Configure input and output formats

403

job.setInputFormatClass(PhoenixInputFormat.class);

404

job.setOutputFormatClass(PhoenixOutputFormat.class);

405

406

return job.waitForCompletion(true) ? 0 : 1;

407

}

408

409

public static void main(String[] args) throws Exception {

410

Configuration conf = HBaseConfiguration.create();

411

int result = ToolRunner.run(conf, new PhoenixDataProcessingJob(), args);

412

System.exit(result);

413

}

414

}

415

```

416

417

### ETL Pipeline Job

418

419

```java

420

// ETL (Extract, Transform, Load) pipeline using Phoenix MapReduce

421

public class PhoenixETLPipeline extends Configured implements Tool {

422

423

// Mapper for data transformation

424

public static class ETLMapper

425

extends Mapper<NullWritable, TransactionRecord, NullWritable, TransformedRecord> {

426

427

private static final BigDecimal CURRENCY_CONVERSION_RATE = new BigDecimal("1.25");

428

429

@Override

430

protected void map(NullWritable key, TransactionRecord transaction, Context context)

431

throws IOException, InterruptedException {

432

433

// Transform the data

434

TransformedRecord transformed = new TransformedRecord();

435

transformed.setTransactionId(transaction.getTransactionId());

436

transformed.setCustomerId(transaction.getCustomerId());

437

438

// Convert currency

439

BigDecimal originalAmount = transaction.getAmount();

440

BigDecimal convertedAmount = originalAmount.multiply(CURRENCY_CONVERSION_RATE);

441

transformed.setConvertedAmount(convertedAmount);

442

443

// Categorize transaction

444

String category = categorizeTransaction(transaction.getMerchant(), originalAmount);

445

transformed.setCategory(category);

446

447

// Add processing timestamp

448

transformed.setProcessedTimestamp(new Timestamp(System.currentTimeMillis()));

449

450

// Emit transformed record

451

context.write(NullWritable.get(), transformed);

452

}

453

454

private String categorizeTransaction(String merchant, BigDecimal amount) {

455

if (merchant.toLowerCase().contains("grocery") ||

456

merchant.toLowerCase().contains("supermarket")) {

457

return "GROCERY";

458

} else if (merchant.toLowerCase().contains("gas") ||

459

merchant.toLowerCase().contains("fuel")) {

460

return "FUEL";

461

} else if (amount.compareTo(new BigDecimal("500")) > 0) {

462

return "LARGE_PURCHASE";

463

} else {

464

return "OTHER";

465

}

466

}

467

}

468

469

@Override

470

public int run(String[] args) throws Exception {

471

Configuration conf = getConf();

472

Job job = Job.getInstance(conf, "Phoenix ETL Pipeline");

473

474

job.setJarByClass(PhoenixETLPipeline.class);

475

476

// Configure input from source table

477

PhoenixInputFormat.setInput(job, TransactionRecord.class,

478

"raw_transactions",

479

"processed_flag IS NULL OR processed_flag = false",

480

"transaction_id", "customer_id", "amount",

481

"transaction_date", "merchant");

482

483

// Configure output to transformed table

484

PhoenixOutputFormat.setOutput(job, "transformed_transactions",

485

"transaction_id", "customer_id", "converted_amount",

486

"category", "processed_timestamp");

487

488

// This is a map-only job (no reducer needed)

489

job.setMapperClass(ETLMapper.class);

490

job.setNumReduceTasks(0);

491

492

job.setOutputKeyClass(NullWritable.class);

493

job.setOutputValueClass(TransformedRecord.class);

494

495

job.setInputFormatClass(PhoenixInputFormat.class);

496

job.setOutputFormatClass(PhoenixOutputFormat.class);

497

498

// Set batch size for better performance

499

PhoenixInputFormat.setBatchSize(conf, 10000);

500

PhoenixOutputFormat.setBatchSize(conf, 5000);

501

502

return job.waitForCompletion(true) ? 0 : 1;

503

}

504

505

public static void main(String[] args) throws Exception {

506

Configuration conf = HBaseConfiguration.create();

507

int result = ToolRunner.run(conf, new PhoenixETLPipeline(), args);

508

System.exit(result);

509

}

510

}

511

512

// Supporting record classes

513

public class TransactionRecord implements DBWritable, Writable {

514

private long transactionId;

515

private long customerId;

516

private BigDecimal amount;

517

private Timestamp transactionDate;

518

private String merchant;

519

520

@Override

521

public void readFields(ResultSet resultSet) throws SQLException {

522

transactionId = resultSet.getLong("transaction_id");

523

customerId = resultSet.getLong("customer_id");

524

amount = resultSet.getBigDecimal("amount");

525

transactionDate = resultSet.getTimestamp("transaction_date");

526

merchant = resultSet.getString("merchant");

527

}

528

529

@Override

530

public void write(PreparedStatement statement) throws SQLException {

531

statement.setLong(1, transactionId);

532

statement.setLong(2, customerId);

533

statement.setBigDecimal(3, amount);

534

statement.setTimestamp(4, transactionDate);

535

statement.setString(5, merchant);

536

}

537

538

// Writable implementation and getters/setters omitted for brevity...

539

}

540

541

public class TransformedRecord implements DBWritable, Writable {

542

private long transactionId;

543

private long customerId;

544

private BigDecimal convertedAmount;

545

private String category;

546

private Timestamp processedTimestamp;

547

548

@Override

549

public void readFields(ResultSet resultSet) throws SQLException {

550

transactionId = resultSet.getLong("transaction_id");

551

customerId = resultSet.getLong("customer_id");

552

convertedAmount = resultSet.getBigDecimal("converted_amount");

553

category = resultSet.getString("category");

554

processedTimestamp = resultSet.getTimestamp("processed_timestamp");

555

}

556

557

@Override

558

public void write(PreparedStatement statement) throws SQLException {

559

statement.setLong(1, transactionId);

560

statement.setLong(2, customerId);

561

statement.setBigDecimal(3, convertedAmount);

562

statement.setString(4, category);

563

statement.setTimestamp(5, processedTimestamp);

564

}

565

566

// Writable implementation and getters/setters omitted for brevity...

567

}

568

```

569

570

## Advanced MapReduce Patterns

571

572

### Multi-Table Join Job

573

574

```java

575

// MapReduce job performing joins across multiple Phoenix tables

576

public class MultiTableJoinJob extends Configured implements Tool {

577

578

// Mapper for customer data

579

public static class CustomerMapper

580

extends Mapper<NullWritable, CustomerRecord, LongWritable, Text> {

581

582

@Override

583

protected void map(NullWritable key, CustomerRecord customer, Context context)

584

throws IOException, InterruptedException {

585

586

// Emit customer ID as key with customer data as value

587

String customerData = "CUSTOMER:" + customer.getName() + "," +

588

customer.getEmail() + "," + customer.getSegment();

589

context.write(new LongWritable(customer.getCustomerId()), new Text(customerData));

590

}

591

}

592

593

// Mapper for order data

594

public static class OrderMapper

595

extends Mapper<NullWritable, OrderRecord, LongWritable, Text> {

596

597

@Override

598

protected void map(NullWritable key, OrderRecord order, Context context)

599

throws IOException, InterruptedException {

600

601

// Emit customer ID as key with order data as value

602

String orderData = "ORDER:" + order.getOrderId() + "," +

603

order.getOrderDate() + "," + order.getTotalAmount();

604

context.write(new LongWritable(order.getCustomerId()), new Text(orderData));

605

}

606

}

607

608

// Reducer to perform the join

609

public static class JoinReducer

610

extends Reducer<LongWritable, Text, NullWritable, CustomerOrderRecord> {

611

612

@Override

613

protected void reduce(LongWritable customerId, Iterable<Text> values, Context context)

614

throws IOException, InterruptedException {

615

616

List<String> customerData = new ArrayList<>();

617

List<String> orderData = new ArrayList<>();

618

619

// Separate customer and order data

620

for (Text value : values) {

621

String valueStr = value.toString();

622

if (valueStr.startsWith("CUSTOMER:")) {

623

customerData.add(valueStr.substring(9));

624

} else if (valueStr.startsWith("ORDER:")) {

625

orderData.add(valueStr.substring(6));

626

}

627

}

628

629

// Perform join - emit record for each customer-order combination

630

for (String customer : customerData) {

631

for (String order : orderData) {

632

CustomerOrderRecord joined = createJoinedRecord(customerId.get(), customer, order);

633

context.write(NullWritable.get(), joined);

634

}

635

}

636

}

637

638

private CustomerOrderRecord createJoinedRecord(long customerId, String customerData, String orderData) {

639

String[] customerParts = customerData.split(",");

640

String[] orderParts = orderData.split(",");

641

642

CustomerOrderRecord record = new CustomerOrderRecord();

643

record.setCustomerId(customerId);

644

record.setCustomerName(customerParts[0]);

645

record.setCustomerEmail(customerParts[1]);

646

record.setCustomerSegment(customerParts[2]);

647

record.setOrderId(Long.parseLong(orderParts[0]));

648

record.setOrderDate(Date.valueOf(orderParts[1]));

649

record.setOrderAmount(new BigDecimal(orderParts[2]));

650

651

return record;

652

}

653

}

654

655

@Override

656

public int run(String[] args) throws Exception {

657

Configuration conf = getConf();

658

659

// Job 1: Process customers

660

Job customerJob = Job.getInstance(conf, "Customer Processing");

661

customerJob.setJarByClass(MultiTableJoinJob.class);

662

663

PhoenixInputFormat.setInput(customerJob, CustomerRecord.class, "customers");

664

customerJob.setMapperClass(CustomerMapper.class);

665

customerJob.setNumReduceTasks(0);

666

667

Path customerOutput = new Path("/tmp/customers");

668

FileOutputFormat.setOutputPath(customerJob, customerOutput);

669

670

// Job 2: Process orders

671

Job orderJob = Job.getInstance(conf, "Order Processing");

672

orderJob.setJarByClass(MultiTableJoinJob.class);

673

674

PhoenixInputFormat.setInput(orderJob, OrderRecord.class, "orders");

675

orderJob.setMapperClass(OrderMapper.class);

676

orderJob.setNumReduceTasks(0);

677

678

Path orderOutput = new Path("/tmp/orders");

679

FileOutputFormat.setOutputPath(orderJob, orderOutput);

680

681

// Wait for both jobs to complete

682

boolean success = customerJob.waitForCompletion(true) && orderJob.waitForCompletion(true);

683

if (!success) {

684

return 1;

685

}

686

687

// Job 3: Join the results

688

Job joinJob = Job.getInstance(conf, "Customer Order Join");

689

joinJob.setJarByClass(MultiTableJoinJob.class);

690

691

FileInputFormat.addInputPath(joinJob, customerOutput);

692

FileInputFormat.addInputPath(joinJob, orderOutput);

693

694

PhoenixOutputFormat.setOutput(joinJob, "customer_orders",

695

"customer_id", "customer_name", "customer_email",

696

"customer_segment", "order_id", "order_date", "order_amount");

697

698

joinJob.setReducerClass(JoinReducer.class);

699

joinJob.setOutputKeyClass(NullWritable.class);

700

joinJob.setOutputValueClass(CustomerOrderRecord.class);

701

joinJob.setOutputFormatClass(PhoenixOutputFormat.class);

702

703

return joinJob.waitForCompletion(true) ? 0 : 1;

704

}

705

}

706

```

707

708

### Incremental Data Processing

709

710

```java

711

// Incremental processing pattern for Phoenix MapReduce

712

public class IncrementalProcessingJob extends Configured implements Tool {

713

714

public static class IncrementalMapper

715

extends Mapper<NullWritable, TransactionRecord, NullWritable, ProcessedRecord> {

716

717

private Timestamp lastProcessedTime;

718

719

@Override

720

protected void setup(Context context) throws IOException, InterruptedException {

721

// Get last processed timestamp from configuration

722

String lastProcessedStr = context.getConfiguration().get("last.processed.timestamp");

723

if (lastProcessedStr != null) {

724

lastProcessedTime = Timestamp.valueOf(lastProcessedStr);

725

} else {

726

// Default to 24 hours ago if no timestamp provided

727

lastProcessedTime = new Timestamp(System.currentTimeMillis() - 24 * 60 * 60 * 1000);

728

}

729

}

730

731

@Override

732

protected void map(NullWritable key, TransactionRecord transaction, Context context)

733

throws IOException, InterruptedException {

734

735

// Only process records newer than last processed time

736

if (transaction.getTransactionDate().after(lastProcessedTime)) {

737

ProcessedRecord processed = processTransaction(transaction);

738

context.write(NullWritable.get(), processed);

739

}

740

}

741

742

private ProcessedRecord processTransaction(TransactionRecord transaction) {

743

ProcessedRecord processed = new ProcessedRecord();

744

processed.setTransactionId(transaction.getTransactionId());

745

processed.setCustomerId(transaction.getCustomerId());

746

processed.setAmount(transaction.getAmount());

747

748

// Add risk score calculation

749

BigDecimal riskScore = calculateRiskScore(transaction);

750

processed.setRiskScore(riskScore);

751

752

// Add fraud flag

753

boolean isFraud = detectFraud(transaction, riskScore);

754

processed.setFraudFlag(isFraud);

755

756

processed.setProcessedTimestamp(new Timestamp(System.currentTimeMillis()));

757

758

return processed;

759

}

760

761

private BigDecimal calculateRiskScore(TransactionRecord transaction) {

762

// Simple risk scoring based on amount and time

763

BigDecimal amount = transaction.getAmount();

764

long currentTime = System.currentTimeMillis();

765

long transactionTime = transaction.getTransactionDate().getTime();

766

767

// Higher risk for large amounts and late night transactions

768

BigDecimal baseScore = amount.divide(new BigDecimal("1000"));

769

770

Calendar cal = Calendar.getInstance();

771

cal.setTimeInMillis(transactionTime);

772

int hour = cal.get(Calendar.HOUR_OF_DAY);

773

774

if (hour < 6 || hour > 23) {

775

baseScore = baseScore.multiply(new BigDecimal("1.5"));

776

}

777

778

return baseScore.min(new BigDecimal("10.0")); // Cap at 10.0

779

}

780

781

private boolean detectFraud(TransactionRecord transaction, BigDecimal riskScore) {

782

// Simple fraud detection based on risk score

783

return riskScore.compareTo(new BigDecimal("7.5")) > 0;

784

}

785

}

786

787

@Override

788

public int run(String[] args) throws Exception {

789

Configuration conf = getConf();

790

791

// Get last processed timestamp

792

String lastProcessedTimestamp = getLastProcessedTimestamp(conf);

793

conf.set("last.processed.timestamp", lastProcessedTimestamp);

794

795

Job job = Job.getInstance(conf, "Incremental Transaction Processing");

796

job.setJarByClass(IncrementalProcessingJob.class);

797

798

// Configure input with time-based filter

799

String whereClause = "transaction_date > '" + lastProcessedTimestamp + "'";

800

PhoenixInputFormat.setInput(job, TransactionRecord.class, "transactions", whereClause,

801

"transaction_id", "customer_id", "amount", "transaction_date", "merchant");

802

803

// Configure output

804

PhoenixOutputFormat.setOutput(job, "processed_transactions",

805

"transaction_id", "customer_id", "amount", "risk_score",

806

"fraud_flag", "processed_timestamp");

807

808

job.setMapperClass(IncrementalMapper.class);

809

job.setNumReduceTasks(0); // Map-only job

810

811

job.setOutputKeyClass(NullWritable.class);

812

job.setOutputValueClass(ProcessedRecord.class);

813

814

job.setInputFormatClass(PhoenixInputFormat.class);

815

job.setOutputFormatClass(PhoenixOutputFormat.class);

816

817

boolean success = job.waitForCompletion(true);

818

819

if (success) {

820

// Update last processed timestamp

821

updateLastProcessedTimestamp(conf, new Timestamp(System.currentTimeMillis()));

822

}

823

824

return success ? 0 : 1;

825

}

826

827

private String getLastProcessedTimestamp(Configuration conf) throws SQLException {

828

// Query the control table to get last processed timestamp

829

String url = conf.get("phoenix.connection.url", "jdbc:phoenix:localhost:2181");

830

831

try (Connection conn = DriverManager.getConnection(url)) {

832

PreparedStatement stmt = conn.prepareStatement(

833

"SELECT last_processed_timestamp FROM processing_control WHERE job_name = ?"

834

);

835

stmt.setString(1, "incremental_transaction_processing");

836

837

ResultSet rs = stmt.executeQuery();

838

if (rs.next()) {

839

return rs.getTimestamp("last_processed_timestamp").toString();

840

} else {

841

// Return default timestamp if no record exists

842

return new Timestamp(System.currentTimeMillis() - 24 * 60 * 60 * 1000).toString();

843

}

844

}

845

}

846

847

private void updateLastProcessedTimestamp(Configuration conf, Timestamp timestamp) throws SQLException {

848

String url = conf.get("phoenix.connection.url", "jdbc:phoenix:localhost:2181");

849

850

try (Connection conn = DriverManager.getConnection(url)) {

851

PreparedStatement stmt = conn.prepareStatement(

852

"UPSERT INTO processing_control (job_name, last_processed_timestamp) VALUES (?, ?)"

853

);

854

stmt.setString(1, "incremental_transaction_processing");

855

stmt.setTimestamp(2, timestamp);

856

stmt.executeUpdate();

857

conn.commit();

858

}

859

}

860

861

public static void main(String[] args) throws Exception {

862

Configuration conf = HBaseConfiguration.create();

863

int result = ToolRunner.run(conf, new IncrementalProcessingJob(), args);

864

System.exit(result);

865

}

866

}

867

```

868

869

This comprehensive documentation covers Phoenix's MapReduce integration including input/output formats, bulk loading capabilities, and advanced processing patterns. The examples demonstrate practical usage scenarios for distributed data processing with Phoenix tables.