or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

asynchronous-client.mdcommand-line-interface.mddatabase-api.mdindex.mdquery-utilities.mdsqlalchemy-integration.mdsynchronous-client.md

asynchronous-client.mddocs/

0

# Asynchronous Client

1

2

The AsyncPyDruid client provides asynchronous query execution using Tornado's HTTP client, enabling non-blocking operations suitable for high-concurrency applications and async frameworks.

3

4

## Capabilities

5

6

### Client Initialization

7

8

Creates a new AsyncPyDruid client instance for asynchronous Druid queries.

9

10

```python { .api }

11

class AsyncPyDruid:

12

def __init__(

13

self,

14

url: str,

15

endpoint: str,

16

defaults: dict = None,

17

http_client: str = None

18

) -> None:

19

"""

20

Initialize AsyncPyDruid client.

21

22

Parameters:

23

- url: URL of Broker node in the Druid cluster

24

- endpoint: Endpoint that Broker listens for queries on (typically 'druid/v2/')

25

- defaults: Default parameters for the Tornado HTTP client (optional)

26

- http_client: Tornado HTTP client implementation to use (optional)

27

"""

28

```

29

30

### Authentication and Configuration

31

32

Configure client authentication and proxy settings (inherited from base client).

33

34

```python { .api }

35

def set_basic_auth_credentials(self, username: str, password: str) -> None:

36

"""

37

Set HTTP Basic Authentication credentials.

38

39

Parameters:

40

- username: Username for authentication

41

- password: Password for authentication

42

"""

43

44

def set_proxies(self, proxies: dict) -> None:

45

"""

46

Configure proxy settings for HTTP requests.

47

48

Parameters:

49

- proxies: Dictionary mapping protocol names to proxy URLs

50

"""

51

```

52

53

### Asynchronous TopN Queries

54

55

Execute TopN queries asynchronously.

56

57

```python { .api }

58

async def topn(

59

self,

60

datasource: str,

61

granularity: str,

62

intervals: str | list,

63

aggregations: dict,

64

dimension: str,

65

metric: str,

66

threshold: int,

67

filter: 'Filter' = None,

68

post_aggregations: dict = None,

69

context: dict = None,

70

**kwargs

71

) -> Query:

72

"""

73

Execute a TopN query asynchronously.

74

75

Parameters:

76

- datasource: Data source to query

77

- granularity: Time granularity ('all', 'day', 'hour', 'minute', etc.)

78

- intervals: ISO-8601 intervals ('2014-02-02/p4w' or list of intervals)

79

- aggregations: Dict mapping aggregator names to aggregator specifications

80

- dimension: Dimension to run the query against

81

- metric: Metric to sort the dimension values by

82

- threshold: Number of top items to return

83

- filter: Filter to apply to the data (optional)

84

- post_aggregations: Dict of post-aggregations to compute (optional)

85

- context: Query context parameters (optional)

86

87

Returns:

88

Query object containing results and metadata

89

"""

90

```

91

92

### Asynchronous Timeseries Queries

93

94

Execute timeseries queries asynchronously.

95

96

```python { .api }

97

async def timeseries(

98

self,

99

datasource: str,

100

granularity: str,

101

intervals: str | list,

102

aggregations: dict,

103

filter: 'Filter' = None,

104

post_aggregations: dict = None,

105

context: dict = None,

106

**kwargs

107

) -> Query:

108

"""

109

Execute a timeseries query asynchronously.

110

111

Parameters:

112

- datasource: Data source to query

113

- granularity: Time granularity for aggregation

114

- intervals: ISO-8601 intervals to query

115

- aggregations: Dict mapping aggregator names to aggregator specifications

116

- filter: Filter to apply to the data (optional)

117

- post_aggregations: Dict of post-aggregations to compute (optional)

118

- context: Query context parameters (optional)

119

120

Returns:

121

Query object containing time-series results

122

"""

123

```

124

125

### Asynchronous GroupBy Queries

126

127

Execute groupBy queries asynchronously.

128

129

```python { .api }

130

async def groupby(

131

self,

132

datasource: str,

133

granularity: str,

134

intervals: str | list,

135

dimensions: list,

136

aggregations: dict,

137

filter: 'Filter' = None,

138

having: 'Having' = None,

139

post_aggregations: dict = None,

140

limit_spec: dict = None,

141

context: dict = None,

142

**kwargs

143

) -> Query:

144

"""

145

Execute a groupBy query asynchronously.

146

147

Parameters:

148

- datasource: Data source to query

149

- granularity: Time granularity for grouping

150

- intervals: ISO-8601 intervals to query

151

- dimensions: List of dimensions to group by

152

- aggregations: Dict mapping aggregator names to aggregator specifications

153

- filter: Filter to apply to the data (optional)

154

- having: Having clause for filtering grouped results (optional)

155

- post_aggregations: Dict of post-aggregations to compute (optional)

156

- limit_spec: Specification for limiting and ordering results (optional)

157

- context: Query context parameters (optional)

158

159

Returns:

160

Query object containing grouped results

161

"""

162

```

