Apache Airflow provider package enabling integration with Apache Pinot for real-time analytics queries and administrative operations.
—
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Pending
The risk profile of this skill
Apache Airflow provider package that enables integration with Apache Pinot, a real-time distributed OLAP datastore designed for ultra-low latency analytics. The provider offers comprehensive connectivity for both SQL-based queries and administrative operations.
pip install apache-airflow-providers-apache-pinotfrom airflow.providers.apache.pinot.hooks.pinot import PinotDbApiHook, PinotAdminHookVersion access:
from airflow.providers.apache.pinot import __version__from airflow.providers.apache.pinot.hooks.pinot import PinotDbApiHook
# Initialize database hook (uses default connection ID)
hook = PinotDbApiHook()
# Execute SQL queries
sql = "SELECT COUNT(*) FROM my_table WHERE timestamp > '2023-01-01'"
results = hook.get_records(sql)
# Get first record only
first_result = hook.get_first(sql)
# Get connection URI
uri = hook.get_uri()from airflow.providers.apache.pinot.hooks.pinot import PinotAdminHook
# Initialize admin hook
admin_hook = PinotAdminHook(conn_id='pinot_admin_default')
# Add schema
admin_hook.add_schema('path/to/schema.json')
# Add table
admin_hook.add_table('path/to/table_config.json')
# Create and upload segment
admin_hook.create_segment(
table_name='my_table',
data_dir='/path/to/data',
out_dir='/path/to/segments'
)
admin_hook.upload_segment('/path/to/segments/my_segment')The provider is built around two main hook classes that handle different aspects of Pinot integration:
Both hooks integrate with Airflow's connection management system, supporting authentication and configuration through Airflow connections.
SQL-based querying capabilities for retrieving data from Pinot clusters through the broker API. Supports standard SQL queries with result retrieval in various formats.
class PinotDbApiHook(DbApiHook):
def get_conn(self): ...
def get_uri(self) -> str: ...
def get_records(self, sql: str | list[str], parameters=None, **kwargs): ...
def get_first(self, sql: str | list[str], parameters=None): ...Administrative functionality for managing Pinot clusters including schema management, table configuration, segment creation, and data ingestion workflows.
class PinotAdminHook(BaseHook):
def add_schema(self, schema_file: str, with_exec: bool = True): ...
def add_table(self, file_path: str, with_exec: bool = True): ...
def create_segment(self, **kwargs): ...
def upload_segment(self, segment_dir: str, table_name: str | None = None): ...
def run_cli(self, cmd: list[str], verbose: bool = True) -> str: ...The provider registers two Airflow connection types:
from airflow.providers.apache.pinot.version_compat import AIRFLOW_V_3_1_PLUS, BaseHook
def get_base_airflow_version_tuple() -> tuple[int, int, int]: ...The provider maintains compatibility with Airflow 2.10+ and includes version-specific compatibility handling for different Airflow releases.
# Connection imports
from typing import TYPE_CHECKING, Any, Iterable, Mapping
from collections.abc import Iterable, Mapping
if TYPE_CHECKING:
from airflow.models import Connection