CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-influxdb-client

Comprehensive Python client library for InfluxDB 2.x with sync/async APIs for writing, querying, and managing time series data.

82

1.18x
Overview
Eval results
Files

advanced-operations.mddocs/

Advanced Operations

Advanced functionality for performing specialized operations including data deletion, invokable scripts, and direct access to low-level service APIs for maximum control over InfluxDB operations. These APIs provide fine-grained control and advanced capabilities beyond standard read/write operations.

Capabilities

DeleteApi

API for deleting time series data from InfluxDB buckets with support for time-based and predicate-based deletion.

class DeleteApi:
    def __init__(self, influxdb_client): ...
    
    def delete(
        self,
        start: Union[str, datetime],
        stop: Union[str, datetime],
        predicate: str = "",
        bucket: str = None,
        org: str = None
    ) -> None:
        """
        Delete time series data from InfluxDB.

        Parameters:
        - start (Union[str, datetime]): Start time for deletion range (RFC3339 or datetime)
        - stop (Union[str, datetime]): Stop time for deletion range (RFC3339 or datetime)
        - predicate (str, optional): InfluxDB predicate for filtering data to delete
        - bucket (str, optional): Bucket name (uses client default if not specified)
        - org (str, optional): Organization name or ID (uses client default if not specified)

        Note: Deletion is permanent and cannot be undone. Use predicates carefully.
        """

DeleteApi Usage Examples

Basic time-based deletion:

from influxdb_client import InfluxDBClient
from datetime import datetime, timedelta

client = InfluxDBClient(url="http://localhost:8086", token="token", org="my-org")
delete_api = client.delete_api()

# Delete data from the last hour
now = datetime.utcnow()
one_hour_ago = now - timedelta(hours=1)

delete_api.delete(
    start=one_hour_ago,
    stop=now,
    bucket="sensor_data"
)
print("Deleted data from the last hour")

Predicate-based deletion:

# Delete specific measurement data
delete_api.delete(
    start="2023-01-01T00:00:00Z",
    stop="2023-01-02T00:00:00Z",
    predicate='_measurement="temperature" AND location="room1"',
    bucket="sensor_data",
    org="my-org"
)

# Delete data with specific tag values
delete_api.delete(
    start=datetime(2023, 1, 1),
    stop=datetime(2023, 1, 31),
    predicate='sensor_id="temp_001" OR sensor_id="temp_002"',
    bucket="sensor_data"
)

# Delete all data for a specific field
delete_api.delete(
    start="2023-01-01T00:00:00Z",
    stop="2023-12-31T23:59:59Z", 
    predicate='_field="debug_info"',
    bucket="logs"
)

Range-based cleanup:

# Delete old data (data retention cleanup)
cutoff_date = datetime.utcnow() - timedelta(days=90)
delete_api.delete(
    start="1970-01-01T00:00:00Z",  # Beginning of time
    stop=cutoff_date,
    bucket="archive_data"
)

# Delete test data
delete_api.delete(
    start="2023-06-01T00:00:00Z",
    stop="2023-06-02T00:00:00Z",
    predicate='environment="test" OR environment="staging"',
    bucket="application_metrics"
)

DeleteApiAsync

Asynchronous version of DeleteApi for non-blocking deletion operations.

class DeleteApiAsync:
    def __init__(self, influxdb_client): ...
    
    async def delete(
        self,
        start: Union[str, datetime],
        stop: Union[str, datetime],
        predicate: str = "",
        bucket: str = None,
        org: str = None
    ) -> None:
        """
        Asynchronously delete time series data from InfluxDB.
        
        Parameters: Same as DeleteApi.delete()
        """

DeleteApiAsync Usage Example

import asyncio
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync

async def cleanup_old_data():
    async with InfluxDBClientAsync(url="http://localhost:8086", token="token") as client:
        delete_api = client.delete_api()
        
        # Delete multiple ranges concurrently
        deletion_tasks = []
        
        # Task 1: Delete old test data
        deletion_tasks.append(
            delete_api.delete(
                start="2023-01-01T00:00:00Z",
                stop="2023-01-31T23:59:59Z",
                predicate='environment="test"',
                bucket="metrics"
            )
        )
        
        # Task 2: Delete old logs
        deletion_tasks.append(
            delete_api.delete(
                start="2023-01-01T00:00:00Z", 
                stop="2023-02-01T00:00:00Z",
                bucket="application_logs"
            )
        )
        
        # Execute all deletions concurrently
        await asyncio.gather(*deletion_tasks)
        print("All deletions completed")

