CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-apache-pinot

Apache Airflow provider package enabling integration with Apache Pinot for real-time analytics queries and administrative operations.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

admin-operations.mddocs/

Administrative Operations

Administrative functionality for managing Pinot clusters including schema management, table configuration, segment creation, and data ingestion workflows. The PinotAdminHook wraps the pinot-admin.sh command-line tool to provide programmatic access to Pinot administrative operations.

Capabilities

Hook Initialization

class PinotAdminHook(BaseHook):
    """
    Hook for Pinot administrative operations via pinot-admin.sh script.
    
    Attributes:
        conn_name_attr: str = "conn_id"
        default_conn_name: str = "pinot_admin_default"
        conn_type: str = "pinot_admin"
        hook_name: str = "Pinot Admin"
    """
    
    def __init__(
        self,
        conn_id: str = "pinot_admin_default",
        cmd_path: str = "pinot-admin.sh",
        pinot_admin_system_exit: bool = False
    ):
        """
        Initialize PinotAdminHook.
        
        Args:
            conn_id: Airflow connection ID for Pinot admin operations
            cmd_path: Must remain "pinot-admin.sh" (legacy parameter, do not modify)
            pinot_admin_system_exit: If true, evaluate result based on status code;
                                   if false, evaluate based on output message
        """

    def get_conn(self):
        """
        Get connection object.
        
        Returns:
            Airflow connection object
        """

Usage Example - Hook Initialization

from airflow.providers.apache.pinot.hooks.pinot import PinotAdminHook

# Initialize with default connection
admin_hook = PinotAdminHook()

# Initialize with custom connection
admin_hook = PinotAdminHook(
    conn_id='my_pinot_admin',
    pinot_admin_system_exit=True  # Use exit codes for error detection
)

# Get connection details
conn = admin_hook.get_conn()

Schema Management

Add and manage Pinot schemas for table definitions.

def add_schema(self, schema_file: str, with_exec: bool = True):
    """
    Add Pinot schema by running AddSchema command.
    
    Args:
        schema_file: Path to Pinot schema file (JSON format)
        with_exec: Whether to execute the schema addition immediately
    """

Usage Example - Schema Management

from airflow.providers.apache.pinot.hooks.pinot import PinotAdminHook

admin_hook = PinotAdminHook(conn_id='pinot_admin_conn')

# Add schema from file
schema_file_path = '/path/to/my_table_schema.json'
admin_hook.add_schema(schema_file_path)

# Add schema without immediate execution (dry-run)
admin_hook.add_schema(schema_file_path, with_exec=False)

Example schema file format:

{
  "schemaName": "my_table",
  "dimensionFieldSpecs": [
    {
      "name": "customer_id",
      "dataType": "STRING"
    },
    {
      "name": "product_category", 
      "dataType": "STRING"
    }
  ],
  "metricFieldSpecs": [
    {
      "name": "revenue",
      "dataType": "DOUBLE"
    }
  ],
  "timeFieldSpec": {
    "incomingGranularitySpec": {
      "timeType": "MILLISECONDS",
      "dataType": "LONG",
      "name": "timestamp"
    }
  }
}

Table Management

Add and configure Pinot tables for data storage and querying.

def add_table(self, file_path: str, with_exec: bool = True):
    """
    Add Pinot table by running AddTable command.
    
    Args:
        file_path: Path to Pinot table configuration file (JSON format)
        with_exec: Whether to execute the table creation immediately
    """

Usage Example - Table Management

admin_hook = PinotAdminHook(conn_id='pinot_admin_conn')

# Add table configuration
table_config_path = '/path/to/my_table_config.json'
admin_hook.add_table(table_config_path)

# Add table without immediate execution
admin_hook.add_table(table_config_path, with_exec=False)

Example table configuration:

{
  "tableName": "my_table",
  "tableType": "OFFLINE",
  "segmentsConfig": {
    "timeColumnName": "timestamp",
    "timeType": "MILLISECONDS",
    "retentionTimeUnit": "DAYS",
    "retentionTimeValue": "365"
  },
  "tenants": {
    "broker": "DefaultTenant",
    "server": "DefaultTenant"
  },
  "metadata": {
    "customConfigs": {}
  }
}

Segment Operations

Create and upload segments for offline data ingestion.

