CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-influxdb-client

Comprehensive Python client library for InfluxDB 2.x with sync/async APIs for writing, querying, and managing time series data.

82

1.18x
Overview
Eval results
Files

writing-data.mddocs/

Writing Data

Comprehensive functionality for writing time series data points to InfluxDB using the WriteApi with support for different write modes, batching, retry policies, and precision levels. The writing system supports both synchronous and asynchronous operations with flexible configuration options.

Capabilities

WriteApi

Main API for writing data to InfluxDB buckets with configurable write options, retry policies, and batching behavior.

class WriteApi:
    def __init__(
        self,
        influxdb_client,
        write_options: WriteOptions = WriteOptions(),
        point_settings: PointSettings = PointSettings()
    ): ...
    
    def write(
        self,
        bucket: str,
        org: str = None,
        record: Union[Point, str, List[Point], List[str], pandas.DataFrame, bytes, Any] = None,
        write_precision: WritePrecision = WritePrecision.NS,
        **kwargs
    ) -> None:
        """
        Write data to InfluxDB.

        Parameters:
        - bucket (str): Destination bucket name
        - org (str, optional): Organization name or ID 
        - record (Union[Point, str, List, DataFrame, bytes]): Data to write in various formats
        - **kwargs: Additional write parameters

        Supported record formats:
        - Point object or list of Point objects
        - Line protocol string or list of strings
        - Pandas DataFrame with proper time indexing
        - Bytes representing line protocol data
        """
        
    def flush(self) -> None:
        """
        Flush any pending writes to InfluxDB.
        """
        
    def close(self) -> None:
        """
        Close the write API and flush any pending writes.
        """
        
    def __enter__(self) -> 'WriteApi': ...
    def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...

Usage Examples

Basic point writing:

from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS

client = InfluxDBClient(url="http://localhost:8086", token="token", org="org")
write_api = client.write_api(write_options=SYNCHRONOUS)

# Write single point
point = Point("temperature") \
    .tag("location", "room1") \
    .field("value", 23.5) \
    .time(datetime.utcnow(), WritePrecision.S)

write_api.write(bucket="sensors", record=point)

# Write multiple points
points = [
    Point("temperature").tag("location", "room1").field("value", 23.5),
    Point("temperature").tag("location", "room2").field("value", 24.1)
]
write_api.write(bucket="sensors", record=points)

# Write line protocol string
line_protocol = "temperature,location=room3 value=22.8"
write_api.write(bucket="sensors", record=line_protocol)

DataFrame writing:

import pandas as pd

# Create DataFrame with datetime index
df = pd.DataFrame({
    'temperature': [23.5, 24.1, 22.8],
    'humidity': [45.2, 43.1, 46.8],
    'location': ['room1', 'room2', 'room3']
})
df.index = pd.date_range('2023-01-01', periods=3, freq='H')

write_api.write(bucket="sensors", record=df, data_frame_measurement_name='climate')

WriteApiAsync

Asynchronous version of WriteApi for non-blocking write operations.

class WriteApiAsync:
    def __init__(
        self,
        influxdb_client,
        point_settings: PointSettings = PointSettings()
    ): ...
    
    async def write(
        self,
        bucket: str,
        org: str = None,
        record: Union[Point, str, List[Point], List[str], pandas.DataFrame, bytes] = None,
        **kwargs
    ) -> bool:
        """
        Asynchronously write data to InfluxDB.

        Parameters:
        - bucket (str): Destination bucket name
        - org (str, optional): Organization name or ID
        - record: Data to write in supported formats
        """
        
    async def close(self) -> None:
        """
        Close the async write API.
        """

Async Usage Example

import asyncio
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync

async def write_data():
    async with InfluxDBClientAsync(url="http://localhost:8086", token="token") as client:
        write_api = client.write_api()
        
        point = Point("async_measurement") \
            .tag("source", "async") \
            .field("value", 42.0)
            
        await write_api.write(bucket="async_bucket", record=point)
        
asyncio.run(write_data())

Point

Core class for representing individual data points with measurement, tags, fields, and timestamps.

