InfluxDB client library for time series database operations with comprehensive API for data management and querying
—
Complete database lifecycle management including database creation and deletion, user management with permissions, retention policies for data lifecycle, continuous queries for real-time aggregation, and measurement operations for schema management.
Core database operations for creating, listing, and managing InfluxDB databases.
def create_database(self, dbname):
"""
Create a new database.
Parameters:
- dbname (str): Name of the database to create
Returns:
bool: True if successful
Raises:
InfluxDBClientError: If database creation fails
"""
def drop_database(self, dbname):
"""
Delete a database and all its data.
Parameters:
- dbname (str): Name of the database to delete
Returns:
bool: True if successful
Raises:
InfluxDBClientError: If database deletion fails
"""
def get_list_database(self):
"""
Get list of all databases on the InfluxDB server.
Returns:
list: List of database names
Raises:
InfluxDBClientError: If unable to retrieve database list
"""from influxdb import InfluxDBClient
from influxdb.exceptions import InfluxDBClientError
client = InfluxDBClient(host='localhost', username='admin', password='admin')
# Create databases
try:
client.create_database('production_metrics')
client.create_database('development_metrics')
client.create_database('test_data')
print("Databases created successfully")
except InfluxDBClientError as e:
print(f"Failed to create database: {e}")
# List all databases
databases = client.get_list_database()
print("Available databases:")
for db in databases:
print(f" - {db['name']}")
# Switch to specific database
client.switch_database('production_metrics')
# Clean up test databases
try:
client.drop_database('test_data')
print("Test database deleted")
except InfluxDBClientError as e:
print(f"Failed to delete database: {e}")
# Database existence check example
databases = client.get_list_database()
db_exists = any(db['name'] == 'metrics' for db in databases)
if not db_exists:
client.create_database('metrics')
print("Created metrics database")Comprehensive user account management with role-based permissions and privilege control.
def create_user(self, username, password, admin=False):
"""
Create a new user account.
Parameters:
- username (str): Username for the new account
- password (str): Password for the new account
- admin (bool): Grant admin privileges (default: False)
Returns:
bool: True if successful
Raises:
InfluxDBClientError: If user creation fails
"""
def drop_user(self, username):
"""
Delete a user account.
Parameters:
- username (str): Username of account to delete
Returns:
bool: True if successful
Raises:
InfluxDBClientError: If user deletion fails
"""
def get_list_users(self):
"""
Get list of all user accounts.
Returns:
list: List of user dictionaries with user information
Raises:
InfluxDBClientError: If unable to retrieve user list
"""
def set_user_password(self, username, password):
"""
Change a user's password.
Parameters:
- username (str): Username of account to modify
- password (str): New password
Returns:
bool: True if successful
Raises:
InfluxDBClientError: If password change fails
"""
def grant_admin_privileges(self, username):
"""
Grant administrative privileges to a user.
Parameters:
- username (str): Username to grant admin privileges
Returns:
bool: True if successful
Raises:
InfluxDBClientError: If privilege grant fails
"""
def revoke_admin_privileges(self, username):
"""
Revoke administrative privileges from a user.
Parameters:
- username (str): Username to revoke admin privileges
Returns:
bool: True if successful
Raises:
InfluxDBClientError: If privilege revocation fails
"""
def grant_privilege(self, privilege, database, username):
"""
Grant database-specific privilege to a user.
Parameters:
- privilege (str): Privilege type ('READ', 'WRITE', 'ALL')
- database (str): Database name for privilege scope
- username (str): Username to grant privilege
Returns:
bool: True if successful
Raises:
InfluxDBClientError: If privilege grant fails
"""
def revoke_privilege(self, privilege, database, username):
"""
Revoke database-specific privilege from a user.
Parameters:
- privilege (str): Privilege type ('READ', 'WRITE', 'ALL')
- database (str): Database name for privilege scope
- username (str): Username to revoke privilege
Returns:
bool: True if successful
Raises:
InfluxDBClientError: If privilege revocation fails
"""
def get_list_privileges(self, username):
"""
Get list of privileges for a specific user.
Parameters:
- username (str): Username to query privileges
Returns:
list: List of privilege dictionaries with database and privilege information
Raises:
InfluxDBClientError: If unable to retrieve privileges
"""# Admin client for user management
admin_client = InfluxDBClient(username='admin', password='admin_password')
# Create users with different roles
try:
# Create regular users
admin_client.create_user('developer', 'dev_password', admin=False)
admin_client.create_user('analyst', 'analyst_password', admin=False)
# Create admin user
admin_client.create_user('db_admin', 'admin_password', admin=True)
print("Users created successfully")
except InfluxDBClientError as e:
print(f"User creation failed: {e}")
# List all users
users = admin_client.get_list_users()
print("Current users:")
for user in users:
print(f" - {user['user']} (admin: {user['admin']})")
# Grant database-specific privileges
admin_client.grant_privilege('READ', 'production_metrics', 'analyst')
admin_client.grant_privilege('WRITE', 'development_metrics', 'developer')
admin_client.grant_privilege('ALL', 'production_metrics', 'developer')
# Check user privileges
dev_privileges = admin_client.get_list_privileges('developer')
print("Developer privileges:")
for priv in dev_privileges:
print(f" - {priv['database']}: {priv['privilege']}")
# Change passwords
admin_client.set_user_password('developer', 'new_secure_password')
# Revoke privileges
admin_client.revoke_privilege('WRITE', 'production_metrics', 'developer')
# Administrative privilege management
admin_client.grant_admin_privileges('senior_developer')
admin_client.revoke_admin_privileges('former_admin')
# Clean up users
admin_client.drop_user('temp_user')
# Bulk user setup example
try:
username = 'metrics_reader'
password = 'reader_password'
databases = ['production_metrics', 'development_metrics']
privilege = 'READ'
admin_client.create_user(username, password)
for database in databases:
admin_client.grant_privilege(privilege, database, username)
print(f"User {username} created with {privilege} access to {databases}")
except InfluxDBClientError as e:
print(f"Failed to setup user {username}: {e}")Data lifecycle management through retention policies that automatically manage data expiration and storage optimization.
def create_retention_policy(self, name, duration, replication, database=None,
default=False, shard_duration="0s"):
"""
Create a retention policy for automatic data lifecycle management.
Parameters:
- name (str): Name of the retention policy
- duration (str): How long to keep data (e.g., '7d', '4w', '52w', 'INF')
- replication (int): Number of data replicas (usually 1 for single node)
- database (str): Database name (default: current database)
- default (bool): Make this the default retention policy (default: False)
- shard_duration (str): Shard group duration (default: "0s" for auto)
Returns:
bool: True if successful
Raises:
InfluxDBClientError: If retention policy creation fails
"""
def alter_retention_policy(self, name, database=None, duration=None,
replication=None, default=None, shard_duration=None):
"""
Modify an existing retention policy.
Parameters:
- name (str): Name of retention policy to modify
- database (str): Database name (default: current database)
- duration (str): New duration (default: unchanged)
- replication (int): New replication factor (default: unchanged)
- default (bool): Make/unmake default policy (default: unchanged)
- shard_duration (str): New shard duration (default: unchanged)
Returns:
bool: True if successful
Raises:
InfluxDBClientError: If retention policy modification fails
"""
def drop_retention_policy(self, name, database=None):
"""
Delete a retention policy.
Parameters:
- name (str): Name of retention policy to delete
- database (str): Database name (default: current database)
Returns:
bool: True if successful
Raises:
InfluxDBClientError: If retention policy deletion fails
"""
def get_list_retention_policies(self, database=None):
"""
Get list of retention policies for a database.
Parameters:
- database (str): Database name (default: current database)
Returns:
list: List of retention policy dictionaries
Raises:
InfluxDBClientError: If unable to retrieve retention policies
"""client = InfluxDBClient(database='production_metrics')
# Create retention policies for different data lifecycles
try:
# Short-term high-resolution data (1 week)
client.create_retention_policy(
name='realtime',
duration='7d',
replication=1,
default=True, # Make this the default policy
shard_duration='1h' # 1-hour shards for recent data
)
# Medium-term aggregated data (1 year)
client.create_retention_policy(
name='historical',
duration='52w',
replication=1,
shard_duration='1d' # Daily shards for historical data
)
# Long-term archive (infinite retention)
client.create_retention_policy(
name='archive',
duration='INF',
replication=1,
shard_duration='1w' # Weekly shards for archival data
)
print("Retention policies created successfully")
except InfluxDBClientError as e:
print(f"Failed to create retention policies: {e}")
# List retention policies
policies = client.get_list_retention_policies()
print("Current retention policies:")
for policy in policies:
print(f" - {policy['name']}: {policy['duration']} "
f"(default: {policy['default']}, shards: {policy['shardGroupDuration']})")
# Modify retention policy
client.alter_retention_policy(
name='historical',
duration='26w', # Reduce to 6 months
shard_duration='12h' # Smaller shards
)
# Write data to specific retention policy
points = [
{
"measurement": "cpu_usage",
"tags": {"host": "server01"},
"fields": {"value": 75.5},
"time": "2023-09-07T07:18:24Z"
}
]
# Write to specific retention policy
client.write_points(points, retention_policy='historical')
# Set different default policy
client.alter_retention_policy('historical', default=True)
# Clean up old retention policy
client.drop_retention_policy('old_policy')
# Tiered retention setup example
for database_name in ['production', 'staging', 'development']:
client.switch_database(database_name)
# Real-time data: 24 hours, high resolution
client.create_retention_policy(
'realtime', '24h', 1, default=True, shard_duration='1h'
)
# Daily aggregates: 30 days
client.create_retention_policy(
'daily', '30d', 1, shard_duration='1d'
)
# Monthly aggregates: 2 years
client.create_retention_policy(
'monthly', '104w', 1, shard_duration='1w'
)
print(f"Tiered retention setup complete for {database_name}")Real-time data aggregation and downsampling through continuous queries that automatically process data as it arrives.
def create_continuous_query(self, name, select, database=None, resample_opts=None):
"""
Create a continuous query for real-time data aggregation.
Parameters:
- name (str): Name of the continuous query
- select (str): SELECT statement for the aggregation
- database (str): Database name (default: current database)
- resample_opts (dict): Resampling options with 'FOR' and 'EVERY' keys
Returns:
bool: True if successful
Raises:
InfluxDBClientError: If continuous query creation fails
Note: resample_opts format: {'FOR': '2m', 'EVERY': '1m'}
"""
def drop_continuous_query(self, name, database=None):
"""
Delete a continuous query.
Parameters:
- name (str): Name of continuous query to delete
- database (str): Database name (default: current database)
Returns:
bool: True if successful
Raises:
InfluxDBClientError: If continuous query deletion fails
"""
def get_list_continuous_queries(self):
"""
Get list of all continuous queries across all databases.
Returns:
list: List of dictionaries containing continuous query information
Raises:
InfluxDBClientError: If unable to retrieve continuous queries
"""client = InfluxDBClient(database='production_metrics')
# Create continuous queries for automatic downsampling
try:
# Downsample CPU data to 5-minute averages
client.create_continuous_query(
name='cpu_5min_avg',
select="""
SELECT mean(value) as avg_cpu, max(value) as max_cpu, min(value) as min_cpu
INTO "historical"."cpu_5min"
FROM "realtime"."cpu_usage"
GROUP BY time(5m), host
"""
)
# Downsample memory data to hourly statistics
client.create_continuous_query(
name='memory_hourly_stats',
select="""
SELECT mean(used) as avg_used, max(used) as max_used,
mean(available) as avg_available
INTO "daily"."memory_hourly"
FROM "realtime"."memory_usage"
GROUP BY time(1h), host
"""
)
# Create alerts based on thresholds
client.create_continuous_query(
name='high_cpu_alerts',
select="""
SELECT mean(value) as avg_cpu
INTO "alerts"."high_cpu"
FROM "realtime"."cpu_usage"
WHERE value > 80
GROUP BY time(1m), host
"""
)
print("Continuous queries created successfully")
except InfluxDBClientError as e:
print(f"Failed to create continuous queries: {e}")
# Create continuous query with resampling options
client.create_continuous_query(
name='disk_io_resampled',
select="""
SELECT sum(read_bytes) as total_read, sum(write_bytes) as total_write
INTO "daily"."disk_io_hourly"
FROM "realtime"."disk_io"
GROUP BY time(1h), host, device
""",
resample_opts={
'FOR': '2h', # Process data for 2 hours back
'EVERY': '30m' # Run every 30 minutes
}
)
# List all continuous queries
cqs = client.get_list_continuous_queries()
print("Active continuous queries:")
for cq_info in cqs:
database = cq_info['name']
for cq in cq_info.get('cqs', []):
print(f" Database: {database}")
print(f" Name: {cq['name']}")
print(f" Query: {cq['query']}")
# Drop continuous query
client.drop_continuous_query('old_downsampling_query')
# Complex continuous query for business metrics
business_analytics_cq = """
SELECT sum(revenue) as total_revenue,
count(transaction_id) as transaction_count,
mean(order_value) as avg_order_value
INTO "business"."daily_metrics"
FROM "realtime"."transactions"
WHERE status = 'completed'
GROUP BY time(1d), region, product_category
"""
client.create_continuous_query(
name='daily_business_analytics',
select=business_analytics_cq
)
# Standard downsampling pattern example
retention_policies = {
'realtime': 'realtime',
'daily': 'daily',
'monthly': 'monthly'
}
measurement = 'cpu_usage'
fields = ['value']
# 5-minute averages
fields_select = ", ".join([f"mean({field}) as avg_{field}" for field in fields])
cq_5min = f"""
SELECT {fields_select}
INTO "{retention_policies['daily']}"."{measurement}_5min"
FROM "{retention_policies['realtime']}"."{measurement}"
GROUP BY time(5m), *
"""
client.create_continuous_query(f"{measurement}_5min_avg", cq_5min)
# Hourly statistics
stats_select = ", ".join([
f"mean({field}) as avg_{field}, max({field}) as max_{field}, min({field}) as min_{field}"
for field in fields
])
cq_hourly = f"""
SELECT {stats_select}
INTO "{retention_policies['monthly']}"."{measurement}_hourly"
FROM "{retention_policies['daily']}"."{measurement}_5min"
GROUP BY time(1h), *
"""
client.create_continuous_query(f"{measurement}_hourly_stats", cq_hourly)
print(f"Standard downsampling created for {measurement}")Schema management operations for working with measurements, series, and data organization.
def get_list_measurements(self):
"""
Get list of measurements in the current database.
Returns:
list: List of measurement dictionaries
Raises:
InfluxDBClientError: If unable to retrieve measurements
"""
def drop_measurement(self, measurement):
"""
Delete a measurement and all its data.
Parameters:
- measurement (str): Name of measurement to delete
Returns:
bool: True if successful
Raises:
InfluxDBClientError: If measurement deletion fails
"""
def get_list_series(self, database=None, measurement=None, tags=None):
"""
Get list of series matching optional filters.
Parameters:
- database (str): Database name (default: current database)
- measurement (str): Filter by measurement name (default: None)
- tags (dict): Filter by tag key-value pairs (default: None)
Returns:
list: List of series information
Raises:
InfluxDBClientError: If unable to retrieve series
"""
def delete_series(self, database=None, measurement=None, tags=None):
"""
Delete series matching the specified criteria.
Parameters:
- database (str): Database name (default: current database)
- measurement (str): Filter by measurement name (default: None)
- tags (dict): Filter by tag key-value pairs (default: None)
Returns:
bool: True if successful
Raises:
InfluxDBClientError: If series deletion fails
Warning: This operation is destructive and cannot be undone
"""client = InfluxDBClient(database='production_metrics')
# List all measurements
measurements = client.get_list_measurements()
print("Available measurements:")
for measurement in measurements:
print(f" - {measurement['name']}")
# Get detailed series information
series_list = client.get_list_series()
print(f"Total series count: {len(series_list)}")
# Filter series by measurement
cpu_series = client.get_list_series(measurement='cpu_usage')
print(f"CPU usage series: {len(cpu_series)}")
for series in cpu_series[:5]: # Show first 5
print(f" - {series}")
# Filter series by tags
server01_series = client.get_list_series(tags={'host': 'server01'})
print(f"Series for server01: {len(server01_series)}")
# Delete specific series (be careful!)
# Delete test data series
client.delete_series(measurement='test_data', tags={'environment': 'test'})
# Drop entire measurement (removes all series and data)
client.drop_measurement('deprecated_measurement')
# Measurement analysis example
measurement_name = 'cpu_usage'
series = client.get_list_series(measurement=measurement_name)
tag_combinations = {}
for series_key in series:
# Parse series key to extract tags
# Series format: "measurement,tag1=value1,tag2=value2"
if ',' in series_key:
tags_part = series_key.split(',', 1)[1]
tag_pairs = tags_part.split(',')
tag_combo = tuple(sorted(tag_pairs))
tag_combinations[tag_combo] = tag_combinations.get(tag_combo, 0) + 1
print(f"Measurement: {measurement_name}")
print(f"Total series: {len(series)}")
print(f"Unique tag combinations: {len(tag_combinations)}")
# Show most common tag combinations
sorted_combos = sorted(tag_combinations.items(), key=lambda x: x[1], reverse=True)
print("Most common tag combinations:")
for combo, count in sorted_combos[:5]:
print(f" {combo}: {count} series")
# Clean up measurements by pattern example
measurements = client.get_list_measurements()
patterns_to_delete = ['test_', 'temp_', 'debug_']
for measurement_info in measurements:
measurement_name = measurement_info['name']
for pattern in patterns_to_delete:
if pattern in measurement_name:
print(f"Deleting measurement: {measurement_name}")
try:
client.drop_measurement(measurement_name)
except InfluxDBClientError as e:
print(f"Failed to delete {measurement_name}: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-influxdb