0
# Querying Data
1
2
Comprehensive functionality for executing Flux queries against InfluxDB and processing results in various formats including streaming, materialized tables, CSV, pandas DataFrames, and raw HTTP responses. The querying system supports both synchronous and asynchronous operations with flexible result processing.
3
4
## Capabilities
5
6
### QueryApi
7
8
Main API for executing Flux queries against InfluxDB with support for multiple result formats and query profiling.
9
10
```python { .api }
11
class QueryApi:
12
def __init__(
13
self,
14
influxdb_client,
15
query_options: QueryOptions = QueryOptions()
16
): ...
17
18
def query(
19
self,
20
query: str,
21
org: str = None,
22
params: dict = None
23
) -> TableList:
24
"""
25
Execute Flux query and return materialized results as TableList.
26
27
Parameters:
28
- query (str): Flux query string
29
- org (str, optional): Organization name or ID
30
- params (dict, optional): Query parameters for parameterized queries
31
32
Returns:
33
TableList: Materialized query results
34
"""
35
36
def query_stream(
37
self,
38
query: str,
39
org: str = None,
40
params: dict = None
41
) -> Generator[FluxRecord]:
42
"""
43
Execute Flux query and return streaming results as generator.
44
45
Parameters:
46
- query (str): Flux query string
47
- org (str, optional): Organization name or ID
48
- params (dict, optional): Query parameters
49
50
Returns:
51
Generator[FluxRecord]: Streaming query results
52
"""
53
54
def query_csv(
55
self,
56
query: str,
57
org: str = None,
58
dialect: Dialect = None,
59
params: dict = None
60
) -> CSVIterator:
61
"""
62
Execute Flux query and return results as CSV iterator.
63
64
Parameters:
65
- query (str): Flux query string
66
- org (str, optional): Organization name or ID
67
- dialect (Dialect, optional): CSV format configuration
68
- params (dict, optional): Query parameters
69
70
Returns:
71
CSVIterator: CSV formatted query results
72
"""
73
74
def query_raw(
75
self,
76
query: str,
77
org: str = None,
78
dialect: Dialect = None,
79
params: dict = None
80
) -> str:
81
"""
82
Execute Flux query and return raw HTTP response.
83
84
Parameters:
85
- query (str): Flux query string
86
- org (str, optional): Organization name or ID
87
- dialect (Dialect, optional): Response format configuration
88
- params (dict, optional): Query parameters
89
90
Returns:
91
HTTPResponse: Raw HTTP response object
92
"""
93
94
def query_data_frame(
95
self,
96
query: str,
97
org: str = None,
98
data_frame_index: List[str] = None,
99
params: dict = None,
100
use_extension_dtypes: bool = False
101
):
102
"""
103
Execute Flux query and return results as pandas DataFrame.
104
105
Parameters:
106
- query (str): Flux query string
107
- org (str, optional): Organization name or ID
108
- data_frame_index (List[str], optional): Columns to use as DataFrame index
109
- params (dict, optional): Query parameters
110
- use_extension_dtypes (bool): Use pandas extension data types
111
112
Returns:
113
pandas.DataFrame: Query results as DataFrame
114
"""
115
```
116
117
#### QueryApi Usage Examples
118
119
**Basic query execution:**
120
```python
121
from influxdb_client import InfluxDBClient
122
123
client = InfluxDBClient(url="http://localhost:8086", token="token", org="org")
124
query_api = client.query_api()
125
126
# Basic Flux query
127
query = '''
128
from(bucket: "sensors")
129
|> range(start: -1h)
130
|> filter(fn: (r) => r._measurement == "temperature")
131
|> filter(fn: (r) => r.location == "room1")
132
'''
133
134
# Get materialized results
135
tables = query_api.query(query)
136
for table in tables:
137
for record in table.records:
138
print(f"Time: {record.get_time()}, Value: {record.get_value()}")
139
```
140
141
**Streaming query results:**
142
```python
143
# Stream results for large datasets
144
query = '''
145
from(bucket: "large_dataset")
146
|> range(start: -24h)
147
|> filter(fn: (r) => r._measurement == "metrics")
148
'''
149
150
for record in query_api.query_stream(query):
151
# Process records one at a time without loading all into memory
152
process_record(record)
153
```
154
155
**Parameterized queries:**
156
```python
157
# Use parameters for dynamic queries
158
parameterized_query = '''
159
from(bucket: params.bucket_name)
160
|> range(start: params.start_time)
161
|> filter(fn: (r) => r._measurement == params.measurement)
162
|> filter(fn: (r) => r.location == params.location_filter)
163
'''
164
165
query_params = {
166
"bucket_name": "sensors",
167
"start_time": "-2h",
168
"measurement": "temperature",
169
"location_filter": "datacenter1"
170
}
171
172
results = query_api.query(parameterized_query, params=query_params)
173
```
174
175
**DataFrame integration:**
176
```python
177
import pandas as pd
178
179
# Get results as pandas DataFrame
180
query = '''
181
from(bucket: "analytics")
182
|> range(start: -1d)
183
|> filter(fn: (r) => r._measurement == "sales")
184
|> aggregateWindow(every: 1h, fn: sum)
185
'''
186
187
df = query_api.query_data_frame(
188
query,
189
data_frame_index=["_time", "store_id"],
190
use_extension_dtypes=True
191
)
192
193
# Now use standard pandas operations
194
monthly_avg = df.groupby(df.index.month).mean()
195
print(monthly_avg)
196
```
197
198
### QueryApiAsync
199
200
Asynchronous version of QueryApi for non-blocking query operations.
201
202
```python { .api }
203
class QueryApiAsync:
204
def __init__(
205
self,
206
influxdb_client,
207
query_options: QueryOptions = QueryOptions()
208
): ...
209
210
async def query(
211
self,
212
query: str,
213
org: str = None,
214
params: dict = None
215
) -> TableList:
216
"""
217
Asynchronously execute Flux query and return materialized results.
218
"""
219
220
async def query_stream(
221
self,
222
query: str,
223
org: str = None,
224
params: dict = None
225
) -> AsyncGenerator[FluxRecord]:
226
"""
227
Asynchronously execute Flux query and return streaming results.
228
"""
229
230
async def query_csv(
231
self,
232
query: str,
233
org: str = None,
234
dialect: Dialect = None,
235
params: dict = None
236
) -> CSVIterator:
237
"""
238
Asynchronously execute Flux query and return CSV results.
239
"""
240
241
async def query_raw(
242
self,
243
query: str,
244
org: str = None,
245
dialect: Dialect = None,
246
params: dict = None
247
) -> str:
248
"""
249
Asynchronously execute Flux query and return raw response.
250
"""
251
252
async def query_data_frame(
253
self,
254
query: str,
255
org: str = None,
256
data_frame_index: List[str] = None,
257
params: dict = None,
258
use_extension_dtypes: bool = False
259
):
260
"""
261
Asynchronously execute Flux query and return DataFrame.
262
"""
263
```
264
265
#### Async Query Usage Example
266
267
```python
268
import asyncio
269
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
270
271
async def query_data():
272
async with InfluxDBClientAsync(url="http://localhost:8086", token="token") as client:
273
query_api = client.query_api()
274
275
query = '''
276
from(bucket: "sensors")
277
|> range(start: -1h)
278
|> filter(fn: (r) => r._measurement == "temperature")
279
'''
280
281
# Async materialized query
282
tables = await query_api.query(query)
283
for table in tables:
284
for record in table.records:
285
print(f"Value: {record.get_value()}")
286
287
# Async streaming query
288
async for record in query_api.query_stream(query):
289
print(f"Streaming value: {record.get_value()}")
290
291
asyncio.run(query_data())
292
```
293
294
### FluxRecord
295
296
Represents individual records in query results with methods to access time series data fields.
297
298
```python { .api }
299
class FluxRecord:
300
def __init__(self, table: int, values: dict = None): ...
301
302
def get_start(self) -> datetime:
303
"""
304
Get the start time of the record's time range.
305
306
Returns:
307
datetime: Start time
308
"""
309
310
def get_stop(self) -> datetime:
311
"""
312
Get the stop time of the record's time range.
313
314
Returns:
315
datetime: Stop time
316
"""
317
318
def get_time(self) -> datetime:
319
"""
320
Get the timestamp of the record.
321
322
Returns:
323
datetime: Record timestamp
324
"""
325
326
def get_value(self) -> Any:
327
"""
328
Get the value field of the record.
329
330
Returns:
331
Any: Record value (int, float, str, bool)
332
"""
333
334
def get_field(self) -> str:
335
"""
336
Get the field name of the record.
337
338
Returns:
339
str: Field name
340
"""
341
342
def get_measurement(self) -> str:
343
"""
344
Get the measurement name of the record.
345
346
Returns:
347
str: Measurement name
348
"""
349
350
def values(self) -> dict:
351
"""
352
Get all column values as dictionary.
353
354
Returns:
355
dict: All record values keyed by column name
356
"""
357
358
def __getitem__(self, key: str) -> Any:
359
"""
360
Get value by column name using dict-like access.
361
362
Parameters:
363
- key (str): Column name
364
365
Returns:
366
Any: Column value
367
"""
368
369
def __str__(self) -> str: ...
370
def __repr__(self) -> str: ...
371
```
372
373
#### FluxRecord Usage Examples
374
375
**Accessing record data:**
376
```python
377
for table in query_api.query(flux_query):
378
for record in table.records:
379
# Access standard time series fields
380
timestamp = record.get_time()
381
value = record.get_value()
382
field_name = record.get_field()
383
measurement = record.get_measurement()
384
385
# Access custom tags and fields
386
location = record["location"] # Tag value
387
sensor_id = record["sensor_id"] # Tag value
388
389
# Check if column exists
390
if "quality" in record:
391
quality = record["quality"]
392
393
# Get all values as dictionary
394
all_values = record.values()
395
print(f"Record: {all_values}")
396
```
397
398
**Processing different data types:**
399
```python
400
for record in query_api.query_stream(query):
401
value = record.get_value()
402
field = record.get_field()
403
404
# Handle different field types
405
if field == "temperature":
406
temperature = float(value)
407
print(f"Temperature: {temperature}°C")
408
elif field == "status":
409
status = str(value)
410
print(f"Status: {status}")
411
elif field == "count":
412
count = int(value)
413
print(f"Count: {count}")
414
elif field == "enabled":
415
enabled = bool(value)
416
print(f"Enabled: {enabled}")
417
```
418
419
### TableList
420
421
Container for multiple flux tables that extends Python list with additional utility methods.
422
423
```python { .api }
424
class TableList(list):
425
def __init__(self): ...
426
427
def to_json(self, indent: int = None) -> str:
428
"""
429
Convert all tables to JSON representation.
430
431
Parameters:
432
- indent (int, optional): JSON indentation for pretty printing
433
434
Returns:
435
str: JSON string representation
436
"""
437
438
def to_values(self, columns: List[str] = None) -> List[List[Any]]:
439
"""
440
Convert all table records to nested list of values.
441
442
Parameters:
443
- columns (List[str], optional): Specific columns to include
444
445
Returns:
446
List[List[Any]]: Nested list with all record values
447
"""
448
```
449
450
#### TableList Usage Examples
451
452
**Processing multiple tables:**
453
```python
454
tables = query_api.query('''
455
from(bucket: "sensors")
456
|> range(start: -1h)
457
|> filter(fn: (r) => r._measurement == "temperature")
458
|> group(columns: ["location"])
459
''')
460
461
# Iterate through all tables
462
for i, table in enumerate(tables):
463
print(f"Table {i}: {len(table.records)} records")
464
465
# Process records in each table
466
for record in table.records:
467
location = record["location"]
468
temp = record.get_value()
469
print(f" {location}: {temp}°C")
470
```
471
472
**Export to JSON:**
473
```python
474
# Convert results to JSON
475
json_output = tables.to_json(indent=2)
476
print(json_output)
477
478
# Save to file
479
with open("query_results.json", "w") as f:
480
f.write(json_output)
481
```
482
483
**Convert to values list:**
484
```python
485
# Get all values as nested list
486
all_values = tables.to_values()
487
print(f"Total records: {len(all_values)}")
488
489
# Get specific columns only
490
temp_values = tables.to_values(columns=["_time", "_value", "location"])
491
for time, value, location in temp_values:
492
print(f"{time}: {location} = {value}")
493
```
494
495
### CSVIterator
496
497
Iterator for processing CSV-formatted query results with support for custom dialects.
498
499
```python { .api }
500
class CSVIterator:
501
def __init__(self, response: HTTPResponse, dialect: Dialect = None): ...
502
503
def to_values(self) -> List[List[str]]:
504
"""
505
Convert CSV results to list of string lists.
506
507
Returns:
508
List[List[str]]: All CSV rows as nested string lists
509
"""
510
511
def __iter__(self) -> 'CSVIterator': ...
512
513
def __next__(self) -> List[str]:
514
"""
515
Get next CSV row as list of strings.
516
517
Returns:
518
List[str]: Next CSV row
519
"""
520
```
521
522
#### CSVIterator Usage Examples
523
524
**Processing CSV results:**
525
```python
526
from influxdb_client import Dialect
527
528
# Configure CSV dialect
529
csv_dialect = Dialect(
530
header=True,
531
delimiter=",",
532
comment_prefix="#",
533
annotations=["datatype", "group", "default"]
534
)
535
536
csv_results = query_api.query_csv(query, dialect=csv_dialect)
537
538
# Iterate through CSV rows
539
for row in csv_results:
540
# row is a list of string values
541
if len(row) >= 4:
542
timestamp = row[0]
543
measurement = row[1]
544
field = row[2]
545
value = row[3]
546
print(f"{timestamp}: {measurement}.{field} = {value}")
547
```
548
549
**Convert CSV to list:**
550
```python
551
csv_results = query_api.query_csv(query)
552
all_rows = csv_results.to_values()
553
554
# Process as regular list
555
headers = all_rows[0] if all_rows else []
556
data_rows = all_rows[1:] if len(all_rows) > 1 else []
557
558
print(f"Headers: {headers}")
559
for row in data_rows[:5]: # First 5 data rows
560
print(f"Data: {row}")
561
```
562
563
### QueryOptions
564
565
Configuration class for query behavior including profiling and custom callbacks.
566
567
```python { .api }
568
class QueryOptions:
569
def __init__(
570
self,
571
profilers: List[str] = None,
572
profiler_callback: Callable = None
573
):
574
"""
575
Configure query execution options.
576
577
Parameters:
578
- profilers (List[str]): List of profiler names to enable
579
- profiler_callback (Callable): Callback function for profiler results
580
581
Available profilers:
582
- "query": Query execution profiling
583
- "operator": Individual operator profiling
584
"""
585
```
586
587
#### QueryOptions Usage Examples
588
589
**Query profiling:**
590
```python
591
from influxdb_client import QueryOptions
592
593
def profiler_callback(profiler_name, profiler_result):
594
print(f"Profiler {profiler_name}: {profiler_result}")
595
596
# Configure query profiling
597
query_options = QueryOptions(
598
profilers=["query", "operator"],
599
profiler_callback=profiler_callback
600
)
601
602
# Use with QueryApi
603
query_api = client.query_api(query_options=query_options)
604
605
# Profiler results will be sent to callback during query execution
606
results = query_api.query('''
607
from(bucket: "performance_test")
608
|> range(start: -1h)
609
|> filter(fn: (r) => r._measurement == "cpu")
610
|> mean()
611
''')
612
```
613
614
### Dialect
615
616
Configuration class for CSV output formatting when using query_csv method.
617
618
```python { .api }
619
class Dialect:
620
def __init__(
621
self,
622
header: bool = True,
623
delimiter: str = ",",
624
comment_prefix: str = "#",
625
annotations: List[str] = None,
626
date_time_format: str = "RFC3339"
627
):
628
"""
629
Configure CSV output format.
630
631
Parameters:
632
- header (bool): Include column headers
633
- delimiter (str): CSV field delimiter
634
- comment_prefix (str): Prefix for comment lines
635
- annotations (List[str]): Metadata annotations to include
636
- date_time_format (str): DateTime format ("RFC3339" or "RFC3339Nano")
637
"""
638
```
639
640
#### Dialect Usage Example
641
642
```python
643
from influxdb_client import Dialect
644
645
# Custom CSV format
646
custom_dialect = Dialect(
647
header=True,
648
delimiter=";",
649
comment_prefix="//",
650
annotations=["datatype", "group"],
651
date_time_format="RFC3339Nano"
652
)
653
654
# Use with CSV query
655
csv_iterator = query_api.query_csv(query, dialect=custom_dialect)
656
```
657
658
## Types
659
660
```python { .api }
661
# Core result types
662
class FluxTable:
663
"""Represents a single result table from Flux query."""
664
columns: List[FluxColumn]
665
records: List[FluxRecord]
666
667
class FluxColumn:
668
"""Represents a column in a Flux table."""
669
index: int
670
label: str
671
data_type: str
672
group: bool
673
default_value: str
674
675
# Query parameter types
676
QueryParams = Dict[str, Any] # Parameters for parameterized queries
677
678
# Response types from various query methods
679
from typing import Generator, AsyncGenerator
680
from urllib3 import HTTPResponse
681
import pandas
682
683
QueryResult = TableList
684
QueryStreamResult = Generator[FluxRecord, None, None]
685
QueryAsyncStreamResult = AsyncGenerator[FluxRecord, None]
686
QueryCSVResult = CSVIterator
687
QueryRawResult = HTTPResponse
688
QueryDataFrameResult = pandas.DataFrame
689
690
# Exception types
691
class FluxQueryError(InfluxDBError):
692
"""Raised when Flux queries fail."""
693
pass
694
695
class FluxParseError(FluxQueryError):
696
"""Raised when Flux query parsing fails."""
697
pass
698
699
class FluxRuntimeError(FluxQueryError):
700
"""Raised when Flux query execution fails."""
701
pass
702
```