InfluxDB client library for time series database operations with comprehensive API for data management and querying
—
The DataFrameClient extends InfluxDBClient to provide seamless integration with pandas DataFrames, enabling efficient data analysis workflows and simplified data exchange between InfluxDB and Python data science tools.
Requirement: This client requires the pandas library to be installed.
DataFrameClient inherits all InfluxDBClient functionality while adding DataFrame-specific methods.
class DataFrameClient(InfluxDBClient):
def __init__(self, host='localhost', port=8086, username='root', password='root',
database=None, ssl=False, verify_ssl=False, timeout=None, retries=3,
use_udp=False, udp_port=4444, proxies=None, pool_size=10, path='',
cert=None, gzip=False, session=None, headers=None, socket_options=None):
"""
Initialize DataFrame client with same parameters as InfluxDBClient.
Raises:
ImportError: If pandas is not installed
"""from influxdb import DataFrameClient
import pandas as pd
# Create DataFrame client
client = DataFrameClient(host='localhost', port=8086, database='mydb')
# Verify pandas is available
print("DataFrame client ready for pandas integration")Write pandas DataFrames directly to InfluxDB with flexible column mapping and type handling.
def write_points(self, dataframe, measurement, tags=None, tag_columns=None,
field_columns=None, time_precision=None, database=None,
retention_policy=None, batch_size=None, protocol='line',
numeric_precision=None):
"""
Write pandas DataFrame to InfluxDB.
Parameters:
- dataframe (pandas.DataFrame): Data to write
- measurement (str): Measurement name
- tags (dict): Global tags for all points (default: None)
- tag_columns (list): DataFrame columns to use as tags (default: None)
- field_columns (list): DataFrame columns to use as fields (default: None)
- time_precision (str): Time precision ('s', 'ms', 'u', 'ns') (default: None)
- database (str): Database name override (default: None)
- retention_policy (str): Retention policy name (default: None)
- batch_size (int): Points per batch (default: None)
- protocol (str): Write protocol ('line' recommended) (default: 'line')
- numeric_precision (int): Decimal precision for floats (default: None)
Returns:
bool: True if successful
Raises:
InfluxDBClientError: On write errors
TypeError: If dataframe is not a pandas DataFrame
"""import pandas as pd
from datetime import datetime, timezone
# Create sample DataFrame
df = pd.DataFrame({
'timestamp': [
datetime(2023, 9, 7, 7, 0, 0, tzinfo=timezone.utc),
datetime(2023, 9, 7, 7, 1, 0, tzinfo=timezone.utc),
datetime(2023, 9, 7, 7, 2, 0, tzinfo=timezone.utc)
],
'host': ['server01', 'server01', 'server02'],
'region': ['us-west', 'us-west', 'us-east'],
'cpu_usage': [65.2, 70.1, 45.8],
'memory_usage': [78.5, 82.3, 56.7],
'disk_io': [1200, 1450, 890]
})
# Set timestamp as index
df.set_index('timestamp', inplace=True)
# Write DataFrame with automatic field detection
client.write_points(
dataframe=df,
measurement='system_metrics',
tag_columns=['host', 'region'] # These columns become tags
# cpu_usage, memory_usage, disk_io automatically become fields
)
# Write with explicit field selection
client.write_points(
dataframe=df,
measurement='cpu_metrics',
tag_columns=['host', 'region'],
field_columns=['cpu_usage'] # Only cpu_usage as field
)
# Write with global tags
client.write_points(
dataframe=df,
measurement='system_metrics',
tags={'environment': 'production'}, # Added to all points
tag_columns=['host'],
field_columns=['cpu_usage', 'memory_usage']
)
# Batch writing for large DataFrames
large_df = pd.DataFrame(...) # Large dataset
client.write_points(
dataframe=large_df,
measurement='bulk_data',
batch_size=10000
)Execute InfluxQL queries and receive results as pandas DataFrames for immediate analysis.
def query(self, query, params=None, bind_params=None, epoch=None,
expected_response_code=200, database=None, raise_errors=True,
chunked=False, chunk_size=0, method="GET", dropna=True,
data_frame_index=None):
"""
Query InfluxDB and return results as pandas DataFrame.
Parameters:
- query (str): InfluxQL query string
- params (dict): URL parameters (default: None)
- bind_params (dict): Query parameter bindings (default: None)
- epoch (str): Time precision ('s', 'ms', 'u', 'ns') (default: None)
- expected_response_code (int): Expected HTTP status (default: 200)
- database (str): Database name override (default: None)
- raise_errors (bool): Raise exceptions on query errors (default: True)
- chunked (bool): Enable chunked responses (default: False)
- chunk_size (int): Chunk size for chunked responses (default: 0)
- method (str): HTTP method ('GET' or 'POST') (default: 'GET')
- dropna (bool): Drop rows with NaN values (default: True)
- data_frame_index (str): Column to use as DataFrame index (default: None)
Returns:
dict: Dictionary mapping measurement names to pandas DataFrames
Raises:
InfluxDBClientError: On query errors
"""# Basic query returning DataFrame
result = client.query('SELECT * FROM cpu_usage WHERE time >= now() - 1h')
# result is a dict: {'cpu_usage': DataFrame}
df = result['cpu_usage']
print(df.head())
print(df.describe())
# Query with time grouping
result = client.query("""
SELECT mean(value) as avg_cpu, max(value) as max_cpu
FROM cpu_usage
WHERE time >= now() - 24h
GROUP BY time(1h), host
""")
df = result['cpu_usage']
# DataFrame with time-based grouping
# Multiple measurements in one query
result = client.query("""
SELECT * FROM cpu_usage;
SELECT * FROM memory_usage
""")
cpu_df = result['cpu_usage']
memory_df = result['memory_usage']
# Set custom DataFrame index
result = client.query(
'SELECT * FROM system_metrics ORDER BY time DESC LIMIT 1000',
data_frame_index='time' # Use time column as index
)
df = result['system_metrics']
# DataFrame indexed by time for time series analysis
# Query with parameters
result = client.query(
'SELECT * FROM metrics WHERE host = $host AND time >= $start_time',
bind_params={
'host': 'server01',
'start_time': '2023-09-07T00:00:00Z'
}
)The DataFrame integration enables seamless use of pandas time series analysis tools.
# Query data for analysis
result = client.query("""
SELECT mean(cpu_usage) as cpu, mean(memory_usage) as memory
FROM system_metrics
WHERE time >= now() - 7d
GROUP BY time(1h)
""")
df = result['system_metrics']
# Time series analysis with pandas
df['time'] = pd.to_datetime(df['time'])
df.set_index('time', inplace=True)
# Rolling averages
df['cpu_rolling_mean'] = df['cpu'].rolling(window=6).mean() # 6-hour window
df['memory_rolling_mean'] = df['memory'].rolling(window=6).mean()
# Resampling
daily_avg = df.resample('D').mean()
hourly_max = df.resample('H').max()
# Statistical analysis
correlation = df['cpu'].corr(df['memory'])
cpu_stats = df['cpu'].describe()
# Plotting with matplotlib
import matplotlib.pyplot as plt
df[['cpu', 'cpu_rolling_mean']].plot(figsize=(12, 6))
plt.title('CPU Usage Over Time')
plt.ylabel('CPU Usage (%)')
plt.show()Handle data type mapping between InfluxDB and pandas effectively.
# Specify data types when writing
df = pd.DataFrame({
'time': pd.date_range('2023-09-07', periods=100, freq='1min'),
'sensor_id': ['sensor_' + str(i % 10) for i in range(100)],
'temperature': np.random.normal(25.0, 2.0, 100),
'humidity': np.random.normal(60.0, 10.0, 100),
'is_active': [True] * 50 + [False] * 50
})
df.set_index('time', inplace=True)
# Write with type preservation
client.write_points(
dataframe=df,
measurement='sensor_data',
tag_columns=['sensor_id'], # String tags
field_columns=['temperature', 'humidity', 'is_active'], # Mixed field types
numeric_precision=2 # Round floats to 2 decimal places
)
# Query back with proper types
result = client.query('SELECT * FROM sensor_data LIMIT 10')
df_result = result['sensor_data']
# Verify data types
print(df_result.dtypes)
print(df_result['is_active'].unique()) # Boolean values preservedDataFrameClient inherits all InfluxDBClient error handling plus pandas-specific errors.
from influxdb import DataFrameClient
from influxdb.exceptions import InfluxDBClientError
try:
client = DataFrameClient()
# This will raise ImportError if pandas not installed
result = client.query('SELECT * FROM measurement')
except ImportError as e:
print("pandas is required for DataFrameClient:", e)
except InfluxDBClientError as e:
print("InfluxDB error:", e)
# Validate DataFrame before writing
def safe_write_dataframe(client, df, measurement):
if not isinstance(df, pd.DataFrame):
raise TypeError("Input must be a pandas DataFrame")
if df.empty:
print("Warning: DataFrame is empty, skipping write")
return
try:
client.write_points(df, measurement=measurement)
print(f"Successfully wrote {len(df)} points to {measurement}")
except Exception as e:
print(f"Failed to write DataFrame: {e}")# Use line protocol for better performance
client.write_points(df, measurement='metrics', protocol='line')
# Batch large DataFrames
large_df = pd.read_csv('large_dataset.csv')
client.write_points(large_df, measurement='bulk_data', batch_size=10000)
# Pre-process DataFrames for efficiency
df['timestamp'] = pd.to_datetime(df['timestamp'])
df.set_index('timestamp', inplace=True)
df = df.sort_index() # Pre-sort for better write performance
# Use appropriate data types
df = df.astype({
'sensor_id': 'category', # Use category for repeated strings
'value': 'float32' # Use float32 if precision allows
})# Process large datasets in chunks
def write_large_csv(client, filepath, measurement, chunk_size=10000):
for chunk in pd.read_csv(filepath, chunksize=chunk_size):
# Process chunk
chunk['timestamp'] = pd.to_datetime(chunk['timestamp'])
chunk.set_index('timestamp', inplace=True)
# Write chunk
client.write_points(
dataframe=chunk,
measurement=measurement,
protocol='line'
)
print(f"Processed {len(chunk)} records")
# Use context manager for automatic cleanup
with DataFrameClient(database='large_db') as client:
write_large_csv(client, 'massive_dataset.csv', 'sensor_readings')Install with Tessl CLI
npx tessl i tessl/pypi-influxdb