asyncio.run(cleanup_old_data())

InvokableScriptsApi

API for managing and executing InfluxDB invokable scripts for custom data processing and analysis.

class InvokableScriptsApi:
    def __init__(self, influxdb_client): ...
    
    def create_script(self, script_create_request: ScriptCreateRequest) -> Script:
        """
        Create a new invokable script.

        Parameters:
        - script_create_request (ScriptCreateRequest): Script configuration and code

        Returns:
        Script: Created script object
        """
        
    def delete_script(self, script_id: str) -> None:
        """
        Delete an invokable script.

        Parameters:
        - script_id (str): Script ID to delete
        """
        
    def find_scripts(self, **kwargs) -> Scripts:
        """
        List invokable scripts.

        Parameters:
        - **kwargs: Query parameters (limit, offset)

        Returns:
        Scripts: Collection of script objects
        """
        
    def find_script_by_id(self, script_id: str) -> Script:
        """
        Find script by ID.

        Parameters:
        - script_id (str): Script ID

        Returns:
        Script: Found script object or None
        """
        
    def update_script(
        self,
        script_id: str,
        script_update_request: ScriptUpdateRequest
    ) -> Script:
        """
        Update an existing script.

        Parameters:
        - script_id (str): Script ID to update
        - script_update_request (ScriptUpdateRequest): Updated script configuration

        Returns:
        Script: Updated script object
        """
        
    def invoke_script(
        self,
        script_id: str,
        params: dict = None
    ) -> str:
        """
        Execute an invokable script.

        Parameters:
        - script_id (str): Script ID to execute
        - params (dict, optional): Parameters to pass to the script

        Returns:
        str: Script execution result
        """

InvokableScriptsApi Usage Examples

Script creation and management:

from influxdb_client import ScriptCreateRequest, ScriptLanguage

scripts_api = client.invokable_scripts_api()

# Create a data analysis script
flux_code = '''
import "array"
import "math"

// Calculate moving average for temperature data
data = from(bucket: params.bucket_name)
  |> range(start: params.start_time)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> filter(fn: (r) => r.location == params.location)

// Calculate 5-point moving average
data
  |> timedMovingAverage(every: params.window_duration, period: params.window_duration * 5)
  |> yield(name: "moving_average")
'''

script_request = ScriptCreateRequest(
    name="temperature_analysis",
    description="Calculate moving average for temperature sensors",
    script=flux_code,
    language=ScriptLanguage.flux
)

script = scripts_api.create_script(script_request)
print(f"Created script: {script.name} (ID: {script.id})")

# List all scripts
scripts = scripts_api.find_scripts()
for s in scripts.scripts:
    print(f"Script: {s.name} - {s.description}")

Script execution with parameters:

# Execute script with parameters
execution_params = {
    "bucket_name": "sensor_data",
    "start_time": "-2h",
    "location": "datacenter1", 
    "window_duration": "5m"
}

result = scripts_api.invoke_script(script.id, params=execution_params)
print(f"Script execution result: {result}")

# Update script
from influxdb_client import ScriptUpdateRequest

updated_script_code = '''
// Enhanced version with anomaly detection
import "array"
import "math"

data = from(bucket: params.bucket_name)
  |> range(start: params.start_time) 
  |> filter(fn: (r) => r._measurement == "temperature")

// Detect anomalies using standard deviation
anomalies = data
  |> aggregateWindow(every: 1h, fn: stddev)
  |> filter(fn: (r) => r._value > params.anomaly_threshold)
  |> yield(name: "anomalies")

// Also yield the moving average
data
  |> timedMovingAverage(every: params.window_duration, period: params.window_duration * 5)
  |> yield(name: "moving_average")
'''

update_request = ScriptUpdateRequest(
    name="enhanced_temperature_analysis",
    description="Temperature analysis with anomaly detection", 
    script=updated_script_code
)

updated_script = scripts_api.update_script(script.id, update_request)

