InfluxDB client library for time series database operations with comprehensive API for data management and querying
—
Comprehensive utilities for efficient data handling including bulk data insertion, line protocol formatting, and query result processing. These tools optimize data operations and provide convenient abstractions for common data management tasks.
Class-based utility for efficient bulk data insertion with configurable auto-commit behavior and immutable data point definitions.
class SeriesHelper:
"""
Helper class for writing data points in bulk with immutable data points
and configurable auto-commit behavior.
Usage requires defining a Meta class with series configuration.
"""
def __init__(self, **kw):
"""
Create new data point with field values.
Parameters:
- **kw: Field values matching Meta.fields definitions
Note: Creates immutable data point that is queued for bulk commit
"""
@classmethod
def commit(cls, client=None):
"""
Commit all queued datapoints via InfluxDB client.
Parameters:
- client (InfluxDBClient): Client instance (default: uses Meta.client)
Returns:
bool: True if successful
Raises:
InfluxDBClientError: On write errors
"""
@classmethod
def _json_body_(cls):
"""
Generate JSON body for all queued datapoints.
Returns:
list: List of point dictionaries ready for InfluxDB
"""
@classmethod
def _reset_(cls):
"""
Reset internal data storage, clearing all queued points.
"""
@staticmethod
def _current_timestamp():
"""
Get current timestamp in nanoseconds since epoch.
Returns:
int: Current timestamp in nanoseconds
"""SeriesHelper requires configuration via a Meta class defining the series structure:
class Meta:
# Required attributes
series_name = 'measurement_name'
fields = ['field1', 'field2', 'field3']
tags = ['tag1', 'tag2']
# Optional attributes
bulk_size = 300 # Auto-commit after N points (0 disables)
client = influxdb_client_instance # Default client
autocommit = True # Enable auto-commit behaviorfrom influxdb import InfluxDBClient, SeriesHelper
# Configure client
client = InfluxDBClient(database='metrics')
# Define SeriesHelper subclass
class CpuMetrics(SeriesHelper):
class Meta:
series_name = 'cpu_usage'
fields = ['user', 'system', 'idle']
tags = ['host', 'cpu_core']
bulk_size = 100 # Auto-commit every 100 points
client = client
autocommit = True
# Create data points
CpuMetrics(host='server01', cpu_core='core0', user=25.5, system=10.2, idle=64.3)
CpuMetrics(host='server01', cpu_core='core1', user=30.1, system=8.7, idle=61.2)
CpuMetrics(host='server02', cpu_core='core0', user=45.8, system=15.3, idle=38.9)
# Manual commit (if autocommit disabled)
CpuMetrics.commit()
# Reset queued points
CpuMetrics._reset_()
# Generate JSON without committing
json_data = CpuMetrics._json_body_()
print(json_data)# Multiple series helpers
class MemoryMetrics(SeriesHelper):
class Meta:
series_name = 'memory_usage'
fields = ['used', 'available', 'buffer', 'cache']
tags = ['host']
bulk_size = 50
client = client
class DiskMetrics(SeriesHelper):
class Meta:
series_name = 'disk_io'
fields = ['read_bytes', 'write_bytes', 'read_ops', 'write_ops']
tags = ['host', 'device']
bulk_size = 200
client = client
# Batch data collection
def collect_system_metrics(hosts):
for host in hosts:
# CPU data
cpu_data = get_cpu_stats(host)
for core, stats in cpu_data.items():
CpuMetrics(host=host, cpu_core=core, **stats)
# Memory data
mem_data = get_memory_stats(host)
MemoryMetrics(host=host, **mem_data)
# Disk data
disk_data = get_disk_stats(host)
for device, stats in disk_data.items():
DiskMetrics(host=host, device=device, **stats)
# All helpers auto-commit based on their bulk_size settings
collect_system_metrics(['server01', 'server02', 'server03'])
# Manual commit all at once
CpuMetrics.commit()
MemoryMetrics.commit()
DiskMetrics.commit()Functions for creating efficient line protocol formatted data, InfluxDB's native wire format for optimal write performance.
def make_line(measurement, tags=None, fields=None, time=None, precision=None):
"""
Create single line protocol formatted string.
Parameters:
- measurement (str): Measurement name
- tags (dict): Tag key-value pairs (default: None)
- fields (dict): Field key-value pairs (default: None)
- time (int or datetime): Timestamp (default: current time)
- precision (str): Time precision ('s', 'ms', 'u', 'ns') (default: None)
Returns:
str: Line protocol formatted string
Raises:
ValueError: If measurement or fields are missing
"""
def make_lines(data, precision=None):
"""
Create multiple line protocol strings from data structure.
Parameters:
- data (list): List of point dictionaries with measurement, tags, fields, time
- precision (str): Time precision ('s', 'ms', 'u', 'ns') (default: None)
Returns:
list: List of line protocol formatted strings
Raises:
ValueError: If data format is invalid
"""
def quote_ident(value):
"""
Quote identifier strings for line protocol.
Parameters:
- value (str): Identifier to quote
Returns:
str: Quoted identifier
"""
def quote_literal(value):
"""
Quote literal strings for line protocol.
Parameters:
- value (str): Literal to quote
Returns:
str: Quoted literal
"""
# Time reference constant
EPOCH: datetime # UTC epoch timestamp referencefrom influxdb.line_protocol import make_line, make_lines
from datetime import datetime, timezone
# Create single line
line = make_line(
measurement='cpu_usage',
tags={'host': 'server01', 'cpu': 'cpu0'},
fields={'user': 23.5, 'system': 12.3, 'idle': 64.2},
time=datetime.now(timezone.utc)
)
print(line)
# Output: cpu_usage,host=server01,cpu=cpu0 user=23.5,system=12.3,idle=64.2 1694068704000000000
# Create multiple lines from data
data = [
{
'measurement': 'cpu_usage',
'tags': {'host': 'server01'},
'fields': {'value': 75.5},
'time': '2023-09-07T07:18:24Z'
},
{
'measurement': 'memory_usage',
'tags': {'host': 'server01'},
'fields': {'value': 82.3},
'time': '2023-09-07T07:18:24Z'
}
]
lines = make_lines(data, precision='s')
for line in lines:
print(line)
# Write line protocol directly
client = InfluxDBClient()
line_data = make_line('temperature', {'sensor': 'room1'}, {'value': 23.5})
client.write([line_data], protocol='line')
# Batch line protocol creation
def create_sensor_lines(sensor_readings):
lines = []
for reading in sensor_readings:
line = make_line(
measurement='sensor_data',
tags={'sensor_id': reading['id'], 'location': reading['location']},
fields={'temperature': reading['temp'], 'humidity': reading['humidity']},
time=reading['timestamp']
)
lines.append(line)
return lines
# High-performance writing
sensor_data = get_sensor_readings() # Get data
lines = create_sensor_lines(sensor_data)
client.write('\n'.join(lines), protocol='line')Wrapper class for InfluxDB query results providing iteration, filtering, and data extraction capabilities.
class ResultSet:
"""
Wrapper around InfluxDB query results with iteration and filtering capabilities.
"""
def __init__(self, series, raise_errors=True):
"""
Initialize ResultSet from query response.
Parameters:
- series (list): Raw series data from InfluxDB query response
- raise_errors (bool): Raise exceptions on query errors (default: True)
Raises:
InfluxDBClientError: If query errors and raise_errors=True
"""
def get_points(self, measurement=None, tags=None):
"""
Get data points matching optional filters.
Parameters:
- measurement (str): Filter by measurement name (default: None)
- tags (dict): Filter by tag key-value pairs (default: None)
Yields:
dict: Individual data points as dictionaries
Example:
for point in result.get_points(measurement='cpu_usage', tags={'host': 'server01'}):
print(point['time'], point['value'])
"""
def keys(self):
"""
Get list of measurement keys in the result set.
Returns:
list: List of measurement key tuples (name, tags)
"""
def items(self):
"""
Get key-value pairs for all series in result set.
Yields:
tuple: (key, points_generator) pairs
"""
@staticmethod
def point_from_cols_vals(cols, vals):
"""
Create point dictionary from column names and values.
Parameters:
- cols (list): Column names
- vals (list): Column values
Returns:
dict: Point dictionary with column names as keys
"""
def __getitem__(self, key):
"""
Retrieve series by key (deprecated - use get_points instead).
Parameters:
- key: Series key
Returns:
generator: Points for the specified series
"""
def __iter__(self):
"""
Iterate over all points in all series.
Yields:
dict: Individual data points
"""
def __len__(self):
"""
Get number of series in result set.
Returns:
int: Number of series
"""
def __repr__(self):
"""
String representation of ResultSet.
Returns:
str: ResultSet description
"""
# Properties
@property
def raw(self):
"""Raw JSON response from InfluxDB."""
@property
def error(self):
"""Error message from InfluxDB query (if any)."""from influxdb import InfluxDBClient
client = InfluxDBClient(database='metrics')
# Execute query
result = client.query('SELECT * FROM cpu_usage WHERE time >= now() - 1h')
# Basic iteration
for point in result.get_points():
print(f"Time: {point['time']}, Value: {point['value']}")
# Filtered iteration
for point in result.get_points(measurement='cpu_usage', tags={'host': 'server01'}):
print(f"Server01 CPU: {point['value']}%")
# Get all keys
keys = result.keys()
print("Available measurements:", keys)
# Process by series
for key, points in result.items():
measurement, tags = key
print(f"Processing {measurement} with tags {tags}")
for point in points:
# Process each point
pass
# Check for errors
if result.error:
print("Query error:", result.error)
# Access raw response
raw_data = result.raw
print("Raw InfluxDB response:", raw_data)
# Convert to simple list
all_points = list(result.get_points())
print(f"Retrieved {len(all_points)} total points")
# Filter and aggregate
cpu_values = [
point['value'] for point in result.get_points()
if point.get('measurement') == 'cpu_usage'
]
avg_cpu = sum(cpu_values) / len(cpu_values) if cpu_values else 0
print(f"Average CPU usage: {avg_cpu:.2f}%")# Complex query with multiple measurements
query = """
SELECT mean(value) as avg_value, max(value) as max_value
FROM cpu_usage, memory_usage
WHERE time >= now() - 24h
GROUP BY time(1h), host
"""
result = client.query(query)
# Group results by measurement and host
from collections import defaultdict
results_by_measurement = defaultdict(list)
for point in result.get_points():
measurement = point.get('name', 'unknown') # Series name
host = point.get('host', 'unknown')
results_by_measurement[f"{measurement}_{host}"].append({
'time': point['time'],
'avg_value': point.get('avg_value'),
'max_value': point.get('max_value')
})
# Process grouped results
for key, points in results_by_measurement.items():
print(f"Processing {key}: {len(points)} time points")
# Calculate trends
values = [p['avg_value'] for p in points if p['avg_value'] is not None]
if len(values) >= 2:
trend = values[-1] - values[0] # Simple trend calculation
print(f" Trend: {trend:+.2f}")
# Export results to different formats
def export_results(result_set, format='json'):
"""Export ResultSet to various formats."""
points = list(result_set.get_points())
if format == 'json':
import json
return json.dumps(points, default=str)
elif format == 'csv':
import csv, io
if not points:
return ""
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=points[0].keys())
writer.writeheader()
writer.writerows(points)
return output.getvalue()
elif format == 'dataframe':
import pandas as pd
return pd.DataFrame(points)
# Export query results
json_data = export_results(result, 'json')
csv_data = export_results(result, 'csv')
df = export_results(result, 'dataframe')# Use line protocol for maximum write performance
from influxdb.line_protocol import make_lines
# Batch create line protocol
data_points = [
{'measurement': 'metrics', 'tags': {'host': f'server{i:02d}'},
'fields': {'value': i * 10.5}}
for i in range(1000)
]
lines = make_lines(data_points)
line_data = '\n'.join(lines)
client.write(line_data, protocol='line')
# SeriesHelper for structured bulk inserts
class HighThroughputMetrics(SeriesHelper):
class Meta:
series_name = 'high_volume_data'
fields = ['value1', 'value2', 'value3']
tags = ['source', 'type']
bulk_size = 10000 # Large batch size
client = client
# Stream processing pattern
def process_data_stream(data_stream):
for batch in data_stream.batches(size=1000):
for record in batch:
HighThroughputMetrics(
source=record['source'],
type=record['type'],
value1=record['v1'],
value2=record['v2'],
value3=record['v3']
)
# Auto-commits when bulk_size reached
# Memory-efficient result processing
def process_large_query_results(query):
result = client.query(query, chunked=True, chunk_size=10000)
# Process points in chunks to avoid memory issues
chunk_count = 0
for point in result.get_points():
# Process individual point
process_point(point)
chunk_count += 1
if chunk_count % 10000 == 0:
print(f"Processed {chunk_count} points...")Install with Tessl CLI
npx tessl i tessl/pypi-influxdb