class Point:
    def __init__(self, measurement_name: str = None): ...
    
    @staticmethod
    def measurement(measurement: str) -> 'Point':
        """
        Create a Point with the specified measurement name.
        
        Parameters:
        - measurement (str): Measurement name
        
        Returns:
        Point: New Point instance
        """
        
    @staticmethod
    def from_dict(
        dictionary: dict,
        write_precision: WritePrecision = WritePrecision.NS,
        **kwargs
    ) -> 'Point':
        """
        Create Point from dictionary representation.
        
        Parameters:
        - dictionary (dict): Dictionary with measurement, tags, fields, and time
        - write_precision (WritePrecision): Time precision for timestamp
        
        Returns:
        Point: Point created from dictionary
        """
    
    def tag(self, key: str, value: str) -> 'Point':
        """
        Add a tag key-value pair to the point.
        
        Parameters:
        - key (str): Tag key
        - value (str): Tag value
        
        Returns:
        Point: Self for method chaining
        """
        
    def field(self, key: str, value: Any) -> 'Point':
        """
        Add a field key-value pair to the point.
        
        Parameters:
        - key (str): Field key  
        - value (Any): Field value (int, float, str, bool)
        
        Returns:
        Point: Self for method chaining
        """
        
    def time(
        self,
        timestamp: Union[datetime, str, int, float],
        write_precision: WritePrecision = WritePrecision.NS
    ) -> 'Point':
        """
        Set the timestamp for the point.
        
        Parameters:
        - timestamp: Timestamp in various formats
        - write_precision (WritePrecision): Precision for the timestamp
        
        Returns:
        Point: Self for method chaining
        """
        
    def to_line_protocol(self) -> str:
        """
        Convert the point to line protocol format.
        
        Returns:
        str: Line protocol representation
        """
        
    @property
    def write_precision(self) -> WritePrecision:
        """
        Get the write precision of this point.
        
        Returns:
        WritePrecision: The precision used for the timestamp
        """
        
    @classmethod
    def set_str_rep(cls, str_rep: str) -> None:
        """
        Set string representation for Point class.
        
        Parameters:
        - str_rep (str): String representation format
        """

Point Usage Examples

Basic point creation:

from influxdb_client import Point, WritePrecision
from datetime import datetime

# Method chaining
point = Point("cpu_usage") \
    .tag("host", "server1") \
    .tag("region", "us-west") \
    .field("usage_percent", 85.2) \
    .field("core_count", 8) \
    .time(datetime.utcnow(), WritePrecision.S)

print(point.to_line_protocol())
# Output: cpu_usage,host=server1,region=us-west usage_percent=85.2,core_count=8i 1640995200000000000

Creating from dictionary:

data = {
    "measurement": "sensor_data",
    "tags": {"location": "warehouse", "sensor_id": "temp001"},
    "fields": {"temperature": 23.5, "battery": 87},
    "time": datetime.utcnow()
}

point = Point.from_dict(data, WritePrecision.MS)

Different timestamp formats:

from datetime import datetime
import time

# Using datetime object
point1 = Point("test").field("value", 1).time(datetime.utcnow(), WritePrecision.S)

# Using Unix timestamp
point2 = Point("test").field("value", 2).time(time.time(), WritePrecision.S)

# Using string timestamp (RFC3339)
point3 = Point("test").field("value", 3).time("2023-01-01T12:00:00Z", WritePrecision.S)

# Using nanosecond timestamp
point4 = Point("test").field("value", 4).time(1640995200000000000, WritePrecision.NS)

WriteOptions

Configuration class for controlling write behavior including batching, retry policies, and write modes.

class WriteOptions:
    def __init__(
        self,
        write_type: WriteType = WriteType.batching,
        batch_size: int = 1000,
        flush_interval: int = 1000,
        jitter_interval: int = 0,
        retry_interval: int = 5000,
        max_retries: int = 5,
        max_retry_delay: int = 125000,
        max_retry_time: int = 180000,
        exponential_base: int = 2,
        max_close_wait: int = 300000,
        write_scheduler: Any = None
    ):
        """
        Configure write operation behavior.
        
        Parameters:
        - write_type (WriteType): Write operation mode
        - batch_size (int): Number of points to batch together
        - flush_interval (int): Interval in milliseconds to flush batches
        - jitter_interval (int): Random delay in milliseconds to add to flush_interval
        - retry_interval (int): Initial retry delay in milliseconds
        - max_retries (int): Maximum number of retry attempts
        - max_retry_delay (int): Maximum retry delay in milliseconds
        - max_retry_time (int): Maximum total retry time in milliseconds
        - exponential_base (int): Base for exponential backoff calculation
        - max_close_wait (int): Maximum wait time when closing write API
        - write_scheduler: Custom scheduler for write operations
        """

