CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-influxdb-client

Comprehensive Python client library for InfluxDB 2.x with sync/async APIs for writing, querying, and managing time series data.

82

1.18x
Overview
Eval results
Files

querying-data.mddocs/

Querying Data

Comprehensive functionality for executing Flux queries against InfluxDB and processing results in various formats including streaming, materialized tables, CSV, pandas DataFrames, and raw HTTP responses. The querying system supports both synchronous and asynchronous operations with flexible result processing.

Capabilities

QueryApi

Main API for executing Flux queries against InfluxDB with support for multiple result formats and query profiling.

class QueryApi:
    def __init__(
        self,
        influxdb_client,
        query_options: QueryOptions = QueryOptions()
    ): ...
    
    def query(
        self,
        query: str,
        org: str = None,
        params: dict = None
    ) -> TableList:
        """
        Execute Flux query and return materialized results as TableList.

        Parameters:
        - query (str): Flux query string
        - org (str, optional): Organization name or ID
        - params (dict, optional): Query parameters for parameterized queries

        Returns:
        TableList: Materialized query results
        """
        
    def query_stream(
        self,
        query: str,
        org: str = None,
        params: dict = None
    ) -> Generator[FluxRecord]:
        """
        Execute Flux query and return streaming results as generator.

        Parameters:
        - query (str): Flux query string
        - org (str, optional): Organization name or ID  
        - params (dict, optional): Query parameters

        Returns:
        Generator[FluxRecord]: Streaming query results
        """
        
    def query_csv(
        self,
        query: str,
        org: str = None,
        dialect: Dialect = None,
        params: dict = None
    ) -> CSVIterator:
        """
        Execute Flux query and return results as CSV iterator.

        Parameters:
        - query (str): Flux query string
        - org (str, optional): Organization name or ID
        - dialect (Dialect, optional): CSV format configuration
        - params (dict, optional): Query parameters

        Returns:
        CSVIterator: CSV formatted query results
        """
        
    def query_raw(
        self,
        query: str,
        org: str = None,
        dialect: Dialect = None,
        params: dict = None
    ) -> str:
        """
        Execute Flux query and return raw HTTP response.

        Parameters:
        - query (str): Flux query string
        - org (str, optional): Organization name or ID
        - dialect (Dialect, optional): Response format configuration
        - params (dict, optional): Query parameters

        Returns:
        HTTPResponse: Raw HTTP response object
        """
        
    def query_data_frame(
        self,
        query: str,
        org: str = None,
        data_frame_index: List[str] = None,
        params: dict = None,
        use_extension_dtypes: bool = False
    ):
        """
        Execute Flux query and return results as pandas DataFrame.

        Parameters:
        - query (str): Flux query string
        - org (str, optional): Organization name or ID
        - data_frame_index (List[str], optional): Columns to use as DataFrame index
        - params (dict, optional): Query parameters  
        - use_extension_dtypes (bool): Use pandas extension data types

        Returns:
        pandas.DataFrame: Query results as DataFrame
        """

QueryApi Usage Examples

Basic query execution:

from influxdb_client import InfluxDBClient

client = InfluxDBClient(url="http://localhost:8086", token="token", org="org")
query_api = client.query_api()

# Basic Flux query
query = '''
from(bucket: "sensors")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> filter(fn: (r) => r.location == "room1")
'''

# Get materialized results
tables = query_api.query(query)
for table in tables:
    for record in table.records:
        print(f"Time: {record.get_time()}, Value: {record.get_value()}")

Streaming query results:

# Stream results for large datasets
query = '''
from(bucket: "large_dataset")
  |> range(start: -24h)
  |> filter(fn: (r) => r._measurement == "metrics")
'''

for record in query_api.query_stream(query):
    # Process records one at a time without loading all into memory
    process_record(record)

Parameterized queries:

