CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-influxdb

InfluxDB client library for time series database operations with comprehensive API for data management and querying

Pending
Overview
Eval results
Files

database-operations.mddocs/

Database Operations

Complete database lifecycle management including database creation and deletion, user management with permissions, retention policies for data lifecycle, continuous queries for real-time aggregation, and measurement operations for schema management.

Capabilities

Database Management

Core database operations for creating, listing, and managing InfluxDB databases.

def create_database(self, dbname):
    """
    Create a new database.
    
    Parameters:
    - dbname (str): Name of the database to create
    
    Returns:
    bool: True if successful
    
    Raises:
    InfluxDBClientError: If database creation fails
    """

def drop_database(self, dbname):
    """
    Delete a database and all its data.
    
    Parameters:
    - dbname (str): Name of the database to delete
    
    Returns:
    bool: True if successful
    
    Raises:
    InfluxDBClientError: If database deletion fails
    """

def get_list_database(self):
    """
    Get list of all databases on the InfluxDB server.
    
    Returns:
    list: List of database names
    
    Raises:
    InfluxDBClientError: If unable to retrieve database list
    """

Database Management Examples

from influxdb import InfluxDBClient
from influxdb.exceptions import InfluxDBClientError

client = InfluxDBClient(host='localhost', username='admin', password='admin')

# Create databases
try:
    client.create_database('production_metrics')
    client.create_database('development_metrics')  
    client.create_database('test_data')
    print("Databases created successfully")
except InfluxDBClientError as e:
    print(f"Failed to create database: {e}")

# List all databases
databases = client.get_list_database()
print("Available databases:")
for db in databases:
    print(f"  - {db['name']}")

# Switch to specific database
client.switch_database('production_metrics')

# Clean up test databases
try:
    client.drop_database('test_data')
    print("Test database deleted")
except InfluxDBClientError as e:
    print(f"Failed to delete database: {e}")

# Database existence check example
databases = client.get_list_database()
db_exists = any(db['name'] == 'metrics' for db in databases)

if not db_exists:
    client.create_database('metrics')
    print("Created metrics database")

User Management

Comprehensive user account management with role-based permissions and privilege control.

def create_user(self, username, password, admin=False):
    """
    Create a new user account.
    
    Parameters:
    - username (str): Username for the new account
    - password (str): Password for the new account  
    - admin (bool): Grant admin privileges (default: False)
    
    Returns:
    bool: True if successful
    
    Raises:
    InfluxDBClientError: If user creation fails
    """

def drop_user(self, username):
    """
    Delete a user account.
    
    Parameters:
    - username (str): Username of account to delete
    
    Returns:
    bool: True if successful
    
    Raises:
    InfluxDBClientError: If user deletion fails
    """

def get_list_users(self):
    """
    Get list of all user accounts.
    
    Returns:
    list: List of user dictionaries with user information
    
    Raises:
    InfluxDBClientError: If unable to retrieve user list
    """

def set_user_password(self, username, password):
    """
    Change a user's password.
    
    Parameters:
    - username (str): Username of account to modify
    - password (str): New password
    
    Returns:
    bool: True if successful
    
    Raises:
    InfluxDBClientError: If password change fails
    """

def grant_admin_privileges(self, username):
    """
    Grant administrative privileges to a user.
    
    Parameters:
    - username (str): Username to grant admin privileges
    
    Returns:
    bool: True if successful
    
    Raises:
    InfluxDBClientError: If privilege grant fails
    """

def revoke_admin_privileges(self, username):
    """
    Revoke administrative privileges from a user.
    
    Parameters:
    - username (str): Username to revoke admin privileges
    
    Returns:
    bool: True if successful
    
    Raises:
    InfluxDBClientError: If privilege revocation fails
    """

def grant_privilege(self, privilege, database, username):
    """
    Grant database-specific privilege to a user.
    
    Parameters:
    - privilege (str): Privilege type ('READ', 'WRITE', 'ALL')
    - database (str): Database name for privilege scope
    - username (str): Username to grant privilege
    
    Returns:
    bool: True if successful
    
    Raises:
    InfluxDBClientError: If privilege grant fails
    """

