CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-influxdb

InfluxDB client library for time series database operations with comprehensive API for data management and querying

Pending
Overview
Eval results
Files

data-management.mddocs/

Data Management

Comprehensive utilities for efficient data handling including bulk data insertion, line protocol formatting, and query result processing. These tools optimize data operations and provide convenient abstractions for common data management tasks.

Capabilities

SeriesHelper - Bulk Data Operations

Class-based utility for efficient bulk data insertion with configurable auto-commit behavior and immutable data point definitions.

class SeriesHelper:
    """
    Helper class for writing data points in bulk with immutable data points 
    and configurable auto-commit behavior.
    
    Usage requires defining a Meta class with series configuration.
    """
    
    def __init__(self, **kw):
        """
        Create new data point with field values.
        
        Parameters:
        - **kw: Field values matching Meta.fields definitions
        
        Note: Creates immutable data point that is queued for bulk commit
        """
    
    @classmethod
    def commit(cls, client=None):
        """
        Commit all queued datapoints via InfluxDB client.
        
        Parameters:
        - client (InfluxDBClient): Client instance (default: uses Meta.client)
        
        Returns:
        bool: True if successful
        
        Raises:
        InfluxDBClientError: On write errors
        """
    
    @classmethod  
    def _json_body_(cls):
        """
        Generate JSON body for all queued datapoints.
        
        Returns:
        list: List of point dictionaries ready for InfluxDB
        """
    
    @classmethod
    def _reset_(cls):
        """
        Reset internal data storage, clearing all queued points.
        """
    
    @staticmethod
    def _current_timestamp():
        """
        Get current timestamp in nanoseconds since epoch.
        
        Returns:
        int: Current timestamp in nanoseconds
        """

SeriesHelper Configuration

SeriesHelper requires configuration via a Meta class defining the series structure:

class Meta:
    # Required attributes
    series_name = 'measurement_name'
    fields = ['field1', 'field2', 'field3'] 
    tags = ['tag1', 'tag2']
    
    # Optional attributes  
    bulk_size = 300  # Auto-commit after N points (0 disables)
    client = influxdb_client_instance  # Default client
    autocommit = True  # Enable auto-commit behavior

SeriesHelper Usage Examples

from influxdb import InfluxDBClient, SeriesHelper

# Configure client
client = InfluxDBClient(database='metrics')

# Define SeriesHelper subclass
class CpuMetrics(SeriesHelper):
    class Meta:
        series_name = 'cpu_usage'
        fields = ['user', 'system', 'idle']
        tags = ['host', 'cpu_core']
        bulk_size = 100  # Auto-commit every 100 points
        client = client
        autocommit = True

# Create data points
CpuMetrics(host='server01', cpu_core='core0', user=25.5, system=10.2, idle=64.3)
CpuMetrics(host='server01', cpu_core='core1', user=30.1, system=8.7, idle=61.2)
CpuMetrics(host='server02', cpu_core='core0', user=45.8, system=15.3, idle=38.9)

# Manual commit (if autocommit disabled)
CpuMetrics.commit()

# Reset queued points
CpuMetrics._reset_()

# Generate JSON without committing  
json_data = CpuMetrics._json_body_()
print(json_data)

Advanced SeriesHelper Patterns

# Multiple series helpers
class MemoryMetrics(SeriesHelper):
    class Meta:
        series_name = 'memory_usage'
        fields = ['used', 'available', 'buffer', 'cache']
        tags = ['host']
        bulk_size = 50
        client = client

class DiskMetrics(SeriesHelper):
    class Meta:
        series_name = 'disk_io'
        fields = ['read_bytes', 'write_bytes', 'read_ops', 'write_ops']
        tags = ['host', 'device']
        bulk_size = 200
        client = client

# Batch data collection
def collect_system_metrics(hosts):
    for host in hosts:
        # CPU data
        cpu_data = get_cpu_stats(host)
        for core, stats in cpu_data.items():
            CpuMetrics(host=host, cpu_core=core, **stats)
        
        # Memory data
        mem_data = get_memory_stats(host)
        MemoryMetrics(host=host, **mem_data)
        
        # Disk data
        disk_data = get_disk_stats(host)
        for device, stats in disk_data.items():
            DiskMetrics(host=host, device=device, **stats)

# All helpers auto-commit based on their bulk_size settings
collect_system_metrics(['server01', 'server02', 'server03'])

# Manual commit all at once
CpuMetrics.commit()
MemoryMetrics.commit()  
DiskMetrics.commit()

Line Protocol Utilities

Functions for creating efficient line protocol formatted data, InfluxDB's native wire format for optimal write performance.

def make_line(measurement, tags=None, fields=None, time=None, precision=None):
    """
    Create single line protocol formatted string.
    
    Parameters:
    - measurement (str): Measurement name
    - tags (dict): Tag key-value pairs (default: None)
    - fields (dict): Field key-value pairs (default: None)  
    - time (int or datetime): Timestamp (default: current time)
    - precision (str): Time precision ('s', 'ms', 'u', 'ns') (default: None)
    
    Returns:
    str: Line protocol formatted string
    
    Raises:
    ValueError: If measurement or fields are missing
    """

