CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-influxdb

InfluxDB client library for time series database operations with comprehensive API for data management and querying

Pending
Overview
Eval results
Files

dataframe-client.mddocs/

DataFrame Client

The DataFrameClient extends InfluxDBClient to provide seamless integration with pandas DataFrames, enabling efficient data analysis workflows and simplified data exchange between InfluxDB and Python data science tools.

Requirement: This client requires the pandas library to be installed.

Capabilities

Client Initialization

DataFrameClient inherits all InfluxDBClient functionality while adding DataFrame-specific methods.

class DataFrameClient(InfluxDBClient):
    def __init__(self, host='localhost', port=8086, username='root', password='root', 
                 database=None, ssl=False, verify_ssl=False, timeout=None, retries=3,
                 use_udp=False, udp_port=4444, proxies=None, pool_size=10, path='',
                 cert=None, gzip=False, session=None, headers=None, socket_options=None):
        """
        Initialize DataFrame client with same parameters as InfluxDBClient.
        
        Raises:
        ImportError: If pandas is not installed
        """

Usage Example

from influxdb import DataFrameClient
import pandas as pd

# Create DataFrame client
client = DataFrameClient(host='localhost', port=8086, database='mydb')

# Verify pandas is available
print("DataFrame client ready for pandas integration")

DataFrame Writing

Write pandas DataFrames directly to InfluxDB with flexible column mapping and type handling.

def write_points(self, dataframe, measurement, tags=None, tag_columns=None, 
                 field_columns=None, time_precision=None, database=None, 
                 retention_policy=None, batch_size=None, protocol='line', 
                 numeric_precision=None):
    """
    Write pandas DataFrame to InfluxDB.
    
    Parameters:
    - dataframe (pandas.DataFrame): Data to write
    - measurement (str): Measurement name
    - tags (dict): Global tags for all points (default: None)
    - tag_columns (list): DataFrame columns to use as tags (default: None)
    - field_columns (list): DataFrame columns to use as fields (default: None)
    - time_precision (str): Time precision ('s', 'ms', 'u', 'ns') (default: None)
    - database (str): Database name override (default: None)
    - retention_policy (str): Retention policy name (default: None)
    - batch_size (int): Points per batch (default: None)
    - protocol (str): Write protocol ('line' recommended) (default: 'line')
    - numeric_precision (int): Decimal precision for floats (default: None)
    
    Returns:
    bool: True if successful
    
    Raises:
    InfluxDBClientError: On write errors
    TypeError: If dataframe is not a pandas DataFrame
    """

DataFrame Writing Examples

import pandas as pd
from datetime import datetime, timezone

# Create sample DataFrame
df = pd.DataFrame({
    'timestamp': [
        datetime(2023, 9, 7, 7, 0, 0, tzinfo=timezone.utc),
        datetime(2023, 9, 7, 7, 1, 0, tzinfo=timezone.utc),
        datetime(2023, 9, 7, 7, 2, 0, tzinfo=timezone.utc)
    ],
    'host': ['server01', 'server01', 'server02'],
    'region': ['us-west', 'us-west', 'us-east'],
    'cpu_usage': [65.2, 70.1, 45.8],
    'memory_usage': [78.5, 82.3, 56.7],
    'disk_io': [1200, 1450, 890]
})

# Set timestamp as index
df.set_index('timestamp', inplace=True)

# Write DataFrame with automatic field detection
client.write_points(
    dataframe=df,
    measurement='system_metrics',
    tag_columns=['host', 'region']  # These columns become tags
    # cpu_usage, memory_usage, disk_io automatically become fields
)

# Write with explicit field selection
client.write_points(
    dataframe=df,
    measurement='cpu_metrics',
    tag_columns=['host', 'region'],  
    field_columns=['cpu_usage']  # Only cpu_usage as field
)

