CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-influxdb-client

Comprehensive Python client library for InfluxDB 2.x with sync/async APIs for writing, querying, and managing time series data.

82

1.18x
Overview
Eval results
Files

resource-management.mddocs/

Resource Management

Comprehensive functionality for managing InfluxDB resources including buckets, organizations, users, authorizations, tasks, and labels through dedicated API classes. These APIs provide complete CRUD operations and advanced management capabilities for all InfluxDB entities.

Capabilities

BucketsApi

API for managing InfluxDB buckets with support for creation, deletion, updates, and querying with retention policies and metadata.

class BucketsApi:
    def __init__(self, influxdb_client): ...
    
    def create_bucket(
        self,
        bucket: Bucket = None,
        bucket_name: str = None,
        org_id: str = None,
        retention_rules: List[RetentionRule] = None,
        description: str = None,
        org: str = None
    ) -> Bucket:
        """
        Create a new bucket.

        Parameters:
        - bucket (Bucket, optional): Pre-configured bucket object
        - bucket_name (str, optional): Name for the new bucket
        - org_id (str, optional): Organization ID (takes precedence over org)
        - retention_rules (List[RetentionRule], optional): Data retention policies
        - description (str, optional): Bucket description
        - org (str, optional): Organization name

        Returns:
        Bucket: Created bucket object
        """
        
    def delete_bucket(self, bucket: Union[Bucket, str]) -> Bucket:
        """
        Delete a bucket.

        Parameters:
        - bucket (Union[Bucket, str]): Bucket object or bucket ID

        Returns:
        Bucket: Deleted bucket object
        """
        
    def find_bucket_by_id(self, bucket_id: str) -> Bucket:
        """
        Find bucket by its ID.

        Parameters:
        - bucket_id (str): Bucket ID

        Returns:
        Bucket: Found bucket object or None
        """
        
    def find_bucket_by_name(self, bucket_name: str, org: str = None) -> Bucket:
        """
        Find bucket by name within organization.

        Parameters:
        - bucket_name (str): Bucket name to search for
        - org (str, optional): Organization name or ID

        Returns:
        Bucket: Found bucket object or None
        """
        
    def find_buckets(self, org: str = None, **kwargs) -> List[Bucket]:
        """
        List all buckets, optionally filtered by organization.

        Parameters:
        - org (str, optional): Organization name or ID to filter by
        - **kwargs: Additional query parameters (limit, offset, after, etc.)

        Returns:
        List[Bucket]: List of bucket objects
        """
        
    def update_bucket(self, bucket: Bucket) -> Bucket:
        """
        Update an existing bucket.

        Parameters:
        - bucket (Bucket): Bucket object with updated properties

        Returns:
        Bucket: Updated bucket object
        """

BucketsApi Usage Examples

Basic bucket management:

from influxdb_client import InfluxDBClient, Bucket, RetentionRule
from datetime import timedelta

client = InfluxDBClient(url="http://localhost:8086", token="token", org="my-org")
buckets_api = client.buckets_api()

# Create bucket with retention policy
retention_rule = RetentionRule(type="expire", every_seconds=int(timedelta(days=30).total_seconds()))
bucket = buckets_api.create_bucket(
    bucket_name="sensor_data",
    description="IoT sensor measurements",
    retention_rules=[retention_rule],
    org="my-org"
)
print(f"Created bucket: {bucket.name} (ID: {bucket.id})")

# Find bucket by name
found_bucket = buckets_api.find_bucket_by_name("sensor_data", org="my-org")
if found_bucket:
    print(f"Found bucket: {found_bucket.name}")

# List all buckets
all_buckets = buckets_api.find_buckets()
for b in all_buckets:
    print(f"Bucket: {b.name} - {b.description}")

Advanced bucket configuration:

# Create bucket with multiple retention rules
retention_rules = [
    RetentionRule(type="expire", every_seconds=86400 * 7),    # 7 days
    RetentionRule(type="expire", every_seconds=86400 * 365)   # 1 year
]

# Create using Bucket object
new_bucket = Bucket(
    name="analytics_data",
    description="Analytics and reporting data",
    org_id="organization_id_here",
    retention_rules=retention_rules
)