def make_lines(data, precision=None):
    """
    Create multiple line protocol strings from data structure.
    
    Parameters:
    - data (list): List of point dictionaries with measurement, tags, fields, time
    - precision (str): Time precision ('s', 'ms', 'u', 'ns') (default: None)
    
    Returns:
    list: List of line protocol formatted strings
    
    Raises:
    ValueError: If data format is invalid
    """

def quote_ident(value):
    """
    Quote identifier strings for line protocol.
    
    Parameters:
    - value (str): Identifier to quote
    
    Returns:
    str: Quoted identifier
    """

def quote_literal(value):
    """
    Quote literal strings for line protocol.
    
    Parameters:
    - value (str): Literal to quote
    
    Returns:
    str: Quoted literal
    """

# Time reference constant
EPOCH: datetime  # UTC epoch timestamp reference

Line Protocol Examples

from influxdb.line_protocol import make_line, make_lines
from datetime import datetime, timezone

# Create single line
line = make_line(
    measurement='cpu_usage',
    tags={'host': 'server01', 'cpu': 'cpu0'},
    fields={'user': 23.5, 'system': 12.3, 'idle': 64.2},
    time=datetime.now(timezone.utc)
)
print(line)
# Output: cpu_usage,host=server01,cpu=cpu0 user=23.5,system=12.3,idle=64.2 1694068704000000000

# Create multiple lines from data
data = [
    {
        'measurement': 'cpu_usage',
        'tags': {'host': 'server01'},
        'fields': {'value': 75.5},
        'time': '2023-09-07T07:18:24Z'
    },
    {
        'measurement': 'memory_usage', 
        'tags': {'host': 'server01'},
        'fields': {'value': 82.3},
        'time': '2023-09-07T07:18:24Z'
    }
]

lines = make_lines(data, precision='s')
for line in lines:
    print(line)

# Write line protocol directly
client = InfluxDBClient()
line_data = make_line('temperature', {'sensor': 'room1'}, {'value': 23.5})
client.write([line_data], protocol='line')

# Batch line protocol creation
def create_sensor_lines(sensor_readings):
    lines = []
    for reading in sensor_readings:
        line = make_line(
            measurement='sensor_data',
            tags={'sensor_id': reading['id'], 'location': reading['location']},
            fields={'temperature': reading['temp'], 'humidity': reading['humidity']},
            time=reading['timestamp']
        )
        lines.append(line)
    return lines

# High-performance writing
sensor_data = get_sensor_readings()  # Get data
lines = create_sensor_lines(sensor_data)
client.write('\n'.join(lines), protocol='line')

ResultSet - Query Result Processing

Wrapper class for InfluxDB query results providing iteration, filtering, and data extraction capabilities.

class ResultSet:
    """
    Wrapper around InfluxDB query results with iteration and filtering capabilities.
    """
    
    def __init__(self, series, raise_errors=True):
        """
        Initialize ResultSet from query response.
        
        Parameters:
        - series (list): Raw series data from InfluxDB query response
        - raise_errors (bool): Raise exceptions on query errors (default: True)
        
        Raises:
        InfluxDBClientError: If query errors and raise_errors=True
        """
    
    def get_points(self, measurement=None, tags=None):
        """
        Get data points matching optional filters.
        
        Parameters:
        - measurement (str): Filter by measurement name (default: None)
        - tags (dict): Filter by tag key-value pairs (default: None)
        
        Yields:
        dict: Individual data points as dictionaries
        
        Example:
        for point in result.get_points(measurement='cpu_usage', tags={'host': 'server01'}):
            print(point['time'], point['value'])
        """
    
    def keys(self):
        """
        Get list of measurement keys in the result set.
        
        Returns:
        list: List of measurement key tuples (name, tags)
        """
    
    def items(self):
        """
        Get key-value pairs for all series in result set.
        
        Yields:
        tuple: (key, points_generator) pairs
        """
    
    @staticmethod
    def point_from_cols_vals(cols, vals):
        """  
        Create point dictionary from column names and values.
        
        Parameters:
        - cols (list): Column names
        - vals (list): Column values
        
        Returns:
        dict: Point dictionary with column names as keys
        """
    
    def __getitem__(self, key):
        """
        Retrieve series by key (deprecated - use get_points instead).
        
        Parameters:
        - key: Series key
        
        Returns:
        generator: Points for the specified series
        """
    
    def __iter__(self):
        """
        Iterate over all points in all series.
        
        Yields:
        dict: Individual data points
        """
    
    def __len__(self):
        """
        Get number of series in result set.
        
        Returns:
        int: Number of series
        """
    
    def __repr__(self):
        """
        String representation of ResultSet.
        
        Returns:
        str: ResultSet description
        """
    
    # Properties
    @property
    def raw(self):
        """Raw JSON response from InfluxDB."""
        
    @property  
    def error(self):
        """Error message from InfluxDB query (if any)."""

