CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-phoenix--phoenix-core

Apache Phoenix Core library providing SQL-on-HBase functionality with JDBC connectivity, query compilation, and transaction support

Overview
Eval results
Files

server.mddocs/

Server-Side Components

Phoenix's server-side components run within HBase region servers as coprocessors, providing distributed query processing, metadata management, index maintenance, and caching operations. These components enable Phoenix's SQL capabilities while leveraging HBase's scalability.

Core Imports

import org.apache.phoenix.coprocessor.*;
import org.apache.phoenix.hbase.index.*;
import org.apache.hadoop.hbase.coprocessor.*;
import org.apache.hadoop.hbase.client.*;
import java.io.IOException;

Coprocessor Framework

PhoenixCoprocessor

Base interface for all Phoenix coprocessors providing common functionality and lifecycle management.

public interface PhoenixCoprocessor {
    // Lifecycle methods
    void start(CoprocessorEnvironment env) throws IOException
    void stop(CoprocessorEnvironment env) throws IOException

    // Configuration
    Configuration getConfiguration()
    HRegionInfo getRegionInfo()

    // Error handling
    void handleException(Exception e) throws IOException
}

MetaDataEndpointImpl

Server-side metadata operations coprocessor handling schema changes and metadata queries.

public class MetaDataEndpointImpl extends BaseEndpointCoprocessor
                                  implements PhoenixMetaDataCoprocessorProtocol {
    // Table operations
    public MetaDataMutationResult createTable(List<Mutation> tableMetadata) throws IOException
    public MetaDataMutationResult dropTable(byte[] schemaName, byte[] tableName,
                                           byte[] parentTableName, long timestamp) throws IOException
    public MetaDataMutationResult addColumn(List<Mutation> tableMetadata) throws IOException
    public MetaDataMutationResult dropColumn(List<Mutation> tableMetadata) throws IOException

    // Index operations
    public MetaDataMutationResult createIndex(List<Mutation> tableMetadata,
                                            byte[] physicalIndexName) throws IOException
    public MetaDataMutationResult dropIndex(List<Mutation> tableMetadata,
                                          byte[] indexName) throws IOException

    // Schema operations
    public MetaDataMutationResult createSchema(List<Mutation> schemaMetadata) throws IOException
    public MetaDataMutationResult dropSchema(List<Mutation> schemaMetadata) throws IOException

    // Function operations
    public MetaDataMutationResult createFunction(List<Mutation> functionMetadata) throws IOException
    public MetaDataMutationResult dropFunction(List<Mutation> functionMetadata) throws IOException

    // Metadata queries
    public MetaDataResponse getTable(byte[] tenantId, byte[] schemaName,
                                   byte[] tableName, long timestamp) throws IOException
    public MetaDataResponse getFunctions(byte[] tenantId, List<byte[]> functionNames,
                                       long timestamp) throws IOException
    public MetaDataResponse getSchema(byte[] tenantId, byte[] schemaName,
                                    long timestamp) throws IOException
}

Usage:

// Server-side metadata operations are typically invoked through client connections
// but can be accessed programmatically in coprocessor contexts

// Example: Custom coprocessor extending MetaDataEndpointImpl
public class CustomMetaDataEndpoint extends MetaDataEndpointImpl {
    @Override
    public MetaDataMutationResult createTable(List<Mutation> tableMetadata) throws IOException {
        // Custom logic before table creation
        System.out.println("Creating table with " + tableMetadata.size() + " mutations");

        // Perform additional validation
        for (Mutation mutation : tableMetadata) {
            if (mutation instanceof Put) {
                Put put = (Put) mutation;
                // Validate table metadata
                validateTableMetadata(put);
            }
        }

        // Call parent implementation
        MetaDataMutationResult result = super.createTable(tableMetadata);

        // Custom logic after table creation
        if (result.getMutationCode() == MutationCode.TABLE_ALREADY_EXISTS) {
            System.out.println("Table already exists");
        } else {
            System.out.println("Table created successfully");
        }

        return result;
    }

