or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

advanced-operations.mdclient-management.mdindex.mdquerying-data.mdresource-management.mdwriting-data.md
tile.json

tessl/pypi-influxdb-client

Comprehensive Python client library for InfluxDB 2.x with sync/async APIs for writing, querying, and managing time series data.

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/influxdb-client@1.49.x

To install, run

npx @tessl/cli install tessl/pypi-influxdb-client@1.49.0

index.mddocs/

InfluxDB Client

A comprehensive Python client library for InfluxDB 2.x that provides both synchronous and asynchronous APIs for writing, querying, and managing time series data. The library offers high-level convenience APIs and low-level service access for maximum flexibility in interacting with InfluxDB instances.

Package Information

  • Package Name: influxdb-client
  • Language: Python
  • Installation: pip install influxdb-client
  • Version: 1.49.0

Core Imports

from influxdb_client import InfluxDBClient, WriteApi, QueryApi, Point, WritePrecision

For asynchronous operations:

from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
from influxdb_client.client.write_api_async import WriteApiAsync
from influxdb_client.client.query_api_async import QueryApiAsync

Basic Usage

from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
from datetime import datetime

# Initialize client
client = InfluxDBClient(url="http://localhost:8086", token="your-token", org="your-org")

# Write data
write_api = client.write_api(write_options=SYNCHRONOUS)
point = Point("measurement_name") \
    .tag("location", "us-west") \
    .field("temperature", 25.3) \
    .time(datetime.utcnow(), WritePrecision.NS)

write_api.write(bucket="your-bucket", record=point)

# Query data
query_api = client.query_api()
query = '''
from(bucket: "your-bucket")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "measurement_name")
'''
tables = query_api.query(query)

for table in tables:
    for record in table.records:
        print(f"Time: {record.get_time()}, Value: {record.get_value()}")

client.close()

Architecture

The InfluxDB client library is built around a dual-API architecture:

  • High-level APIs: Convenient wrapper classes (WriteApi, QueryApi, etc.) that handle common operations with sensible defaults
  • Low-level Services: Direct API access classes for advanced usage and custom configurations
  • Sync/Async Support: Parallel synchronous and asynchronous implementations for all core functionality
  • Domain Models: Auto-generated classes representing all InfluxDB API entities
  • Configuration System: Flexible configuration through objects, files, or environment variables

The library provides both batched and immediate writing modes, streaming and materialized query results, and comprehensive resource management capabilities for buckets, users, organizations, and more.

Capabilities

Writing Data

Write time series data points to InfluxDB using the WriteApi with support for different write modes, batching, retry policies, and precision levels.

class WriteApi:
    def write(
        self,
        bucket: str,
        org: str = None,
        record: Union[Point, str, List[Point], List[str], bytes, Any] = None,
        write_precision: WritePrecision = WritePrecision.NS,
        **kwargs
    ) -> None: ...

class Point:
    def __init__(self, measurement_name: str): ...
    def tag(self, key: str, value: str) -> 'Point': ...
    def field(self, key: str, value: Any) -> 'Point': ...
    def time(self, timestamp: Any, write_precision: WritePrecision = WritePrecision.NS) -> 'Point': ...
    def to_line_protocol(self) -> str: ...

class WriteOptions:
    def __init__(
        self,
        write_type: WriteType = WriteType.batching,
        batch_size: int = 1000,
        flush_interval: int = 1000,
        jitter_interval: int = 0,
        retry_interval: int = 5000,
        max_retries: int = 5,
        max_retry_delay: int = 125000,
        max_retry_time: int = 180000,
        exponential_base: int = 2
    ): ...

Writing Data

Querying Data

Execute Flux queries against InfluxDB and process results in various formats including streaming, materialized tables, CSV, and pandas DataFrames.

class QueryApi:
    def query(self, query: str, org: str = None, params: dict = None) -> TableList: ...
    def query_stream(self, query: str, org: str = None, params: dict = None) -> Generator[FluxRecord]: ...
    def query_csv(self, query: str, org: str = None, dialect: Dialect = None, params: dict = None) -> CSVIterator: ...
    def query_raw(self, query: str, org: str = None, dialect: Dialect = None, params: dict = None) -> HTTPResponse: ...
    def query_data_frame(
        self,
        query: str,
        org: str = None,
        data_frame_index: List[str] = None,
        params: dict = None,
        use_extension_dtypes: bool = False
    ): ...

class FluxRecord:
    def get_time(self) -> datetime: ...
    def get_value(self) -> Any: ...
    def get_field(self) -> str: ...
    def get_measurement(self) -> str: ...
    def values(self) -> dict: ...

class TableList(list):
    def to_json(self, indent: int = None) -> str: ...
    def to_values(self, columns: List[str] = None) -> List[List[Any]]: ...

Querying Data

Client Management

Configure and manage InfluxDB client instances with support for both synchronous and asynchronous operations, connection pooling, authentication, and health monitoring.

