Apache Phoenix Core library providing SQL-on-HBase functionality with JDBC connectivity, query compilation, and transaction support
Phoenix's server-side components run within HBase region servers as coprocessors, providing distributed query processing, metadata management, index maintenance, and caching operations. These components enable Phoenix's SQL capabilities while leveraging HBase's scalability.
import org.apache.phoenix.coprocessor.*;
import org.apache.phoenix.hbase.index.*;
import org.apache.hadoop.hbase.coprocessor.*;
import org.apache.hadoop.hbase.client.*;
import java.io.IOException;Base interface for all Phoenix coprocessors providing common functionality and lifecycle management.
public interface PhoenixCoprocessor {
// Lifecycle methods
void start(CoprocessorEnvironment env) throws IOException
void stop(CoprocessorEnvironment env) throws IOException
// Configuration
Configuration getConfiguration()
HRegionInfo getRegionInfo()
// Error handling
void handleException(Exception e) throws IOException
}Server-side metadata operations coprocessor handling schema changes and metadata queries.
public class MetaDataEndpointImpl extends BaseEndpointCoprocessor
implements PhoenixMetaDataCoprocessorProtocol {
// Table operations
public MetaDataMutationResult createTable(List<Mutation> tableMetadata) throws IOException
public MetaDataMutationResult dropTable(byte[] schemaName, byte[] tableName,
byte[] parentTableName, long timestamp) throws IOException
public MetaDataMutationResult addColumn(List<Mutation> tableMetadata) throws IOException
public MetaDataMutationResult dropColumn(List<Mutation> tableMetadata) throws IOException
// Index operations
public MetaDataMutationResult createIndex(List<Mutation> tableMetadata,
byte[] physicalIndexName) throws IOException
public MetaDataMutationResult dropIndex(List<Mutation> tableMetadata,
byte[] indexName) throws IOException
// Schema operations
public MetaDataMutationResult createSchema(List<Mutation> schemaMetadata) throws IOException
public MetaDataMutationResult dropSchema(List<Mutation> schemaMetadata) throws IOException
// Function operations
public MetaDataMutationResult createFunction(List<Mutation> functionMetadata) throws IOException
public MetaDataMutationResult dropFunction(List<Mutation> functionMetadata) throws IOException
// Metadata queries
public MetaDataResponse getTable(byte[] tenantId, byte[] schemaName,
byte[] tableName, long timestamp) throws IOException
public MetaDataResponse getFunctions(byte[] tenantId, List<byte[]> functionNames,
long timestamp) throws IOException
public MetaDataResponse getSchema(byte[] tenantId, byte[] schemaName,
long timestamp) throws IOException
}Usage:
// Server-side metadata operations are typically invoked through client connections
// but can be accessed programmatically in coprocessor contexts
// Example: Custom coprocessor extending MetaDataEndpointImpl
public class CustomMetaDataEndpoint extends MetaDataEndpointImpl {
@Override
public MetaDataMutationResult createTable(List<Mutation> tableMetadata) throws IOException {
// Custom logic before table creation
System.out.println("Creating table with " + tableMetadata.size() + " mutations");
// Perform additional validation
for (Mutation mutation : tableMetadata) {
if (mutation instanceof Put) {
Put put = (Put) mutation;
// Validate table metadata
validateTableMetadata(put);
}
}
// Call parent implementation
MetaDataMutationResult result = super.createTable(tableMetadata);
// Custom logic after table creation
if (result.getMutationCode() == MutationCode.TABLE_ALREADY_EXISTS) {
System.out.println("Table already exists");
} else {
System.out.println("Table created successfully");
}
return result;
}
private void validateTableMetadata(Put put) throws IOException {
// Custom table metadata validation logic
byte[] tableName = put.getRow();
// Validate table name patterns, quotas, etc.
}
}Server-side caching operations coprocessor for distributed cache management.
public class ServerCachingEndpointImpl extends BaseEndpointCoprocessor
implements ServerCachingProtocol {
// Cache operations
public boolean addServerCache(byte[] tenantId, byte[] cacheId, byte[] cachePtr,
byte[] txState, ServerCacheFactory cacheFactory) throws IOException
public boolean removeServerCache(byte[] tenantId, byte[] cacheId) throws IOException
// Cache queries
public ServerCache getServerCache(byte[] tenantId, byte[] cacheId) throws IOException
public long getSize(byte[] tenantId, byte[] cacheId) throws IOException
// Cache statistics
public ServerCacheStats getCacheStats() throws IOException
public void clearAllCache() throws IOException
}Usage:
// Server cache usage is typically handled internally by Phoenix
// but can be managed in custom coprocessors
// Example: Custom caching coprocessor
public class CustomCachingEndpoint extends ServerCachingEndpointImpl {
private static final Logger LOG = LoggerFactory.getLogger(CustomCachingEndpoint.class);
@Override
public boolean addServerCache(byte[] tenantId, byte[] cacheId, byte[] cachePtr,
byte[] txState, ServerCacheFactory cacheFactory) throws IOException {
LOG.info("Adding server cache: tenant={}, cacheId={}, size={}",
Bytes.toString(tenantId), Bytes.toString(cacheId), cachePtr.length);
// Custom cache validation or preprocessing
if (cachePtr.length > getMaxCacheSize()) {
throw new IOException("Cache size exceeds maximum allowed size");
}
boolean result = super.addServerCache(tenantId, cacheId, cachePtr, txState, cacheFactory);
if (result) {
LOG.info("Server cache added successfully");
} else {
LOG.warn("Failed to add server cache");
}
return result;
}
private int getMaxCacheSize() {
return getConfiguration().getInt("phoenix.cache.maxSize", 104857600); // 100MB default
}
}Interface for building index updates from primary table mutations.
public interface IndexBuilder {
// Index update building
Collection<Pair<Mutation, byte[]>> getIndexUpdate(Put put, IndexMetaData indexMetaData)
throws IOException
Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete delete, IndexMetaData indexMetaData)
throws IOException
Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
Collection<KeyValue> filtered, IndexMetaData indexMetaData) throws IOException
// Builder lifecycle
void setup(RegionCoprocessorEnvironment env) throws IOException
void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp,
IndexMetaData context) throws IOException
void batchCompleted(MiniBatchOperationInProgress<Mutation> miniBatchOp)
}Phoenix-specific implementation of IndexBuilder with support for covered columns and functional indexes.
public class PhoenixIndexBuilder implements IndexBuilder {
// Index update generation
public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Put put, IndexMetaData indexMetaData)
throws IOException
public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete delete, IndexMetaData indexMetaData)
throws IOException
// Phoenix-specific functionality
public Collection<Pair<Mutation, byte[]>> getIndexUpdatesForFilteredRows(
Collection<KeyValue> filtered, IndexMetaData indexMetaData) throws IOException
// Configuration
public void setup(RegionCoprocessorEnvironment env) throws IOException
public boolean isEnabled(Mutation m) throws IOException
}Interface for writing index updates with support for different writing strategies.
public interface IndexWriter {
// Index writing
void write(Collection<Pair<Mutation, byte[]>> indexUpdates) throws IOException
void write(Pair<Mutation, byte[]> indexUpdate, boolean allowLocalUpdates) throws IOException
// Writer lifecycle
void setup(RegionCoprocessorEnvironment env) throws IOException
void stop(String why) throws IOException
// Error handling
void handleFailure(Collection<Pair<Mutation, byte[]>> attempted,
Exception cause) throws IOException
}Parallel index writer implementation for high-throughput index maintenance.
public class ParallelWriterIndexCommitter implements IndexWriter {
// Parallel writing configuration
public ParallelWriterIndexCommitter(int numThreads)
public ParallelWriterIndexCommitter(ThreadPoolExecutor pool)
// Index writing implementation
public void write(Collection<Pair<Mutation, byte[]>> indexUpdates) throws IOException
public void write(Pair<Mutation, byte[]> indexUpdate, boolean allowLocalUpdates) throws IOException
// Performance monitoring
public IndexWriterStats getStats()
public void resetStats()
}Usage:
// Custom index builder for specialized indexing logic
public class CustomIndexBuilder extends PhoenixIndexBuilder {
@Override
public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Put put, IndexMetaData indexMetaData)
throws IOException {
Collection<Pair<Mutation, byte[]>> updates = super.getIndexUpdate(put, indexMetaData);
// Add custom index logic
for (Pair<Mutation, byte[]> update : updates) {
Mutation indexMutation = update.getFirst();
byte[] indexTableName = update.getSecond();
// Custom processing for specific indexes
if (Bytes.toString(indexTableName).endsWith("_CUSTOM_IDX")) {
enhanceIndexMutation(indexMutation, put);
}
}
return updates;
}
private void enhanceIndexMutation(Mutation indexMutation, Put originalPut) {
// Add computed columns, derived values, etc.
if (indexMutation instanceof Put) {
Put indexPut = (Put) indexMutation;
// Add computed timestamp
indexPut.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("computed_ts"),
Bytes.toBytes(System.currentTimeMillis()));
}
}
}
// Custom index writer with retry logic
public class RetryableIndexWriter extends ParallelWriterIndexCommitter {
private final int maxRetries;
private final long retryDelayMs;
public RetryableIndexWriter(int numThreads, int maxRetries, long retryDelayMs) {
super(numThreads);
this.maxRetries = maxRetries;
this.retryDelayMs = retryDelayMs;
}
@Override
public void write(Collection<Pair<Mutation, byte[]>> indexUpdates) throws IOException {
int attempts = 0;
IOException lastException = null;
while (attempts < maxRetries) {
try {
super.write(indexUpdates);
return; // Success
} catch (IOException e) {
lastException = e;
attempts++;
if (attempts < maxRetries) {
try {
Thread.sleep(retryDelayMs);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
throw new IOException("Interrupted during retry", ie);
}
}
}
}
throw new IOException("Failed to write index updates after " + maxRetries + " attempts",
lastException);
}
}Phoenix region observer that integrates with HBase region lifecycle for SQL operations.
public class PhoenixRegionObserver implements RegionObserver, RegionCoprocessor {
// Region lifecycle
public void start(CoprocessorEnvironment e) throws IOException
public void stop(CoprocessorEnvironment e) throws IOException
// Mutation interception
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException
public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException
// Scan interception
public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
Scan scan, RegionScanner s) throws IOException
public boolean preScannerNext(ObserverContext<RegionCoprocessorEnvironment> c,
InternalScanner s, List<Result> result,
int limit, boolean hasNext) throws IOException
// Region split/merge handling
public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c,
byte[] splitRow) throws IOException
public void postSplit(ObserverContext<RegionCoprocessorEnvironment> c,
Region l, Region r) throws IOException
}Usage:
// Custom Phoenix region observer for monitoring and custom logic
public class CustomPhoenixRegionObserver extends PhoenixRegionObserver {
private static final Logger LOG = LoggerFactory.getLogger(CustomPhoenixRegionObserver.class);
private Timer mutationTimer;
@Override
public void start(CoprocessorEnvironment e) throws IOException {
super.start(e);
mutationTimer = new Timer("MutationTimer", true);
LOG.info("Custom Phoenix Region Observer started");
}
@Override
public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
long startTime = System.currentTimeMillis();
// Log large batches
if (miniBatchOp.size() > 1000) {
LOG.info("Processing large batch of {} mutations", miniBatchOp.size());
}
// Custom validation or preprocessing
for (int i = 0; i < miniBatchOp.size(); i++) {
Mutation mutation = miniBatchOp.getOperation(i);
if (mutation instanceof Put) {
validatePutMutation((Put) mutation);
}
}
// Store timing information
c.getEnvironment().getRegion().getRegionInfo().setEndKey(
Bytes.toBytes(String.valueOf(startTime))
);
super.preBatchMutate(c, miniBatchOp);
}
@Override
public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
super.postBatchMutate(c, miniBatchOp);
// Calculate and log timing
long duration = System.currentTimeMillis() - extractStartTime(c);
if (duration > 5000) { // Log slow operations
LOG.warn("Slow batch mutation: {} mutations took {}ms",
miniBatchOp.size(), duration);
}
}
private void validatePutMutation(Put put) throws IOException {
// Custom validation logic
NavigableMap<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
for (List<Cell> cells : familyMap.values()) {
for (Cell cell : cells) {
if (cell.getValueLength() > getMaxCellSize()) {
throw new IOException("Cell value too large: " + cell.getValueLength());
}
}
}
}
private long extractStartTime(ObserverContext<RegionCoprocessorEnvironment> c) {
byte[] endKey = c.getEnvironment().getRegion().getRegionInfo().getEndKey();
return Long.parseLong(Bytes.toString(endKey));
}
private int getMaxCellSize() {
return getConfiguration().getInt("phoenix.custom.maxCellSize", 10485760); // 10MB
}
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
if (mutationTimer != null) {
mutationTimer.cancel();
}
super.stop(e);
LOG.info("Custom Phoenix Region Observer stopped");
}
}Client interface for server-side aggregation operations.
public class AggregateClient {
// Aggregation execution
public <T> T aggregate(Scan scan, RowProcessor<T> rowProcessor) throws IOException
public Result[] getRows(Scan scan, int numberOfRows) throws IOException
// Aggregate functions
public long rowCount(Scan scan) throws IOException
public <R, S> R median(Scan scan, ColumnInterpreter<R, S, P, Q, W> ci,
byte[] colFamily, byte[] qualifier) throws IOException
public <R, S> S sum(Scan scan, ColumnInterpreter<R, S, P, Q, W> ci,
byte[] colFamily, byte[] qualifier) throws IOException
public <R, S> R max(Scan scan, ColumnInterpreter<R, S, P, Q, W> ci,
byte[] colFamily, byte[] qualifier) throws IOException
public <R, S> R min(Scan scan, ColumnInterpreter<R, S, P, Q, W> ci,
byte[] colFamily, byte[] qualifier) throws IOException
}Server-side scanner for hash join operations.
public class HashJoinRegionScanner extends BaseRegionScanner {
public HashJoinRegionScanner(RegionScanner scanner, List<Expression> expressions,
Expression postJoinFilterExpression,
List<HashJoinInfo> joinInfos, TupleProjector projector)
// Scanner implementation
public boolean next(List<Cell> results) throws IOException
public boolean next(List<Cell> results, ScannerContext scannerContext) throws IOException
public void close() throws IOException
// Join-specific methods
public long getMaxResultSize()
public RegionInfo getRegionInfo()
}Usage:
// Server-side aggregation example
public class ServerAggregationExample {
public void performServerAggregation(Connection connection, String tableName) throws IOException, SQLException {
PhoenixConnection phoenixConn = connection.unwrap(PhoenixConnection.class);
ConnectionQueryServices queryServices = phoenixConn.getQueryServices();
// Create scan for aggregation
Scan scan = new Scan();
scan.addFamily(Bytes.toBytes("cf"));
// Custom row processor for aggregation
RowProcessor<AggregationResult> processor = new RowProcessor<AggregationResult>() {
private long count = 0;
private long sum = 0;
@Override
public void process(Result result) {
count++;
Cell[] cells = result.rawCells();
for (Cell cell : cells) {
if (Bytes.equals(CellUtil.cloneQualifier(cell), Bytes.toBytes("salary"))) {
sum += Bytes.toLong(CellUtil.cloneValue(cell));
}
}
}
@Override
public AggregationResult getResult() {
return new AggregationResult(count, sum, sum / (double) count);
}
};
// Execute server-side aggregation
AggregateClient client = new AggregateClient();
Table table = queryServices.getTable(Bytes.toBytes(tableName));
AggregationResult result = client.aggregate(scan, processor);
System.out.println("Count: " + result.getCount());
System.out.println("Sum: " + result.getSum());
System.out.println("Average: " + result.getAverage());
}
}// Template for developing custom Phoenix coprocessors
public abstract class CustomPhoenixCoprocessor implements RegionObserver, RegionCoprocessor {
protected Configuration configuration;
protected RegionCoprocessorEnvironment environment;
@Override
public void start(CoprocessorEnvironment env) throws IOException {
if (env instanceof RegionCoprocessorEnvironment) {
this.environment = (RegionCoprocessorEnvironment) env;
this.configuration = env.getConfiguration();
onStart();
}
}
@Override
public void stop(CoprocessorEnvironment env) throws IOException {
onStop();
}
// Template methods for subclasses
protected abstract void onStart() throws IOException;
protected abstract void onStop() throws IOException;
// Utility methods
protected Table getPhoenixTable(String tableName) throws IOException {
return environment.getConnection().getTable(TableName.valueOf(tableName));
}
protected void logInfo(String message, Object... args) {
System.out.println(String.format("[CustomCoprocessor] " + message, args));
}
protected void logError(String message, Throwable t) {
System.err.println("[CustomCoprocessor] ERROR: " + message);
if (t != null) {
t.printStackTrace();
}
}
}
// Example implementation
public class AuditTrailCoprocessor extends CustomPhoenixCoprocessor {
private Table auditTable;
@Override
protected void onStart() throws IOException {
auditTable = getPhoenixTable("AUDIT_TRAIL");
logInfo("Audit trail coprocessor started");
}
@Override
protected void onStop() throws IOException {
if (auditTable != null) {
auditTable.close();
}
logInfo("Audit trail coprocessor stopped");
}
@Override
public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
try {
// Record audit information
for (int i = 0; i < miniBatchOp.size(); i++) {
Mutation mutation = miniBatchOp.getOperation(i);
recordAuditEntry(mutation);
}
} catch (Exception e) {
logError("Failed to record audit entry", e);
// Don't fail the original operation
}
}
private void recordAuditEntry(Mutation mutation) throws IOException {
Put auditPut = new Put(generateAuditRowKey());
auditPut.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("operation_type"),
Bytes.toBytes(mutation.getClass().getSimpleName()));
auditPut.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("row_key"),
mutation.getRow());
auditPut.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("timestamp"),
Bytes.toBytes(System.currentTimeMillis()));
auditTable.put(auditPut);
}
private byte[] generateAuditRowKey() {
return Bytes.toBytes(UUID.randomUUID().toString());
}
}// Programmatic coprocessor registration
public class CoprocessorManager {
public static void registerCustomCoprocessors(Admin admin, String tableName) throws IOException {
TableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf(tableName));
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableDescriptor);
// Add custom coprocessors
builder.setCoprocessor(CustomPhoenixRegionObserver.class.getName());
builder.setCoprocessor(AuditTrailCoprocessor.class.getName());
// Update table
admin.modifyTable(builder.build());
System.out.println("Custom coprocessors registered for table: " + tableName);
}
public static void unregisterCoprocessors(Admin admin, String tableName) throws IOException {
TableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf(tableName));
TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableDescriptor);
// Remove coprocessors
builder.removeCoprocessor(CustomPhoenixRegionObserver.class.getName());
builder.removeCoprocessor(AuditTrailCoprocessor.class.getName());
admin.modifyTable(builder.build());
System.out.println("Coprocessors unregistered for table: " + tableName);
}
}
// Usage
ConnectionQueryServices queryServices = connection.getQueryServices();
Admin admin = queryServices.getAdmin();
CoprocessorManager.registerCustomCoprocessors(admin, "users");<!-- hbase-site.xml configuration for Phoenix coprocessors -->
<configuration>
<!-- Enable Phoenix coprocessors -->
<property>
<name>hbase.coprocessor.region.classes</name>
<value>org.apache.phoenix.coprocessor.PhoenixRegionObserver</value>
</property>
<!-- Phoenix index coprocessors -->
<property>
<name>hbase.coprocessor.wal.classes</name>
<value>org.apache.phoenix.hbase.index.wal.IndexedWALEditCodec</value>
</property>
<!-- Custom coprocessor configuration -->
<property>
<name>phoenix.custom.maxCellSize</name>
<value>10485760</value>
</property>
<property>
<name>phoenix.cache.maxSize</name>
<value>104857600</value>
</property>
</configuration>Install with Tessl CLI
npx tessl i tessl/maven-org-apache-phoenix--phoenix-core