ResultSet Usage Examples

from influxdb import InfluxDBClient

client = InfluxDBClient(database='metrics')

# Execute query
result = client.query('SELECT * FROM cpu_usage WHERE time >= now() - 1h')

# Basic iteration
for point in result.get_points():
    print(f"Time: {point['time']}, Value: {point['value']}")

# Filtered iteration
for point in result.get_points(measurement='cpu_usage', tags={'host': 'server01'}):
    print(f"Server01 CPU: {point['value']}%")

# Get all keys
keys = result.keys()
print("Available measurements:", keys)

# Process by series
for key, points in result.items():
    measurement, tags = key
    print(f"Processing {measurement} with tags {tags}")
    for point in points:
        # Process each point
        pass

# Check for errors
if result.error:
    print("Query error:", result.error)

# Access raw response
raw_data = result.raw
print("Raw InfluxDB response:", raw_data)

# Convert to simple list
all_points = list(result.get_points())
print(f"Retrieved {len(all_points)} total points")

# Filter and aggregate
cpu_values = [
    point['value'] for point in result.get_points() 
    if point.get('measurement') == 'cpu_usage'
]
avg_cpu = sum(cpu_values) / len(cpu_values) if cpu_values else 0
print(f"Average CPU usage: {avg_cpu:.2f}%")

Advanced ResultSet Processing

# Complex query with multiple measurements
query = """
    SELECT mean(value) as avg_value, max(value) as max_value 
    FROM cpu_usage, memory_usage 
    WHERE time >= now() - 24h 
    GROUP BY time(1h), host
"""

result = client.query(query)

# Group results by measurement and host
from collections import defaultdict

results_by_measurement = defaultdict(list)
for point in result.get_points():
    measurement = point.get('name', 'unknown')  # Series name
    host = point.get('host', 'unknown')
    
    results_by_measurement[f"{measurement}_{host}"].append({
        'time': point['time'],
        'avg_value': point.get('avg_value'),
        'max_value': point.get('max_value')
    })

# Process grouped results
for key, points in results_by_measurement.items():
    print(f"Processing {key}: {len(points)} time points")
    
    # Calculate trends
    values = [p['avg_value'] for p in points if p['avg_value'] is not None]
    if len(values) >= 2:
        trend = values[-1] - values[0]  # Simple trend calculation
        print(f"  Trend: {trend:+.2f}")

# Export results to different formats
def export_results(result_set, format='json'):
    """Export ResultSet to various formats."""
    points = list(result_set.get_points())
    
    if format == 'json':
        import json
        return json.dumps(points, default=str)
    
    elif format == 'csv':
        import csv, io
        if not points:
            return ""
        
        output = io.StringIO()
        writer = csv.DictWriter(output, fieldnames=points[0].keys())
        writer.writeheader()
        writer.writerows(points)
        return output.getvalue()
    
    elif format == 'dataframe':
        import pandas as pd
        return pd.DataFrame(points)

# Export query results
json_data = export_results(result, 'json')
csv_data = export_results(result, 'csv')
df = export_results(result, 'dataframe')

Performance Optimization

Efficient Data Operations

# Use line protocol for maximum write performance
from influxdb.line_protocol import make_lines

# Batch create line protocol
data_points = [
    {'measurement': 'metrics', 'tags': {'host': f'server{i:02d}'}, 
     'fields': {'value': i * 10.5}} 
    for i in range(1000)
]

lines = make_lines(data_points)
line_data = '\n'.join(lines)
client.write(line_data, protocol='line')

# SeriesHelper for structured bulk inserts
class HighThroughputMetrics(SeriesHelper):
    class Meta:
        series_name = 'high_volume_data'
        fields = ['value1', 'value2', 'value3']
        tags = ['source', 'type']
        bulk_size = 10000  # Large batch size
        client = client

# Stream processing pattern
def process_data_stream(data_stream):
    for batch in data_stream.batches(size=1000):
        for record in batch:
            HighThroughputMetrics(
                source=record['source'],
                type=record['type'],
                value1=record['v1'],
                value2=record['v2'], 
                value3=record['v3']
            )
        # Auto-commits when bulk_size reached

# Memory-efficient result processing
def process_large_query_results(query):
    result = client.query(query, chunked=True, chunk_size=10000)
    
    # Process points in chunks to avoid memory issues
    chunk_count = 0
    for point in result.get_points():
        # Process individual point
        process_point(point)
        
        chunk_count += 1
        if chunk_count % 10000 == 0:
            print(f"Processed {chunk_count} points...")

Install with Tessl CLI

npx tessl i tessl/pypi-influxdb

docs

client.md

data-management.md

database-operations.md

dataframe-client.md

index.md

legacy.md

tile.json