    private void validateTableMetadata(Put put) throws IOException {
        // Custom table metadata validation logic
        byte[] tableName = put.getRow();
        // Validate table name patterns, quotas, etc.
    }
}

ServerCachingEndpointImpl

Server-side caching operations coprocessor for distributed cache management.

public class ServerCachingEndpointImpl extends BaseEndpointCoprocessor
                                       implements ServerCachingProtocol {
    // Cache operations
    public boolean addServerCache(byte[] tenantId, byte[] cacheId, byte[] cachePtr,
                                byte[] txState, ServerCacheFactory cacheFactory) throws IOException
    public boolean removeServerCache(byte[] tenantId, byte[] cacheId) throws IOException

    // Cache queries
    public ServerCache getServerCache(byte[] tenantId, byte[] cacheId) throws IOException
    public long getSize(byte[] tenantId, byte[] cacheId) throws IOException

    // Cache statistics
    public ServerCacheStats getCacheStats() throws IOException
    public void clearAllCache() throws IOException
}

Usage:

// Server cache usage is typically handled internally by Phoenix
// but can be managed in custom coprocessors

// Example: Custom caching coprocessor
public class CustomCachingEndpoint extends ServerCachingEndpointImpl {
    private static final Logger LOG = LoggerFactory.getLogger(CustomCachingEndpoint.class);

    @Override
    public boolean addServerCache(byte[] tenantId, byte[] cacheId, byte[] cachePtr,
                                 byte[] txState, ServerCacheFactory cacheFactory) throws IOException {
        LOG.info("Adding server cache: tenant={}, cacheId={}, size={}",
                Bytes.toString(tenantId), Bytes.toString(cacheId), cachePtr.length);

        // Custom cache validation or preprocessing
        if (cachePtr.length > getMaxCacheSize()) {
            throw new IOException("Cache size exceeds maximum allowed size");
        }

        boolean result = super.addServerCache(tenantId, cacheId, cachePtr, txState, cacheFactory);

        if (result) {
            LOG.info("Server cache added successfully");
        } else {
            LOG.warn("Failed to add server cache");
        }

        return result;
    }

    private int getMaxCacheSize() {
        return getConfiguration().getInt("phoenix.cache.maxSize", 104857600); // 100MB default
    }
}

Index Management

IndexBuilder

Interface for building index updates from primary table mutations.

public interface IndexBuilder {
    // Index update building
    Collection<Pair<Mutation, byte[]>> getIndexUpdate(Put put, IndexMetaData indexMetaData)
        throws IOException
    Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete delete, IndexMetaData indexMetaData)
        throws IOException
    Collection<Pair<Mutation, byte[]>> getIndexUpdateForFilteredRows(
        Collection<KeyValue> filtered, IndexMetaData indexMetaData) throws IOException

    // Builder lifecycle
    void setup(RegionCoprocessorEnvironment env) throws IOException
    void batchStarted(MiniBatchOperationInProgress<Mutation> miniBatchOp,
                     IndexMetaData context) throws IOException
    void batchCompleted(MiniBatchOperationInProgress<Mutation> miniBatchOp)
}

PhoenixIndexBuilder

Phoenix-specific implementation of IndexBuilder with support for covered columns and functional indexes.

public class PhoenixIndexBuilder implements IndexBuilder {
    // Index update generation
    public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Put put, IndexMetaData indexMetaData)
        throws IOException
    public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Delete delete, IndexMetaData indexMetaData)
        throws IOException

    // Phoenix-specific functionality
    public Collection<Pair<Mutation, byte[]>> getIndexUpdatesForFilteredRows(
        Collection<KeyValue> filtered, IndexMetaData indexMetaData) throws IOException

    // Configuration
    public void setup(RegionCoprocessorEnvironment env) throws IOException
    public boolean isEnabled(Mutation m) throws IOException
}