def revoke_privilege(self, privilege, database, username):
    """
    Revoke database-specific privilege from a user.
    
    Parameters:
    - privilege (str): Privilege type ('READ', 'WRITE', 'ALL')
    - database (str): Database name for privilege scope
    - username (str): Username to revoke privilege
    
    Returns:
    bool: True if successful
    
    Raises:
    InfluxDBClientError: If privilege revocation fails
    """

def get_list_privileges(self, username):
    """
    Get list of privileges for a specific user.
    
    Parameters:
    - username (str): Username to query privileges
    
    Returns:
    list: List of privilege dictionaries with database and privilege information
    
    Raises:
    InfluxDBClientError: If unable to retrieve privileges
    """

User Management Examples

# Admin client for user management
admin_client = InfluxDBClient(username='admin', password='admin_password')

# Create users with different roles
try:
    # Create regular users
    admin_client.create_user('developer', 'dev_password', admin=False)
    admin_client.create_user('analyst', 'analyst_password', admin=False)
    
    # Create admin user
    admin_client.create_user('db_admin', 'admin_password', admin=True)
    
    print("Users created successfully")
except InfluxDBClientError as e:
    print(f"User creation failed: {e}")

# List all users
users = admin_client.get_list_users()
print("Current users:")
for user in users:
    print(f"  - {user['user']} (admin: {user['admin']})")

# Grant database-specific privileges
admin_client.grant_privilege('READ', 'production_metrics', 'analyst')
admin_client.grant_privilege('WRITE', 'development_metrics', 'developer')
admin_client.grant_privilege('ALL', 'production_metrics', 'developer')

# Check user privileges
dev_privileges = admin_client.get_list_privileges('developer')
print("Developer privileges:")
for priv in dev_privileges:
    print(f"  - {priv['database']}: {priv['privilege']}")

# Change passwords
admin_client.set_user_password('developer', 'new_secure_password')

# Revoke privileges
admin_client.revoke_privilege('WRITE', 'production_metrics', 'developer')

# Administrative privilege management
admin_client.grant_admin_privileges('senior_developer')
admin_client.revoke_admin_privileges('former_admin')

# Clean up users
admin_client.drop_user('temp_user')

# Bulk user setup example
try:
    username = 'metrics_reader'
    password = 'reader_password'
    databases = ['production_metrics', 'development_metrics']
    privilege = 'READ'
    
    admin_client.create_user(username, password)
    
    for database in databases:
        admin_client.grant_privilege(privilege, database, username)
        
    print(f"User {username} created with {privilege} access to {databases}")
    
except InfluxDBClientError as e:
    print(f"Failed to setup user {username}: {e}")

Retention Policy Management

Data lifecycle management through retention policies that automatically manage data expiration and storage optimization.

def create_retention_policy(self, name, duration, replication, database=None, 
                           default=False, shard_duration="0s"):
    """
    Create a retention policy for automatic data lifecycle management.
    
    Parameters:
    - name (str): Name of the retention policy
    - duration (str): How long to keep data (e.g., '7d', '4w', '52w', 'INF')
    - replication (int): Number of data replicas (usually 1 for single node)
    - database (str): Database name (default: current database)
    - default (bool): Make this the default retention policy (default: False)
    - shard_duration (str): Shard group duration (default: "0s" for auto)
    
    Returns:
    bool: True if successful
    
    Raises:
    InfluxDBClientError: If retention policy creation fails
    """

def alter_retention_policy(self, name, database=None, duration=None, 
                          replication=None, default=None, shard_duration=None):
    """
    Modify an existing retention policy.
    
    Parameters:
    - name (str): Name of retention policy to modify
    - database (str): Database name (default: current database)  
    - duration (str): New duration (default: unchanged)
    - replication (int): New replication factor (default: unchanged)
    - default (bool): Make/unmake default policy (default: unchanged)
    - shard_duration (str): New shard duration (default: unchanged)
    
    Returns:
    bool: True if successful
    
    Raises:
    InfluxDBClientError: If retention policy modification fails
    """

