CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-apache-pinot

Apache Airflow provider package enabling integration with Apache Pinot for real-time analytics queries and administrative operations.

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

database-operations.mddocs/

Database Query Operations

SQL-based querying capabilities for retrieving data from Pinot clusters through the broker API. The PinotDbApiHook extends Airflow's standard DbApiHook to provide Pinot-specific database connectivity using the pinotdb client library.

Capabilities

Connection Management

Establishes and manages connections to Pinot brokers for SQL query execution.

class PinotDbApiHook(DbApiHook):
    """
    Interact with Pinot Broker Query API using standard SQL.
    
    Attributes:
        conn_name_attr: str = "pinot_broker_conn_id"
        default_conn_name: str = "pinot_broker_default"
        conn_type: str = "pinot"
        hook_name: str = "Pinot Broker"
        supports_autocommit: bool = False
    """
    
    # Inherits __init__ from DbApiHook - no custom constructor

    def get_conn(self):
        """
        Establish a connection to pinot broker through pinot dbapi.
        
        Returns:
            Pinot database connection object
        """

    def get_uri(self) -> str:
        """
        Get the connection uri for pinot broker.
        
        Returns:
            Connection URI (e.g: http://localhost:9000/query/sql)
        """

Usage Example - Connection Management

from airflow.providers.apache.pinot.hooks.pinot import PinotDbApiHook

# Initialize with default connection
hook = PinotDbApiHook()

# Get connection URI
uri = hook.get_uri()
print(f"Connecting to: {uri}")

# Get raw connection object
conn = hook.get_conn()

Query Execution

Execute SQL queries against Pinot clusters and retrieve results in various formats.

def get_records(
    self, 
    sql: str | list[str], 
    parameters: Iterable | Mapping[str, Any] | None = None, 
    **kwargs
):
    """
    Execute the sql and returns a set of records.
    
    Args:
        sql: SQL statement(s) to execute
        parameters: Parameters to render the SQL query with
        **kwargs: Additional parameters
        
    Returns:
        List of tuples containing query results
    """

def get_first(
    self, 
    sql: str | list[str], 
    parameters: Iterable | Mapping[str, Any] | None = None
):
    """
    Execute the sql and returns the first resulting row.
    
    Args:
        sql: SQL statement(s) to execute  
        parameters: Parameters to render the SQL query with
        
    Returns:
        Tuple containing the first row of results, or None if no results
    """

Usage Example - Query Execution

from airflow.providers.apache.pinot.hooks.pinot import PinotDbApiHook

hook = PinotDbApiHook()

# Execute query and get all records
sql = """
SELECT 
    customer_id, 
    COUNT(*) as order_count,
    SUM(total_amount) as total_revenue
FROM orders 
WHERE order_date >= '2023-01-01'
GROUP BY customer_id
ORDER BY total_revenue DESC
LIMIT 100
"""

results = hook.get_records(sql)
for row in results:
    customer_id, order_count, total_revenue = row
    print(f"Customer {customer_id}: {order_count} orders, ${total_revenue}")

# Get only the first result
top_customer = hook.get_first(sql)
if top_customer:
    customer_id, order_count, total_revenue = top_customer
    print(f"Top customer: {customer_id} with ${total_revenue}")

# Query with parameters (if supported by underlying connection)
parametrized_sql = "SELECT * FROM orders WHERE customer_id = ? AND order_date >= ?"
results = hook.get_records(parametrized_sql, parameters=[12345, '2023-06-01'])

Unsupported Operations

The following operations are not supported for Pinot (read-only analytical database):

def set_autocommit(self, conn: Connection, autocommit: Any):
    """Raises NotImplementedError - autocommit not supported"""

def insert_rows(
    self,
    table: str,
    rows: str,
    target_fields: str | None = None,
    commit_every: int = 1000,
    replace: bool = False,
    **kwargs: Any
):
    """Raises NotImplementedError - insert operations not supported"""

Connection Configuration

Airflow Connection Setup

The PinotDbApiHook uses Airflow connections with the following configuration:

  • Connection Type: pinot
  • Host: Pinot broker hostname
  • Port: Pinot broker port (typically 8099)
  • Login: Username (optional, for authenticated clusters)
  • Password: Password (optional, for authenticated clusters)
  • Extra: JSON configuration with additional options

Extra Configuration Options

{
    "endpoint": "/query/sql",  # API endpoint (default: /query/sql)
    "schema": "http"          # Protocol scheme (default: http)
}

Usage Example - Connection Configuration

# Connection configuration in Airflow UI or via code
from airflow.models import Connection
from airflow import settings

# Create connection programmatically
conn = Connection(
    conn_id='my_pinot_broker',
    conn_type='pinot',
    host='pinot-broker.example.com',
    port=8099,
    login='pinot_user',        # Optional
    password='pinot_password', # Optional
    extra='{"endpoint": "/query/sql", "schema": "https"}'
)

# Add to Airflow
session = settings.Session()
session.add(conn)
session.commit()

Error Handling

The hook inherits standard database error handling from DbApiHook and may raise AirflowException for connection or query failures. Common error scenarios include:

  • Connection timeout or failure
  • Invalid SQL syntax
  • Authentication failures
  • Network connectivity issues
from airflow.exceptions import AirflowException
from airflow.providers.apache.pinot.hooks.pinot import PinotDbApiHook

try:
    hook = PinotDbApiHook()
    results = hook.get_records("SELECT * FROM non_existent_table")
except AirflowException as e:
    print(f"Query failed: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-apache-pinot

docs

admin-operations.md

database-operations.md

index.md

tile.json