created_bucket = buckets_api.create_bucket(bucket=new_bucket)

# Update bucket description
created_bucket.description = "Updated analytics and reporting data"
updated_bucket = buckets_api.update_bucket(created_bucket)

# Delete bucket
buckets_api.delete_bucket(updated_bucket.id)

OrganizationsApi

API for managing InfluxDB organizations with support for creation, deletion, updates, and membership management.

class OrganizationsApi:
    def __init__(self, influxdb_client): ...
    
    def create_organization(self, org: Organization = None, **kwargs) -> Organization:
        """
        Create a new organization.

        Parameters:
        - org (Organization, optional): Pre-configured organization object
        - **kwargs: Organization properties (name, description, etc.)

        Returns:
        Organization: Created organization object
        """
        
    def delete_organization(self, org: Union[Organization, str]) -> Organization:
        """
        Delete an organization.

        Parameters:
        - org (Union[Organization, str]): Organization object or organization ID

        Returns:
        Organization: Deleted organization object
        """
        
    def find_organization_by_id(self, org_id: str) -> Organization:
        """
        Find organization by its ID.

        Parameters:
        - org_id (str): Organization ID

        Returns:
        Organization: Found organization object or None
        """
        
    def find_organizations(self, **kwargs) -> List[Organization]:
        """
        List all organizations.

        Parameters:
        - **kwargs: Query parameters (limit, offset, descending, etc.)

        Returns:
        List[Organization]: List of organization objects
        """
        
    def update_organization(self, org: Organization) -> Organization:
        """
        Update an existing organization.

        Parameters:
        - org (Organization): Organization object with updated properties

        Returns:
        Organization: Updated organization object
        """

OrganizationsApi Usage Examples

Organization management:

orgs_api = client.organizations_api()

# Create new organization
new_org = orgs_api.create_organization(name="Data Science Team", description="Analytics and ML workloads")
print(f"Created org: {new_org.name} (ID: {new_org.id})")

# List all organizations
organizations = orgs_api.find_organizations()
for org in organizations:
    print(f"Org: {org.name} - {org.description}")

# Find specific organization
found_org = orgs_api.find_organization_by_id(new_org.id)
if found_org:
    # Update organization
    found_org.description = "Updated: Analytics, ML, and BI workloads"
    updated_org = orgs_api.update_organization(found_org)
    print(f"Updated org description: {updated_org.description}")

UsersApi

API for managing InfluxDB users with support for user creation, updates, and current user information.

class UsersApi:
    def __init__(self, influxdb_client): ...
    
    def create_user(self, user: User = None, **kwargs) -> UserResponse:
        """
        Create a new user.

        Parameters:
        - user (User, optional): Pre-configured user object
        - **kwargs: User properties (name, status, etc.)

        Returns:
        UserResponse: Created user object
        """
        
    def delete_user(self, user: Union[UserResponse, str]) -> UserResponse:
        """
        Delete a user.

        Parameters:
        - user (Union[UserResponse, str]): User object or user ID

        Returns:
        UserResponse: Deleted user object
        """
        
    def find_user_by_id(self, user_id: str) -> UserResponse:
        """
        Find user by ID.

        Parameters:
        - user_id (str): User ID

        Returns:
        UserResponse: Found user object or None
        """
        
    def find_users(self, **kwargs) -> List[UserResponse]:
        """
        List all users.

        Parameters:
        - **kwargs: Query parameters (limit, offset, after, name, id)

        Returns:
        List[UserResponse]: List of user objects
        """
        
    def update_user(self, user: UserResponse) -> UserResponse:
        """
        Update an existing user.

        Parameters:
        - user (UserResponse): User object with updated properties

        Returns:
        UserResponse: Updated user object
        """
        
    def me(self) -> UserResponse:
        """
        Get current user information.

        Returns:
        UserResponse: Current user object
        """

UsersApi Usage Examples

User management:

users_api = client.users_api()

# Get current user info
current_user = users_api.me()
print(f"Current user: {current_user.name} (ID: {current_user.id})")

# Create new user
new_user = users_api.create_user(name="john.doe", status="active")
print(f"Created user: {new_user.name}")

# List all users
users = users_api.find_users()
for user in users:
    print(f"User: {user.name} - Status: {user.status}")

