Comprehensive Python client library for InfluxDB 2.x with sync/async APIs for writing, querying, and managing time series data.
82
Comprehensive functionality for executing Flux queries against InfluxDB and processing results in various formats including streaming, materialized tables, CSV, pandas DataFrames, and raw HTTP responses. The querying system supports both synchronous and asynchronous operations with flexible result processing.
Main API for executing Flux queries against InfluxDB with support for multiple result formats and query profiling.
class QueryApi:
def __init__(
self,
influxdb_client,
query_options: QueryOptions = QueryOptions()
): ...
def query(
self,
query: str,
org: str = None,
params: dict = None
) -> TableList:
"""
Execute Flux query and return materialized results as TableList.
Parameters:
- query (str): Flux query string
- org (str, optional): Organization name or ID
- params (dict, optional): Query parameters for parameterized queries
Returns:
TableList: Materialized query results
"""
def query_stream(
self,
query: str,
org: str = None,
params: dict = None
) -> Generator[FluxRecord]:
"""
Execute Flux query and return streaming results as generator.
Parameters:
- query (str): Flux query string
- org (str, optional): Organization name or ID
- params (dict, optional): Query parameters
Returns:
Generator[FluxRecord]: Streaming query results
"""
def query_csv(
self,
query: str,
org: str = None,
dialect: Dialect = None,
params: dict = None
) -> CSVIterator:
"""
Execute Flux query and return results as CSV iterator.
Parameters:
- query (str): Flux query string
- org (str, optional): Organization name or ID
- dialect (Dialect, optional): CSV format configuration
- params (dict, optional): Query parameters
Returns:
CSVIterator: CSV formatted query results
"""
def query_raw(
self,
query: str,
org: str = None,
dialect: Dialect = None,
params: dict = None
) -> str:
"""
Execute Flux query and return raw HTTP response.
Parameters:
- query (str): Flux query string
- org (str, optional): Organization name or ID
- dialect (Dialect, optional): Response format configuration
- params (dict, optional): Query parameters
Returns:
HTTPResponse: Raw HTTP response object
"""
def query_data_frame(
self,
query: str,
org: str = None,
data_frame_index: List[str] = None,
params: dict = None,
use_extension_dtypes: bool = False
):
"""
Execute Flux query and return results as pandas DataFrame.
Parameters:
- query (str): Flux query string
- org (str, optional): Organization name or ID
- data_frame_index (List[str], optional): Columns to use as DataFrame index
- params (dict, optional): Query parameters
- use_extension_dtypes (bool): Use pandas extension data types
Returns:
pandas.DataFrame: Query results as DataFrame
"""Basic query execution:
from influxdb_client import InfluxDBClient
client = InfluxDBClient(url="http://localhost:8086", token="token", org="org")
query_api = client.query_api()
# Basic Flux query
query = '''
from(bucket: "sensors")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r.location == "room1")
'''
# Get materialized results
tables = query_api.query(query)
for table in tables:
for record in table.records:
print(f"Time: {record.get_time()}, Value: {record.get_value()}")Streaming query results:
# Stream results for large datasets
query = '''
from(bucket: "large_dataset")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "metrics")
'''
for record in query_api.query_stream(query):
# Process records one at a time without loading all into memory
process_record(record)Parameterized queries:
# Use parameters for dynamic queries
parameterized_query = '''
from(bucket: params.bucket_name)
|> range(start: params.start_time)
|> filter(fn: (r) => r._measurement == params.measurement)
|> filter(fn: (r) => r.location == params.location_filter)
'''
query_params = {
"bucket_name": "sensors",
"start_time": "-2h",
"measurement": "temperature",
"location_filter": "datacenter1"
}
results = query_api.query(parameterized_query, params=query_params)DataFrame integration:
import pandas as pd
# Get results as pandas DataFrame
query = '''
from(bucket: "analytics")
|> range(start: -1d)
|> filter(fn: (r) => r._measurement == "sales")
|> aggregateWindow(every: 1h, fn: sum)
'''
df = query_api.query_data_frame(
query,
data_frame_index=["_time", "store_id"],
use_extension_dtypes=True
)
# Now use standard pandas operations
monthly_avg = df.groupby(df.index.month).mean()
print(monthly_avg)Asynchronous version of QueryApi for non-blocking query operations.
class QueryApiAsync:
def __init__(
self,
influxdb_client,
query_options: QueryOptions = QueryOptions()
): ...
async def query(
self,
query: str,
org: str = None,
params: dict = None
) -> TableList:
"""
Asynchronously execute Flux query and return materialized results.
"""
async def query_stream(
self,
query: str,
org: str = None,
params: dict = None
) -> AsyncGenerator[FluxRecord]:
"""
Asynchronously execute Flux query and return streaming results.
"""
async def query_csv(
self,
query: str,
org: str = None,
dialect: Dialect = None,
params: dict = None
) -> CSVIterator:
"""
Asynchronously execute Flux query and return CSV results.
"""
async def query_raw(
self,
query: str,
org: str = None,
dialect: Dialect = None,
params: dict = None
) -> str:
"""
Asynchronously execute Flux query and return raw response.
"""
async def query_data_frame(
self,
query: str,
org: str = None,
data_frame_index: List[str] = None,
params: dict = None,
use_extension_dtypes: bool = False
):
"""
Asynchronously execute Flux query and return DataFrame.
"""import asyncio
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
async def query_data():
async with InfluxDBClientAsync(url="http://localhost:8086", token="token") as client:
query_api = client.query_api()
query = '''
from(bucket: "sensors")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
'''
# Async materialized query
tables = await query_api.query(query)
for table in tables:
for record in table.records:
print(f"Value: {record.get_value()}")
# Async streaming query
async for record in query_api.query_stream(query):
print(f"Streaming value: {record.get_value()}")
asyncio.run(query_data())Represents individual records in query results with methods to access time series data fields.
class FluxRecord:
def __init__(self, table: int, values: dict = None): ...
def get_start(self) -> datetime:
"""
Get the start time of the record's time range.
Returns:
datetime: Start time
"""
def get_stop(self) -> datetime:
"""
Get the stop time of the record's time range.
Returns:
datetime: Stop time
"""
def get_time(self) -> datetime:
"""
Get the timestamp of the record.
Returns:
datetime: Record timestamp
"""
def get_value(self) -> Any:
"""
Get the value field of the record.
Returns:
Any: Record value (int, float, str, bool)
"""
def get_field(self) -> str:
"""
Get the field name of the record.
Returns:
str: Field name
"""
def get_measurement(self) -> str:
"""
Get the measurement name of the record.
Returns:
str: Measurement name
"""
def values(self) -> dict:
"""
Get all column values as dictionary.
Returns:
dict: All record values keyed by column name
"""
def __getitem__(self, key: str) -> Any:
"""
Get value by column name using dict-like access.
Parameters:
- key (str): Column name
Returns:
Any: Column value
"""
def __str__(self) -> str: ...
def __repr__(self) -> str: ...Accessing record data:
for table in query_api.query(flux_query):
for record in table.records:
# Access standard time series fields
timestamp = record.get_time()
value = record.get_value()
field_name = record.get_field()
measurement = record.get_measurement()
# Access custom tags and fields
location = record["location"] # Tag value
sensor_id = record["sensor_id"] # Tag value
# Check if column exists
if "quality" in record:
quality = record["quality"]
# Get all values as dictionary
all_values = record.values()
print(f"Record: {all_values}")Processing different data types:
for record in query_api.query_stream(query):
value = record.get_value()
field = record.get_field()
# Handle different field types
if field == "temperature":
temperature = float(value)
print(f"Temperature: {temperature}°C")
elif field == "status":
status = str(value)
print(f"Status: {status}")
elif field == "count":
count = int(value)
print(f"Count: {count}")
elif field == "enabled":
enabled = bool(value)
print(f"Enabled: {enabled}")Container for multiple flux tables that extends Python list with additional utility methods.
class TableList(list):
def __init__(self): ...
def to_json(self, indent: int = None) -> str:
"""
Convert all tables to JSON representation.
Parameters:
- indent (int, optional): JSON indentation for pretty printing
Returns:
str: JSON string representation
"""
def to_values(self, columns: List[str] = None) -> List[List[Any]]:
"""
Convert all table records to nested list of values.
Parameters:
- columns (List[str], optional): Specific columns to include
Returns:
List[List[Any]]: Nested list with all record values
"""Processing multiple tables:
tables = query_api.query('''
from(bucket: "sensors")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> group(columns: ["location"])
''')
# Iterate through all tables
for i, table in enumerate(tables):
print(f"Table {i}: {len(table.records)} records")
# Process records in each table
for record in table.records:
location = record["location"]
temp = record.get_value()
print(f" {location}: {temp}°C")Export to JSON:
# Convert results to JSON
json_output = tables.to_json(indent=2)
print(json_output)
# Save to file
with open("query_results.json", "w") as f:
f.write(json_output)Convert to values list:
# Get all values as nested list
all_values = tables.to_values()
print(f"Total records: {len(all_values)}")
# Get specific columns only
temp_values = tables.to_values(columns=["_time", "_value", "location"])
for time, value, location in temp_values:
print(f"{time}: {location} = {value}")Iterator for processing CSV-formatted query results with support for custom dialects.
class CSVIterator:
def __init__(self, response: HTTPResponse, dialect: Dialect = None): ...
def to_values(self) -> List[List[str]]:
"""
Convert CSV results to list of string lists.
Returns:
List[List[str]]: All CSV rows as nested string lists
"""
def __iter__(self) -> 'CSVIterator': ...
def __next__(self) -> List[str]:
"""
Get next CSV row as list of strings.
Returns:
List[str]: Next CSV row
"""Processing CSV results:
from influxdb_client import Dialect
# Configure CSV dialect
csv_dialect = Dialect(
header=True,
delimiter=",",
comment_prefix="#",
annotations=["datatype", "group", "default"]
)
csv_results = query_api.query_csv(query, dialect=csv_dialect)
# Iterate through CSV rows
for row in csv_results:
# row is a list of string values
if len(row) >= 4:
timestamp = row[0]
measurement = row[1]
field = row[2]
value = row[3]
print(f"{timestamp}: {measurement}.{field} = {value}")Convert CSV to list:
csv_results = query_api.query_csv(query)
all_rows = csv_results.to_values()
# Process as regular list
headers = all_rows[0] if all_rows else []
data_rows = all_rows[1:] if len(all_rows) > 1 else []
print(f"Headers: {headers}")
for row in data_rows[:5]: # First 5 data rows
print(f"Data: {row}")Configuration class for query behavior including profiling and custom callbacks.
class QueryOptions:
def __init__(
self,
profilers: List[str] = None,
profiler_callback: Callable = None
):
"""
Configure query execution options.
Parameters:
- profilers (List[str]): List of profiler names to enable
- profiler_callback (Callable): Callback function for profiler results
Available profilers:
- "query": Query execution profiling
- "operator": Individual operator profiling
"""Query profiling:
from influxdb_client import QueryOptions
def profiler_callback(profiler_name, profiler_result):
print(f"Profiler {profiler_name}: {profiler_result}")
# Configure query profiling
query_options = QueryOptions(
profilers=["query", "operator"],
profiler_callback=profiler_callback
)
# Use with QueryApi
query_api = client.query_api(query_options=query_options)
# Profiler results will be sent to callback during query execution
results = query_api.query('''
from(bucket: "performance_test")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu")
|> mean()
''')Configuration class for CSV output formatting when using query_csv method.
class Dialect:
def __init__(
self,
header: bool = True,
delimiter: str = ",",
comment_prefix: str = "#",
annotations: List[str] = None,
date_time_format: str = "RFC3339"
):
"""
Configure CSV output format.
Parameters:
- header (bool): Include column headers
- delimiter (str): CSV field delimiter
- comment_prefix (str): Prefix for comment lines
- annotations (List[str]): Metadata annotations to include
- date_time_format (str): DateTime format ("RFC3339" or "RFC3339Nano")
"""from influxdb_client import Dialect
# Custom CSV format
custom_dialect = Dialect(
header=True,
delimiter=";",
comment_prefix="//",
annotations=["datatype", "group"],
date_time_format="RFC3339Nano"
)
# Use with CSV query
csv_iterator = query_api.query_csv(query, dialect=custom_dialect)# Core result types
class FluxTable:
"""Represents a single result table from Flux query."""
columns: List[FluxColumn]
records: List[FluxRecord]
class FluxColumn:
"""Represents a column in a Flux table."""
index: int
label: str
data_type: str
group: bool
default_value: str
# Query parameter types
QueryParams = Dict[str, Any] # Parameters for parameterized queries
# Response types from various query methods
from typing import Generator, AsyncGenerator
from urllib3 import HTTPResponse
import pandas
QueryResult = TableList
QueryStreamResult = Generator[FluxRecord, None, None]
QueryAsyncStreamResult = AsyncGenerator[FluxRecord, None]
QueryCSVResult = CSVIterator
QueryRawResult = HTTPResponse
QueryDataFrameResult = pandas.DataFrame
# Exception types
class FluxQueryError(InfluxDBError):
"""Raised when Flux queries fail."""
pass
class FluxParseError(FluxQueryError):
"""Raised when Flux query parsing fails."""
pass
class FluxRuntimeError(FluxQueryError):
"""Raised when Flux query execution fails."""
passInstall with Tessl CLI
npx tessl i tessl/pypi-influxdb-clientdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10