Comprehensive Python client library for InfluxDB 2.x with sync/async APIs for writing, querying, and managing time series data.
npx @tessl/cli install tessl/pypi-influxdb-client@1.49.0A comprehensive Python client library for InfluxDB 2.x that provides both synchronous and asynchronous APIs for writing, querying, and managing time series data. The library offers high-level convenience APIs and low-level service access for maximum flexibility in interacting with InfluxDB instances.
pip install influxdb-clientfrom influxdb_client import InfluxDBClient, WriteApi, QueryApi, Point, WritePrecisionFor asynchronous operations:
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
from influxdb_client.client.write_api_async import WriteApiAsync
from influxdb_client.client.query_api_async import QueryApiAsyncfrom influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
from datetime import datetime
# Initialize client
client = InfluxDBClient(url="http://localhost:8086", token="your-token", org="your-org")
# Write data
write_api = client.write_api(write_options=SYNCHRONOUS)
point = Point("measurement_name") \
.tag("location", "us-west") \
.field("temperature", 25.3) \
.time(datetime.utcnow(), WritePrecision.NS)
write_api.write(bucket="your-bucket", record=point)
# Query data
query_api = client.query_api()
query = '''
from(bucket: "your-bucket")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "measurement_name")
'''
tables = query_api.query(query)
for table in tables:
for record in table.records:
print(f"Time: {record.get_time()}, Value: {record.get_value()}")
client.close()The InfluxDB client library is built around a dual-API architecture:
The library provides both batched and immediate writing modes, streaming and materialized query results, and comprehensive resource management capabilities for buckets, users, organizations, and more.
Write time series data points to InfluxDB using the WriteApi with support for different write modes, batching, retry policies, and precision levels.
class WriteApi:
def write(
self,
bucket: str,
org: str = None,
record: Union[Point, str, List[Point], List[str], bytes, Any] = None,
write_precision: WritePrecision = WritePrecision.NS,
**kwargs
) -> None: ...
class Point:
def __init__(self, measurement_name: str): ...
def tag(self, key: str, value: str) -> 'Point': ...
def field(self, key: str, value: Any) -> 'Point': ...
def time(self, timestamp: Any, write_precision: WritePrecision = WritePrecision.NS) -> 'Point': ...
def to_line_protocol(self) -> str: ...
class WriteOptions:
def __init__(
self,
write_type: WriteType = WriteType.batching,
batch_size: int = 1000,
flush_interval: int = 1000,
jitter_interval: int = 0,
retry_interval: int = 5000,
max_retries: int = 5,
max_retry_delay: int = 125000,
max_retry_time: int = 180000,
exponential_base: int = 2
): ...Execute Flux queries against InfluxDB and process results in various formats including streaming, materialized tables, CSV, and pandas DataFrames.
class QueryApi:
def query(self, query: str, org: str = None, params: dict = None) -> TableList: ...
def query_stream(self, query: str, org: str = None, params: dict = None) -> Generator[FluxRecord]: ...
def query_csv(self, query: str, org: str = None, dialect: Dialect = None, params: dict = None) -> CSVIterator: ...
def query_raw(self, query: str, org: str = None, dialect: Dialect = None, params: dict = None) -> HTTPResponse: ...
def query_data_frame(
self,
query: str,
org: str = None,
data_frame_index: List[str] = None,
params: dict = None,
use_extension_dtypes: bool = False
): ...
class FluxRecord:
def get_time(self) -> datetime: ...
def get_value(self) -> Any: ...
def get_field(self) -> str: ...
def get_measurement(self) -> str: ...
def values(self) -> dict: ...
class TableList(list):
def to_json(self, indent: int = None) -> str: ...
def to_values(self, columns: List[str] = None) -> List[List[Any]]: ...Configure and manage InfluxDB client instances with support for both synchronous and asynchronous operations, connection pooling, authentication, and health monitoring.
class InfluxDBClient:
def __init__(
self,
url: str,
token: str = None,
debug: bool = None,
timeout: int = 10000,
enable_gzip: bool = False,
org: str = None,
**kwargs
): ...
def write_api(self, write_options: WriteOptions = None, point_settings: PointSettings = None) -> WriteApi: ...
def query_api(self, query_options: QueryOptions = None) -> QueryApi: ...
def delete_api(self) -> DeleteApi: ...
def ping(self) -> bool: ...
def version(self) -> str: ...
def close(self) -> None: ...
@classmethod
def from_config_file(cls, config_file: str = "config.ini", **kwargs) -> 'InfluxDBClient': ...
@classmethod
def from_env_properties(cls, **kwargs) -> 'InfluxDBClient': ...
class InfluxDBClientAsync:
def __init__(
self,
url: str,
token: str = None,
org: str = None,
debug: bool = None,
timeout: int = 10000,
enable_gzip: bool = False,
**kwargs
): ...
def query_api(self, query_options: QueryOptions = QueryOptions()) -> QueryApiAsync: ...
def write_api(self, point_settings: PointSettings = PointSettings()) -> WriteApiAsync: ...
def delete_api(self) -> DeleteApiAsync: ...
async def ping(self) -> bool: ...
async def close(self) -> None: ...Manage InfluxDB resources including buckets, organizations, users, authorizations, tasks, and labels through dedicated API classes.
class BucketsApi:
def create_bucket(
self,
bucket: Bucket = None,
bucket_name: str = None,
org_id: str = None,
retention_rules: List[RetentionRule] = None,
description: str = None,
org: str = None
) -> Bucket: ...
def find_bucket_by_name(self, bucket_name: str, org: str = None) -> Bucket: ...
def find_buckets(self, org: str = None, **kwargs) -> List[Bucket]: ...
def update_bucket(self, bucket: Bucket) -> Bucket: ...
def delete_bucket(self, bucket: Union[Bucket, str]) -> Bucket: ...
class AuthorizationsApi:
def create_authorization(
self,
org_id: str = None,
permissions: List[Permission] = None,
**kwargs
) -> Authorization: ...
def find_authorizations(self, **kwargs) -> List[Authorization]: ...
def delete_authorization(self, authorization: Union[Authorization, str]) -> Authorization: ...
class OrganizationsApi:
def create_organization(self, org: Organization = None, **kwargs) -> Organization: ...
def find_organizations(self, **kwargs) -> List[Organization]: ...
def find_organization_by_id(self, org_id: str) -> Organization: ...
class UsersApi:
def create_user(self, user: User = None, **kwargs) -> UserResponse: ...
def find_users(self, **kwargs) -> List[UserResponse]: ...
def me(self) -> UserResponse: ...
class TasksApi:
def create_task(self, task: Task = None, **kwargs) -> Task: ...
def find_tasks(self, **kwargs) -> List[Task]: ...
def run_manually(self, task_id: str, **kwargs) -> Run: ...
def find_runs(self, task_id: str, **kwargs) -> List[Run]: ...
class LabelsApi:
def create_label(self, label: Label = None, **kwargs) -> LabelResponse: ...
def find_labels(self, **kwargs) -> List[LabelResponse]: ...Perform advanced operations including data deletion, invokable scripts, and direct access to low-level service APIs for maximum control.
class DeleteApi:
def delete(
self,
start: Union[str, datetime],
stop: Union[str, datetime],
predicate: str = "",
bucket: str = None,
org: str = None
) -> None: ...
class InvokableScriptsApi:
def create_script(self, script_create_request: ScriptCreateRequest) -> Script: ...
def find_scripts(self, **kwargs) -> Scripts: ...
def invoke_script(
self,
script_id: str,
params: dict = None
) -> str: ...
def update_script(
self,
script_id: str,
script_update_request: ScriptUpdateRequest
) -> Script: ...
def delete_script(self, script_id: str) -> None: ...class WritePrecision(Enum):
NS = "ns" # nanoseconds
US = "us" # microseconds
MS = "ms" # milliseconds
S = "s" # seconds
class WriteType(Enum):
batching = "batching"
asynchronous = "asynchronous"
synchronous = "synchronous"
class PointSettings:
def __init__(self, default_tags: dict = None): ...
class QueryOptions:
def __init__(
self,
profilers: List[str] = None,
profiler_callback: Callable = None
): ...
class Configuration:
def __init__(self): ...
# HTTP client configuration attributes
host: str
api_key: dict
username: str
password: str
debug: bool
verify_ssl: bool
timeout: int
# Domain model classes (300+ available)
class Bucket:
id: str
name: str
org_id: str
retention_rules: List[RetentionRule]
description: str
class Authorization:
id: str
token: str
status: str
org_id: str
permissions: List[Permission]
class Organization:
id: str
name: str
description: str
class User:
id: str
name: str
class Task:
id: str
name: str
flux: str
org_id: str
status: str
class Label:
id: str
name: str
org_id: str
properties: dict
class InfluxDBError(Exception):
def __init__(self, response: HTTPResponse = None, message: str = None, reason: str = None): ...