# Find and update user
found_user = users_api.find_user_by_id(new_user.id)
if found_user:
    found_user.status = "inactive"
    updated_user = users_api.update_user(found_user)
    print(f"Updated user status: {updated_user.status}")

AuthorizationsApi

API for managing InfluxDB authentication tokens and permissions with support for fine-grained access control.

class AuthorizationsApi:
    def __init__(self, influxdb_client): ...
    
    def create_authorization(
        self,
        org_id: str = None,
        permissions: List[Permission] = None,
        **kwargs
    ) -> Authorization:
        """
        Create a new authorization token.

        Parameters:
        - org_id (str, optional): Organization ID for the authorization
        - permissions (List[Permission], optional): List of permission objects
        - **kwargs: Additional authorization properties (user_id, description, status)

        Returns:
        Authorization: Created authorization object with token
        """
        
    def delete_authorization(self, authorization: Union[Authorization, str]) -> Authorization:
        """
        Delete an authorization token.

        Parameters:
        - authorization (Union[Authorization, str]): Authorization object or ID

        Returns:
        Authorization: Deleted authorization object
        """
        
    def find_authorization_by_id(self, authorization_id: str) -> Authorization:
        """
        Find authorization by ID.

        Parameters:
        - authorization_id (str): Authorization ID

        Returns:
        Authorization: Found authorization object or None
        """
        
    def find_authorizations(self, **kwargs) -> List[Authorization]:
        """
        List authorizations.

        Parameters:
        - **kwargs: Query parameters (user_id, user, org_id, org, token)

        Returns:
        List[Authorization]: List of authorization objects
        """
        
    def update_authorization(self, authorization: Authorization) -> Authorization:
        """
        Update an authorization.

        Parameters:
        - authorization (Authorization): Authorization object with updated properties

        Returns:
        Authorization: Updated authorization object
        """

AuthorizationsApi Usage Examples

Token and permission management:

from influxdb_client import Permission, PermissionResource

auth_api = client.authorizations_api()

# Create read permission for specific bucket
read_permission = Permission(
    action="read",
    resource=PermissionResource(type="buckets", id="bucket_id_here")
)

# Create write permission for specific bucket
write_permission = Permission(
    action="write", 
    resource=PermissionResource(type="buckets", id="bucket_id_here")
)

# Create authorization with specific permissions
auth = auth_api.create_authorization(
    org_id="org_id_here",
    permissions=[read_permission, write_permission],
    description="Sensor data access token"
)
print(f"Created token: {auth.token[:10]}...")

# List all authorizations for organization
authorizations = auth_api.find_authorizations(org_id="org_id_here")
for auth in authorizations:
    print(f"Token: {auth.id} - Status: {auth.status}")

# Update authorization status
auth.status = "inactive"
updated_auth = auth_api.update_authorization(auth)

Create organization-wide permissions:

# Create permissions for all buckets in organization
all_buckets_read = Permission(
    action="read",
    resource=PermissionResource(type="buckets", org_id="org_id_here")
)

all_buckets_write = Permission(
    action="write",
    resource=PermissionResource(type="buckets", org_id="org_id_here")
)

# Create admin-level authorization
admin_auth = auth_api.create_authorization(
    org_id="org_id_here",
    permissions=[all_buckets_read, all_buckets_write],
    description="Organization admin token"
)

TasksApi

API for managing InfluxDB tasks with support for task execution, monitoring, and log management.