Predefined WriteOptions

# Available as constants
SYNCHRONOUS: WriteOptions  # Immediate synchronous writes
ASYNCHRONOUS: WriteOptions  # Asynchronous writes with default batching

WriteOptions Usage Examples

Custom write options:

from influxdb_client import WriteOptions, WriteType
from influxdb_client.client.write.retry import WritesRetry

# High-throughput batching configuration
high_throughput_options = WriteOptions(
    write_type=WriteType.batching,
    batch_size=5000,
    flush_interval=500,  # 500ms
    max_retries=3,
    retry_interval=1000
)

# Synchronous writes with retries
sync_options = WriteOptions(
    write_type=WriteType.synchronous,
    max_retries=5,
    retry_interval=2000,
    max_retry_delay=30000
)

# Use with WriteAPI
write_api = client.write_api(write_options=high_throughput_options)

Retry strategy configuration:

# Configure exponential backoff
exponential_options = WriteOptions(
    write_type=WriteType.batching,
    retry_interval=1000,     # Start with 1 second
    max_retry_delay=60000,   # Max 60 seconds
    exponential_base=2,      # Double the delay each time
    max_retries=5
)

# Configure linear backoff
linear_options = WriteOptions(
    write_type=WriteType.batching,
    retry_interval=5000,     # 5 second intervals
    max_retry_delay=5000,    # Keep constant delay
    exponential_base=1,      # No exponential increase
    max_retries=3
)

PointSettings

Configuration for default tags applied to all points written through a WriteApi instance.

class PointSettings:
    def __init__(self, default_tags: dict = None):
        """
        Configure default tags for all points.
        
        Parameters:
        - default_tags (dict): Tags to add to every point
        """
        
    def add_default_tag(self, key: str, value: str) -> None:
        """
        Add a default tag that will be applied to all points.
        
        Parameters:
        - key (str): Tag key
        - value (str): Tag value
        """

PointSettings Usage Example

from influxdb_client import PointSettings

# Configure default tags
point_settings = PointSettings(default_tags={
    "environment": "production",
    "application": "sensor-collector",
    "version": "1.2.3"
})

# Add additional default tag
point_settings.add_default_tag("datacenter", "us-west-2")

# Use with WriteAPI
write_api = client.write_api(
    write_options=SYNCHRONOUS,
    point_settings=point_settings
)

# All points written will automatically include default tags
point = Point("temperature").field("value", 23.5)
write_api.write(bucket="sensors", record=point)
# Actual point written: temperature,environment=production,application=sensor-collector,version=1.2.3,datacenter=us-west-2 value=23.5

Types

class WritePrecision(Enum):
    """Time precision constants for timestamps."""
    NS = "ns"  # nanoseconds (default)
    US = "us"  # microseconds
    MS = "ms"  # milliseconds  
    S = "s"    # seconds

class WriteType(Enum):
    """Write operation modes."""
    batching = "batching"        # Background batching (default)
    asynchronous = "asynchronous"  # Async individual writes
    synchronous = "synchronous"    # Immediate synchronous writes

# Retry configuration
class WritesRetry:
    def __init__(
        self,
        total: int = 3,
        retry_interval: int = 5000,
        max_retry_delay: int = 125000,
        max_retry_time: int = 180000,
        exponential_base: int = 2,
        jitter_interval: int = 0
    ): ...

# Exception types
class WriteApiError(InfluxDBError):
    """Raised when write operations fail."""
    pass

class WriteRetryError(WriteApiError):
    """Raised when write retries are exhausted."""
    pass

Install with Tessl CLI

npx tessl i tessl/pypi-influxdb-client

docs

advanced-operations.md

client-management.md

index.md

querying-data.md

resource-management.md

writing-data.md

tile.json