# Use parameters for dynamic queries
parameterized_query = '''
from(bucket: params.bucket_name)
  |> range(start: params.start_time)
  |> filter(fn: (r) => r._measurement == params.measurement)
  |> filter(fn: (r) => r.location == params.location_filter)
'''

query_params = {
    "bucket_name": "sensors",
    "start_time": "-2h",
    "measurement": "temperature",
    "location_filter": "datacenter1"
}

results = query_api.query(parameterized_query, params=query_params)

DataFrame integration:

import pandas as pd

# Get results as pandas DataFrame
query = '''
from(bucket: "analytics") 
  |> range(start: -1d)
  |> filter(fn: (r) => r._measurement == "sales")
  |> aggregateWindow(every: 1h, fn: sum)
'''

df = query_api.query_data_frame(
    query,
    data_frame_index=["_time", "store_id"],
    use_extension_dtypes=True
)

# Now use standard pandas operations
monthly_avg = df.groupby(df.index.month).mean()
print(monthly_avg)

QueryApiAsync

Asynchronous version of QueryApi for non-blocking query operations.

class QueryApiAsync:
    def __init__(
        self,
        influxdb_client,
        query_options: QueryOptions = QueryOptions()
    ): ...
    
    async def query(
        self,
        query: str,
        org: str = None,
        params: dict = None
    ) -> TableList:
        """
        Asynchronously execute Flux query and return materialized results.
        """
        
    async def query_stream(
        self,
        query: str,
        org: str = None,
        params: dict = None
    ) -> AsyncGenerator[FluxRecord]:
        """
        Asynchronously execute Flux query and return streaming results.
        """
        
    async def query_csv(
        self,
        query: str,
        org: str = None,
        dialect: Dialect = None,
        params: dict = None
    ) -> CSVIterator:
        """
        Asynchronously execute Flux query and return CSV results.
        """
        
    async def query_raw(
        self,
        query: str,
        org: str = None,
        dialect: Dialect = None,
        params: dict = None
    ) -> str:
        """
        Asynchronously execute Flux query and return raw response.
        """
        
    async def query_data_frame(
        self,
        query: str,
        org: str = None,
        data_frame_index: List[str] = None,
        params: dict = None,
        use_extension_dtypes: bool = False
    ):
        """
        Asynchronously execute Flux query and return DataFrame.
        """

Async Query Usage Example

import asyncio
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync

async def query_data():
    async with InfluxDBClientAsync(url="http://localhost:8086", token="token") as client:
        query_api = client.query_api()
        
        query = '''
        from(bucket: "sensors")
          |> range(start: -1h)
          |> filter(fn: (r) => r._measurement == "temperature")
        '''
        
        # Async materialized query
        tables = await query_api.query(query)
        for table in tables:
            for record in table.records:
                print(f"Value: {record.get_value()}")
        
        # Async streaming query
        async for record in query_api.query_stream(query):
            print(f"Streaming value: {record.get_value()}")
            
asyncio.run(query_data())

FluxRecord

Represents individual records in query results with methods to access time series data fields.

class FluxRecord:
    def __init__(self, table: int, values: dict = None): ...
    
    def get_start(self) -> datetime:
        """
        Get the start time of the record's time range.
        
        Returns:
        datetime: Start time
        """
        
    def get_stop(self) -> datetime:
        """
        Get the stop time of the record's time range.
        
        Returns:
        datetime: Stop time
        """
        
    def get_time(self) -> datetime:
        """
        Get the timestamp of the record.
        
        Returns:
        datetime: Record timestamp
        """
        
    def get_value(self) -> Any:
        """
        Get the value field of the record.
        
        Returns:
        Any: Record value (int, float, str, bool)
        """
        
    def get_field(self) -> str:
        """
        Get the field name of the record.
        
        Returns:
        str: Field name
        """
        
    def get_measurement(self) -> str:
        """
        Get the measurement name of the record.
        
        Returns:
        str: Measurement name
        """
        
    def values(self) -> dict:
        """
        Get all column values as dictionary.
        
        Returns:
        dict: All record values keyed by column name
        """
        
    def __getitem__(self, key: str) -> Any:
        """
        Get value by column name using dict-like access.
        
        Parameters:
        - key (str): Column name
        
        Returns:
        Any: Column value
        """
        
    def __str__(self) -> str: ...
    def __repr__(self) -> str: ...