class TasksApi:
    def __init__(self, influxdb_client): ...
    
    def create_task(self, task: Task = None, **kwargs) -> Task:
        """
        Create a new task.

        Parameters:
        - task (Task, optional): Pre-configured task object
        - **kwargs: Task properties (name, flux, org_id, status, cron, every)

        Returns:
        Task: Created task object
        """
        
    def delete_task(self, task: Union[Task, str]) -> Task:
        """
        Delete a task.

        Parameters:
        - task (Union[Task, str]): Task object or task ID

        Returns:
        Task: Deleted task object
        """
        
    def find_task_by_id(self, task_id: str) -> Task:
        """
        Find task by ID.

        Parameters:
        - task_id (str): Task ID

        Returns:
        Task: Found task object or None
        """
        
    def find_tasks(self, **kwargs) -> List[Task]:
        """
        List tasks.

        Parameters:
        - **kwargs: Query parameters (after, user_id, org_id, org, status, limit)

        Returns:
        List[Task]: List of task objects
        """
        
    def update_task(self, task: Task) -> Task:
        """
        Update an existing task.

        Parameters:
        - task (Task): Task object with updated properties

        Returns:
        Task: Updated task object
        """
        
    def run_manually(self, task_id: str, **kwargs) -> Run:
        """
        Manually execute a task.

        Parameters:
        - task_id (str): Task ID to execute
        - **kwargs: Execution parameters

        Returns:
        Run: Task execution run object
        """
        
    def find_runs(self, task_id: str, **kwargs) -> List[Run]:
        """
        List task execution runs.

        Parameters:
        - task_id (str): Task ID
        - **kwargs: Query parameters (after, limit, after_time, before_time)

        Returns:
        List[Run]: List of task run objects
        """
        
    def find_run_by_id(self, task_id: str, run_id: str) -> Run:
        """
        Find specific task run.

        Parameters:
        - task_id (str): Task ID
        - run_id (str): Run ID

        Returns:
        Run: Task run object or None
        """
        
    def cancel_run(self, task_id: str, run_id: str) -> Run:
        """
        Cancel a running task execution.

        Parameters:
        - task_id (str): Task ID
        - run_id (str): Run ID to cancel

        Returns:
        Run: Cancelled run object
        """
        
    def retry_run(self, task_id: str, run_id: str) -> Run:
        """
        Retry a failed task execution.

        Parameters:
        - task_id (str): Task ID
        - run_id (str): Run ID to retry

        Returns:
        Run: New run object for retry
        """
        
    def find_logs(self, task_id: str, run_id: str = None, **kwargs) -> Logs:
        """
        Get task execution logs.

        Parameters:
        - task_id (str): Task ID
        - run_id (str, optional): Specific run ID (if None, gets logs for all runs)
        - **kwargs: Log query parameters

        Returns:
        Logs: Task execution log entries
        """

TasksApi Usage Examples

Task creation and management:

tasks_api = client.tasks_api()

# Create a data processing task
flux_script = '''
from(bucket: "raw_data")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "temperature")
  |> aggregateWindow(every: 5m, fn: mean)
  |> to(bucket: "processed_data")
'''

task = tasks_api.create_task(
    name="Temperature Aggregation",
    flux=flux_script,
    every="5m",  # Run every 5 minutes
    org_id="org_id_here"
)
print(f"Created task: {task.name} (ID: {task.id})")

# List all tasks
tasks = tasks_api.find_tasks(org_id="org_id_here")
for t in tasks:
    print(f"Task: {t.name} - Status: {t.status}")

# Manually run task
run = tasks_api.run_manually(task.id)
print(f"Started manual run: {run.id}")

# Monitor task runs
runs = tasks_api.find_runs(task.id, limit=10)
for run in runs:
    print(f"Run {run.id}: {run.status} at {run.started_at}")
    
    # Get logs for failed runs
    if run.status == "failed":
        logs = tasks_api.find_logs(task.id, run.id)
        print(f"Error logs: {logs}")

Task scheduling patterns:

# Cron-based scheduling
cron_task = tasks_api.create_task(
    name="Daily Report",
    flux=daily_report_flux,
    cron="0 9 * * *",  # 9 AM daily
    org_id="org_id_here"
)

# Interval-based scheduling  
interval_task = tasks_api.create_task(
    name="Real-time Processing",
    flux=realtime_flux,
    every="30s",  # Every 30 seconds
    org_id="org_id_here"
)

# Update task status
interval_task.status = "inactive"
updated_task = tasks_api.update_task(interval_task)

LabelsApi

API for managing InfluxDB labels for organizing and categorizing resources.