IndexWriter

Interface for writing index updates with support for different writing strategies.

public interface IndexWriter {
    // Index writing
    void write(Collection<Pair<Mutation, byte[]>> indexUpdates) throws IOException
    void write(Pair<Mutation, byte[]> indexUpdate, boolean allowLocalUpdates) throws IOException

    // Writer lifecycle
    void setup(RegionCoprocessorEnvironment env) throws IOException
    void stop(String why) throws IOException

    // Error handling
    void handleFailure(Collection<Pair<Mutation, byte[]>> attempted,
                      Exception cause) throws IOException
}

ParallelWriterIndexCommitter

Parallel index writer implementation for high-throughput index maintenance.

public class ParallelWriterIndexCommitter implements IndexWriter {
    // Parallel writing configuration
    public ParallelWriterIndexCommitter(int numThreads)
    public ParallelWriterIndexCommitter(ThreadPoolExecutor pool)

    // Index writing implementation
    public void write(Collection<Pair<Mutation, byte[]>> indexUpdates) throws IOException
    public void write(Pair<Mutation, byte[]> indexUpdate, boolean allowLocalUpdates) throws IOException

    // Performance monitoring
    public IndexWriterStats getStats()
    public void resetStats()
}

Usage:

// Custom index builder for specialized indexing logic
public class CustomIndexBuilder extends PhoenixIndexBuilder {
    @Override
    public Collection<Pair<Mutation, byte[]>> getIndexUpdate(Put put, IndexMetaData indexMetaData)
            throws IOException {
        Collection<Pair<Mutation, byte[]>> updates = super.getIndexUpdate(put, indexMetaData);

        // Add custom index logic
        for (Pair<Mutation, byte[]> update : updates) {
            Mutation indexMutation = update.getFirst();
            byte[] indexTableName = update.getSecond();

            // Custom processing for specific indexes
            if (Bytes.toString(indexTableName).endsWith("_CUSTOM_IDX")) {
                enhanceIndexMutation(indexMutation, put);
            }
        }

        return updates;
    }

    private void enhanceIndexMutation(Mutation indexMutation, Put originalPut) {
        // Add computed columns, derived values, etc.
        if (indexMutation instanceof Put) {
            Put indexPut = (Put) indexMutation;
            // Add computed timestamp
            indexPut.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("computed_ts"),
                             Bytes.toBytes(System.currentTimeMillis()));
        }
    }
}

// Custom index writer with retry logic
public class RetryableIndexWriter extends ParallelWriterIndexCommitter {
    private final int maxRetries;
    private final long retryDelayMs;

    public RetryableIndexWriter(int numThreads, int maxRetries, long retryDelayMs) {
        super(numThreads);
        this.maxRetries = maxRetries;
        this.retryDelayMs = retryDelayMs;
    }

    @Override
    public void write(Collection<Pair<Mutation, byte[]>> indexUpdates) throws IOException {
        int attempts = 0;
        IOException lastException = null;

        while (attempts < maxRetries) {
            try {
                super.write(indexUpdates);
                return; // Success
            } catch (IOException e) {
                lastException = e;
                attempts++;

                if (attempts < maxRetries) {
                    try {
                        Thread.sleep(retryDelayMs);
                    } catch (InterruptedException ie) {
                        Thread.currentThread().interrupt();
                        throw new IOException("Interrupted during retry", ie);
                    }
                }
            }
        }

        throw new IOException("Failed to write index updates after " + maxRetries + " attempts",
                            lastException);
    }
}

Region Observer Integration

PhoenixRegionObserver

Phoenix region observer that integrates with HBase region lifecycle for SQL operations.

public class PhoenixRegionObserver implements RegionObserver, RegionCoprocessor {
    // Region lifecycle
    public void start(CoprocessorEnvironment e) throws IOException
    public void stop(CoprocessorEnvironment e) throws IOException

