0
# Database Query Operations
1
2
SQL-based querying capabilities for retrieving data from Pinot clusters through the broker API. The PinotDbApiHook extends Airflow's standard DbApiHook to provide Pinot-specific database connectivity using the pinotdb client library.
3
4
## Capabilities
5
6
### Connection Management
7
8
Establishes and manages connections to Pinot brokers for SQL query execution.
9
10
```python { .api }
11
class PinotDbApiHook(DbApiHook):
12
"""
13
Interact with Pinot Broker Query API using standard SQL.
14
15
Attributes:
16
conn_name_attr: str = "pinot_broker_conn_id"
17
default_conn_name: str = "pinot_broker_default"
18
conn_type: str = "pinot"
19
hook_name: str = "Pinot Broker"
20
supports_autocommit: bool = False
21
"""
22
23
# Inherits __init__ from DbApiHook - no custom constructor
24
25
def get_conn(self):
26
"""
27
Establish a connection to pinot broker through pinot dbapi.
28
29
Returns:
30
Pinot database connection object
31
"""
32
33
def get_uri(self) -> str:
34
"""
35
Get the connection uri for pinot broker.
36
37
Returns:
38
Connection URI (e.g: http://localhost:9000/query/sql)
39
"""
40
```
41
42
### Usage Example - Connection Management
43
44
```python
45
from airflow.providers.apache.pinot.hooks.pinot import PinotDbApiHook
46
47
# Initialize with default connection
48
hook = PinotDbApiHook()
49
50
# Get connection URI
51
uri = hook.get_uri()
52
print(f"Connecting to: {uri}")
53
54
# Get raw connection object
55
conn = hook.get_conn()
56
```
57
58
### Query Execution
59
60
Execute SQL queries against Pinot clusters and retrieve results in various formats.
61
62
```python { .api }
63
def get_records(
64
self,
65
sql: str | list[str],
66
parameters: Iterable | Mapping[str, Any] | None = None,
67
**kwargs
68
):
69
"""
70
Execute the sql and returns a set of records.
71
72
Args:
73
sql: SQL statement(s) to execute
74
parameters: Parameters to render the SQL query with
75
**kwargs: Additional parameters
76
77
Returns:
78
List of tuples containing query results
79
"""
80
81
def get_first(
82
self,
83
sql: str | list[str],
84
parameters: Iterable | Mapping[str, Any] | None = None
85
):
86
"""
87
Execute the sql and returns the first resulting row.
88
89
Args:
90
sql: SQL statement(s) to execute
91
parameters: Parameters to render the SQL query with
92
93
Returns:
94
Tuple containing the first row of results, or None if no results
95
"""
96
```
97
98
### Usage Example - Query Execution
99
100
```python
101
from airflow.providers.apache.pinot.hooks.pinot import PinotDbApiHook
102
103
hook = PinotDbApiHook()
104
105
# Execute query and get all records
106
sql = """
107
SELECT
108
customer_id,
109
COUNT(*) as order_count,
110
SUM(total_amount) as total_revenue
111
FROM orders
112
WHERE order_date >= '2023-01-01'
113
GROUP BY customer_id
114
ORDER BY total_revenue DESC
115
LIMIT 100
116
"""
117
118
results = hook.get_records(sql)
119
for row in results:
120
customer_id, order_count, total_revenue = row
121
print(f"Customer {customer_id}: {order_count} orders, ${total_revenue}")
122
123
# Get only the first result
124
top_customer = hook.get_first(sql)
125
if top_customer:
126
customer_id, order_count, total_revenue = top_customer
127
print(f"Top customer: {customer_id} with ${total_revenue}")
128
129
# Query with parameters (if supported by underlying connection)
130
parametrized_sql = "SELECT * FROM orders WHERE customer_id = ? AND order_date >= ?"
131
results = hook.get_records(parametrized_sql, parameters=[12345, '2023-06-01'])
132
```
133
134
### Unsupported Operations
135
136
The following operations are not supported for Pinot (read-only analytical database):
137
138
```python { .api }
139
def set_autocommit(self, conn: Connection, autocommit: Any):
140
"""Raises NotImplementedError - autocommit not supported"""
141
142
def insert_rows(
143
self,
144
table: str,
145
rows: str,
146
target_fields: str | None = None,
147
commit_every: int = 1000,
148
replace: bool = False,
149
**kwargs: Any
150
):
151
"""Raises NotImplementedError - insert operations not supported"""
152
```
153
154
## Connection Configuration
155
156
### Airflow Connection Setup
157
158
The PinotDbApiHook uses Airflow connections with the following configuration:
159
160
- **Connection Type**: `pinot`
161
- **Host**: Pinot broker hostname
162
- **Port**: Pinot broker port (typically 8099)
163
- **Login**: Username (optional, for authenticated clusters)
164
- **Password**: Password (optional, for authenticated clusters)
165
- **Extra**: JSON configuration with additional options
166
167
### Extra Configuration Options
168
169
```python
170
{
171
"endpoint": "/query/sql", # API endpoint (default: /query/sql)
172
"schema": "http" # Protocol scheme (default: http)
173
}
174
```
175
176
### Usage Example - Connection Configuration
177
178
```python
179
# Connection configuration in Airflow UI or via code
180
from airflow.models import Connection
181
from airflow import settings
182
183
# Create connection programmatically
184
conn = Connection(
185
conn_id='my_pinot_broker',
186
conn_type='pinot',
187
host='pinot-broker.example.com',
188
port=8099,
189
login='pinot_user', # Optional
190
password='pinot_password', # Optional
191
extra='{"endpoint": "/query/sql", "schema": "https"}'
192
)
193
194
# Add to Airflow
195
session = settings.Session()
196
session.add(conn)
197
session.commit()
198
```
199
200
## Error Handling
201
202
The hook inherits standard database error handling from DbApiHook and may raise AirflowException for connection or query failures. Common error scenarios include:
203
204
- Connection timeout or failure
205
- Invalid SQL syntax
206
- Authentication failures
207
- Network connectivity issues
208
209
```python
210
from airflow.exceptions import AirflowException
211
from airflow.providers.apache.pinot.hooks.pinot import PinotDbApiHook
212
213
try:
214
hook = PinotDbApiHook()
215
results = hook.get_records("SELECT * FROM non_existent_table")
216
except AirflowException as e:
217
print(f"Query failed: {e}")
218
```