163

164

### Asynchronous Metadata Queries

165

166

Query metadata about datasources and segments asynchronously.

167

168

```python { .api }

169

async def segment_metadata(

170

self,

171

datasource: str,

172

intervals: str | list = None,

173

context: dict = None,

174

**kwargs

175

) -> Query:

176

"""

177

Execute a segment metadata query asynchronously.

178

179

Parameters:

180

- datasource: Data source to analyze

181

- intervals: ISO-8601 intervals to analyze (optional, defaults to all)

182

- context: Query context parameters (optional)

183

184

Returns:

185

Query object containing segment metadata

186

"""

187

188

async def time_boundary(

189

self,

190

datasource: str,

191

context: dict = None,

192

**kwargs

193

) -> Query:

194

"""

195

Execute a time boundary query asynchronously.

196

197

Parameters:

198

- datasource: Data source to query

199

- context: Query context parameters (optional)

200

201

Returns:

202

Query object containing time boundary information

203

"""

204

```

205

206

### Asynchronous Select Queries

207

208

Execute select queries for raw data access asynchronously.

209

210

Note: The AsyncPyDruid client does not support scan queries. Use the synchronous client for scan operations.

211

212

```python { .api }

213

async def select(

214

self,

215

datasource: str,

216

granularity: str,

217

intervals: str | list,

218

dimensions: list = None,

219

metrics: list = None,

220

filter: 'Filter' = None,

221

paging_spec: dict = None,

222

context: dict = None,

223

**kwargs

224

) -> Query:

225

"""

226

Execute a select query for raw data access asynchronously.

227

228

Parameters:

229

- datasource: Data source to query

230

- granularity: Time granularity

231

- intervals: ISO-8601 intervals to query

232

- dimensions: List of dimensions to include (optional)

233

- metrics: List of metrics to include (optional)

234

- filter: Filter to apply (optional)

235

- paging_spec: Paging specification for large result sets (optional)

236

- context: Query context parameters (optional)

237

238

Returns:

239

Query object containing raw data

240

"""

241

```

242

243

## Usage Example

244

245

```python

246

from tornado import gen

247

from pydruid.async_client import AsyncPyDruid

248

from pydruid.utils.aggregators import doublesum

249

from pydruid.utils.filters import Dimension

250

251

client = AsyncPyDruid('http://localhost:8082', 'druid/v2/')

252

253

@gen.coroutine

254

def execute_async_query():

255

top_mentions = yield client.topn(

256

datasource='twitterstream',

257

granularity='all',

258

intervals='2014-03-03/p1d',

259

aggregations={'count': doublesum('count')},

260

dimension='user_mention_name',

261

filter=(Dimension('user_lang') == 'en') & (Dimension('first_hashtag') == 'oscars'),

262

metric='count',

263

threshold=10

264

)

265

266

# Process results

267

print(top_mentions.result)

268

269

# Export to pandas (if available)

270

df = top_mentions.export_pandas()

271

272

# Return results (Python 3.x: can use 'return top_mentions')

273

raise gen.Return(top_mentions)

274

275

# Modern async/await syntax (Python 3.5+)

276

async def modern_async_query():

277

top_mentions = await client.topn(

278

datasource='twitterstream',

279

granularity='all',

280

intervals='2014-03-03/p1d',

281

aggregations={'count': doublesum('count')},

282

dimension='user_mention_name',

283

filter=(Dimension('user_lang') == 'en') & (Dimension('first_hashtag') == 'oscars'),

284

metric='count',

285

threshold=10

286

)

287

288

return top_mentions

289

```

290

291

## Performance Benefits

292

293

The asynchronous client provides significant performance benefits for applications that:

294

295

- Execute multiple concurrent queries

296

- Serve multiple requests simultaneously

297

- Integrate with async web frameworks (Tornado, FastAPI, aiohttp)

298

- Need non-blocking I/O operations

299

300

The async client uses Tornado's optimized HTTP client and allows the event loop to handle other tasks while waiting for Druid responses, resulting in much better resource utilization and throughput compared to the synchronous client in concurrent scenarios.