    // Mutation interception
    public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
                              MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException
    public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
                               MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException

    // Scan interception
    public RegionScanner preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c,
                                       Scan scan, RegionScanner s) throws IOException
    public boolean preScannerNext(ObserverContext<RegionCoprocessorEnvironment> c,
                                 InternalScanner s, List<Result> result,
                                 int limit, boolean hasNext) throws IOException

    // Region split/merge handling
    public void preSplit(ObserverContext<RegionCoprocessorEnvironment> c,
                        byte[] splitRow) throws IOException
    public void postSplit(ObserverContext<RegionCoprocessorEnvironment> c,
                         Region l, Region r) throws IOException
}

Usage:

// Custom Phoenix region observer for monitoring and custom logic
public class CustomPhoenixRegionObserver extends PhoenixRegionObserver {
    private static final Logger LOG = LoggerFactory.getLogger(CustomPhoenixRegionObserver.class);
    private Timer mutationTimer;

    @Override
    public void start(CoprocessorEnvironment e) throws IOException {
        super.start(e);
        mutationTimer = new Timer("MutationTimer", true);
        LOG.info("Custom Phoenix Region Observer started");
    }

    @Override
    public void preBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
                              MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
        long startTime = System.currentTimeMillis();

        // Log large batches
        if (miniBatchOp.size() > 1000) {
            LOG.info("Processing large batch of {} mutations", miniBatchOp.size());
        }

        // Custom validation or preprocessing
        for (int i = 0; i < miniBatchOp.size(); i++) {
            Mutation mutation = miniBatchOp.getOperation(i);
            if (mutation instanceof Put) {
                validatePutMutation((Put) mutation);
            }
        }

        // Store timing information
        c.getEnvironment().getRegion().getRegionInfo().setEndKey(
            Bytes.toBytes(String.valueOf(startTime))
        );

        super.preBatchMutate(c, miniBatchOp);
    }

    @Override
    public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
                               MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
        super.postBatchMutate(c, miniBatchOp);

        // Calculate and log timing
        long duration = System.currentTimeMillis() - extractStartTime(c);
        if (duration > 5000) { // Log slow operations
            LOG.warn("Slow batch mutation: {} mutations took {}ms",
                    miniBatchOp.size(), duration);
        }
    }

    private void validatePutMutation(Put put) throws IOException {
        // Custom validation logic
        NavigableMap<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
        for (List<Cell> cells : familyMap.values()) {
            for (Cell cell : cells) {
                if (cell.getValueLength() > getMaxCellSize()) {
                    throw new IOException("Cell value too large: " + cell.getValueLength());
                }
            }
        }
    }

    private long extractStartTime(ObserverContext<RegionCoprocessorEnvironment> c) {
        byte[] endKey = c.getEnvironment().getRegion().getRegionInfo().getEndKey();
        return Long.parseLong(Bytes.toString(endKey));
    }

    private int getMaxCellSize() {
        return getConfiguration().getInt("phoenix.custom.maxCellSize", 10485760); // 10MB
    }

    @Override
    public void stop(CoprocessorEnvironment e) throws IOException {
        if (mutationTimer != null) {
            mutationTimer.cancel();
        }
        super.stop(e);
        LOG.info("Custom Phoenix Region Observer stopped");
    }
}

Server-Side Query Processing

AggregateClient

Client interface for server-side aggregation operations.

public class AggregateClient {
    // Aggregation execution
    public <T> T aggregate(Scan scan, RowProcessor<T> rowProcessor) throws IOException
    public Result[] getRows(Scan scan, int numberOfRows) throws IOException

