Comprehensive Python client library for InfluxDB 2.x with sync/async APIs for writing, querying, and managing time series data.
82
Comprehensive functionality for managing InfluxDB resources including buckets, organizations, users, authorizations, tasks, and labels through dedicated API classes. These APIs provide complete CRUD operations and advanced management capabilities for all InfluxDB entities.
API for managing InfluxDB buckets with support for creation, deletion, updates, and querying with retention policies and metadata.
class BucketsApi:
def __init__(self, influxdb_client): ...
def create_bucket(
self,
bucket: Bucket = None,
bucket_name: str = None,
org_id: str = None,
retention_rules: List[RetentionRule] = None,
description: str = None,
org: str = None
) -> Bucket:
"""
Create a new bucket.
Parameters:
- bucket (Bucket, optional): Pre-configured bucket object
- bucket_name (str, optional): Name for the new bucket
- org_id (str, optional): Organization ID (takes precedence over org)
- retention_rules (List[RetentionRule], optional): Data retention policies
- description (str, optional): Bucket description
- org (str, optional): Organization name
Returns:
Bucket: Created bucket object
"""
def delete_bucket(self, bucket: Union[Bucket, str]) -> Bucket:
"""
Delete a bucket.
Parameters:
- bucket (Union[Bucket, str]): Bucket object or bucket ID
Returns:
Bucket: Deleted bucket object
"""
def find_bucket_by_id(self, bucket_id: str) -> Bucket:
"""
Find bucket by its ID.
Parameters:
- bucket_id (str): Bucket ID
Returns:
Bucket: Found bucket object or None
"""
def find_bucket_by_name(self, bucket_name: str, org: str = None) -> Bucket:
"""
Find bucket by name within organization.
Parameters:
- bucket_name (str): Bucket name to search for
- org (str, optional): Organization name or ID
Returns:
Bucket: Found bucket object or None
"""
def find_buckets(self, org: str = None, **kwargs) -> List[Bucket]:
"""
List all buckets, optionally filtered by organization.
Parameters:
- org (str, optional): Organization name or ID to filter by
- **kwargs: Additional query parameters (limit, offset, after, etc.)
Returns:
List[Bucket]: List of bucket objects
"""
def update_bucket(self, bucket: Bucket) -> Bucket:
"""
Update an existing bucket.
Parameters:
- bucket (Bucket): Bucket object with updated properties
Returns:
Bucket: Updated bucket object
"""Basic bucket management:
from influxdb_client import InfluxDBClient, Bucket, RetentionRule
from datetime import timedelta
client = InfluxDBClient(url="http://localhost:8086", token="token", org="my-org")
buckets_api = client.buckets_api()
# Create bucket with retention policy
retention_rule = RetentionRule(type="expire", every_seconds=int(timedelta(days=30).total_seconds()))
bucket = buckets_api.create_bucket(
bucket_name="sensor_data",
description="IoT sensor measurements",
retention_rules=[retention_rule],
org="my-org"
)
print(f"Created bucket: {bucket.name} (ID: {bucket.id})")
# Find bucket by name
found_bucket = buckets_api.find_bucket_by_name("sensor_data", org="my-org")
if found_bucket:
print(f"Found bucket: {found_bucket.name}")
# List all buckets
all_buckets = buckets_api.find_buckets()
for b in all_buckets:
print(f"Bucket: {b.name} - {b.description}")Advanced bucket configuration:
# Create bucket with multiple retention rules
retention_rules = [
RetentionRule(type="expire", every_seconds=86400 * 7), # 7 days
RetentionRule(type="expire", every_seconds=86400 * 365) # 1 year
]
# Create using Bucket object
new_bucket = Bucket(
name="analytics_data",
description="Analytics and reporting data",
org_id="organization_id_here",
retention_rules=retention_rules
)
created_bucket = buckets_api.create_bucket(bucket=new_bucket)
# Update bucket description
created_bucket.description = "Updated analytics and reporting data"
updated_bucket = buckets_api.update_bucket(created_bucket)
# Delete bucket
buckets_api.delete_bucket(updated_bucket.id)API for managing InfluxDB organizations with support for creation, deletion, updates, and membership management.
class OrganizationsApi:
def __init__(self, influxdb_client): ...
def create_organization(self, org: Organization = None, **kwargs) -> Organization:
"""
Create a new organization.
Parameters:
- org (Organization, optional): Pre-configured organization object
- **kwargs: Organization properties (name, description, etc.)
Returns:
Organization: Created organization object
"""
def delete_organization(self, org: Union[Organization, str]) -> Organization:
"""
Delete an organization.
Parameters:
- org (Union[Organization, str]): Organization object or organization ID
Returns:
Organization: Deleted organization object
"""
def find_organization_by_id(self, org_id: str) -> Organization:
"""
Find organization by its ID.
Parameters:
- org_id (str): Organization ID
Returns:
Organization: Found organization object or None
"""
def find_organizations(self, **kwargs) -> List[Organization]:
"""
List all organizations.
Parameters:
- **kwargs: Query parameters (limit, offset, descending, etc.)
Returns:
List[Organization]: List of organization objects
"""
def update_organization(self, org: Organization) -> Organization:
"""
Update an existing organization.
Parameters:
- org (Organization): Organization object with updated properties
Returns:
Organization: Updated organization object
"""Organization management:
orgs_api = client.organizations_api()
# Create new organization
new_org = orgs_api.create_organization(name="Data Science Team", description="Analytics and ML workloads")
print(f"Created org: {new_org.name} (ID: {new_org.id})")
# List all organizations
organizations = orgs_api.find_organizations()
for org in organizations:
print(f"Org: {org.name} - {org.description}")
# Find specific organization
found_org = orgs_api.find_organization_by_id(new_org.id)
if found_org:
# Update organization
found_org.description = "Updated: Analytics, ML, and BI workloads"
updated_org = orgs_api.update_organization(found_org)
print(f"Updated org description: {updated_org.description}")API for managing InfluxDB users with support for user creation, updates, and current user information.
class UsersApi:
def __init__(self, influxdb_client): ...
def create_user(self, user: User = None, **kwargs) -> UserResponse:
"""
Create a new user.
Parameters:
- user (User, optional): Pre-configured user object
- **kwargs: User properties (name, status, etc.)
Returns:
UserResponse: Created user object
"""
def delete_user(self, user: Union[UserResponse, str]) -> UserResponse:
"""
Delete a user.
Parameters:
- user (Union[UserResponse, str]): User object or user ID
Returns:
UserResponse: Deleted user object
"""
def find_user_by_id(self, user_id: str) -> UserResponse:
"""
Find user by ID.
Parameters:
- user_id (str): User ID
Returns:
UserResponse: Found user object or None
"""
def find_users(self, **kwargs) -> List[UserResponse]:
"""
List all users.
Parameters:
- **kwargs: Query parameters (limit, offset, after, name, id)
Returns:
List[UserResponse]: List of user objects
"""
def update_user(self, user: UserResponse) -> UserResponse:
"""
Update an existing user.
Parameters:
- user (UserResponse): User object with updated properties
Returns:
UserResponse: Updated user object
"""
def me(self) -> UserResponse:
"""
Get current user information.
Returns:
UserResponse: Current user object
"""User management:
users_api = client.users_api()
# Get current user info
current_user = users_api.me()
print(f"Current user: {current_user.name} (ID: {current_user.id})")
# Create new user
new_user = users_api.create_user(name="john.doe", status="active")
print(f"Created user: {new_user.name}")
# List all users
users = users_api.find_users()
for user in users:
print(f"User: {user.name} - Status: {user.status}")
# Find and update user
found_user = users_api.find_user_by_id(new_user.id)
if found_user:
found_user.status = "inactive"
updated_user = users_api.update_user(found_user)
print(f"Updated user status: {updated_user.status}")API for managing InfluxDB authentication tokens and permissions with support for fine-grained access control.
class AuthorizationsApi:
def __init__(self, influxdb_client): ...
def create_authorization(
self,
org_id: str = None,
permissions: List[Permission] = None,
**kwargs
) -> Authorization:
"""
Create a new authorization token.
Parameters:
- org_id (str, optional): Organization ID for the authorization
- permissions (List[Permission], optional): List of permission objects
- **kwargs: Additional authorization properties (user_id, description, status)
Returns:
Authorization: Created authorization object with token
"""
def delete_authorization(self, authorization: Union[Authorization, str]) -> Authorization:
"""
Delete an authorization token.
Parameters:
- authorization (Union[Authorization, str]): Authorization object or ID
Returns:
Authorization: Deleted authorization object
"""
def find_authorization_by_id(self, authorization_id: str) -> Authorization:
"""
Find authorization by ID.
Parameters:
- authorization_id (str): Authorization ID
Returns:
Authorization: Found authorization object or None
"""
def find_authorizations(self, **kwargs) -> List[Authorization]:
"""
List authorizations.
Parameters:
- **kwargs: Query parameters (user_id, user, org_id, org, token)
Returns:
List[Authorization]: List of authorization objects
"""
def update_authorization(self, authorization: Authorization) -> Authorization:
"""
Update an authorization.
Parameters:
- authorization (Authorization): Authorization object with updated properties
Returns:
Authorization: Updated authorization object
"""Token and permission management:
from influxdb_client import Permission, PermissionResource
auth_api = client.authorizations_api()
# Create read permission for specific bucket
read_permission = Permission(
action="read",
resource=PermissionResource(type="buckets", id="bucket_id_here")
)
# Create write permission for specific bucket
write_permission = Permission(
action="write",
resource=PermissionResource(type="buckets", id="bucket_id_here")
)
# Create authorization with specific permissions
auth = auth_api.create_authorization(
org_id="org_id_here",
permissions=[read_permission, write_permission],
description="Sensor data access token"
)
print(f"Created token: {auth.token[:10]}...")
# List all authorizations for organization
authorizations = auth_api.find_authorizations(org_id="org_id_here")
for auth in authorizations:
print(f"Token: {auth.id} - Status: {auth.status}")
# Update authorization status
auth.status = "inactive"
updated_auth = auth_api.update_authorization(auth)Create organization-wide permissions:
# Create permissions for all buckets in organization
all_buckets_read = Permission(
action="read",
resource=PermissionResource(type="buckets", org_id="org_id_here")
)
all_buckets_write = Permission(
action="write",
resource=PermissionResource(type="buckets", org_id="org_id_here")
)
# Create admin-level authorization
admin_auth = auth_api.create_authorization(
org_id="org_id_here",
permissions=[all_buckets_read, all_buckets_write],
description="Organization admin token"
)API for managing InfluxDB tasks with support for task execution, monitoring, and log management.
class TasksApi:
def __init__(self, influxdb_client): ...
def create_task(self, task: Task = None, **kwargs) -> Task:
"""
Create a new task.
Parameters:
- task (Task, optional): Pre-configured task object
- **kwargs: Task properties (name, flux, org_id, status, cron, every)
Returns:
Task: Created task object
"""
def delete_task(self, task: Union[Task, str]) -> Task:
"""
Delete a task.
Parameters:
- task (Union[Task, str]): Task object or task ID
Returns:
Task: Deleted task object
"""
def find_task_by_id(self, task_id: str) -> Task:
"""
Find task by ID.
Parameters:
- task_id (str): Task ID
Returns:
Task: Found task object or None
"""
def find_tasks(self, **kwargs) -> List[Task]:
"""
List tasks.
Parameters:
- **kwargs: Query parameters (after, user_id, org_id, org, status, limit)
Returns:
List[Task]: List of task objects
"""
def update_task(self, task: Task) -> Task:
"""
Update an existing task.
Parameters:
- task (Task): Task object with updated properties
Returns:
Task: Updated task object
"""
def run_manually(self, task_id: str, **kwargs) -> Run:
"""
Manually execute a task.
Parameters:
- task_id (str): Task ID to execute
- **kwargs: Execution parameters
Returns:
Run: Task execution run object
"""
def find_runs(self, task_id: str, **kwargs) -> List[Run]:
"""
List task execution runs.
Parameters:
- task_id (str): Task ID
- **kwargs: Query parameters (after, limit, after_time, before_time)
Returns:
List[Run]: List of task run objects
"""
def find_run_by_id(self, task_id: str, run_id: str) -> Run:
"""
Find specific task run.
Parameters:
- task_id (str): Task ID
- run_id (str): Run ID
Returns:
Run: Task run object or None
"""
def cancel_run(self, task_id: str, run_id: str) -> Run:
"""
Cancel a running task execution.
Parameters:
- task_id (str): Task ID
- run_id (str): Run ID to cancel
Returns:
Run: Cancelled run object
"""
def retry_run(self, task_id: str, run_id: str) -> Run:
"""
Retry a failed task execution.
Parameters:
- task_id (str): Task ID
- run_id (str): Run ID to retry
Returns:
Run: New run object for retry
"""
def find_logs(self, task_id: str, run_id: str = None, **kwargs) -> Logs:
"""
Get task execution logs.
Parameters:
- task_id (str): Task ID
- run_id (str, optional): Specific run ID (if None, gets logs for all runs)
- **kwargs: Log query parameters
Returns:
Logs: Task execution log entries
"""Task creation and management:
tasks_api = client.tasks_api()
# Create a data processing task
flux_script = '''
from(bucket: "raw_data")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "temperature")
|> aggregateWindow(every: 5m, fn: mean)
|> to(bucket: "processed_data")
'''
task = tasks_api.create_task(
name="Temperature Aggregation",
flux=flux_script,
every="5m", # Run every 5 minutes
org_id="org_id_here"
)
print(f"Created task: {task.name} (ID: {task.id})")
# List all tasks
tasks = tasks_api.find_tasks(org_id="org_id_here")
for t in tasks:
print(f"Task: {t.name} - Status: {t.status}")
# Manually run task
run = tasks_api.run_manually(task.id)
print(f"Started manual run: {run.id}")
# Monitor task runs
runs = tasks_api.find_runs(task.id, limit=10)
for run in runs:
print(f"Run {run.id}: {run.status} at {run.started_at}")
# Get logs for failed runs
if run.status == "failed":
logs = tasks_api.find_logs(task.id, run.id)
print(f"Error logs: {logs}")Task scheduling patterns:
# Cron-based scheduling
cron_task = tasks_api.create_task(
name="Daily Report",
flux=daily_report_flux,
cron="0 9 * * *", # 9 AM daily
org_id="org_id_here"
)
# Interval-based scheduling
interval_task = tasks_api.create_task(
name="Real-time Processing",
flux=realtime_flux,
every="30s", # Every 30 seconds
org_id="org_id_here"
)
# Update task status
interval_task.status = "inactive"
updated_task = tasks_api.update_task(interval_task)API for managing InfluxDB labels for organizing and categorizing resources.
class LabelsApi:
def __init__(self, influxdb_client): ...
def create_label(self, label: Label = None, **kwargs) -> LabelResponse:
"""
Create a new label.
Parameters:
- label (Label, optional): Pre-configured label object
- **kwargs: Label properties (name, org_id, properties)
Returns:
LabelResponse: Created label object
"""
def delete_label(self, label: Union[LabelResponse, str]) -> LabelResponse:
"""
Delete a label.
Parameters:
- label (Union[LabelResponse, str]): Label object or label ID
Returns:
LabelResponse: Deleted label object
"""
def find_label_by_id(self, label_id: str) -> LabelResponse:
"""
Find label by ID.
Parameters:
- label_id (str): Label ID
Returns:
LabelResponse: Found label object or None
"""
def find_labels(self, **kwargs) -> List[LabelResponse]:
"""
List labels.
Parameters:
- **kwargs: Query parameters (org_id, name, limit)
Returns:
List[LabelResponse]: List of label objects
"""
def update_label(self, label: LabelResponse) -> LabelResponse:
"""
Update an existing label.
Parameters:
- label (LabelResponse): Label object with updated properties
Returns:
LabelResponse: Updated label object
"""Label management:
labels_api = client.labels_api()
# Create labels for organization
env_label = labels_api.create_label(
name="environment",
org_id="org_id_here",
properties={"color": "#FF5733", "description": "Environment classification"}
)
team_label = labels_api.create_label(
name="team:data-science",
org_id="org_id_here",
properties={"color": "#33A1FF", "description": "Data Science team resources"}
)
# List all labels
labels = labels_api.find_labels(org_id="org_id_here")
for label in labels:
print(f"Label: {label.name} - Color: {label.properties.get('color', 'N/A')}")
# Update label properties
env_label.properties["description"] = "Updated: Environment and deployment classification"
updated_label = labels_api.update_label(env_label)# Resource entity types
class Bucket:
id: str
name: str
org_id: str
description: str
retention_rules: List[RetentionRule]
created_at: datetime
updated_at: datetime
class Organization:
id: str
name: str
description: str
created_at: datetime
updated_at: datetime
class UserResponse:
id: str
name: str
status: str # "active", "inactive"
created_at: datetime
updated_at: datetime
class Authorization:
id: str
token: str
status: str # "active", "inactive"
description: str
org_id: str
user_id: str
permissions: List[Permission]
created_at: datetime
updated_at: datetime
class Task:
id: str
name: str
org_id: str
status: str # "active", "inactive"
flux: str
cron: str
every: str
created_at: datetime
updated_at: datetime
latest_completed: datetime
class LabelResponse:
id: str
name: str
org_id: str
properties: dict
created_at: datetime
updated_at: datetime
# Permission and access control types
class Permission:
action: str # "read", "write"
resource: PermissionResource
class PermissionResource:
type: str # "buckets", "tasks", "authorizations", etc.
id: str # Specific resource ID (optional)
org_id: str # Organization ID (optional)
# Retention policy types
class RetentionRule:
type: str # "expire"
every_seconds: int
shard_group_duration_seconds: int
# Task execution types
class Run:
id: str
task_id: str
status: str # "success", "failed", "canceled", "scheduled"
scheduled_for: datetime
started_at: datetime
finished_at: datetime
requested_at: datetime
log: List[LogEvent]
class Logs:
events: List[LogEvent]
class LogEvent:
run_id: str
time: datetime
level: str # "info", "error", "debug"
message: str
# Query and filter types
ResourceFilter = Dict[str, Any] # Generic resource filtering
PaginationParams = Dict[str, Union[str, int]] # Pagination parameters
# Exception types
class ResourceNotFoundError(InfluxDBError):
"""Raised when requested resource is not found."""
pass
class DuplicateResourceError(InfluxDBError):
"""Raised when attempting to create duplicate resource."""
pass
class InsufficientPermissionsError(InfluxDBError):
"""Raised when user lacks required permissions."""
passInstall with Tessl CLI
npx tessl i tessl/pypi-influxdb-clientdocs
evals
scenario-1
scenario-2
scenario-3
scenario-4
scenario-5
scenario-6
scenario-7
scenario-8
scenario-9
scenario-10