Apache Airflow provider package enabling integration with Apache Pinot for real-time analytics queries and administrative operations.
npx @tessl/cli install tessl/pypi-apache-airflow-providers-apache-pinot@4.8.0Apache Airflow provider package that enables integration with Apache Pinot, a real-time distributed OLAP datastore designed for ultra-low latency analytics. The provider offers comprehensive connectivity for both SQL-based queries and administrative operations.
pip install apache-airflow-providers-apache-pinotfrom airflow.providers.apache.pinot.hooks.pinot import PinotDbApiHook, PinotAdminHookVersion access:
from airflow.providers.apache.pinot import __version__from airflow.providers.apache.pinot.hooks.pinot import PinotDbApiHook
# Initialize database hook (uses default connection ID)
hook = PinotDbApiHook()
# Execute SQL queries
sql = "SELECT COUNT(*) FROM my_table WHERE timestamp > '2023-01-01'"
results = hook.get_records(sql)
# Get first record only
first_result = hook.get_first(sql)
# Get connection URI
uri = hook.get_uri()from airflow.providers.apache.pinot.hooks.pinot import PinotAdminHook
# Initialize admin hook
admin_hook = PinotAdminHook(conn_id='pinot_admin_default')
# Add schema
admin_hook.add_schema('path/to/schema.json')
# Add table
admin_hook.add_table('path/to/table_config.json')
# Create and upload segment
admin_hook.create_segment(
table_name='my_table',
data_dir='/path/to/data',
out_dir='/path/to/segments'
)
admin_hook.upload_segment('/path/to/segments/my_segment')The provider is built around two main hook classes that handle different aspects of Pinot integration:
Both hooks integrate with Airflow's connection management system, supporting authentication and configuration through Airflow connections.
SQL-based querying capabilities for retrieving data from Pinot clusters through the broker API. Supports standard SQL queries with result retrieval in various formats.
class PinotDbApiHook(DbApiHook):
def get_conn(self): ...
def get_uri(self) -> str: ...
def get_records(self, sql: str | list[str], parameters=None, **kwargs): ...
def get_first(self, sql: str | list[str], parameters=None): ...Administrative functionality for managing Pinot clusters including schema management, table configuration, segment creation, and data ingestion workflows.
class PinotAdminHook(BaseHook):
def add_schema(self, schema_file: str, with_exec: bool = True): ...
def add_table(self, file_path: str, with_exec: bool = True): ...
def create_segment(self, **kwargs): ...
def upload_segment(self, segment_dir: str, table_name: str | None = None): ...
def run_cli(self, cmd: list[str], verbose: bool = True) -> str: ...The provider registers two Airflow connection types:
from airflow.providers.apache.pinot.version_compat import AIRFLOW_V_3_1_PLUS, BaseHook
def get_base_airflow_version_tuple() -> tuple[int, int, int]: ...The provider maintains compatibility with Airflow 2.10+ and includes version-specific compatibility handling for different Airflow releases.
# Connection imports
from typing import TYPE_CHECKING, Any, Iterable, Mapping
from collections.abc import Iterable, Mapping
if TYPE_CHECKING:
from airflow.models import Connection