FluxRecord Usage Examples

Accessing record data:

for table in query_api.query(flux_query):
    for record in table.records:
        # Access standard time series fields
        timestamp = record.get_time()
        value = record.get_value()
        field_name = record.get_field()
        measurement = record.get_measurement()
        
        # Access custom tags and fields
        location = record["location"]  # Tag value
        sensor_id = record["sensor_id"]  # Tag value
        
        # Check if column exists
        if "quality" in record:
            quality = record["quality"]
            
        # Get all values as dictionary
        all_values = record.values()
        print(f"Record: {all_values}")

Processing different data types:

for record in query_api.query_stream(query):
    value = record.get_value()
    field = record.get_field()
    
    # Handle different field types
    if field == "temperature":
        temperature = float(value)
        print(f"Temperature: {temperature}°C")
    elif field == "status":
        status = str(value)
        print(f"Status: {status}")
    elif field == "count":
        count = int(value)
        print(f"Count: {count}")
    elif field == "enabled":
        enabled = bool(value)
        print(f"Enabled: {enabled}")

TableList

Container for multiple flux tables that extends Python list with additional utility methods.

class TableList(list):
    def __init__(self): ...
    
    def to_json(self, indent: int = None) -> str:
        """
        Convert all tables to JSON representation.
        
        Parameters:
        - indent (int, optional): JSON indentation for pretty printing
        
        Returns:
        str: JSON string representation
        """
        
    def to_values(self, columns: List[str] = None) -> List[List[Any]]:
        """
        Convert all table records to nested list of values.
        
        Parameters:
        - columns (List[str], optional): Specific columns to include
        
        Returns:
        List[List[Any]]: Nested list with all record values
        """

TableList Usage Examples

Processing multiple tables:

tables = query_api.query('''
from(bucket: "sensors")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> group(columns: ["location"])
''')

# Iterate through all tables
for i, table in enumerate(tables):
    print(f"Table {i}: {len(table.records)} records")
    
    # Process records in each table
    for record in table.records:
        location = record["location"]
        temp = record.get_value()
        print(f"  {location}: {temp}°C")

Export to JSON:

# Convert results to JSON
json_output = tables.to_json(indent=2)
print(json_output)

# Save to file
with open("query_results.json", "w") as f:
    f.write(json_output)

Convert to values list:

# Get all values as nested list
all_values = tables.to_values()
print(f"Total records: {len(all_values)}")

# Get specific columns only
temp_values = tables.to_values(columns=["_time", "_value", "location"])
for time, value, location in temp_values:
    print(f"{time}: {location} = {value}")

CSVIterator

Iterator for processing CSV-formatted query results with support for custom dialects.

class CSVIterator:
    def __init__(self, response: HTTPResponse, dialect: Dialect = None): ...
    
    def to_values(self) -> List[List[str]]:
        """
        Convert CSV results to list of string lists.
        
        Returns:
        List[List[str]]: All CSV rows as nested string lists
        """
        
    def __iter__(self) -> 'CSVIterator': ...
    
    def __next__(self) -> List[str]:
        """
        Get next CSV row as list of strings.
        
        Returns:
        List[str]: Next CSV row
        """

CSVIterator Usage Examples

Processing CSV results:

from influxdb_client import Dialect

# Configure CSV dialect
csv_dialect = Dialect(
    header=True,
    delimiter=",",
    comment_prefix="#",
    annotations=["datatype", "group", "default"]
)

csv_results = query_api.query_csv(query, dialect=csv_dialect)