def drop_retention_policy(self, name, database=None):
    """
    Delete a retention policy.
    
    Parameters:
    - name (str): Name of retention policy to delete
    - database (str): Database name (default: current database)
    
    Returns:
    bool: True if successful
    
    Raises:
    InfluxDBClientError: If retention policy deletion fails
    """

def get_list_retention_policies(self, database=None):
    """
    Get list of retention policies for a database.
    
    Parameters:
    - database (str): Database name (default: current database)
    
    Returns:
    list: List of retention policy dictionaries
    
    Raises:
    InfluxDBClientError: If unable to retrieve retention policies
    """

Retention Policy Examples

client = InfluxDBClient(database='production_metrics')

# Create retention policies for different data lifecycles
try:
    # Short-term high-resolution data (1 week)
    client.create_retention_policy(
        name='realtime',
        duration='7d',
        replication=1,
        default=True,  # Make this the default policy
        shard_duration='1h'  # 1-hour shards for recent data
    )
    
    # Medium-term aggregated data (1 year)
    client.create_retention_policy(
        name='historical',
        duration='52w',
        replication=1,
        shard_duration='1d'  # Daily shards for historical data
    )
    
    # Long-term archive (infinite retention)
    client.create_retention_policy(
        name='archive',
        duration='INF',
        replication=1,
        shard_duration='1w'  # Weekly shards for archival data
    )
    
    print("Retention policies created successfully")
    
except InfluxDBClientError as e:
    print(f"Failed to create retention policies: {e}")

# List retention policies
policies = client.get_list_retention_policies()
print("Current retention policies:")
for policy in policies:
    print(f"  - {policy['name']}: {policy['duration']} "
          f"(default: {policy['default']}, shards: {policy['shardGroupDuration']})")

# Modify retention policy
client.alter_retention_policy(
    name='historical',
    duration='26w',  # Reduce to 6 months
    shard_duration='12h'  # Smaller shards
)

# Write data to specific retention policy
points = [
    {
        "measurement": "cpu_usage",
        "tags": {"host": "server01"},
        "fields": {"value": 75.5},
        "time": "2023-09-07T07:18:24Z"
    }
]

# Write to specific retention policy
client.write_points(points, retention_policy='historical')

# Set different default policy
client.alter_retention_policy('historical', default=True)

# Clean up old retention policy
client.drop_retention_policy('old_policy')

# Tiered retention setup example
for database_name in ['production', 'staging', 'development']:
    client.switch_database(database_name)
    
    # Real-time data: 24 hours, high resolution
    client.create_retention_policy(
        'realtime', '24h', 1, default=True, shard_duration='1h'
    )
    
    # Daily aggregates: 30 days
    client.create_retention_policy(
        'daily', '30d', 1, shard_duration='1d'
    )
    
    # Monthly aggregates: 2 years  
    client.create_retention_policy(
        'monthly', '104w', 1, shard_duration='1w'
    )
    
    print(f"Tiered retention setup complete for {database_name}")

Continuous Query Management

Real-time data aggregation and downsampling through continuous queries that automatically process data as it arrives.

def create_continuous_query(self, name, select, database=None, resample_opts=None):
    """
    Create a continuous query for real-time data aggregation.
    
    Parameters:
    - name (str): Name of the continuous query
    - select (str): SELECT statement for the aggregation
    - database (str): Database name (default: current database)
    - resample_opts (dict): Resampling options with 'FOR' and 'EVERY' keys
    
    Returns:
    bool: True if successful
    
    Raises:
    InfluxDBClientError: If continuous query creation fails
    
    Note: resample_opts format: {'FOR': '2m', 'EVERY': '1m'}
    """

def drop_continuous_query(self, name, database=None):
    """
    Delete a continuous query.
    
    Parameters:
    - name (str): Name of continuous query to delete
    - database (str): Database name (default: current database)
    
    Returns:
    bool: True if successful
    
    Raises:
    InfluxDBClientError: If continuous query deletion fails
    """

