Comprehensive Python client library for InfluxDB 2.x with sync/async APIs for writing, querying, and managing time series data.
82
Advanced functionality for performing specialized operations including data deletion, invokable scripts, and direct access to low-level service APIs for maximum control over InfluxDB operations. These APIs provide fine-grained control and advanced capabilities beyond standard read/write operations.
API for deleting time series data from InfluxDB buckets with support for time-based and predicate-based deletion.
class DeleteApi:
def __init__(self, influxdb_client): ...
def delete(
self,
start: Union[str, datetime],
stop: Union[str, datetime],
predicate: str = "",
bucket: str = None,
org: str = None
) -> None:
"""
Delete time series data from InfluxDB.
Parameters:
- start (Union[str, datetime]): Start time for deletion range (RFC3339 or datetime)
- stop (Union[str, datetime]): Stop time for deletion range (RFC3339 or datetime)
- predicate (str, optional): InfluxDB predicate for filtering data to delete
- bucket (str, optional): Bucket name (uses client default if not specified)
- org (str, optional): Organization name or ID (uses client default if not specified)
Note: Deletion is permanent and cannot be undone. Use predicates carefully.
"""Basic time-based deletion:
from influxdb_client import InfluxDBClient
from datetime import datetime, timedelta
client = InfluxDBClient(url="http://localhost:8086", token="token", org="my-org")
delete_api = client.delete_api()
# Delete data from the last hour
now = datetime.utcnow()
one_hour_ago = now - timedelta(hours=1)
delete_api.delete(
start=one_hour_ago,
stop=now,
bucket="sensor_data"
)
print("Deleted data from the last hour")Predicate-based deletion:
# Delete specific measurement data
delete_api.delete(
start="2023-01-01T00:00:00Z",
stop="2023-01-02T00:00:00Z",
predicate='_measurement="temperature" AND location="room1"',
bucket="sensor_data",
org="my-org"
)
# Delete data with specific tag values
delete_api.delete(
start=datetime(2023, 1, 1),
stop=datetime(2023, 1, 31),
predicate='sensor_id="temp_001" OR sensor_id="temp_002"',
bucket="sensor_data"
)
# Delete all data for a specific field
delete_api.delete(
start="2023-01-01T00:00:00Z",
stop="2023-12-31T23:59:59Z",
predicate='_field="debug_info"',
bucket="logs"
)Range-based cleanup:
# Delete old data (data retention cleanup)
cutoff_date = datetime.utcnow() - timedelta(days=90)
delete_api.delete(
start="1970-01-01T00:00:00Z", # Beginning of time
stop=cutoff_date,
bucket="archive_data"
)
# Delete test data
delete_api.delete(
start="2023-06-01T00:00:00Z",
stop="2023-06-02T00:00:00Z",
predicate='environment="test" OR environment="staging"',
bucket="application_metrics"
)Asynchronous version of DeleteApi for non-blocking deletion operations.
class DeleteApiAsync:
def __init__(self, influxdb_client): ...
async def delete(
self,
start: Union[str, datetime],
stop: Union[str, datetime],
predicate: str = "",
bucket: str = None,
org: str = None
) -> None:
"""
Asynchronously delete time series data from InfluxDB.
Parameters: Same as DeleteApi.delete()
"""import asyncio
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
async def cleanup_old_data():
async with InfluxDBClientAsync(url="http://localhost:8086", token="token") as client:
delete_api = client.delete_api()
# Delete multiple ranges concurrently
deletion_tasks = []
# Task 1: Delete old test data
deletion_tasks.append(
delete_api.delete(
start="2023-01-01T00:00:00Z",
stop="2023-01-31T23:59:59Z",
predicate='environment="test"',
bucket="metrics"
)
)
# Task 2: Delete old logs
deletion_tasks.append(
delete_api.delete(
start="2023-01-01T00:00:00Z",
stop="2023-02-01T00:00:00Z",
bucket="application_logs"
)
)
# Execute all deletions concurrently
await asyncio.gather(*deletion_tasks)
print("All deletions completed")
asyncio.run(cleanup_old_data())API for managing and executing InfluxDB invokable scripts for custom data processing and analysis.
class InvokableScriptsApi:
def __init__(self, influxdb_client): ...
def create_script(self, script_create_request: ScriptCreateRequest) -> Script:
"""
Create a new invokable script.
Parameters:
- script_create_request (ScriptCreateRequest): Script configuration and code
Returns:
Script: Created script object
"""
def delete_script(self, script_id: str) -> None:
"""
Delete an invokable script.
Parameters:
- script_id (str): Script ID to delete
"""
def find_scripts(self, **kwargs) -> Scripts:
"""
List invokable scripts.
Parameters:
- **kwargs: Query parameters (limit, offset)
Returns:
Scripts: Collection of script objects
"""
def find_script_by_id(self, script_id: str) -> Script:
"""
Find script by ID.
Parameters:
- script_id (str): Script ID
Returns:
Script: Found script object or None
"""
def update_script(
self,
script_id: str,
script_update_request: ScriptUpdateRequest
) -> Script:
"""
Update an existing script.
Parameters:
- script_id (str): Script ID to update
- script_update_request (ScriptUpdateRequest): Updated script configuration
Returns:
Script: Updated script object
"""
def invoke_script(
self,
script_id: str,
params: dict = None
) -> str:
"""
Execute an invokable script.
Parameters:
- script_id (str): Script ID to execute
- params (dict, optional): Parameters to pass to the script
Returns:
str: Script execution result
"""Script creation and management:
from influxdb_client import ScriptCreateRequest, ScriptLanguage
scripts_api = client.invokable_scripts_api()
# Create a data analysis script
flux_code = '''
import "array"
import "math"
// Calculate moving average for temperature data
data = from(bucket: params.bucket_name)
|> range(start: params.start_time)
|> filter(fn: (r) => r._measurement == "temperature")
|> filter(fn: (r) => r.location == params.location)
// Calculate 5-point moving average
data
|> timedMovingAverage(every: params.window_duration, period: params.window_duration * 5)
|> yield(name: "moving_average")
'''
script_request = ScriptCreateRequest(
name="temperature_analysis",
description="Calculate moving average for temperature sensors",
script=flux_code,
language=ScriptLanguage.flux
)
script = scripts_api.create_script(script_request)
print(f"Created script: {script.name} (ID: {script.id})")
# List all scripts
scripts = scripts_api.find_scripts()
for s in scripts.scripts:
print(f"Script: {s.name} - {s.description}")Script execution with parameters:
# Execute script with parameters
execution_params = {
"bucket_name": "sensor_data",
"start_time": "-2h",
"location": "datacenter1",
"window_duration": "5m"
}
result = scripts_api.invoke_script(script.id, params=execution_params)
print(f"Script execution result: {result}")
# Update script
from influxdb_client import ScriptUpdateRequest
updated_script_code = '''
// Enhanced version with anomaly detection
import "array"
import "math"
data = from(bucket: params.bucket_name)
|> range(start: params.start_time)
|> filter(fn: (r) => r._measurement == "temperature")
// Detect anomalies using standard deviation
anomalies = data
|> aggregateWindow(every: 1h, fn: stddev)
|> filter(fn: (r) => r._value > params.anomaly_threshold)
|> yield(name: "anomalies")
// Also yield the moving average
data
|> timedMovingAverage(every: params.window_duration, period: params.window_duration * 5)
|> yield(name: "moving_average")
'''
update_request = ScriptUpdateRequest(
name="enhanced_temperature_analysis",
description="Temperature analysis with anomaly detection",
script=updated_script_code
)
updated_script = scripts_api.update_script(script.id, update_request)Parameterized script execution patterns:
# Create a flexible data export script
export_script_code = '''
// Flexible data export with filtering
data = from(bucket: params.source_bucket)
|> range(start: params.start_time, stop: params.stop_time)
// Apply optional measurement filter
filtered_data = if exists params.measurement then
data |> filter(fn: (r) => r._measurement == params.measurement)
else
data
// Apply optional tag filters
result = if exists params.tag_filters then
filtered_data |> filter(fn: (r) =>
array.from(rows: params.tag_filters)
|> array.any(fn: (tag) => r[tag.key] == tag.value))
else
filtered_data
// Export to destination
result |> to(bucket: params.destination_bucket)
'''
export_script_request = ScriptCreateRequest(
name="flexible_data_export",
description="Export filtered data between buckets",
script=export_script_code,
language=ScriptLanguage.flux
)
export_script = scripts_api.create_script(export_script_request)
# Use the export script
export_params = {
"source_bucket": "raw_data",
"destination_bucket": "processed_data",
"start_time": "-24h",
"stop_time": "now()",
"measurement": "cpu_usage",
"tag_filters": [
{"key": "host", "value": "web-server-1"},
{"key": "environment", "value": "production"}
]
}
export_result = scripts_api.invoke_script(export_script.id, params=export_params)Direct access to InfluxDB's OpenAPI service layer for advanced use cases and custom integrations.
# Core service classes (examples of the 40+ available services)
class QueryService:
"""Direct access to query API endpoints."""
def post_query(
self,
org: str,
query: Query,
zap_trace_span: str = None,
accept_encoding: str = None,
content_encoding: str = None,
**kwargs
): ...
class WriteService:
"""Direct access to write API endpoints."""
def post_write(
self,
org: str,
bucket: str,
body: str,
zap_trace_span: str = None,
content_encoding: str = None,
content_type: str = "text/plain; charset=utf-8",
content_length: int = None,
accept: str = None,
precision: WritePrecision = None,
**kwargs
): ...
class BucketsService:
"""Direct access to bucket management endpoints."""
def get_buckets(
self,
zap_trace_span: str = None,
org: str = None,
org_id: str = None,
after: str = None,
limit: int = None,
**kwargs
): ...
def post_buckets(
self,
post_bucket_request: PostBucketRequest,
zap_trace_span: str = None,
**kwargs
): ...
class AuthorizationsService:
"""Direct access to authorization endpoints."""
def get_authorizations(
self,
zap_trace_span: str = None,
user_id: str = None,
user: str = None,
org_id: str = None,
org: str = None,
token: str = None,
**kwargs
): ...
class HealthService:
"""Direct access to health check endpoints."""
def get_health(self, zap_trace_span: str = None, **kwargs): ...
class SetupService:
"""Direct access to InfluxDB setup endpoints."""
def post_setup(
self,
onboarding_request: OnboardingRequest,
zap_trace_span: str = None,
**kwargs
): ...
class BackupService:
"""Direct access to backup and restore endpoints."""
def post_backup_kv(
self,
zap_trace_span: str = None,
**kwargs
): ...
class TelegrafsService:
"""Direct access to Telegraf configuration endpoints."""
def get_telegrafs(
self,
zap_trace_span: str = None,
org_id: str = None,
**kwargs
): ...
# Advanced management services
class DashboardsService:
"""Direct access to dashboard management endpoints."""
def get_dashboards(
self,
zap_trace_span: str = None,
owner: str = None,
sort_by: str = None,
**kwargs
): ...
class ChecksService:
"""Direct access to monitoring check endpoints."""
def get_checks(
self,
zap_trace_span: str = None,
org_id: str = None,
**kwargs
): ...
class NotificationRulesService:
"""Direct access to notification rule endpoints."""
def get_notification_rules(
self,
zap_trace_span: str = None,
org_id: str = None,
**kwargs
): ...Direct service access:
from influxdb_client.service.query_service import QueryService
from influxdb_client.domain.query import Query
# Get direct access to services
query_service = QueryService(client.api_client)
# Execute query using low-level service
query_obj = Query(query='from(bucket: "test") |> range(start: -1h)')
response = query_service.post_query(
org="my-org",
query=query_obj,
accept_encoding="gzip"
)
# Process raw response
print(f"Query response: {response}")Custom HTTP headers and advanced options:
from influxdb_client.service.write_service import WriteService
write_service = WriteService(client.api_client)
# Write with custom headers and options
line_protocol = "measurement,tag1=value1 field1=1.0"
response = write_service.post_write(
org="my-org",
bucket="my-bucket",
body=line_protocol,
content_encoding="gzip",
precision=WritePrecision.S,
zap_trace_span="custom-trace-id"
)Advanced bucket management:
from influxdb_client.service.buckets_service import BucketsService
from influxdb_client.domain.post_bucket_request import PostBucketRequest
from influxdb_client.domain.retention_rule import RetentionRule
buckets_service = BucketsService(client.api_client)
# Create bucket with advanced options
retention_rule = RetentionRule(
type="expire",
every_seconds=86400 * 30, # 30 days
shard_group_duration_seconds=3600 # 1 hour shard groups
)
bucket_request = PostBucketRequest(
name="advanced_bucket",
org_id="org_id_here",
description="Bucket with custom shard configuration",
retention_rules=[retention_rule]
)
response = buckets_service.post_buckets(
post_bucket_request=bucket_request,
zap_trace_span="bucket-creation-trace"
)# Deletion-related types
DeletePredicate = str # InfluxDB predicate expression
TimeRange = Tuple[Union[str, datetime], Union[str, datetime]] # Start and stop time pair
# Script-related types
class ScriptCreateRequest:
name: str
description: str
script: str
language: ScriptLanguage
class ScriptUpdateRequest:
name: str
description: str
script: str
class Script:
id: str
name: str
description: str
script: str
language: ScriptLanguage
created_at: datetime
updated_at: datetime
class Scripts:
scripts: List[Script]
class ScriptLanguage(Enum):
flux = "flux"
# Low-level API types
class Query:
query: str
type: str
params: dict
class PostBucketRequest:
org_id: str
name: str
description: str
retention_rules: List[RetentionRule]
schema_type: str
class OnboardingRequest:
username: str
password: str
org: str
bucket: str
retention_period_hrs: int
retention_period_ns: int
token: str
# Service response types
ServiceResponse = Dict[str, Any] # Generic service response
RawHTTPResponse = Any # Raw HTTP response from services
# Advanced configuration types
class TracingConfig:
zap_trace_span: str
custom_headers: Dict[str, str]
class CompressionConfig:
content_encoding: str # "gzip", "identity"
accept_encoding: str # "gzip", "deflate"
# Exception types for advanced operations
class DeletionError(InfluxDBError):
"""Raised when data deletion fails."""
pass
class ScriptExecutionError(InfluxDBError):
"""Raised when script execution fails."""
pass
class ServiceAPIError(InfluxDBError):
"""Raised when low-level service calls fail."""
pass
class InvalidPredicateError(DeletionError):
"""Raised when deletion predicate is invalid."""
pass
# Constants for advanced operations
DEFAULT_DELETE_PRECISION = WritePrecision.NS
MAX_DELETE_RANGE_DAYS = 7 # Recommended maximum range for single deletion
SCRIPT_TIMEOUT_MS = 30000 # Default script execution timeout
# Service endpoint constants
QUERY_ENDPOINT = "/api/v2/query"
WRITE_ENDPOINT = "/api/v2/write"
DELETE_ENDPOINT = "/api/v2/delete"
SCRIPTS_ENDPOINT = "/api/v2/scripts"Install with Tessl CLI
npx tessl i tessl/pypi-influxdb-clientdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10