# Write with global tags
client.write_points(
    dataframe=df,
    measurement='system_metrics',
    tags={'environment': 'production'},  # Added to all points
    tag_columns=['host'],
    field_columns=['cpu_usage', 'memory_usage']
)

# Batch writing for large DataFrames
large_df = pd.DataFrame(...)  # Large dataset
client.write_points(
    dataframe=large_df,
    measurement='bulk_data',
    batch_size=10000
)

DataFrame Querying

Execute InfluxQL queries and receive results as pandas DataFrames for immediate analysis.

def query(self, query, params=None, bind_params=None, epoch=None, 
          expected_response_code=200, database=None, raise_errors=True, 
          chunked=False, chunk_size=0, method="GET", dropna=True, 
          data_frame_index=None):
    """
    Query InfluxDB and return results as pandas DataFrame.
    
    Parameters:
    - query (str): InfluxQL query string
    - params (dict): URL parameters (default: None)
    - bind_params (dict): Query parameter bindings (default: None)
    - epoch (str): Time precision ('s', 'ms', 'u', 'ns') (default: None)
    - expected_response_code (int): Expected HTTP status (default: 200)
    - database (str): Database name override (default: None)
    - raise_errors (bool): Raise exceptions on query errors (default: True)
    - chunked (bool): Enable chunked responses (default: False)
    - chunk_size (int): Chunk size for chunked responses (default: 0)
    - method (str): HTTP method ('GET' or 'POST') (default: 'GET')
    - dropna (bool): Drop rows with NaN values (default: True)
    - data_frame_index (str): Column to use as DataFrame index (default: None)
    
    Returns:
    dict: Dictionary mapping measurement names to pandas DataFrames
    
    Raises:
    InfluxDBClientError: On query errors
    """

DataFrame Querying Examples

# Basic query returning DataFrame
result = client.query('SELECT * FROM cpu_usage WHERE time >= now() - 1h')

# result is a dict: {'cpu_usage': DataFrame}
df = result['cpu_usage']
print(df.head())
print(df.describe())

# Query with time grouping
result = client.query("""
    SELECT mean(value) as avg_cpu, max(value) as max_cpu
    FROM cpu_usage 
    WHERE time >= now() - 24h 
    GROUP BY time(1h), host
""")

df = result['cpu_usage']
# DataFrame with time-based grouping

# Multiple measurements in one query
result = client.query("""
    SELECT * FROM cpu_usage; 
    SELECT * FROM memory_usage
""")

cpu_df = result['cpu_usage']
memory_df = result['memory_usage']

# Set custom DataFrame index
result = client.query(
    'SELECT * FROM system_metrics ORDER BY time DESC LIMIT 1000',
    data_frame_index='time'  # Use time column as index
)

df = result['system_metrics']
# DataFrame indexed by time for time series analysis

# Query with parameters
result = client.query(
    'SELECT * FROM metrics WHERE host = $host AND time >= $start_time',
    bind_params={
        'host': 'server01',
        'start_time': '2023-09-07T00:00:00Z'
    }
)

Time Series Analysis Integration

The DataFrame integration enables seamless use of pandas time series analysis tools.

Analysis Examples

# Query data for analysis
result = client.query("""
    SELECT mean(cpu_usage) as cpu, mean(memory_usage) as memory
    FROM system_metrics 
    WHERE time >= now() - 7d 
    GROUP BY time(1h)
""")

df = result['system_metrics']

# Time series analysis with pandas
df['time'] = pd.to_datetime(df['time'])
df.set_index('time', inplace=True)

# Rolling averages
df['cpu_rolling_mean'] = df['cpu'].rolling(window=6).mean()  # 6-hour window
df['memory_rolling_mean'] = df['memory'].rolling(window=6).mean()

# Resampling
daily_avg = df.resample('D').mean()
hourly_max = df.resample('H').max()

# Statistical analysis
correlation = df['cpu'].corr(df['memory'])
cpu_stats = df['cpu'].describe()