def get_list_continuous_queries(self):
    """
    Get list of all continuous queries across all databases.
    
    Returns:
    list: List of dictionaries containing continuous query information
    
    Raises:
    InfluxDBClientError: If unable to retrieve continuous queries
    """

Continuous Query Examples

client = InfluxDBClient(database='production_metrics')

# Create continuous queries for automatic downsampling
try:
    # Downsample CPU data to 5-minute averages
    client.create_continuous_query(
        name='cpu_5min_avg',
        select="""
            SELECT mean(value) as avg_cpu, max(value) as max_cpu, min(value) as min_cpu
            INTO "historical"."cpu_5min" 
            FROM "realtime"."cpu_usage" 
            GROUP BY time(5m), host
        """
    )
    
    # Downsample memory data to hourly statistics  
    client.create_continuous_query(
        name='memory_hourly_stats',
        select="""
            SELECT mean(used) as avg_used, max(used) as max_used, 
                   mean(available) as avg_available
            INTO "daily"."memory_hourly"
            FROM "realtime"."memory_usage"
            GROUP BY time(1h), host
        """
    )
    
    # Create alerts based on thresholds
    client.create_continuous_query(
        name='high_cpu_alerts',
        select="""
            SELECT mean(value) as avg_cpu
            INTO "alerts"."high_cpu"
            FROM "realtime"."cpu_usage"
            WHERE value > 80
            GROUP BY time(1m), host
        """
    )
    
    print("Continuous queries created successfully")
    
except InfluxDBClientError as e:
    print(f"Failed to create continuous queries: {e}")

# Create continuous query with resampling options
client.create_continuous_query(
    name='disk_io_resampled',
    select="""
        SELECT sum(read_bytes) as total_read, sum(write_bytes) as total_write
        INTO "daily"."disk_io_hourly"
        FROM "realtime"."disk_io"
        GROUP BY time(1h), host, device
    """,
    resample_opts={
        'FOR': '2h',      # Process data for 2 hours back
        'EVERY': '30m'    # Run every 30 minutes
    }
)

# List all continuous queries
cqs = client.get_list_continuous_queries()
print("Active continuous queries:")
for cq_info in cqs:
    database = cq_info['name']
    for cq in cq_info.get('cqs', []):
        print(f"  Database: {database}")
        print(f"    Name: {cq['name']}")
        print(f"    Query: {cq['query']}")

# Drop continuous query
client.drop_continuous_query('old_downsampling_query')

# Complex continuous query for business metrics
business_analytics_cq = """
    SELECT sum(revenue) as total_revenue, 
           count(transaction_id) as transaction_count,
           mean(order_value) as avg_order_value
    INTO "business"."daily_metrics"
    FROM "realtime"."transactions"
    WHERE status = 'completed'
    GROUP BY time(1d), region, product_category
"""

client.create_continuous_query(
    name='daily_business_analytics',
    select=business_analytics_cq
)

# Standard downsampling pattern example
retention_policies = {
    'realtime': 'realtime',
    'daily': 'daily', 
    'monthly': 'monthly'
}

measurement = 'cpu_usage'
fields = ['value']

# 5-minute averages
fields_select = ", ".join([f"mean({field}) as avg_{field}" for field in fields])
cq_5min = f"""
    SELECT {fields_select}
    INTO "{retention_policies['daily']}"."{measurement}_5min"
    FROM "{retention_policies['realtime']}"."{measurement}"
    GROUP BY time(5m), *
"""

client.create_continuous_query(f"{measurement}_5min_avg", cq_5min)

# Hourly statistics
stats_select = ", ".join([
    f"mean({field}) as avg_{field}, max({field}) as max_{field}, min({field}) as min_{field}"
    for field in fields
])
cq_hourly = f"""
    SELECT {stats_select}
    INTO "{retention_policies['monthly']}"."{measurement}_hourly"
    FROM "{retention_policies['daily']}"."{measurement}_5min"
    GROUP BY time(1h), *
"""