class InfluxDBClient:
    def __init__(
        self,
        url: str,
        token: str = None,
        debug: bool = None,
        timeout: int = 10000,
        enable_gzip: bool = False,
        org: str = None,
        **kwargs
    ): ...
    def write_api(self, write_options: WriteOptions = None, point_settings: PointSettings = None) -> WriteApi: ...
    def query_api(self, query_options: QueryOptions = None) -> QueryApi: ...
    def delete_api(self) -> DeleteApi: ...
    def ping(self) -> bool: ...
    def version(self) -> str: ...
    def close(self) -> None: ...
    @classmethod
    def from_config_file(cls, config_file: str = "config.ini", **kwargs) -> 'InfluxDBClient': ...
    @classmethod
    def from_env_properties(cls, **kwargs) -> 'InfluxDBClient': ...

class InfluxDBClientAsync:
    def __init__(
        self,
        url: str,
        token: str = None,
        org: str = None,
        debug: bool = None,
        timeout: int = 10000,
        enable_gzip: bool = False,
        **kwargs
    ): ...
    def query_api(self, query_options: QueryOptions = QueryOptions()) -> QueryApiAsync: ...
    def write_api(self, point_settings: PointSettings = PointSettings()) -> WriteApiAsync: ...
    def delete_api(self) -> DeleteApiAsync: ...
    async def ping(self) -> bool: ...
    async def close(self) -> None: ...

Client Management

Resource Management

Manage InfluxDB resources including buckets, organizations, users, authorizations, tasks, and labels through dedicated API classes.

class BucketsApi:
    def create_bucket(
        self,
        bucket: Bucket = None,
        bucket_name: str = None,
        org_id: str = None,
        retention_rules: List[RetentionRule] = None,
        description: str = None,
        org: str = None
    ) -> Bucket: ...
    def find_bucket_by_name(self, bucket_name: str, org: str = None) -> Bucket: ...
    def find_buckets(self, org: str = None, **kwargs) -> List[Bucket]: ...
    def update_bucket(self, bucket: Bucket) -> Bucket: ...
    def delete_bucket(self, bucket: Union[Bucket, str]) -> Bucket: ...

class AuthorizationsApi:
    def create_authorization(
        self,
        org_id: str = None,
        permissions: List[Permission] = None,
        **kwargs
    ) -> Authorization: ...
    def find_authorizations(self, **kwargs) -> List[Authorization]: ...
    def delete_authorization(self, authorization: Union[Authorization, str]) -> Authorization: ...

class OrganizationsApi:
    def create_organization(self, org: Organization = None, **kwargs) -> Organization: ...
    def find_organizations(self, **kwargs) -> List[Organization]: ...
    def find_organization_by_id(self, org_id: str) -> Organization: ...

class UsersApi:
    def create_user(self, user: User = None, **kwargs) -> UserResponse: ...
    def find_users(self, **kwargs) -> List[UserResponse]: ...
    def me(self) -> UserResponse: ...

class TasksApi:
    def create_task(self, task: Task = None, **kwargs) -> Task: ...
    def find_tasks(self, **kwargs) -> List[Task]: ...
    def run_manually(self, task_id: str, **kwargs) -> Run: ...
    def find_runs(self, task_id: str, **kwargs) -> List[Run]: ...

class LabelsApi:
    def create_label(self, label: Label = None, **kwargs) -> LabelResponse: ...
    def find_labels(self, **kwargs) -> List[LabelResponse]: ...

Resource Management

Advanced Operations

Perform advanced operations including data deletion, invokable scripts, and direct access to low-level service APIs for maximum control.

class DeleteApi:
    def delete(
        self,
        start: Union[str, datetime],
        stop: Union[str, datetime],
        predicate: str = "",
        bucket: str = None,
        org: str = None
    ) -> None: ...

class InvokableScriptsApi:
    def create_script(self, script_create_request: ScriptCreateRequest) -> Script: ...
    def find_scripts(self, **kwargs) -> Scripts: ...
    def invoke_script(
        self,
        script_id: str,
        params: dict = None
    ) -> str: ...
    def update_script(
        self,
        script_id: str,
        script_update_request: ScriptUpdateRequest
    ) -> Script: ...
    def delete_script(self, script_id: str) -> None: ...

Advanced Operations

Types

class WritePrecision(Enum):
    NS = "ns"  # nanoseconds
    US = "us"  # microseconds  
    MS = "ms"  # milliseconds
    S = "s"    # seconds

class WriteType(Enum):
    batching = "batching"
    asynchronous = "asynchronous"
    synchronous = "synchronous"

class PointSettings:
    def __init__(self, default_tags: dict = None): ...

class QueryOptions:
    def __init__(
        self,
        profilers: List[str] = None,
        profiler_callback: Callable = None
    ): ...

class Configuration:
    def __init__(self): ...
    # HTTP client configuration attributes
    host: str
    api_key: dict
    username: str
    password: str
    debug: bool
    verify_ssl: bool
    timeout: int

# Domain model classes (300+ available)
class Bucket:
    id: str
    name: str
    org_id: str
    retention_rules: List[RetentionRule]
    description: str

class Authorization:
    id: str
    token: str
    status: str
    org_id: str
    permissions: List[Permission]

class Organization:
    id: str
    name: str
    description: str

class User:
    id: str
    name: str

class Task:
    id: str
    name: str
    flux: str
    org_id: str
    status: str

class Label:
    id: str
    name: str
    org_id: str
    properties: dict

class InfluxDBError(Exception):
    def __init__(self, response: HTTPResponse = None, message: str = None, reason: str = None): ...