0
# Asynchronous Client
1
2
The AsyncPyDruid client provides asynchronous query execution using Tornado's HTTP client, enabling non-blocking operations suitable for high-concurrency applications and async frameworks.
3
4
## Capabilities
5
6
### Client Initialization
7
8
Creates a new AsyncPyDruid client instance for asynchronous Druid queries.
9
10
```python { .api }
11
class AsyncPyDruid:
12
def __init__(
13
self,
14
url: str,
15
endpoint: str,
16
defaults: dict = None,
17
http_client: str = None
18
) -> None:
19
"""
20
Initialize AsyncPyDruid client.
21
22
Parameters:
23
- url: URL of Broker node in the Druid cluster
24
- endpoint: Endpoint that Broker listens for queries on (typically 'druid/v2/')
25
- defaults: Default parameters for the Tornado HTTP client (optional)
26
- http_client: Tornado HTTP client implementation to use (optional)
27
"""
28
```
29
30
### Authentication and Configuration
31
32
Configure client authentication and proxy settings (inherited from base client).
33
34
```python { .api }
35
def set_basic_auth_credentials(self, username: str, password: str) -> None:
36
"""
37
Set HTTP Basic Authentication credentials.
38
39
Parameters:
40
- username: Username for authentication
41
- password: Password for authentication
42
"""
43
44
def set_proxies(self, proxies: dict) -> None:
45
"""
46
Configure proxy settings for HTTP requests.
47
48
Parameters:
49
- proxies: Dictionary mapping protocol names to proxy URLs
50
"""
51
```
52
53
### Asynchronous TopN Queries
54
55
Execute TopN queries asynchronously.
56
57
```python { .api }
58
async def topn(
59
self,
60
datasource: str,
61
granularity: str,
62
intervals: str | list,
63
aggregations: dict,
64
dimension: str,
65
metric: str,
66
threshold: int,
67
filter: 'Filter' = None,
68
post_aggregations: dict = None,
69
context: dict = None,
70
**kwargs
71
) -> Query:
72
"""
73
Execute a TopN query asynchronously.
74
75
Parameters:
76
- datasource: Data source to query
77
- granularity: Time granularity ('all', 'day', 'hour', 'minute', etc.)
78
- intervals: ISO-8601 intervals ('2014-02-02/p4w' or list of intervals)
79
- aggregations: Dict mapping aggregator names to aggregator specifications
80
- dimension: Dimension to run the query against
81
- metric: Metric to sort the dimension values by
82
- threshold: Number of top items to return
83
- filter: Filter to apply to the data (optional)
84
- post_aggregations: Dict of post-aggregations to compute (optional)
85
- context: Query context parameters (optional)
86
87
Returns:
88
Query object containing results and metadata
89
"""
90
```
91
92
### Asynchronous Timeseries Queries
93
94
Execute timeseries queries asynchronously.
95
96
```python { .api }
97
async def timeseries(
98
self,
99
datasource: str,
100
granularity: str,
101
intervals: str | list,
102
aggregations: dict,
103
filter: 'Filter' = None,
104
post_aggregations: dict = None,
105
context: dict = None,
106
**kwargs
107
) -> Query:
108
"""
109
Execute a timeseries query asynchronously.
110
111
Parameters:
112
- datasource: Data source to query
113
- granularity: Time granularity for aggregation
114
- intervals: ISO-8601 intervals to query
115
- aggregations: Dict mapping aggregator names to aggregator specifications
116
- filter: Filter to apply to the data (optional)
117
- post_aggregations: Dict of post-aggregations to compute (optional)
118
- context: Query context parameters (optional)
119
120
Returns:
121
Query object containing time-series results
122
"""
123
```
124
125
### Asynchronous GroupBy Queries
126
127
Execute groupBy queries asynchronously.
128
129
```python { .api }
130
async def groupby(
131
self,
132
datasource: str,
133
granularity: str,
134
intervals: str | list,
135
dimensions: list,
136
aggregations: dict,
137
filter: 'Filter' = None,
138
having: 'Having' = None,
139
post_aggregations: dict = None,
140
limit_spec: dict = None,
141
context: dict = None,
142
**kwargs
143
) -> Query:
144
"""
145
Execute a groupBy query asynchronously.
146
147
Parameters:
148
- datasource: Data source to query
149
- granularity: Time granularity for grouping
150
- intervals: ISO-8601 intervals to query
151
- dimensions: List of dimensions to group by
152
- aggregations: Dict mapping aggregator names to aggregator specifications
153
- filter: Filter to apply to the data (optional)
154
- having: Having clause for filtering grouped results (optional)
155
- post_aggregations: Dict of post-aggregations to compute (optional)
156
- limit_spec: Specification for limiting and ordering results (optional)
157
- context: Query context parameters (optional)
158
159
Returns:
160
Query object containing grouped results
161
"""
162
```
163
164
### Asynchronous Metadata Queries
165
166
Query metadata about datasources and segments asynchronously.
167
168
```python { .api }
169
async def segment_metadata(
170
self,
171
datasource: str,
172
intervals: str | list = None,
173
context: dict = None,
174
**kwargs
175
) -> Query:
176
"""
177
Execute a segment metadata query asynchronously.
178
179
Parameters:
180
- datasource: Data source to analyze
181
- intervals: ISO-8601 intervals to analyze (optional, defaults to all)
182
- context: Query context parameters (optional)
183
184
Returns:
185
Query object containing segment metadata
186
"""
187
188
async def time_boundary(
189
self,
190
datasource: str,
191
context: dict = None,
192
**kwargs
193
) -> Query:
194
"""
195
Execute a time boundary query asynchronously.
196
197
Parameters:
198
- datasource: Data source to query
199
- context: Query context parameters (optional)
200
201
Returns:
202
Query object containing time boundary information
203
"""
204
```
205
206
### Asynchronous Select Queries
207
208
Execute select queries for raw data access asynchronously.
209
210
Note: The AsyncPyDruid client does not support scan queries. Use the synchronous client for scan operations.
211
212
```python { .api }
213
async def select(
214
self,
215
datasource: str,
216
granularity: str,
217
intervals: str | list,
218
dimensions: list = None,
219
metrics: list = None,
220
filter: 'Filter' = None,
221
paging_spec: dict = None,
222
context: dict = None,
223
**kwargs
224
) -> Query:
225
"""
226
Execute a select query for raw data access asynchronously.
227
228
Parameters:
229
- datasource: Data source to query
230
- granularity: Time granularity
231
- intervals: ISO-8601 intervals to query
232
- dimensions: List of dimensions to include (optional)
233
- metrics: List of metrics to include (optional)
234
- filter: Filter to apply (optional)
235
- paging_spec: Paging specification for large result sets (optional)
236
- context: Query context parameters (optional)
237
238
Returns:
239
Query object containing raw data
240
"""
241
```
242
243
## Usage Example
244
245
```python
246
from tornado import gen
247
from pydruid.async_client import AsyncPyDruid
248
from pydruid.utils.aggregators import doublesum
249
from pydruid.utils.filters import Dimension
250
251
client = AsyncPyDruid('http://localhost:8082', 'druid/v2/')
252
253
@gen.coroutine
254
def execute_async_query():
255
top_mentions = yield client.topn(
256
datasource='twitterstream',
257
granularity='all',
258
intervals='2014-03-03/p1d',
259
aggregations={'count': doublesum('count')},
260
dimension='user_mention_name',
261
filter=(Dimension('user_lang') == 'en') & (Dimension('first_hashtag') == 'oscars'),
262
metric='count',
263
threshold=10
264
)
265
266
# Process results
267
print(top_mentions.result)
268
269
# Export to pandas (if available)
270
df = top_mentions.export_pandas()
271
272
# Return results (Python 3.x: can use 'return top_mentions')
273
raise gen.Return(top_mentions)
274
275
# Modern async/await syntax (Python 3.5+)
276
async def modern_async_query():
277
top_mentions = await client.topn(
278
datasource='twitterstream',
279
granularity='all',
280
intervals='2014-03-03/p1d',
281
aggregations={'count': doublesum('count')},
282
dimension='user_mention_name',
283
filter=(Dimension('user_lang') == 'en') & (Dimension('first_hashtag') == 'oscars'),
284
metric='count',
285
threshold=10
286
)
287
288
return top_mentions
289
```
290
291
## Performance Benefits
292
293
The asynchronous client provides significant performance benefits for applications that:
294
295
- Execute multiple concurrent queries
296
- Serve multiple requests simultaneously
297
- Integrate with async web frameworks (Tornado, FastAPI, aiohttp)
298
- Need non-blocking I/O operations
299
300
The async client uses Tornado's optimized HTTP client and allows the event loop to handle other tasks while waiting for Druid responses, resulting in much better resource utilization and throughput compared to the synchronous client in concurrent scenarios.