    // Aggregate functions
    public long rowCount(Scan scan) throws IOException
    public <R, S> R median(Scan scan, ColumnInterpreter<R, S, P, Q, W> ci,
                          byte[] colFamily, byte[] qualifier) throws IOException
    public <R, S> S sum(Scan scan, ColumnInterpreter<R, S, P, Q, W> ci,
                       byte[] colFamily, byte[] qualifier) throws IOException
    public <R, S> R max(Scan scan, ColumnInterpreter<R, S, P, Q, W> ci,
                       byte[] colFamily, byte[] qualifier) throws IOException
    public <R, S> R min(Scan scan, ColumnInterpreter<R, S, P, Q, W> ci,
                       byte[] colFamily, byte[] qualifier) throws IOException
}

HashJoinRegionScanner

Server-side scanner for hash join operations.

public class HashJoinRegionScanner extends BaseRegionScanner {
    public HashJoinRegionScanner(RegionScanner scanner, List<Expression> expressions,
                                Expression postJoinFilterExpression,
                                List<HashJoinInfo> joinInfos, TupleProjector projector)

    // Scanner implementation
    public boolean next(List<Cell> results) throws IOException
    public boolean next(List<Cell> results, ScannerContext scannerContext) throws IOException
    public void close() throws IOException

    // Join-specific methods
    public long getMaxResultSize()
    public RegionInfo getRegionInfo()
}

Usage:

// Server-side aggregation example
public class ServerAggregationExample {
    public void performServerAggregation(Connection connection, String tableName) throws IOException, SQLException {
        PhoenixConnection phoenixConn = connection.unwrap(PhoenixConnection.class);
        ConnectionQueryServices queryServices = phoenixConn.getQueryServices();

        // Create scan for aggregation
        Scan scan = new Scan();
        scan.addFamily(Bytes.toBytes("cf"));

        // Custom row processor for aggregation
        RowProcessor<AggregationResult> processor = new RowProcessor<AggregationResult>() {
            private long count = 0;
            private long sum = 0;

            @Override
            public void process(Result result) {
                count++;
                Cell[] cells = result.rawCells();
                for (Cell cell : cells) {
                    if (Bytes.equals(CellUtil.cloneQualifier(cell), Bytes.toBytes("salary"))) {
                        sum += Bytes.toLong(CellUtil.cloneValue(cell));
                    }
                }
            }

            @Override
            public AggregationResult getResult() {
                return new AggregationResult(count, sum, sum / (double) count);
            }
        };

        // Execute server-side aggregation
        AggregateClient client = new AggregateClient();
        Table table = queryServices.getTable(Bytes.toBytes(tableName));
        AggregationResult result = client.aggregate(scan, processor);

        System.out.println("Count: " + result.getCount());
        System.out.println("Sum: " + result.getSum());
        System.out.println("Average: " + result.getAverage());
    }
}

Custom Coprocessor Development

Custom Phoenix Coprocessor Template

// Template for developing custom Phoenix coprocessors
public abstract class CustomPhoenixCoprocessor implements RegionObserver, RegionCoprocessor {
    protected Configuration configuration;
    protected RegionCoprocessorEnvironment environment;

    @Override
    public void start(CoprocessorEnvironment env) throws IOException {
        if (env instanceof RegionCoprocessorEnvironment) {
            this.environment = (RegionCoprocessorEnvironment) env;
            this.configuration = env.getConfiguration();
            onStart();
        }
    }

    @Override
    public void stop(CoprocessorEnvironment env) throws IOException {
        onStop();
    }

    // Template methods for subclasses
    protected abstract void onStart() throws IOException;
    protected abstract void onStop() throws IOException;

    // Utility methods
    protected Table getPhoenixTable(String tableName) throws IOException {
        return environment.getConnection().getTable(TableName.valueOf(tableName));
    }

    protected void logInfo(String message, Object... args) {
        System.out.println(String.format("[CustomCoprocessor] " + message, args));
    }

    protected void logError(String message, Throwable t) {
        System.err.println("[CustomCoprocessor] ERROR: " + message);
        if (t != null) {
            t.printStackTrace();
        }
    }
}

