or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-apache-airflow-providers-apache-pinot

Apache Airflow provider package enabling integration with Apache Pinot for real-time analytics queries and administrative operations.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-apache-pinot@4.8.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-apache-pinot@4.8.0

0

# Apache Airflow Providers Apache Pinot

1

2

Apache Airflow provider package that enables integration with Apache Pinot, a real-time distributed OLAP datastore designed for ultra-low latency analytics. The provider offers comprehensive connectivity for both SQL-based queries and administrative operations.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow-providers-apache-pinot

7

- **Language**: Python

8

- **Installation**: `pip install apache-airflow-providers-apache-pinot`

9

10

## Core Imports

11

12

```python

13

from airflow.providers.apache.pinot.hooks.pinot import PinotDbApiHook, PinotAdminHook

14

```

15

16

Version access:

17

18

```python

19

from airflow.providers.apache.pinot import __version__

20

```

21

22

## Basic Usage

23

24

### Query Operations

25

26

```python

27

from airflow.providers.apache.pinot.hooks.pinot import PinotDbApiHook

28

29

# Initialize database hook (uses default connection ID)

30

hook = PinotDbApiHook()

31

32

# Execute SQL queries

33

sql = "SELECT COUNT(*) FROM my_table WHERE timestamp > '2023-01-01'"

34

results = hook.get_records(sql)

35

36

# Get first record only

37

first_result = hook.get_first(sql)

38

39

# Get connection URI

40

uri = hook.get_uri()

41

```

42

43

### Administrative Operations

44

45

```python

46

from airflow.providers.apache.pinot.hooks.pinot import PinotAdminHook

47

48

# Initialize admin hook

49

admin_hook = PinotAdminHook(conn_id='pinot_admin_default')

50

51

# Add schema

52

admin_hook.add_schema('path/to/schema.json')

53

54

# Add table

55

admin_hook.add_table('path/to/table_config.json')

56

57

# Create and upload segment

58

admin_hook.create_segment(

59

table_name='my_table',

60

data_dir='/path/to/data',

61

out_dir='/path/to/segments'

62

)

63

admin_hook.upload_segment('/path/to/segments/my_segment')

64

```

65

66

## Architecture

67

68

The provider is built around two main hook classes that handle different aspects of Pinot integration:

69

70

- **PinotDbApiHook**: Extends Apache Airflow's DbApiHook to provide SQL query capabilities against Pinot brokers using the pinotdb client library

71

- **PinotAdminHook**: Wraps the pinot-admin.sh command-line tool for administrative operations like schema management, table creation, and segment operations

72

73

Both hooks integrate with Airflow's connection management system, supporting authentication and configuration through Airflow connections.

74

75

## Capabilities

76

77

### Database Query Operations

78

79

SQL-based querying capabilities for retrieving data from Pinot clusters through the broker API. Supports standard SQL queries with result retrieval in various formats.

80

81

```python { .api }

82

class PinotDbApiHook(DbApiHook):

83

def get_conn(self): ...

84

def get_uri(self) -> str: ...

85

def get_records(self, sql: str | list[str], parameters=None, **kwargs): ...

86

def get_first(self, sql: str | list[str], parameters=None): ...

87

```

88

89

[Database Query Operations](./database-operations.md)

90

91

### Administrative Operations

92

93

Administrative functionality for managing Pinot clusters including schema management, table configuration, segment creation, and data ingestion workflows.

94

95

```python { .api }

96

class PinotAdminHook(BaseHook):

97

def add_schema(self, schema_file: str, with_exec: bool = True): ...

98

def add_table(self, file_path: str, with_exec: bool = True): ...

99

def create_segment(self, **kwargs): ...

100

def upload_segment(self, segment_dir: str, table_name: str | None = None): ...

101

def run_cli(self, cmd: list[str], verbose: bool = True) -> str: ...

102

```

103

104

[Administrative Operations](./admin-operations.md)

105

106

## Connection Types

107

108

The provider registers two Airflow connection types:

109

110

- **pinot**: For database query operations via PinotDbApiHook

111

- **pinot_admin**: For administrative operations via PinotAdminHook

112

113

## Version Compatibility

114

115

```python { .api }

116

from airflow.providers.apache.pinot.version_compat import AIRFLOW_V_3_1_PLUS, BaseHook

117

118

def get_base_airflow_version_tuple() -> tuple[int, int, int]: ...

119

```

120

121

The provider maintains compatibility with Airflow 2.10+ and includes version-specific compatibility handling for different Airflow releases.

122

123

## Types

124

125

```python { .api }

126

# Connection imports

127

from typing import TYPE_CHECKING, Any, Iterable, Mapping

128

from collections.abc import Iterable, Mapping

129

130

if TYPE_CHECKING:

131

from airflow.models import Connection

132

```