class LabelsApi:
    def __init__(self, influxdb_client): ...
    
    def create_label(self, label: Label = None, **kwargs) -> LabelResponse:
        """
        Create a new label.

        Parameters:
        - label (Label, optional): Pre-configured label object
        - **kwargs: Label properties (name, org_id, properties)

        Returns:
        LabelResponse: Created label object
        """
        
    def delete_label(self, label: Union[LabelResponse, str]) -> LabelResponse:
        """
        Delete a label.

        Parameters:
        - label (Union[LabelResponse, str]): Label object or label ID

        Returns:
        LabelResponse: Deleted label object
        """
        
    def find_label_by_id(self, label_id: str) -> LabelResponse:
        """
        Find label by ID.

        Parameters:
        - label_id (str): Label ID

        Returns:
        LabelResponse: Found label object or None
        """
        
    def find_labels(self, **kwargs) -> List[LabelResponse]:
        """
        List labels.

        Parameters:
        - **kwargs: Query parameters (org_id, name, limit)

        Returns:
        List[LabelResponse]: List of label objects
        """
        
    def update_label(self, label: LabelResponse) -> LabelResponse:
        """
        Update an existing label.

        Parameters:
        - label (LabelResponse): Label object with updated properties

        Returns:
        LabelResponse: Updated label object
        """

LabelsApi Usage Examples

Label management:

labels_api = client.labels_api()

# Create labels for organization
env_label = labels_api.create_label(
    name="environment",
    org_id="org_id_here",
    properties={"color": "#FF5733", "description": "Environment classification"}
)

team_label = labels_api.create_label(
    name="team:data-science", 
    org_id="org_id_here",
    properties={"color": "#33A1FF", "description": "Data Science team resources"}
)

# List all labels
labels = labels_api.find_labels(org_id="org_id_here")
for label in labels:
    print(f"Label: {label.name} - Color: {label.properties.get('color', 'N/A')}")

# Update label properties
env_label.properties["description"] = "Updated: Environment and deployment classification"
updated_label = labels_api.update_label(env_label)

Types

# Resource entity types
class Bucket:
    id: str
    name: str
    org_id: str
    description: str
    retention_rules: List[RetentionRule]
    created_at: datetime
    updated_at: datetime

class Organization:
    id: str
    name: str
    description: str
    created_at: datetime
    updated_at: datetime

class UserResponse:
    id: str
    name: str
    status: str  # "active", "inactive"
    created_at: datetime
    updated_at: datetime

class Authorization:
    id: str
    token: str
    status: str  # "active", "inactive"
    description: str
    org_id: str
    user_id: str
    permissions: List[Permission]
    created_at: datetime
    updated_at: datetime

class Task:
    id: str
    name: str
    org_id: str
    status: str  # "active", "inactive"
    flux: str
    cron: str
    every: str
    created_at: datetime
    updated_at: datetime
    latest_completed: datetime

class LabelResponse:
    id: str
    name: str
    org_id: str
    properties: dict
    created_at: datetime
    updated_at: datetime

# Permission and access control types
class Permission:
    action: str  # "read", "write"
    resource: PermissionResource

class PermissionResource:
    type: str    # "buckets", "tasks", "authorizations", etc.
    id: str      # Specific resource ID (optional)
    org_id: str  # Organization ID (optional)

# Retention policy types
class RetentionRule:
    type: str  # "expire"
    every_seconds: int
    shard_group_duration_seconds: int

# Task execution types
class Run:
    id: str
    task_id: str
    status: str  # "success", "failed", "canceled", "scheduled"
    scheduled_for: datetime
    started_at: datetime
    finished_at: datetime
    requested_at: datetime
    log: List[LogEvent]

class Logs:
    events: List[LogEvent]

class LogEvent:
    run_id: str
    time: datetime
    level: str  # "info", "error", "debug"
    message: str

# Query and filter types
ResourceFilter = Dict[str, Any]  # Generic resource filtering
PaginationParams = Dict[str, Union[str, int]]  # Pagination parameters

# Exception types
class ResourceNotFoundError(InfluxDBError):
    """Raised when requested resource is not found."""
    pass

class DuplicateResourceError(InfluxDBError):
    """Raised when attempting to create duplicate resource."""
    pass

class InsufficientPermissionsError(InfluxDBError):
    """Raised when user lacks required permissions."""
    pass

Install with Tessl CLI

npx tessl i tessl/pypi-influxdb-client

docs

advanced-operations.md

client-management.md

index.md

querying-data.md

resource-management.md

writing-data.md

tile.json