Parameterized script execution patterns:

# Create a flexible data export script  
export_script_code = '''
// Flexible data export with filtering
data = from(bucket: params.source_bucket)
  |> range(start: params.start_time, stop: params.stop_time)

// Apply optional measurement filter
filtered_data = if exists params.measurement then
  data |> filter(fn: (r) => r._measurement == params.measurement)
else
  data

// Apply optional tag filters
result = if exists params.tag_filters then
  filtered_data |> filter(fn: (r) => 
    array.from(rows: params.tag_filters) 
      |> array.any(fn: (tag) => r[tag.key] == tag.value))
else
  filtered_data

// Export to destination
result |> to(bucket: params.destination_bucket)
'''

export_script_request = ScriptCreateRequest(
    name="flexible_data_export",
    description="Export filtered data between buckets",
    script=export_script_code,
    language=ScriptLanguage.flux
)

export_script = scripts_api.create_script(export_script_request)

# Use the export script
export_params = {
    "source_bucket": "raw_data",
    "destination_bucket": "processed_data",
    "start_time": "-24h",
    "stop_time": "now()",
    "measurement": "cpu_usage",
    "tag_filters": [
        {"key": "host", "value": "web-server-1"},
        {"key": "environment", "value": "production"}
    ]
}

export_result = scripts_api.invoke_script(export_script.id, params=export_params)

Low-Level Service APIs

Direct access to InfluxDB's OpenAPI service layer for advanced use cases and custom integrations.

# Core service classes (examples of the 40+ available services)
class QueryService:
    """Direct access to query API endpoints."""
    def post_query(
        self,
        org: str,
        query: Query,
        zap_trace_span: str = None,
        accept_encoding: str = None,
        content_encoding: str = None,
        **kwargs
    ): ...

class WriteService:
    """Direct access to write API endpoints."""
    def post_write(
        self,
        org: str,
        bucket: str,
        body: str,
        zap_trace_span: str = None,
        content_encoding: str = None,
        content_type: str = "text/plain; charset=utf-8",
        content_length: int = None,
        accept: str = None,
        precision: WritePrecision = None,
        **kwargs
    ): ...

class BucketsService:
    """Direct access to bucket management endpoints."""
    def get_buckets(
        self,
        zap_trace_span: str = None,
        org: str = None,
        org_id: str = None,
        after: str = None,
        limit: int = None,
        **kwargs
    ): ...
    
    def post_buckets(
        self,
        post_bucket_request: PostBucketRequest,
        zap_trace_span: str = None,
        **kwargs
    ): ...

class AuthorizationsService:
    """Direct access to authorization endpoints."""
    def get_authorizations(
        self,
        zap_trace_span: str = None,
        user_id: str = None,
        user: str = None,
        org_id: str = None,
        org: str = None,
        token: str = None,
        **kwargs
    ): ...

class HealthService:
    """Direct access to health check endpoints."""
    def get_health(self, zap_trace_span: str = None, **kwargs): ...

class SetupService:
    """Direct access to InfluxDB setup endpoints."""
    def post_setup(
        self,
        onboarding_request: OnboardingRequest,
        zap_trace_span: str = None,
        **kwargs
    ): ...

class BackupService:
    """Direct access to backup and restore endpoints.""" 
    def post_backup_kv(
        self,
        zap_trace_span: str = None,
        **kwargs
    ): ...

class TelegrafsService:
    """Direct access to Telegraf configuration endpoints."""
    def get_telegrafs(
        self,
        zap_trace_span: str = None,
        org_id: str = None,
        **kwargs
    ): ...

# Advanced management services
class DashboardsService:
    """Direct access to dashboard management endpoints."""
    def get_dashboards(
        self,
        zap_trace_span: str = None,
        owner: str = None,
        sort_by: str = None,
        **kwargs
    ): ...

class ChecksService:
    """Direct access to monitoring check endpoints."""
    def get_checks(
        self,
        zap_trace_span: str = None,
        org_id: str = None,
        **kwargs
    ): ...

class NotificationRulesService:
    """Direct access to notification rule endpoints."""
    def get_notification_rules(
        self,
        zap_trace_span: str = None,
        org_id: str = None,
        **kwargs
    ): ...

Low-Level Service Usage Examples