client.create_continuous_query(f"{measurement}_hourly_stats", cq_hourly)

print(f"Standard downsampling created for {measurement}")

Measurement Operations

Schema management operations for working with measurements, series, and data organization.

def get_list_measurements(self):
    """
    Get list of measurements in the current database.
    
    Returns:
    list: List of measurement dictionaries
    
    Raises:
    InfluxDBClientError: If unable to retrieve measurements
    """

def drop_measurement(self, measurement):
    """
    Delete a measurement and all its data.
    
    Parameters:
    - measurement (str): Name of measurement to delete
    
    Returns:
    bool: True if successful
    
    Raises:
    InfluxDBClientError: If measurement deletion fails
    """

def get_list_series(self, database=None, measurement=None, tags=None):
    """
    Get list of series matching optional filters.
    
    Parameters:
    - database (str): Database name (default: current database)
    - measurement (str): Filter by measurement name (default: None)
    - tags (dict): Filter by tag key-value pairs (default: None)
    
    Returns:
    list: List of series information
    
    Raises:
    InfluxDBClientError: If unable to retrieve series
    """

def delete_series(self, database=None, measurement=None, tags=None):
    """
    Delete series matching the specified criteria.
    
    Parameters:
    - database (str): Database name (default: current database)
    - measurement (str): Filter by measurement name (default: None)
    - tags (dict): Filter by tag key-value pairs (default: None)
    
    Returns:
    bool: True if successful
    
    Raises:
    InfluxDBClientError: If series deletion fails
    
    Warning: This operation is destructive and cannot be undone
    """

Measurement Operations Examples

client = InfluxDBClient(database='production_metrics')

# List all measurements
measurements = client.get_list_measurements()
print("Available measurements:")
for measurement in measurements:
    print(f"  - {measurement['name']}")

# Get detailed series information
series_list = client.get_list_series()
print(f"Total series count: {len(series_list)}")

# Filter series by measurement
cpu_series = client.get_list_series(measurement='cpu_usage')
print(f"CPU usage series: {len(cpu_series)}")
for series in cpu_series[:5]:  # Show first 5
    print(f"  - {series}")

# Filter series by tags
server01_series = client.get_list_series(tags={'host': 'server01'})
print(f"Series for server01: {len(server01_series)}")

# Delete specific series (be careful!)
# Delete test data series
client.delete_series(measurement='test_data', tags={'environment': 'test'})

# Drop entire measurement (removes all series and data)
client.drop_measurement('deprecated_measurement')

# Measurement analysis example
measurement_name = 'cpu_usage'
series = client.get_list_series(measurement=measurement_name)

tag_combinations = {}
for series_key in series:
    # Parse series key to extract tags
    # Series format: "measurement,tag1=value1,tag2=value2"
    if ',' in series_key:
        tags_part = series_key.split(',', 1)[1]
        tag_pairs = tags_part.split(',')
        tag_combo = tuple(sorted(tag_pairs))
        tag_combinations[tag_combo] = tag_combinations.get(tag_combo, 0) + 1

print(f"Measurement: {measurement_name}")
print(f"Total series: {len(series)}")
print(f"Unique tag combinations: {len(tag_combinations)}")

# Show most common tag combinations
sorted_combos = sorted(tag_combinations.items(), key=lambda x: x[1], reverse=True)
print("Most common tag combinations:")
for combo, count in sorted_combos[:5]:
    print(f"  {combo}: {count} series")

# Clean up measurements by pattern example
measurements = client.get_list_measurements()
patterns_to_delete = ['test_', 'temp_', 'debug_']

for measurement_info in measurements:
    measurement_name = measurement_info['name']
    
    for pattern in patterns_to_delete:
        if pattern in measurement_name:
            print(f"Deleting measurement: {measurement_name}")
            try:
                client.drop_measurement(measurement_name)
            except InfluxDBClientError as e:
                print(f"Failed to delete {measurement_name}: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-influxdb

docs

client.md

data-management.md

database-operations.md

dataframe-client.md

index.md

legacy.md

tile.json