Apache Airflow provider package enabling integration with Apache Pinot for real-time analytics queries and administrative operations.
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
SQL-based querying capabilities for retrieving data from Pinot clusters through the broker API. The PinotDbApiHook extends Airflow's standard DbApiHook to provide Pinot-specific database connectivity using the pinotdb client library.
Establishes and manages connections to Pinot brokers for SQL query execution.
class PinotDbApiHook(DbApiHook):
"""
Interact with Pinot Broker Query API using standard SQL.
Attributes:
conn_name_attr: str = "pinot_broker_conn_id"
default_conn_name: str = "pinot_broker_default"
conn_type: str = "pinot"
hook_name: str = "Pinot Broker"
supports_autocommit: bool = False
"""
# Inherits __init__ from DbApiHook - no custom constructor
def get_conn(self):
"""
Establish a connection to pinot broker through pinot dbapi.
Returns:
Pinot database connection object
"""
def get_uri(self) -> str:
"""
Get the connection uri for pinot broker.
Returns:
Connection URI (e.g: http://localhost:9000/query/sql)
"""from airflow.providers.apache.pinot.hooks.pinot import PinotDbApiHook
# Initialize with default connection
hook = PinotDbApiHook()
# Get connection URI
uri = hook.get_uri()
print(f"Connecting to: {uri}")
# Get raw connection object
conn = hook.get_conn()Execute SQL queries against Pinot clusters and retrieve results in various formats.
def get_records(
self,
sql: str | list[str],
parameters: Iterable | Mapping[str, Any] | None = None,
**kwargs
):
"""
Execute the sql and returns a set of records.
Args:
sql: SQL statement(s) to execute
parameters: Parameters to render the SQL query with
**kwargs: Additional parameters
Returns:
List of tuples containing query results
"""
def get_first(
self,
sql: str | list[str],
parameters: Iterable | Mapping[str, Any] | None = None
):
"""
Execute the sql and returns the first resulting row.
Args:
sql: SQL statement(s) to execute
parameters: Parameters to render the SQL query with
Returns:
Tuple containing the first row of results, or None if no results
"""from airflow.providers.apache.pinot.hooks.pinot import PinotDbApiHook
hook = PinotDbApiHook()
# Execute query and get all records
sql = """
SELECT
customer_id,
COUNT(*) as order_count,
SUM(total_amount) as total_revenue
FROM orders
WHERE order_date >= '2023-01-01'
GROUP BY customer_id
ORDER BY total_revenue DESC
LIMIT 100
"""
results = hook.get_records(sql)
for row in results:
customer_id, order_count, total_revenue = row
print(f"Customer {customer_id}: {order_count} orders, ${total_revenue}")
# Get only the first result
top_customer = hook.get_first(sql)
if top_customer:
customer_id, order_count, total_revenue = top_customer
print(f"Top customer: {customer_id} with ${total_revenue}")
# Query with parameters (if supported by underlying connection)
parametrized_sql = "SELECT * FROM orders WHERE customer_id = ? AND order_date >= ?"
results = hook.get_records(parametrized_sql, parameters=[12345, '2023-06-01'])The following operations are not supported for Pinot (read-only analytical database):
def set_autocommit(self, conn: Connection, autocommit: Any):
"""Raises NotImplementedError - autocommit not supported"""
def insert_rows(
self,
table: str,
rows: str,
target_fields: str | None = None,
commit_every: int = 1000,
replace: bool = False,
**kwargs: Any
):
"""Raises NotImplementedError - insert operations not supported"""The PinotDbApiHook uses Airflow connections with the following configuration:
pinot{
"endpoint": "/query/sql", # API endpoint (default: /query/sql)
"schema": "http" # Protocol scheme (default: http)
}# Connection configuration in Airflow UI or via code
from airflow.models import Connection
from airflow import settings
# Create connection programmatically
conn = Connection(
conn_id='my_pinot_broker',
conn_type='pinot',
host='pinot-broker.example.com',
port=8099,
login='pinot_user', # Optional
password='pinot_password', # Optional
extra='{"endpoint": "/query/sql", "schema": "https"}'
)
# Add to Airflow
session = settings.Session()
session.add(conn)
session.commit()The hook inherits standard database error handling from DbApiHook and may raise AirflowException for connection or query failures. Common error scenarios include:
from airflow.exceptions import AirflowException
from airflow.providers.apache.pinot.hooks.pinot import PinotDbApiHook
try:
hook = PinotDbApiHook()
results = hook.get_records("SELECT * FROM non_existent_table")
except AirflowException as e:
print(f"Query failed: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-apache-pinot