Direct service access:

from influxdb_client.service.query_service import QueryService
from influxdb_client.domain.query import Query

# Get direct access to services  
query_service = QueryService(client.api_client)

# Execute query using low-level service
query_obj = Query(query='from(bucket: "test") |> range(start: -1h)')
response = query_service.post_query(
    org="my-org",
    query=query_obj,
    accept_encoding="gzip"
)

# Process raw response
print(f"Query response: {response}")

Custom HTTP headers and advanced options:

from influxdb_client.service.write_service import WriteService

write_service = WriteService(client.api_client)

# Write with custom headers and options
line_protocol = "measurement,tag1=value1 field1=1.0"
response = write_service.post_write(
    org="my-org",
    bucket="my-bucket",
    body=line_protocol,
    content_encoding="gzip",
    precision=WritePrecision.S,
    zap_trace_span="custom-trace-id"
)

Advanced bucket management:

from influxdb_client.service.buckets_service import BucketsService
from influxdb_client.domain.post_bucket_request import PostBucketRequest
from influxdb_client.domain.retention_rule import RetentionRule

buckets_service = BucketsService(client.api_client)

# Create bucket with advanced options
retention_rule = RetentionRule(
    type="expire",
    every_seconds=86400 * 30,  # 30 days
    shard_group_duration_seconds=3600  # 1 hour shard groups
)

bucket_request = PostBucketRequest(
    name="advanced_bucket",
    org_id="org_id_here",
    description="Bucket with custom shard configuration",
    retention_rules=[retention_rule]
)

response = buckets_service.post_buckets(
    post_bucket_request=bucket_request,
    zap_trace_span="bucket-creation-trace"
)

Types

# Deletion-related types
DeletePredicate = str  # InfluxDB predicate expression
TimeRange = Tuple[Union[str, datetime], Union[str, datetime]]  # Start and stop time pair

# Script-related types
class ScriptCreateRequest:
    name: str
    description: str
    script: str
    language: ScriptLanguage

class ScriptUpdateRequest:
    name: str
    description: str
    script: str

class Script:
    id: str
    name: str
    description: str
    script: str
    language: ScriptLanguage
    created_at: datetime
    updated_at: datetime

class Scripts:
    scripts: List[Script]

class ScriptLanguage(Enum):
    flux = "flux"

# Low-level API types
class Query:
    query: str
    type: str
    params: dict

class PostBucketRequest:
    org_id: str
    name: str
    description: str
    retention_rules: List[RetentionRule]
    schema_type: str

class OnboardingRequest:
    username: str
    password: str
    org: str
    bucket: str
    retention_period_hrs: int
    retention_period_ns: int
    token: str

# Service response types
ServiceResponse = Dict[str, Any]  # Generic service response
RawHTTPResponse = Any  # Raw HTTP response from services

# Advanced configuration types
class TracingConfig:
    zap_trace_span: str
    custom_headers: Dict[str, str]

class CompressionConfig:
    content_encoding: str  # "gzip", "identity"
    accept_encoding: str   # "gzip", "deflate"

# Exception types for advanced operations
class DeletionError(InfluxDBError):
    """Raised when data deletion fails."""
    pass

class ScriptExecutionError(InfluxDBError):
    """Raised when script execution fails."""
    pass

class ServiceAPIError(InfluxDBError):
    """Raised when low-level service calls fail."""
    pass

class InvalidPredicateError(DeletionError):
    """Raised when deletion predicate is invalid."""
    pass

# Constants for advanced operations
DEFAULT_DELETE_PRECISION = WritePrecision.NS
MAX_DELETE_RANGE_DAYS = 7  # Recommended maximum range for single deletion
SCRIPT_TIMEOUT_MS = 30000  # Default script execution timeout

# Service endpoint constants  
QUERY_ENDPOINT = "/api/v2/query"
WRITE_ENDPOINT = "/api/v2/write" 
DELETE_ENDPOINT = "/api/v2/delete"
SCRIPTS_ENDPOINT = "/api/v2/scripts"

Install with Tessl CLI

npx tessl i tessl/pypi-influxdb-client

docs

advanced-operations.md

client-management.md

index.md

querying-data.md

resource-management.md

writing-data.md

tile.json