// Example implementation
public class AuditTrailCoprocessor extends CustomPhoenixCoprocessor {
    private Table auditTable;

    @Override
    protected void onStart() throws IOException {
        auditTable = getPhoenixTable("AUDIT_TRAIL");
        logInfo("Audit trail coprocessor started");
    }

    @Override
    protected void onStop() throws IOException {
        if (auditTable != null) {
            auditTable.close();
        }
        logInfo("Audit trail coprocessor stopped");
    }

    @Override
    public void postBatchMutate(ObserverContext<RegionCoprocessorEnvironment> c,
                               MiniBatchOperationInProgress<Mutation> miniBatchOp) throws IOException {
        try {
            // Record audit information
            for (int i = 0; i < miniBatchOp.size(); i++) {
                Mutation mutation = miniBatchOp.getOperation(i);
                recordAuditEntry(mutation);
            }
        } catch (Exception e) {
            logError("Failed to record audit entry", e);
            // Don't fail the original operation
        }
    }

    private void recordAuditEntry(Mutation mutation) throws IOException {
        Put auditPut = new Put(generateAuditRowKey());
        auditPut.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("operation_type"),
                          Bytes.toBytes(mutation.getClass().getSimpleName()));
        auditPut.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("row_key"),
                          mutation.getRow());
        auditPut.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("timestamp"),
                          Bytes.toBytes(System.currentTimeMillis()));

        auditTable.put(auditPut);
    }

    private byte[] generateAuditRowKey() {
        return Bytes.toBytes(UUID.randomUUID().toString());
    }
}

Deployment and Configuration

Coprocessor Registration

// Programmatic coprocessor registration
public class CoprocessorManager {
    public static void registerCustomCoprocessors(Admin admin, String tableName) throws IOException {
        TableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf(tableName));
        TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableDescriptor);

        // Add custom coprocessors
        builder.setCoprocessor(CustomPhoenixRegionObserver.class.getName());
        builder.setCoprocessor(AuditTrailCoprocessor.class.getName());

        // Update table
        admin.modifyTable(builder.build());
        System.out.println("Custom coprocessors registered for table: " + tableName);
    }

    public static void unregisterCoprocessors(Admin admin, String tableName) throws IOException {
        TableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf(tableName));
        TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableDescriptor);

        // Remove coprocessors
        builder.removeCoprocessor(CustomPhoenixRegionObserver.class.getName());
        builder.removeCoprocessor(AuditTrailCoprocessor.class.getName());

        admin.modifyTable(builder.build());
        System.out.println("Coprocessors unregistered for table: " + tableName);
    }
}

// Usage
ConnectionQueryServices queryServices = connection.getQueryServices();
Admin admin = queryServices.getAdmin();

CoprocessorManager.registerCustomCoprocessors(admin, "users");

Configuration Properties

<!-- hbase-site.xml configuration for Phoenix coprocessors -->
<configuration>
    <!-- Enable Phoenix coprocessors -->
    <property>
        <name>hbase.coprocessor.region.classes</name>
        <value>org.apache.phoenix.coprocessor.PhoenixRegionObserver</value>
    </property>

    <!-- Phoenix index coprocessors -->
    <property>
        <name>hbase.coprocessor.wal.classes</name>
        <value>org.apache.phoenix.hbase.index.wal.IndexedWALEditCodec</value>
    </property>

    <!-- Custom coprocessor configuration -->
    <property>
        <name>phoenix.custom.maxCellSize</name>
        <value>10485760</value>
    </property>

    <property>
        <name>phoenix.cache.maxSize</name>
        <value>104857600</value>
    </property>
</configuration>

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-phoenix--phoenix-core

docs

configuration.md

exceptions.md

execution.md

expressions.md

index.md

jdbc.md

mapreduce.md

monitoring.md

query-compilation.md

schema-metadata.md

server.md

transactions.md

types.md

tile.json