Comprehensive Python client library for InfluxDB 2.x with sync/async APIs for writing, querying, and managing time series data.
82
Comprehensive functionality for writing time series data points to InfluxDB using the WriteApi with support for different write modes, batching, retry policies, and precision levels. The writing system supports both synchronous and asynchronous operations with flexible configuration options.
Main API for writing data to InfluxDB buckets with configurable write options, retry policies, and batching behavior.
class WriteApi:
def __init__(
self,
influxdb_client,
write_options: WriteOptions = WriteOptions(),
point_settings: PointSettings = PointSettings()
): ...
def write(
self,
bucket: str,
org: str = None,
record: Union[Point, str, List[Point], List[str], pandas.DataFrame, bytes, Any] = None,
write_precision: WritePrecision = WritePrecision.NS,
**kwargs
) -> None:
"""
Write data to InfluxDB.
Parameters:
- bucket (str): Destination bucket name
- org (str, optional): Organization name or ID
- record (Union[Point, str, List, DataFrame, bytes]): Data to write in various formats
- **kwargs: Additional write parameters
Supported record formats:
- Point object or list of Point objects
- Line protocol string or list of strings
- Pandas DataFrame with proper time indexing
- Bytes representing line protocol data
"""
def flush(self) -> None:
"""
Flush any pending writes to InfluxDB.
"""
def close(self) -> None:
"""
Close the write API and flush any pending writes.
"""
def __enter__(self) -> 'WriteApi': ...
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...Basic point writing:
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
client = InfluxDBClient(url="http://localhost:8086", token="token", org="org")
write_api = client.write_api(write_options=SYNCHRONOUS)
# Write single point
point = Point("temperature") \
.tag("location", "room1") \
.field("value", 23.5) \
.time(datetime.utcnow(), WritePrecision.S)
write_api.write(bucket="sensors", record=point)
# Write multiple points
points = [
Point("temperature").tag("location", "room1").field("value", 23.5),
Point("temperature").tag("location", "room2").field("value", 24.1)
]
write_api.write(bucket="sensors", record=points)
# Write line protocol string
line_protocol = "temperature,location=room3 value=22.8"
write_api.write(bucket="sensors", record=line_protocol)DataFrame writing:
import pandas as pd
# Create DataFrame with datetime index
df = pd.DataFrame({
'temperature': [23.5, 24.1, 22.8],
'humidity': [45.2, 43.1, 46.8],
'location': ['room1', 'room2', 'room3']
})
df.index = pd.date_range('2023-01-01', periods=3, freq='H')
write_api.write(bucket="sensors", record=df, data_frame_measurement_name='climate')Asynchronous version of WriteApi for non-blocking write operations.
class WriteApiAsync:
def __init__(
self,
influxdb_client,
point_settings: PointSettings = PointSettings()
): ...
async def write(
self,
bucket: str,
org: str = None,
record: Union[Point, str, List[Point], List[str], pandas.DataFrame, bytes] = None,
**kwargs
) -> bool:
"""
Asynchronously write data to InfluxDB.
Parameters:
- bucket (str): Destination bucket name
- org (str, optional): Organization name or ID
- record: Data to write in supported formats
"""
async def close(self) -> None:
"""
Close the async write API.
"""import asyncio
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
async def write_data():
async with InfluxDBClientAsync(url="http://localhost:8086", token="token") as client:
write_api = client.write_api()
point = Point("async_measurement") \
.tag("source", "async") \
.field("value", 42.0)
await write_api.write(bucket="async_bucket", record=point)
asyncio.run(write_data())Core class for representing individual data points with measurement, tags, fields, and timestamps.
class Point:
def __init__(self, measurement_name: str = None): ...
@staticmethod
def measurement(measurement: str) -> 'Point':
"""
Create a Point with the specified measurement name.
Parameters:
- measurement (str): Measurement name
Returns:
Point: New Point instance
"""
@staticmethod
def from_dict(
dictionary: dict,
write_precision: WritePrecision = WritePrecision.NS,
**kwargs
) -> 'Point':
"""
Create Point from dictionary representation.
Parameters:
- dictionary (dict): Dictionary with measurement, tags, fields, and time
- write_precision (WritePrecision): Time precision for timestamp
Returns:
Point: Point created from dictionary
"""
def tag(self, key: str, value: str) -> 'Point':
"""
Add a tag key-value pair to the point.
Parameters:
- key (str): Tag key
- value (str): Tag value
Returns:
Point: Self for method chaining
"""
def field(self, key: str, value: Any) -> 'Point':
"""
Add a field key-value pair to the point.
Parameters:
- key (str): Field key
- value (Any): Field value (int, float, str, bool)
Returns:
Point: Self for method chaining
"""
def time(
self,
timestamp: Union[datetime, str, int, float],
write_precision: WritePrecision = WritePrecision.NS
) -> 'Point':
"""
Set the timestamp for the point.
Parameters:
- timestamp: Timestamp in various formats
- write_precision (WritePrecision): Precision for the timestamp
Returns:
Point: Self for method chaining
"""
def to_line_protocol(self) -> str:
"""
Convert the point to line protocol format.
Returns:
str: Line protocol representation
"""
@property
def write_precision(self) -> WritePrecision:
"""
Get the write precision of this point.
Returns:
WritePrecision: The precision used for the timestamp
"""
@classmethod
def set_str_rep(cls, str_rep: str) -> None:
"""
Set string representation for Point class.
Parameters:
- str_rep (str): String representation format
"""Basic point creation:
from influxdb_client import Point, WritePrecision
from datetime import datetime
# Method chaining
point = Point("cpu_usage") \
.tag("host", "server1") \
.tag("region", "us-west") \
.field("usage_percent", 85.2) \
.field("core_count", 8) \
.time(datetime.utcnow(), WritePrecision.S)
print(point.to_line_protocol())
# Output: cpu_usage,host=server1,region=us-west usage_percent=85.2,core_count=8i 1640995200000000000Creating from dictionary:
data = {
"measurement": "sensor_data",
"tags": {"location": "warehouse", "sensor_id": "temp001"},
"fields": {"temperature": 23.5, "battery": 87},
"time": datetime.utcnow()
}
point = Point.from_dict(data, WritePrecision.MS)Different timestamp formats:
from datetime import datetime
import time
# Using datetime object
point1 = Point("test").field("value", 1).time(datetime.utcnow(), WritePrecision.S)
# Using Unix timestamp
point2 = Point("test").field("value", 2).time(time.time(), WritePrecision.S)
# Using string timestamp (RFC3339)
point3 = Point("test").field("value", 3).time("2023-01-01T12:00:00Z", WritePrecision.S)
# Using nanosecond timestamp
point4 = Point("test").field("value", 4).time(1640995200000000000, WritePrecision.NS)Configuration class for controlling write behavior including batching, retry policies, and write modes.
class WriteOptions:
def __init__(
self,
write_type: WriteType = WriteType.batching,
batch_size: int = 1000,
flush_interval: int = 1000,
jitter_interval: int = 0,
retry_interval: int = 5000,
max_retries: int = 5,
max_retry_delay: int = 125000,
max_retry_time: int = 180000,
exponential_base: int = 2,
max_close_wait: int = 300000,
write_scheduler: Any = None
):
"""
Configure write operation behavior.
Parameters:
- write_type (WriteType): Write operation mode
- batch_size (int): Number of points to batch together
- flush_interval (int): Interval in milliseconds to flush batches
- jitter_interval (int): Random delay in milliseconds to add to flush_interval
- retry_interval (int): Initial retry delay in milliseconds
- max_retries (int): Maximum number of retry attempts
- max_retry_delay (int): Maximum retry delay in milliseconds
- max_retry_time (int): Maximum total retry time in milliseconds
- exponential_base (int): Base for exponential backoff calculation
- max_close_wait (int): Maximum wait time when closing write API
- write_scheduler: Custom scheduler for write operations
"""# Available as constants
SYNCHRONOUS: WriteOptions # Immediate synchronous writes
ASYNCHRONOUS: WriteOptions # Asynchronous writes with default batchingCustom write options:
from influxdb_client import WriteOptions, WriteType
from influxdb_client.client.write.retry import WritesRetry
# High-throughput batching configuration
high_throughput_options = WriteOptions(
write_type=WriteType.batching,
batch_size=5000,
flush_interval=500, # 500ms
max_retries=3,
retry_interval=1000
)
# Synchronous writes with retries
sync_options = WriteOptions(
write_type=WriteType.synchronous,
max_retries=5,
retry_interval=2000,
max_retry_delay=30000
)
# Use with WriteAPI
write_api = client.write_api(write_options=high_throughput_options)Retry strategy configuration:
# Configure exponential backoff
exponential_options = WriteOptions(
write_type=WriteType.batching,
retry_interval=1000, # Start with 1 second
max_retry_delay=60000, # Max 60 seconds
exponential_base=2, # Double the delay each time
max_retries=5
)
# Configure linear backoff
linear_options = WriteOptions(
write_type=WriteType.batching,
retry_interval=5000, # 5 second intervals
max_retry_delay=5000, # Keep constant delay
exponential_base=1, # No exponential increase
max_retries=3
)Configuration for default tags applied to all points written through a WriteApi instance.
class PointSettings:
def __init__(self, default_tags: dict = None):
"""
Configure default tags for all points.
Parameters:
- default_tags (dict): Tags to add to every point
"""
def add_default_tag(self, key: str, value: str) -> None:
"""
Add a default tag that will be applied to all points.
Parameters:
- key (str): Tag key
- value (str): Tag value
"""from influxdb_client import PointSettings
# Configure default tags
point_settings = PointSettings(default_tags={
"environment": "production",
"application": "sensor-collector",
"version": "1.2.3"
})
# Add additional default tag
point_settings.add_default_tag("datacenter", "us-west-2")
# Use with WriteAPI
write_api = client.write_api(
write_options=SYNCHRONOUS,
point_settings=point_settings
)
# All points written will automatically include default tags
point = Point("temperature").field("value", 23.5)
write_api.write(bucket="sensors", record=point)
# Actual point written: temperature,environment=production,application=sensor-collector,version=1.2.3,datacenter=us-west-2 value=23.5class WritePrecision(Enum):
"""Time precision constants for timestamps."""
NS = "ns" # nanoseconds (default)
US = "us" # microseconds
MS = "ms" # milliseconds
S = "s" # seconds
class WriteType(Enum):
"""Write operation modes."""
batching = "batching" # Background batching (default)
asynchronous = "asynchronous" # Async individual writes
synchronous = "synchronous" # Immediate synchronous writes
# Retry configuration
class WritesRetry:
def __init__(
self,
total: int = 3,
retry_interval: int = 5000,
max_retry_delay: int = 125000,
max_retry_time: int = 180000,
exponential_base: int = 2,
jitter_interval: int = 0
): ...
# Exception types
class WriteApiError(InfluxDBError):
"""Raised when write operations fail."""
pass
class WriteRetryError(WriteApiError):
"""Raised when write retries are exhausted."""
passInstall with Tessl CLI
npx tessl i tessl/pypi-influxdb-clientdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10