# Plotting with matplotlib
import matplotlib.pyplot as plt

df[['cpu', 'cpu_rolling_mean']].plot(figsize=(12, 6))
plt.title('CPU Usage Over Time')
plt.ylabel('CPU Usage (%)')
plt.show()

DataFrame Data Types and Conversion

Handle data type mapping between InfluxDB and pandas effectively.

# Specify data types when writing
df = pd.DataFrame({
    'time': pd.date_range('2023-09-07', periods=100, freq='1min'),
    'sensor_id': ['sensor_' + str(i % 10) for i in range(100)],
    'temperature': np.random.normal(25.0, 2.0, 100),
    'humidity': np.random.normal(60.0, 10.0, 100),
    'is_active': [True] * 50 + [False] * 50
})

df.set_index('time', inplace=True)

# Write with type preservation
client.write_points(
    dataframe=df,
    measurement='sensor_data',
    tag_columns=['sensor_id'],  # String tags
    field_columns=['temperature', 'humidity', 'is_active'],  # Mixed field types
    numeric_precision=2  # Round floats to 2 decimal places
)

# Query back with proper types
result = client.query('SELECT * FROM sensor_data LIMIT 10')
df_result = result['sensor_data']

# Verify data types
print(df_result.dtypes)
print(df_result['is_active'].unique())  # Boolean values preserved

Error Handling

DataFrameClient inherits all InfluxDBClient error handling plus pandas-specific errors.

from influxdb import DataFrameClient
from influxdb.exceptions import InfluxDBClientError

try:
    client = DataFrameClient()
    
    # This will raise ImportError if pandas not installed
    result = client.query('SELECT * FROM measurement')
    
except ImportError as e:
    print("pandas is required for DataFrameClient:", e)
    
except InfluxDBClientError as e:
    print("InfluxDB error:", e)

# Validate DataFrame before writing
def safe_write_dataframe(client, df, measurement):
    if not isinstance(df, pd.DataFrame):
        raise TypeError("Input must be a pandas DataFrame")
    
    if df.empty:
        print("Warning: DataFrame is empty, skipping write")
        return
    
    try:
        client.write_points(df, measurement=measurement)
        print(f"Successfully wrote {len(df)} points to {measurement}")
    except Exception as e:
        print(f"Failed to write DataFrame: {e}")

Performance Tips

Optimize DataFrame Operations

# Use line protocol for better performance
client.write_points(df, measurement='metrics', protocol='line')

# Batch large DataFrames
large_df = pd.read_csv('large_dataset.csv')
client.write_points(large_df, measurement='bulk_data', batch_size=10000)

# Pre-process DataFrames for efficiency
df['timestamp'] = pd.to_datetime(df['timestamp'])
df.set_index('timestamp', inplace=True)
df = df.sort_index()  # Pre-sort for better write performance

# Use appropriate data types
df = df.astype({
    'sensor_id': 'category',  # Use category for repeated strings
    'value': 'float32'        # Use float32 if precision allows
})

Memory Management

# Process large datasets in chunks
def write_large_csv(client, filepath, measurement, chunk_size=10000):
    for chunk in pd.read_csv(filepath, chunksize=chunk_size):
        # Process chunk
        chunk['timestamp'] = pd.to_datetime(chunk['timestamp'])
        chunk.set_index('timestamp', inplace=True)
        
        # Write chunk
        client.write_points(
            dataframe=chunk,
            measurement=measurement,
            protocol='line'
        )
        
        print(f"Processed {len(chunk)} records")

# Use context manager for automatic cleanup
with DataFrameClient(database='large_db') as client:
    write_large_csv(client, 'massive_dataset.csv', 'sensor_readings')

Install with Tessl CLI

npx tessl i tessl/pypi-influxdb

docs

client.md

data-management.md

database-operations.md

dataframe-client.md

index.md

legacy.md

tile.json