or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

admin-operations.mddatabase-operations.mdindex.md

database-operations.mddocs/

0

# Database Query Operations

1

2

SQL-based querying capabilities for retrieving data from Pinot clusters through the broker API. The PinotDbApiHook extends Airflow's standard DbApiHook to provide Pinot-specific database connectivity using the pinotdb client library.

3

4

## Capabilities

5

6

### Connection Management

7

8

Establishes and manages connections to Pinot brokers for SQL query execution.

9

10

```python { .api }

11

class PinotDbApiHook(DbApiHook):

12

"""

13

Interact with Pinot Broker Query API using standard SQL.

14

15

Attributes:

16

conn_name_attr: str = "pinot_broker_conn_id"

17

default_conn_name: str = "pinot_broker_default"

18

conn_type: str = "pinot"

19

hook_name: str = "Pinot Broker"

20

supports_autocommit: bool = False

21

"""

22

23

# Inherits __init__ from DbApiHook - no custom constructor

24

25

def get_conn(self):

26

"""

27

Establish a connection to pinot broker through pinot dbapi.

28

29

Returns:

30

Pinot database connection object

31

"""

32

33

def get_uri(self) -> str:

34

"""

35

Get the connection uri for pinot broker.

36

37

Returns:

38

Connection URI (e.g: http://localhost:9000/query/sql)

39

"""

40

```

41

42

### Usage Example - Connection Management

43

44

```python

45

from airflow.providers.apache.pinot.hooks.pinot import PinotDbApiHook

46

47

# Initialize with default connection

48

hook = PinotDbApiHook()

49

50

# Get connection URI

51

uri = hook.get_uri()

52

print(f"Connecting to: {uri}")

53

54

# Get raw connection object

55

conn = hook.get_conn()

56

```

57

58

### Query Execution

59

60

Execute SQL queries against Pinot clusters and retrieve results in various formats.

61

62

```python { .api }

63

def get_records(

64

self,

65

sql: str | list[str],

66

parameters: Iterable | Mapping[str, Any] | None = None,

67

**kwargs

68

):

69

"""

70

Execute the sql and returns a set of records.

71

72

Args:

73

sql: SQL statement(s) to execute

74

parameters: Parameters to render the SQL query with

75

**kwargs: Additional parameters

76

77

Returns:

78

List of tuples containing query results

79

"""

80

81

def get_first(

82

self,

83

sql: str | list[str],

84

parameters: Iterable | Mapping[str, Any] | None = None

85

):

86

"""

87

Execute the sql and returns the first resulting row.

88

89

Args:

90

sql: SQL statement(s) to execute

91

parameters: Parameters to render the SQL query with

92

93

Returns:

94

Tuple containing the first row of results, or None if no results

95

"""

96

```

97

98

### Usage Example - Query Execution

99

100

```python

101

from airflow.providers.apache.pinot.hooks.pinot import PinotDbApiHook

102

103

hook = PinotDbApiHook()

104

105

# Execute query and get all records

106

sql = """

107

SELECT

108

customer_id,

109

COUNT(*) as order_count,

110

SUM(total_amount) as total_revenue

111

FROM orders

112

WHERE order_date >= '2023-01-01'

113

GROUP BY customer_id

114

ORDER BY total_revenue DESC

115

LIMIT 100

116

"""

117

118

results = hook.get_records(sql)

119

for row in results:

120

customer_id, order_count, total_revenue = row

121

print(f"Customer {customer_id}: {order_count} orders, ${total_revenue}")

122

123

# Get only the first result

124

top_customer = hook.get_first(sql)

125

if top_customer:

126

customer_id, order_count, total_revenue = top_customer

127

print(f"Top customer: {customer_id} with ${total_revenue}")

128

129

# Query with parameters (if supported by underlying connection)

130

parametrized_sql = "SELECT * FROM orders WHERE customer_id = ? AND order_date >= ?"

131

results = hook.get_records(parametrized_sql, parameters=[12345, '2023-06-01'])

132

```

133

134

### Unsupported Operations

135

136

The following operations are not supported for Pinot (read-only analytical database):

137

138

```python { .api }

139

def set_autocommit(self, conn: Connection, autocommit: Any):

140

"""Raises NotImplementedError - autocommit not supported"""

141

142

def insert_rows(

143

self,

144

table: str,

145

rows: str,

146

target_fields: str | None = None,

147

commit_every: int = 1000,

148

replace: bool = False,

149

**kwargs: Any

150

):

151

"""Raises NotImplementedError - insert operations not supported"""

152

```

153

154

## Connection Configuration

155

156

### Airflow Connection Setup

157

158

The PinotDbApiHook uses Airflow connections with the following configuration:

159

160

- **Connection Type**: `pinot`

161

- **Host**: Pinot broker hostname

162

- **Port**: Pinot broker port (typically 8099)

163

- **Login**: Username (optional, for authenticated clusters)

164

- **Password**: Password (optional, for authenticated clusters)

165

- **Extra**: JSON configuration with additional options

166

167

### Extra Configuration Options

168

169

```python

170

{

171

"endpoint": "/query/sql", # API endpoint (default: /query/sql)

172

"schema": "http" # Protocol scheme (default: http)

173

}

174

```

175

176

### Usage Example - Connection Configuration

177

178

```python

179

# Connection configuration in Airflow UI or via code

180

from airflow.models import Connection

181

from airflow import settings

182

183

# Create connection programmatically

184

conn = Connection(

185

conn_id='my_pinot_broker',

186

conn_type='pinot',

187

host='pinot-broker.example.com',

188

port=8099,

189

login='pinot_user', # Optional

190

password='pinot_password', # Optional

191

extra='{"endpoint": "/query/sql", "schema": "https"}'

192

)

193

194

# Add to Airflow

195

session = settings.Session()

196

session.add(conn)

197

session.commit()

198

```

199

200

## Error Handling

201

202

The hook inherits standard database error handling from DbApiHook and may raise AirflowException for connection or query failures. Common error scenarios include:

203

204

- Connection timeout or failure

205

- Invalid SQL syntax

206

- Authentication failures

207

- Network connectivity issues

208

209

```python

210

from airflow.exceptions import AirflowException

211

from airflow.providers.apache.pinot.hooks.pinot import PinotDbApiHook

212

213

try:

214

hook = PinotDbApiHook()

215

results = hook.get_records("SELECT * FROM non_existent_table")

216

except AirflowException as e:

217

print(f"Query failed: {e}")

218

```