Comprehensive Python client library for InfluxDB 2.x with sync/async APIs for writing, querying, and managing time series data.
82
Comprehensive functionality for configuring and managing InfluxDB client instances with support for both synchronous and asynchronous operations, connection pooling, authentication, health monitoring, and flexible configuration management through objects, files, or environment variables.
Main synchronous client class for connecting to InfluxDB 2.x with comprehensive configuration options and API access.
class InfluxDBClient:
def __init__(
self,
url: str,
token: str = None,
debug: bool = None,
timeout: int = 10000,
enable_gzip: bool = False,
org: str = None,
default_tags: dict = None,
verify_ssl: bool = True,
ssl_ca_cert: str = None,
cert_file: str = None,
cert_key_file: str = None,
cert_key_password: Union[str, Callable] = None,
ssl_context: Any = None,
proxy: str = None,
proxy_headers: dict = None,
connection_pool_maxsize: int = None,
retries: Any = None,
auth_basic: bool = False,
username: str = None,
password: str = None,
profilers: List[str] = None,
**kwargs
):
"""
Initialize InfluxDB client.
Parameters:
- url (str): InfluxDB server URL (e.g., "http://localhost:8086")
- token (str, optional): Authentication token
- debug (bool, optional): Enable debug logging
- timeout (int): Request timeout in milliseconds (default: 10000)
- enable_gzip (bool): Enable gzip compression (default: False)
- org (str, optional): Default organization name or ID
- default_tags (dict, optional): Tags to add to all points
- verify_ssl (bool): Verify SSL certificates (default: True)
- ssl_ca_cert (str, optional): Path to custom CA certificate file
- cert_file (str, optional): Path to client certificate for mTLS
- cert_key_file (str, optional): Path to client certificate key for mTLS
- cert_key_password (str/callable, optional): Password for mTLS private key
- ssl_context (ssl.SSLContext, optional): Custom SSL context
- proxy (str, optional): HTTP proxy URL (e.g., "http://proxy:8080")
- proxy_headers (dict, optional): Headers for proxy authentication
- connection_pool_maxsize (int, optional): Maximum connection pool size
- retries (urllib3.util.retry.Retry, optional): Retry strategy configuration
- auth_basic (bool): Enable basic authentication for InfluxDB 1.8.x
- username (str, optional): Username for credential authentication
- password (str, optional): Password for credential authentication
- profilers (List[str], optional): List of enabled Flux profilers
- **kwargs: Additional HTTP client configuration options
"""
def write_api(
self,
write_options: WriteOptions = WriteOptions(),
point_settings: PointSettings = PointSettings(),
**kwargs
) -> WriteApi:
"""
Get WriteApi instance for writing data.
Parameters:
- write_options (WriteOptions): Write behavior configuration
- point_settings (PointSettings): Default point settings
Returns:
WriteApi: API for writing data
"""
def query_api(self, query_options: QueryOptions = QueryOptions()) -> QueryApi:
"""
Get QueryApi instance for querying data.
Parameters:
- query_options (QueryOptions): Query behavior configuration
Returns:
QueryApi: API for querying data
"""
def delete_api(self) -> DeleteApi:
"""
Get DeleteApi instance for deleting data.
Returns:
DeleteApi: API for deleting data
"""
def buckets_api(self) -> BucketsApi:
"""
Get BucketsApi instance for managing buckets.
Returns:
BucketsApi: API for bucket management
"""
def organizations_api(self) -> OrganizationsApi:
"""
Get OrganizationsApi instance for managing organizations.
Returns:
OrganizationsApi: API for organization management
"""
def users_api(self) -> UsersApi:
"""
Get UsersApi instance for managing users.
Returns:
UsersApi: API for user management
"""
def authorizations_api(self) -> AuthorizationsApi:
"""
Get AuthorizationsApi instance for managing authorizations.
Returns:
AuthorizationsApi: API for authorization management
"""
def tasks_api(self) -> TasksApi:
"""
Get TasksApi instance for managing tasks.
Returns:
TasksApi: API for task management
"""
def labels_api(self) -> LabelsApi:
"""
Get LabelsApi instance for managing labels.
Returns:
LabelsApi: API for label management
"""
def invokable_scripts_api(self) -> InvokableScriptsApi:
"""
Get InvokableScriptsApi instance for managing scripts.
Returns:
InvokableScriptsApi: API for script management
"""
def ping(self) -> bool:
"""
Check if InfluxDB server is reachable.
Returns:
bool: True if server is reachable
"""
def version(self) -> str:
"""
Get InfluxDB server version.
Returns:
str: Server version string
"""
def build(self) -> str:
"""
Get InfluxDB server build information.
Returns:
str: Build information
"""
def ready(self) -> Ready:
"""
Check if InfluxDB server is ready to accept requests.
Returns:
Ready: Server readiness status
"""
def health(self) -> HealthCheck:
"""
Get InfluxDB server health status (deprecated, use ping()).
Returns:
HealthCheck: Server health information
"""
def close(self) -> None:
"""
Close the client and clean up resources.
"""
def __enter__(self) -> 'InfluxDBClient': ...
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
@classmethod
def from_config_file(
cls,
config_file: str = "config.ini",
debug: bool = None,
enable_gzip: bool = False,
**kwargs
) -> 'InfluxDBClient':
"""
Create client from configuration file.
Parameters:
- config_file (str): Path to INI configuration file (default: "config.ini")
- debug (bool, optional): Enable debug logging
- enable_gzip (bool): Enable gzip compression
- **kwargs: Additional configuration options
Returns:
InfluxDBClient: Configured client instance
"""
@classmethod
def from_env_properties(
cls,
debug: bool = None,
enable_gzip: bool = False,
**kwargs
) -> 'InfluxDBClient':
"""
Create client from environment variables.
Parameters:
- debug (bool, optional): Enable debug logging
- enable_gzip (bool): Enable gzip compression
- **kwargs: Additional configuration options
Returns:
InfluxDBClient: Configured client instance
Environment variables:
- INFLUXDB_V2_URL: InfluxDB server URL
- INFLUXDB_V2_TOKEN: Authentication token
- INFLUXDB_V2_ORG: Default organization
- INFLUXDB_V2_TIMEOUT: Request timeout
"""Basic client initialization:
from influxdb_client import InfluxDBClient
# Direct initialization
client = InfluxDBClient(
url="http://localhost:8086",
token="your-auth-token",
org="my-organization",
timeout=30000, # 30 seconds
enable_gzip=True
)
# Use context manager for automatic cleanup
with InfluxDBClient(url="http://localhost:8086", token="token") as client:
# Use client APIs
write_api = client.write_api()
query_api = client.query_api()
# Client automatically closed when exiting context
client.close() # Manual cleanup if not using context managerConfiguration from file:
# config.ini file content:
# [influx2]
# url=http://localhost:8086
# token=your-token-here
# org=my-org
# timeout=20000
# verify_ssl=false
client = InfluxDBClient.from_config_file("config.ini")
# Custom config file location
client = InfluxDBClient.from_config_file("/path/to/custom-config.ini")Configuration from environment:
import os
# Set environment variables
os.environ['INFLUXDB_V2_URL'] = 'http://localhost:8086'
os.environ['INFLUXDB_V2_TOKEN'] = 'your-token'
os.environ['INFLUXDB_V2_ORG'] = 'my-org'
os.environ['INFLUXDB_V2_TIMEOUT'] = '15000'
# Create client from environment
client = InfluxDBClient.from_env_properties()Health and version checking:
# Check server connectivity
if client.ping():
print("Server is reachable")
# Get version information
version = client.version()
build = client.build()
print(f"InfluxDB version: {version}")
print(f"Build: {build}")
# Check readiness
ready_status = client.ready()
print(f"Server ready: {ready_status}")
else:
print("Cannot reach InfluxDB server")API access patterns:
# Get different API instances
write_api = client.write_api(
write_options=WriteOptions(write_type=WriteType.batching, batch_size=1000)
)
query_api = client.query_api(
query_options=QueryOptions(profilers=["query"])
)
# Resource management APIs
buckets_api = client.buckets_api()
users_api = client.users_api()
orgs_api = client.organizations_api()
# Advanced APIs
delete_api = client.delete_api()
scripts_api = client.invokable_scripts_api()Asynchronous version of InfluxDBClient for non-blocking operations with async/await patterns.
class InfluxDBClientAsync:
def __init__(
self,
url: str,
token: str = None,
org: str = None,
debug: bool = None,
timeout: int = 10000,
enable_gzip: bool = False,
**kwargs
):
"""
Initialize async InfluxDB client.
Parameters:
- url (str): InfluxDB server URL
- token (str, optional): Authentication token
- org (str, optional): Default organization name or ID
- debug (bool, optional): Enable debug logging
- timeout (int): Request timeout in milliseconds
- enable_gzip (bool): Enable gzip compression
- **kwargs: Additional HTTP client configuration
"""
def query_api(self, query_options: QueryOptions = QueryOptions()) -> QueryApiAsync:
"""
Get async QueryApi instance.
Parameters:
- query_options (QueryOptions): Query configuration
Returns:
QueryApiAsync: Async API for querying data
"""
def write_api(self, point_settings: PointSettings = PointSettings()) -> WriteApiAsync:
"""
Get async WriteApi instance.
Parameters:
- point_settings (PointSettings): Default point settings
Returns:
WriteApiAsync: Async API for writing data
"""
def delete_api(self) -> DeleteApiAsync:
"""
Get async DeleteApi instance.
Returns:
DeleteApiAsync: Async API for deleting data
"""
async def ping(self) -> bool:
"""
Asynchronously check if server is reachable.
Returns:
bool: True if server is reachable
"""
async def version(self) -> str:
"""
Asynchronously get server version.
Returns:
str: Server version string
"""
async def build(self) -> str:
"""
Asynchronously get server build information.
Returns:
str: Build information
"""
async def close(self) -> None:
"""
Close the async client and clean up resources.
"""
async def __aenter__(self) -> 'InfluxDBClientAsync': ...
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...
@classmethod
def from_config_file(
cls,
config_file: str = "config.ini",
debug: bool = None,
enable_gzip: bool = False,
**kwargs
) -> 'InfluxDBClientAsync':
"""
Create async client from configuration file.
"""
@classmethod
def from_env_properties(
cls,
debug: bool = None,
enable_gzip: bool = False,
**kwargs
) -> 'InfluxDBClientAsync':
"""
Create async client from environment variables.
"""Basic async client usage:
import asyncio
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
async def main():
# Use async context manager
async with InfluxDBClientAsync(
url="http://localhost:8086",
token="your-token",
org="your-org"
) as client:
# Check connectivity
if await client.ping():
print(f"Connected to InfluxDB version: {await client.version()}")
# Get async APIs
query_api = client.query_api()
write_api = client.write_api()
# Use async operations
await write_api.write(bucket="test", record=point)
results = await query_api.query(flux_query)
asyncio.run(main())Concurrent async operations:
async def concurrent_operations():
async with InfluxDBClientAsync(url="http://localhost:8086", token="token") as client:
query_api = client.query_api()
# Execute multiple queries concurrently
queries = [
"from(bucket: 'bucket1') |> range(start: -1h)",
"from(bucket: 'bucket2') |> range(start: -1h)",
"from(bucket: 'bucket3') |> range(start: -1h)"
]
# Run all queries concurrently
results = await asyncio.gather(*[
query_api.query(query) for query in queries
])
for i, result in enumerate(results):
print(f"Query {i+1}: {len(result)} tables")
asyncio.run(concurrent_operations())Low-level HTTP client configuration class for advanced customization of client behavior.
class Configuration:
def __init__(self):
"""Initialize configuration with default settings."""
# Connection settings
host: str # InfluxDB server host
temp_folder_path: str # Temporary file storage path
# Authentication settings
api_key: dict # API key configuration
api_key_prefix: dict # API key prefix mapping
username: str # Basic auth username
password: str # Basic auth password
# HTTP client settings
debug: bool # Enable debug output
verify_ssl: bool # Verify SSL certificates
ssl_ca_cert: str # Path to CA certificate file
cert_file: str # Path to client certificate file
cert_key_file: str # Path to client certificate key file
cert_key_password: str # Client certificate key password
ssl_context: Any # Custom SSL context
# Proxy settings
proxy: str # Proxy server URL
proxy_headers: dict # Additional proxy headers
# Connection pool settings
connection_pool_maxsize: int # Maximum connection pool size
timeout: int # Request timeout in seconds
def to_debug_report(self) -> str:
"""
Generate debug report of configuration.
Returns:
str: Debug report string
"""Custom HTTP client configuration:
from influxdb_client import Configuration, InfluxDBClient
# Create custom configuration
config = Configuration()
config.host = "https://my-influxdb.example.com"
config.verify_ssl = True
config.ssl_ca_cert = "/path/to/ca.crt"
config.timeout = 30
config.connection_pool_maxsize = 20
config.debug = True
# Use with client
client = InfluxDBClient(
url="https://my-influxdb.example.com",
token="token",
configuration=config
)Proxy configuration:
config = Configuration()
config.proxy = "http://proxy.company.com:8080"
config.proxy_headers = {
"Proxy-Authorization": "Basic dXNlcjpwYXNz",
"Custom-Header": "value"
}
client = InfluxDBClient(
url="http://localhost:8086",
token="token",
configuration=config
)SSL/TLS configuration:
config = Configuration()
config.verify_ssl = True
config.ssl_ca_cert = "/etc/ssl/certs/ca-bundle.crt"
config.cert_file = "/path/to/client.crt"
config.cert_key_file = "/path/to/client.key"
config.cert_key_password = "key-password"
# For custom SSL context
import ssl
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
config.ssl_context = ssl_contextResponse types for server health and status monitoring.
class Ready:
"""Server readiness status."""
status: str # "ready" or other status
started: str # Server start timestamp
up: str # Server uptime
class HealthCheck:
"""Server health check response (deprecated)."""
name: str # Service name
status: str # Health status
version: str # Service version
commit: str # Git commit hash
class Routes:
"""Available API routes information."""
query: QueryRoute
write: WriteRoute
ping: PingRoute
# Additional route definitionsServer health monitoring:
def monitor_influxdb_health(client):
try:
# Basic connectivity check
if client.ping():
print("✓ InfluxDB server is reachable")
# Get detailed server information
version = client.version()
build = client.build()
ready_status = client.ready()
print(f"✓ Version: {version}")
print(f"✓ Build: {build}")
print(f"✓ Status: {ready_status.status}")
print(f"✓ Started: {ready_status.started}")
print(f"✓ Uptime: {ready_status.up}")
return True
else:
print("✗ InfluxDB server is not reachable")
return False
except Exception as e:
print(f"✗ Health check failed: {e}")
return False
# Usage
client = InfluxDBClient(url="http://localhost:8086", token="token")
is_healthy = monitor_influxdb_health(client)Async health monitoring:
async def async_health_check():
async with InfluxDBClientAsync(url="http://localhost:8086", token="token") as client:
if await client.ping():
version = await client.version()
build = await client.build()
print(f"Async health check: OK (v{version}, {build})")
else:
print("Async health check: FAILED")
asyncio.run(async_health_check())# Client configuration types
ClientConfig = Dict[str, Any] # Generic client configuration
ConnectionConfig = Dict[str, Any] # HTTP connection settings
# Authentication types
AuthToken = str # Authentication token
AuthCredentials = Dict[str, str] # Username/password auth
# Timeout and retry types
TimeoutConfig = Union[int, float] # Timeout in seconds/milliseconds
RetryConfig = Dict[str, Any] # Retry policy configuration
# Server response types
ServerVersion = str # Version string like "2.7.1"
ServerBuild = str # Build information string
ServerStatus = str # Status indicator
# Configuration file sections
ConfigSection = Dict[str, str] # INI file section
ConfigFile = Dict[str, ConfigSection] # Full INI configuration
# Environment variable names (constants)
ENV_URL = "INFLUXDB_V2_URL"
ENV_TOKEN = "INFLUXDB_V2_TOKEN"
ENV_ORG = "INFLUXDB_V2_ORG"
ENV_TIMEOUT = "INFLUXDB_V2_TIMEOUT"
# Exception types
class ConnectionError(InfluxDBError):
"""Raised when connection to InfluxDB fails."""
pass
class AuthenticationError(InfluxDBError):
"""Raised when authentication fails."""
pass
class ConfigurationError(InfluxDBError):
"""Raised when client configuration is invalid."""
passInstall with Tessl CLI
npx tessl i tessl/pypi-influxdb-clientdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10