Apache Airflow provider package enabling integration with Apache Pinot for real-time analytics queries and administrative operations.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Administrative functionality for managing Pinot clusters including schema management, table configuration, segment creation, and data ingestion workflows. The PinotAdminHook wraps the pinot-admin.sh command-line tool to provide programmatic access to Pinot administrative operations.
class PinotAdminHook(BaseHook):
"""
Hook for Pinot administrative operations via pinot-admin.sh script.
Attributes:
conn_name_attr: str = "conn_id"
default_conn_name: str = "pinot_admin_default"
conn_type: str = "pinot_admin"
hook_name: str = "Pinot Admin"
"""
def __init__(
self,
conn_id: str = "pinot_admin_default",
cmd_path: str = "pinot-admin.sh",
pinot_admin_system_exit: bool = False
):
"""
Initialize PinotAdminHook.
Args:
conn_id: Airflow connection ID for Pinot admin operations
cmd_path: Must remain "pinot-admin.sh" (legacy parameter, do not modify)
pinot_admin_system_exit: If true, evaluate result based on status code;
if false, evaluate based on output message
"""
def get_conn(self):
"""
Get connection object.
Returns:
Airflow connection object
"""from airflow.providers.apache.pinot.hooks.pinot import PinotAdminHook
# Initialize with default connection
admin_hook = PinotAdminHook()
# Initialize with custom connection
admin_hook = PinotAdminHook(
conn_id='my_pinot_admin',
pinot_admin_system_exit=True # Use exit codes for error detection
)
# Get connection details
conn = admin_hook.get_conn()Add and manage Pinot schemas for table definitions.
def add_schema(self, schema_file: str, with_exec: bool = True):
"""
Add Pinot schema by running AddSchema command.
Args:
schema_file: Path to Pinot schema file (JSON format)
with_exec: Whether to execute the schema addition immediately
"""from airflow.providers.apache.pinot.hooks.pinot import PinotAdminHook
admin_hook = PinotAdminHook(conn_id='pinot_admin_conn')
# Add schema from file
schema_file_path = '/path/to/my_table_schema.json'
admin_hook.add_schema(schema_file_path)
# Add schema without immediate execution (dry-run)
admin_hook.add_schema(schema_file_path, with_exec=False)Example schema file format:
{
"schemaName": "my_table",
"dimensionFieldSpecs": [
{
"name": "customer_id",
"dataType": "STRING"
},
{
"name": "product_category",
"dataType": "STRING"
}
],
"metricFieldSpecs": [
{
"name": "revenue",
"dataType": "DOUBLE"
}
],
"timeFieldSpec": {
"incomingGranularitySpec": {
"timeType": "MILLISECONDS",
"dataType": "LONG",
"name": "timestamp"
}
}
}Add and configure Pinot tables for data storage and querying.
def add_table(self, file_path: str, with_exec: bool = True):
"""
Add Pinot table by running AddTable command.
Args:
file_path: Path to Pinot table configuration file (JSON format)
with_exec: Whether to execute the table creation immediately
"""admin_hook = PinotAdminHook(conn_id='pinot_admin_conn')
# Add table configuration
table_config_path = '/path/to/my_table_config.json'
admin_hook.add_table(table_config_path)
# Add table without immediate execution
admin_hook.add_table(table_config_path, with_exec=False)Example table configuration:
{
"tableName": "my_table",
"tableType": "OFFLINE",
"segmentsConfig": {
"timeColumnName": "timestamp",
"timeType": "MILLISECONDS",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "365"
},
"tenants": {
"broker": "DefaultTenant",
"server": "DefaultTenant"
},
"metadata": {
"customConfigs": {}
}
}Create and upload segments for offline data ingestion.
def create_segment(
self,
generator_config_file: str | None = None,
data_dir: str | None = None,
segment_format: str | None = None,
out_dir: str | None = None,
overwrite: str | None = None,
table_name: str | None = None,
segment_name: str | None = None,
time_column_name: str | None = None,
schema_file: str | None = None,
reader_config_file: str | None = None,
enable_star_tree_index: str | None = None,
star_tree_index_spec_file: str | None = None,
hll_size: str | None = None,
hll_columns: str | None = None,
hll_suffix: str | None = None,
num_threads: str | None = None,
post_creation_verification: str | None = None,
retry: str | None = None
):
"""
Create Pinot segment by running CreateSegment command.
Args:
generator_config_file: Path to segment generator configuration file
data_dir: Directory containing input data files
segment_format: Output segment format
out_dir: Output directory for generated segments
overwrite: Whether to overwrite existing segments
table_name: Name of the target table
segment_name: Name for the generated segment
time_column_name: Name of the time column
schema_file: Path to schema file
reader_config_file: Path to data reader configuration
enable_star_tree_index: Enable star tree indexing
star_tree_index_spec_file: Star tree index specification file
hll_size: HyperLogLog size for approximate counting
hll_columns: Columns to apply HLL to
hll_suffix: Suffix for HLL columns
num_threads: Number of threads for processing
post_creation_verification: Enable post-creation verification
retry: Number of retry attempts
"""
def upload_segment(self, segment_dir: str, table_name: str | None = None):
"""
Upload segment by running UploadSegment command.
Args:
segment_dir: Directory containing the segment to upload
table_name: Target table name (optional)
"""from airflow.providers.apache.pinot.hooks.pinot import PinotAdminHook
admin_hook = PinotAdminHook(conn_id='pinot_admin_conn')
# Create segment from data directory
admin_hook.create_segment(
table_name='my_table',
data_dir='/path/to/input/data',
out_dir='/path/to/output/segments',
schema_file='/path/to/schema.json',
overwrite='true',
num_threads='4'
)
# Upload the created segment
segment_path = '/path/to/output/segments/my_table_segment'
admin_hook.upload_segment(segment_path, table_name='my_table')
# Advanced segment creation with indexing
admin_hook.create_segment(
table_name='analytics_table',
data_dir='/data/analytics',
out_dir='/segments/analytics',
schema_file='/config/analytics_schema.json',
enable_star_tree_index='true',
star_tree_index_spec_file='/config/star_tree_spec.json',
hll_columns='user_id,session_id',
hll_size='12',
post_creation_verification='true',
retry='3'
)Direct access to pinot-admin.sh command execution for advanced operations.
def run_cli(self, cmd: list[str], verbose: bool = True) -> str:
"""
Run command with pinot-admin.sh.
Args:
cmd: List of command arguments to pass to pinot-admin.sh
verbose: Whether to log command output
Returns:
Command output as string
Raises:
AirflowException: If command fails or contains error messages
"""admin_hook = PinotAdminHook(conn_id='pinot_admin_conn')
# Execute custom pinot-admin command
custom_command = ['ShowClusterInfo', '-clusterName', 'PinotCluster']
output = admin_hook.run_cli(custom_command)
print(f"Cluster info: {output}")
# Execute with minimal logging
output = admin_hook.run_cli(['ListTables'], verbose=False)
# Example: Delete a table (advanced usage)
delete_command = ['DeleteTable', '-tableName', 'old_table', '-exec']
try:
result = admin_hook.run_cli(delete_command)
print(f"Table deletion result: {result}")
except AirflowException as e:
print(f"Failed to delete table: {e}")The PinotAdminHook uses Airflow connections with the following configuration:
pinot_admin{
"pinot_admin_system_exit": false # Use output parsing vs exit codes for error detection
}from airflow.models import Connection
from airflow import settings
# Create admin connection
admin_conn = Connection(
conn_id='my_pinot_admin',
conn_type='pinot_admin',
host='pinot-controller.example.com',
port=9000,
login='admin_user',
password='admin_password',
extra='{"pinot_admin_system_exit": true}'
)
session = settings.Session()
session.add(admin_conn)
session.commit()The hook evaluates command success based on the pinot_admin_system_exit setting:
pinot_admin_system_exit=True: Uses command exit codes for error detectionpinot_admin_system_exit=False: Checks output for "Error" or "Exception" stringsfrom airflow.exceptions import AirflowException
from airflow.providers.apache.pinot.hooks.pinot import PinotAdminHook
try:
admin_hook = PinotAdminHook(pinot_admin_system_exit=True)
admin_hook.add_schema('/path/to/invalid_schema.json')
except AirflowException as e:
print(f"Schema addition failed: {e}")pinot-admin.sh script must be available in the system PATHInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-apache-pinot