def create_segment(
    self,
    generator_config_file: str | None = None,
    data_dir: str | None = None,
    segment_format: str | None = None,
    out_dir: str | None = None,
    overwrite: str | None = None,
    table_name: str | None = None,
    segment_name: str | None = None,
    time_column_name: str | None = None,
    schema_file: str | None = None,
    reader_config_file: str | None = None,
    enable_star_tree_index: str | None = None,
    star_tree_index_spec_file: str | None = None,
    hll_size: str | None = None,
    hll_columns: str | None = None,
    hll_suffix: str | None = None,
    num_threads: str | None = None,
    post_creation_verification: str | None = None,
    retry: str | None = None
):
    """
    Create Pinot segment by running CreateSegment command.
    
    Args:
        generator_config_file: Path to segment generator configuration file
        data_dir: Directory containing input data files
        segment_format: Output segment format
        out_dir: Output directory for generated segments
        overwrite: Whether to overwrite existing segments
        table_name: Name of the target table
        segment_name: Name for the generated segment
        time_column_name: Name of the time column
        schema_file: Path to schema file
        reader_config_file: Path to data reader configuration
        enable_star_tree_index: Enable star tree indexing
        star_tree_index_spec_file: Star tree index specification file
        hll_size: HyperLogLog size for approximate counting
        hll_columns: Columns to apply HLL to
        hll_suffix: Suffix for HLL columns
        num_threads: Number of threads for processing
        post_creation_verification: Enable post-creation verification
        retry: Number of retry attempts
    """

def upload_segment(self, segment_dir: str, table_name: str | None = None):
    """
    Upload segment by running UploadSegment command.
    
    Args:
        segment_dir: Directory containing the segment to upload
        table_name: Target table name (optional)
    """

Usage Example - Segment Operations

from airflow.providers.apache.pinot.hooks.pinot import PinotAdminHook

admin_hook = PinotAdminHook(conn_id='pinot_admin_conn')

# Create segment from data directory
admin_hook.create_segment(
    table_name='my_table',
    data_dir='/path/to/input/data',
    out_dir='/path/to/output/segments',
    schema_file='/path/to/schema.json',
    overwrite='true',
    num_threads='4'
)

# Upload the created segment
segment_path = '/path/to/output/segments/my_table_segment'
admin_hook.upload_segment(segment_path, table_name='my_table')

# Advanced segment creation with indexing
admin_hook.create_segment(
    table_name='analytics_table',
    data_dir='/data/analytics',
    out_dir='/segments/analytics',
    schema_file='/config/analytics_schema.json',
    enable_star_tree_index='true',
    star_tree_index_spec_file='/config/star_tree_spec.json',
    hll_columns='user_id,session_id',
    hll_size='12',
    post_creation_verification='true',
    retry='3'
)

Command Execution

Direct access to pinot-admin.sh command execution for advanced operations.

def run_cli(self, cmd: list[str], verbose: bool = True) -> str:
    """
    Run command with pinot-admin.sh.
    
    Args:
        cmd: List of command arguments to pass to pinot-admin.sh
        verbose: Whether to log command output
        
    Returns:
        Command output as string
        
    Raises:
        AirflowException: If command fails or contains error messages
    """

Usage Example - Command Execution

admin_hook = PinotAdminHook(conn_id='pinot_admin_conn')

# Execute custom pinot-admin command
custom_command = ['ShowClusterInfo', '-clusterName', 'PinotCluster']
output = admin_hook.run_cli(custom_command)
print(f"Cluster info: {output}")

# Execute with minimal logging
output = admin_hook.run_cli(['ListTables'], verbose=False)

# Example: Delete a table (advanced usage)
delete_command = ['DeleteTable', '-tableName', 'old_table', '-exec']
try:
    result = admin_hook.run_cli(delete_command)
    print(f"Table deletion result: {result}")
except AirflowException as e:
    print(f"Failed to delete table: {e}")

Connection Configuration

Airflow Connection Setup

The PinotAdminHook uses Airflow connections with the following configuration:

  • Connection Type: pinot_admin
  • Host: Pinot controller hostname
  • Port: Pinot controller port (typically 9000)
  • Login: Username (optional, for authenticated clusters)
  • Password: Password (optional, for authenticated clusters)
  • Extra: JSON configuration with additional options

Extra Configuration Options

{
    "pinot_admin_system_exit": false  # Use output parsing vs exit codes for error detection
}

Usage Example - Connection Configuration

from airflow.models import Connection
from airflow import settings

# Create admin connection
admin_conn = Connection(
    conn_id='my_pinot_admin',
    conn_type='pinot_admin',
    host='pinot-controller.example.com',
    port=9000,
    login='admin_user',
    password='admin_password',
    extra='{"pinot_admin_system_exit": true}'
)

session = settings.Session()
session.add(admin_conn)
session.commit()

Error Handling

The hook evaluates command success based on the pinot_admin_system_exit setting:

  • When pinot_admin_system_exit=True: Uses command exit codes for error detection
  • When pinot_admin_system_exit=False: Checks output for "Error" or "Exception" strings
from airflow.exceptions import AirflowException
from airflow.providers.apache.pinot.hooks.pinot import PinotAdminHook

try:
    admin_hook = PinotAdminHook(pinot_admin_system_exit=True)
    admin_hook.add_schema('/path/to/invalid_schema.json')
except AirflowException as e:
    print(f"Schema addition failed: {e}")

Prerequisites

  • The pinot-admin.sh script must be available in the system PATH
  • Appropriate permissions to execute Pinot administrative commands
  • Network connectivity to Pinot controller
  • Valid Airflow connection configuration

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-apache-pinot

docs

admin-operations.md

database-operations.md

index.md

tile.json