0
# PyDruid
1
2
A comprehensive Python connector for Apache Druid that provides both synchronous and asynchronous clients for creating, executing, and analyzing Druid queries. PyDruid enables tight integration between Druid and the SciPy stack by parsing query results into Pandas DataFrame objects, supports multiple query types, implements the Python DB API 2.0 specification with SQLAlchemy dialect support, and provides flexible export capabilities.
3
4
## Package Information
5
6
- **Package Name**: pydruid
7
- **Language**: Python
8
- **Installation**: `pip install pydruid`
9
- **Optional Dependencies**: `pip install pydruid[async,pandas,sqlalchemy,cli]`
10
11
## Core Imports
12
13
```python
14
from pydruid.client import PyDruid
15
from pydruid.async_client import AsyncPyDruid
16
```
17
18
For query building utilities:
19
20
```python
21
from pydruid.utils.aggregators import doublesum, count, longsum
22
from pydruid.utils.filters import Dimension, Filter
23
from pydruid.utils.postaggregator import Field
24
```
25
26
For database API:
27
28
```python
29
from pydruid.db import connect
30
```
31
32
For SQLAlchemy:
33
34
```python
35
from sqlalchemy import create_engine
36
```
37
38
## Basic Usage
39
40
```python
41
from pydruid.client import PyDruid
42
from pydruid.utils.aggregators import doublesum
43
from pydruid.utils.filters import Dimension
44
45
# Create client
46
query = PyDruid('http://localhost:8082', 'druid/v2/')
47
48
# Execute a timeseries query
49
ts = query.timeseries(
50
datasource='twitterstream',
51
granularity='day',
52
intervals='2014-02-02/p4w',
53
aggregations={'tweet_count': doublesum('count')},
54
filter=Dimension('first_hashtag') == 'sochi2014'
55
)
56
57
# Export results to pandas DataFrame
58
df = query.export_pandas()
59
print(df)
60
61
# Or export to TSV
62
query.export_tsv('results.tsv')
63
```
64
65
## Architecture
66
67
PyDruid provides three main interaction patterns:
68
69
- **Native Druid Queries**: Direct query construction using PyDruid/AsyncPyDruid clients with utility classes for aggregations, filters, and post-aggregations
70
- **Database API**: Standard Python DB API 2.0 interface for SQL-based interaction with Druid
71
- **SQLAlchemy Integration**: Full SQLAlchemy dialect support enabling ORM and Core usage patterns
72
73
The utility modules (aggregators, filters, dimensions, having, postaggregator) provide declarative query building capabilities that generate the appropriate JSON structures for Druid's native query format.
74
75
## Capabilities
76
77
### Synchronous Client
78
79
Primary client for executing Druid queries synchronously with support for all query types, authentication, proxy configuration, and result export capabilities.
80
81
```python { .api }
82
class PyDruid:
83
def __init__(self, url: str, endpoint: str, cafile: str = None) -> None: ...
84
def set_basic_auth_credentials(self, username: str, password: str) -> None: ...
85
def set_proxies(self, proxies: dict) -> None: ...
86
def topn(self, **kwargs) -> Query: ...
87
def timeseries(self, **kwargs) -> Query: ...
88
def groupby(self, **kwargs) -> Query: ...
89
def search(self, **kwargs) -> Query: ...
90
def subquery(self, **kwargs) -> dict: ...
91
```
92
93
[Synchronous Client](./synchronous-client.md)
94
95
### Asynchronous Client
96
97
Tornado-based asynchronous client for non-blocking query execution, suitable for high-concurrency applications and async frameworks.
98
99
```python { .api }
100
class AsyncPyDruid:
101
def __init__(self, url: str, endpoint: str, defaults: dict = None, http_client: str = None) -> None: ...
102
async def topn(self, **kwargs) -> Query: ...
103
async def timeseries(self, **kwargs) -> Query: ...
104
async def groupby(self, **kwargs) -> Query: ...
105
```
106
107
[Asynchronous Client](./asynchronous-client.md)
108
109
### Query Building Utilities
110
111
Comprehensive utilities for constructing Druid queries including aggregations, filters, dimensions, having clauses, and post-aggregations.
112
113
```python { .api }
114
# Aggregations
115
def doublesum(raw_metric: str) -> dict: ...
116
def count(raw_metric: str) -> dict: ...
117
def longsum(raw_metric: str) -> dict: ...
118
119
# Filters
120
class Dimension:
121
def __init__(self, dim: str) -> None: ...
122
def __eq__(self, other) -> Filter: ...
123
124
class Filter:
125
def __and__(self, other: 'Filter') -> 'Filter': ...
126
def __or__(self, other: 'Filter') -> 'Filter': ...
127
```
128
129
[Query Building Utilities](./query-utilities.md)
130
131
### Database API
132
133
Python DB API 2.0 compliant interface for SQL-based interaction with Druid, providing standard database connectivity patterns.
134
135
```python { .api }
136
def connect(
137
host: str = "localhost",
138
port: int = 8082,
139
path: str = "/druid/v2/sql/",
140
scheme: str = "http",
141
user: str = None,
142
password: str = None,
143
context: dict = None,
144
header: bool = False,
145
ssl_verify_cert: bool = True,
146
ssl_client_cert: str = None,
147
proxies: dict = None,
148
jwt: str = None
149
) -> Connection: ...
150
```
151
152
[Database API](./database-api.md)
153
154
### SQLAlchemy Integration
155
156
Full SQLAlchemy dialect support enabling both Core and ORM usage patterns with Druid as a backend database.
157
158
```python { .api }
159
class DruidDialect:
160
name = "druid"
161
def dbapi(self) -> type: ...
162
def create_connect_args(self, url) -> tuple: ...
163
```
164
165
[SQLAlchemy Integration](./sqlalchemy-integration.md)
166
167
### Command Line Interface
168
169
Interactive command-line tool for executing SQL queries against Druid with syntax highlighting and autocompletion.
170
171
```python { .api }
172
def main() -> None: ...
173
```
174
175
[Command Line Interface](./command-line-interface.md)
176
177
## Types
178
179
```python { .api }
180
class Query:
181
"""
182
Query result wrapper with export capabilities.
183
184
All query methods return Query objects that provide access to results
185
and export functionality. Query objects act as wrappers over raw result
186
lists and implement MutableSequence interface.
187
"""
188
result: list = None # Query result parsed into list of dicts
189
result_json: str = None # Raw JSON response from Druid
190
query_type: str = None # Type of query executed (topN, timeseries, etc.)
191
query_dict: dict = None # JSON representation of the query
192
193
def __init__(self, query_dict: dict, query_type: str) -> None:
194
"""Initialize Query object with query metadata."""
195
196
def parse(self, data: str) -> None:
197
"""
198
Parse raw JSON response data into result list.
199
200
Parameters:
201
- data: Raw JSON response from Druid
202
203
Raises:
204
IOError: If parsing fails
205
"""
206
207
def export_tsv(self, dest_path: str) -> None:
208
"""
209
Export query results to TSV file.
210
211
Parameters:
212
- dest_path: File path to write TSV data
213
"""
214
215
def export_pandas(self) -> 'pandas.DataFrame':
216
"""
217
Export query results to pandas DataFrame.
218
219
Returns:
220
pandas DataFrame with query results
221
222
Requires:
223
pandas package to be installed
224
225
Note:
226
This method is deprecated. Use export_pandas() method directly
227
on the client object instead.
228
"""
229
230
# MutableSequence interface methods
231
def __len__(self) -> int: ...
232
def __getitem__(self, index: int): ...
233
def __setitem__(self, index: int, value): ...
234
def __delitem__(self, index: int): ...
235
def insert(self, index: int, value): ...
236
237
class Connection:
238
"""DB API 2.0 connection object."""
239
def close(self) -> None: ...
240
def commit(self) -> None: ...
241
def cursor(self) -> 'Cursor': ...
242
def execute(self, operation: str, parameters: dict = None) -> 'Cursor': ...
243
244
class Cursor:
245
"""DB API 2.0 cursor object."""
246
rowcount: int
247
description: list
248
arraysize: int
249
250
def close(self) -> None: ...
251
def execute(self, operation: str, parameters: dict = None) -> None: ...
252
def fetchone(self) -> tuple: ...
253
def fetchmany(self, size: int = None) -> list: ...
254
def fetchall(self) -> list: ...
255
```