# Iterate through CSV rows
for row in csv_results:
    # row is a list of string values
    if len(row) >= 4:
        timestamp = row[0]
        measurement = row[1] 
        field = row[2]
        value = row[3]
        print(f"{timestamp}: {measurement}.{field} = {value}")

Convert CSV to list:

csv_results = query_api.query_csv(query)
all_rows = csv_results.to_values()

# Process as regular list
headers = all_rows[0] if all_rows else []
data_rows = all_rows[1:] if len(all_rows) > 1 else []

print(f"Headers: {headers}")
for row in data_rows[:5]:  # First 5 data rows
    print(f"Data: {row}")

QueryOptions

Configuration class for query behavior including profiling and custom callbacks.

class QueryOptions:
    def __init__(
        self,
        profilers: List[str] = None,
        profiler_callback: Callable = None
    ):
        """
        Configure query execution options.
        
        Parameters:
        - profilers (List[str]): List of profiler names to enable
        - profiler_callback (Callable): Callback function for profiler results
        
        Available profilers:
        - "query": Query execution profiling
        - "operator": Individual operator profiling
        """

QueryOptions Usage Examples

Query profiling:

from influxdb_client import QueryOptions

def profiler_callback(profiler_name, profiler_result):
    print(f"Profiler {profiler_name}: {profiler_result}")

# Configure query profiling
query_options = QueryOptions(
    profilers=["query", "operator"],
    profiler_callback=profiler_callback
)

# Use with QueryApi
query_api = client.query_api(query_options=query_options)

# Profiler results will be sent to callback during query execution
results = query_api.query('''
from(bucket: "performance_test")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "cpu")
  |> mean()
''')

Dialect

Configuration class for CSV output formatting when using query_csv method.

class Dialect:
    def __init__(
        self,
        header: bool = True,
        delimiter: str = ",",
        comment_prefix: str = "#",
        annotations: List[str] = None,
        date_time_format: str = "RFC3339"
    ):
        """
        Configure CSV output format.
        
        Parameters:
        - header (bool): Include column headers
        - delimiter (str): CSV field delimiter
        - comment_prefix (str): Prefix for comment lines
        - annotations (List[str]): Metadata annotations to include
        - date_time_format (str): DateTime format ("RFC3339" or "RFC3339Nano")
        """

Dialect Usage Example

from influxdb_client import Dialect

# Custom CSV format
custom_dialect = Dialect(
    header=True,
    delimiter=";",
    comment_prefix="//",
    annotations=["datatype", "group"],
    date_time_format="RFC3339Nano"
)

# Use with CSV query
csv_iterator = query_api.query_csv(query, dialect=custom_dialect)

Types

# Core result types
class FluxTable:
    """Represents a single result table from Flux query."""
    columns: List[FluxColumn]
    records: List[FluxRecord]

class FluxColumn:
    """Represents a column in a Flux table."""
    index: int
    label: str
    data_type: str
    group: bool
    default_value: str

# Query parameter types  
QueryParams = Dict[str, Any]  # Parameters for parameterized queries

# Response types from various query methods
from typing import Generator, AsyncGenerator
from urllib3 import HTTPResponse
import pandas

QueryResult = TableList
QueryStreamResult = Generator[FluxRecord, None, None]
QueryAsyncStreamResult = AsyncGenerator[FluxRecord, None]
QueryCSVResult = CSVIterator
QueryRawResult = HTTPResponse
QueryDataFrameResult = pandas.DataFrame

# Exception types
class FluxQueryError(InfluxDBError):
    """Raised when Flux queries fail."""
    pass

class FluxParseError(FluxQueryError):
    """Raised when Flux query parsing fails.""" 
    pass

class FluxRuntimeError(FluxQueryError):
    """Raised when Flux query execution fails."""
    pass

Install with Tessl CLI

npx tessl i tessl/pypi-influxdb-client

docs

advanced-operations.md

client-management.md

index.md

querying-data.md

resource-management.md

writing-data.md

tile.json