0
# Resource Management
1
2
Comprehensive functionality for managing InfluxDB resources including buckets, organizations, users, authorizations, tasks, and labels through dedicated API classes. These APIs provide complete CRUD operations and advanced management capabilities for all InfluxDB entities.
3
4
## Capabilities
5
6
### BucketsApi
7
8
API for managing InfluxDB buckets with support for creation, deletion, updates, and querying with retention policies and metadata.
9
10
```python { .api }
11
class BucketsApi:
12
def __init__(self, influxdb_client): ...
13
14
def create_bucket(
15
self,
16
bucket: Bucket = None,
17
bucket_name: str = None,
18
org_id: str = None,
19
retention_rules: List[RetentionRule] = None,
20
description: str = None,
21
org: str = None
22
) -> Bucket:
23
"""
24
Create a new bucket.
25
26
Parameters:
27
- bucket (Bucket, optional): Pre-configured bucket object
28
- bucket_name (str, optional): Name for the new bucket
29
- org_id (str, optional): Organization ID (takes precedence over org)
30
- retention_rules (List[RetentionRule], optional): Data retention policies
31
- description (str, optional): Bucket description
32
- org (str, optional): Organization name
33
34
Returns:
35
Bucket: Created bucket object
36
"""
37
38
def delete_bucket(self, bucket: Union[Bucket, str]) -> Bucket:
39
"""
40
Delete a bucket.
41
42
Parameters:
43
- bucket (Union[Bucket, str]): Bucket object or bucket ID
44
45
Returns:
46
Bucket: Deleted bucket object
47
"""
48
49
def find_bucket_by_id(self, bucket_id: str) -> Bucket:
50
"""
51
Find bucket by its ID.
52
53
Parameters:
54
- bucket_id (str): Bucket ID
55
56
Returns:
57
Bucket: Found bucket object or None
58
"""
59
60
def find_bucket_by_name(self, bucket_name: str, org: str = None) -> Bucket:
61
"""
62
Find bucket by name within organization.
63
64
Parameters:
65
- bucket_name (str): Bucket name to search for
66
- org (str, optional): Organization name or ID
67
68
Returns:
69
Bucket: Found bucket object or None
70
"""
71
72
def find_buckets(self, org: str = None, **kwargs) -> List[Bucket]:
73
"""
74
List all buckets, optionally filtered by organization.
75
76
Parameters:
77
- org (str, optional): Organization name or ID to filter by
78
- **kwargs: Additional query parameters (limit, offset, after, etc.)
79
80
Returns:
81
List[Bucket]: List of bucket objects
82
"""
83
84
def update_bucket(self, bucket: Bucket) -> Bucket:
85
"""
86
Update an existing bucket.
87
88
Parameters:
89
- bucket (Bucket): Bucket object with updated properties
90
91
Returns:
92
Bucket: Updated bucket object
93
"""
94
```
95
96
#### BucketsApi Usage Examples
97
98
**Basic bucket management:**
99
```python
100
from influxdb_client import InfluxDBClient, Bucket, RetentionRule
101
from datetime import timedelta
102
103
client = InfluxDBClient(url="http://localhost:8086", token="token", org="my-org")
104
buckets_api = client.buckets_api()
105
106
# Create bucket with retention policy
107
retention_rule = RetentionRule(type="expire", every_seconds=int(timedelta(days=30).total_seconds()))
108
bucket = buckets_api.create_bucket(
109
bucket_name="sensor_data",
110
description="IoT sensor measurements",
111
retention_rules=[retention_rule],
112
org="my-org"
113
)
114
print(f"Created bucket: {bucket.name} (ID: {bucket.id})")
115
116
# Find bucket by name
117
found_bucket = buckets_api.find_bucket_by_name("sensor_data", org="my-org")
118
if found_bucket:
119
print(f"Found bucket: {found_bucket.name}")
120
121
# List all buckets
122
all_buckets = buckets_api.find_buckets()
123
for b in all_buckets:
124
print(f"Bucket: {b.name} - {b.description}")
125
```
126
127
**Advanced bucket configuration:**
128
```python
129
# Create bucket with multiple retention rules
130
retention_rules = [
131
RetentionRule(type="expire", every_seconds=86400 * 7), # 7 days
132
RetentionRule(type="expire", every_seconds=86400 * 365) # 1 year
133
]
134
135
# Create using Bucket object
136
new_bucket = Bucket(
137
name="analytics_data",
138
description="Analytics and reporting data",
139
org_id="organization_id_here",
140
retention_rules=retention_rules
141
)
142
143
created_bucket = buckets_api.create_bucket(bucket=new_bucket)
144
145
# Update bucket description
146
created_bucket.description = "Updated analytics and reporting data"
147
updated_bucket = buckets_api.update_bucket(created_bucket)
148
149
# Delete bucket
150
buckets_api.delete_bucket(updated_bucket.id)
151
```
152
153
### OrganizationsApi
154
155
API for managing InfluxDB organizations with support for creation, deletion, updates, and membership management.
156
157
```python { .api }
158
class OrganizationsApi:
159
def __init__(self, influxdb_client): ...
160
161
def create_organization(self, org: Organization = None, **kwargs) -> Organization:
162
"""
163
Create a new organization.
164
165
Parameters:
166
- org (Organization, optional): Pre-configured organization object
167
- **kwargs: Organization properties (name, description, etc.)
168
169
Returns:
170
Organization: Created organization object
171
"""
172
173
def delete_organization(self, org: Union[Organization, str]) -> Organization:
174
"""
175
Delete an organization.
176
177
Parameters:
178
- org (Union[Organization, str]): Organization object or organization ID
179
180
Returns:
181
Organization: Deleted organization object
182
"""
183
184
def find_organization_by_id(self, org_id: str) -> Organization:
185
"""
186
Find organization by its ID.
187
188
Parameters:
189
- org_id (str): Organization ID
190
191
Returns:
192
Organization: Found organization object or None
193
"""
194
195
def find_organizations(self, **kwargs) -> List[Organization]:
196
"""
197
List all organizations.
198
199
Parameters:
200
- **kwargs: Query parameters (limit, offset, descending, etc.)
201
202
Returns:
203
List[Organization]: List of organization objects
204
"""
205
206
def update_organization(self, org: Organization) -> Organization:
207
"""
208
Update an existing organization.
209
210
Parameters:
211
- org (Organization): Organization object with updated properties
212
213
Returns:
214
Organization: Updated organization object
215
"""
216
```
217
218
#### OrganizationsApi Usage Examples
219
220
**Organization management:**
221
```python
222
orgs_api = client.organizations_api()
223
224
# Create new organization
225
new_org = orgs_api.create_organization(name="Data Science Team", description="Analytics and ML workloads")
226
print(f"Created org: {new_org.name} (ID: {new_org.id})")
227
228
# List all organizations
229
organizations = orgs_api.find_organizations()
230
for org in organizations:
231
print(f"Org: {org.name} - {org.description}")
232
233
# Find specific organization
234
found_org = orgs_api.find_organization_by_id(new_org.id)
235
if found_org:
236
# Update organization
237
found_org.description = "Updated: Analytics, ML, and BI workloads"
238
updated_org = orgs_api.update_organization(found_org)
239
print(f"Updated org description: {updated_org.description}")
240
```
241
242
### UsersApi
243
244
API for managing InfluxDB users with support for user creation, updates, and current user information.
245
246
```python { .api }
247
class UsersApi:
248
def __init__(self, influxdb_client): ...
249
250
def create_user(self, user: User = None, **kwargs) -> UserResponse:
251
"""
252
Create a new user.
253
254
Parameters:
255
- user (User, optional): Pre-configured user object
256
- **kwargs: User properties (name, status, etc.)
257
258
Returns:
259
UserResponse: Created user object
260
"""
261
262
def delete_user(self, user: Union[UserResponse, str]) -> UserResponse:
263
"""
264
Delete a user.
265
266
Parameters:
267
- user (Union[UserResponse, str]): User object or user ID
268
269
Returns:
270
UserResponse: Deleted user object
271
"""
272
273
def find_user_by_id(self, user_id: str) -> UserResponse:
274
"""
275
Find user by ID.
276
277
Parameters:
278
- user_id (str): User ID
279
280
Returns:
281
UserResponse: Found user object or None
282
"""
283
284
def find_users(self, **kwargs) -> List[UserResponse]:
285
"""
286
List all users.
287
288
Parameters:
289
- **kwargs: Query parameters (limit, offset, after, name, id)
290
291
Returns:
292
List[UserResponse]: List of user objects
293
"""
294
295
def update_user(self, user: UserResponse) -> UserResponse:
296
"""
297
Update an existing user.
298
299
Parameters:
300
- user (UserResponse): User object with updated properties
301
302
Returns:
303
UserResponse: Updated user object
304
"""
305
306
def me(self) -> UserResponse:
307
"""
308
Get current user information.
309
310
Returns:
311
UserResponse: Current user object
312
"""
313
```
314
315
#### UsersApi Usage Examples
316
317
**User management:**
318
```python
319
users_api = client.users_api()
320
321
# Get current user info
322
current_user = users_api.me()
323
print(f"Current user: {current_user.name} (ID: {current_user.id})")
324
325
# Create new user
326
new_user = users_api.create_user(name="john.doe", status="active")
327
print(f"Created user: {new_user.name}")
328
329
# List all users
330
users = users_api.find_users()
331
for user in users:
332
print(f"User: {user.name} - Status: {user.status}")
333
334
# Find and update user
335
found_user = users_api.find_user_by_id(new_user.id)
336
if found_user:
337
found_user.status = "inactive"
338
updated_user = users_api.update_user(found_user)
339
print(f"Updated user status: {updated_user.status}")
340
```
341
342
### AuthorizationsApi
343
344
API for managing InfluxDB authentication tokens and permissions with support for fine-grained access control.
345
346
```python { .api }
347
class AuthorizationsApi:
348
def __init__(self, influxdb_client): ...
349
350
def create_authorization(
351
self,
352
org_id: str = None,
353
permissions: List[Permission] = None,
354
**kwargs
355
) -> Authorization:
356
"""
357
Create a new authorization token.
358
359
Parameters:
360
- org_id (str, optional): Organization ID for the authorization
361
- permissions (List[Permission], optional): List of permission objects
362
- **kwargs: Additional authorization properties (user_id, description, status)
363
364
Returns:
365
Authorization: Created authorization object with token
366
"""
367
368
def delete_authorization(self, authorization: Union[Authorization, str]) -> Authorization:
369
"""
370
Delete an authorization token.
371
372
Parameters:
373
- authorization (Union[Authorization, str]): Authorization object or ID
374
375
Returns:
376
Authorization: Deleted authorization object
377
"""
378
379
def find_authorization_by_id(self, authorization_id: str) -> Authorization:
380
"""
381
Find authorization by ID.
382
383
Parameters:
384
- authorization_id (str): Authorization ID
385
386
Returns:
387
Authorization: Found authorization object or None
388
"""
389
390
def find_authorizations(self, **kwargs) -> List[Authorization]:
391
"""
392
List authorizations.
393
394
Parameters:
395
- **kwargs: Query parameters (user_id, user, org_id, org, token)
396
397
Returns:
398
List[Authorization]: List of authorization objects
399
"""
400
401
def update_authorization(self, authorization: Authorization) -> Authorization:
402
"""
403
Update an authorization.
404
405
Parameters:
406
- authorization (Authorization): Authorization object with updated properties
407
408
Returns:
409
Authorization: Updated authorization object
410
"""
411
```
412
413
#### AuthorizationsApi Usage Examples
414
415
**Token and permission management:**
416
```python
417
from influxdb_client import Permission, PermissionResource
418
419
auth_api = client.authorizations_api()
420
421
# Create read permission for specific bucket
422
read_permission = Permission(
423
action="read",
424
resource=PermissionResource(type="buckets", id="bucket_id_here")
425
)
426
427
# Create write permission for specific bucket
428
write_permission = Permission(
429
action="write",
430
resource=PermissionResource(type="buckets", id="bucket_id_here")
431
)
432
433
# Create authorization with specific permissions
434
auth = auth_api.create_authorization(
435
org_id="org_id_here",
436
permissions=[read_permission, write_permission],
437
description="Sensor data access token"
438
)
439
print(f"Created token: {auth.token[:10]}...")
440
441
# List all authorizations for organization
442
authorizations = auth_api.find_authorizations(org_id="org_id_here")
443
for auth in authorizations:
444
print(f"Token: {auth.id} - Status: {auth.status}")
445
446
# Update authorization status
447
auth.status = "inactive"
448
updated_auth = auth_api.update_authorization(auth)
449
```
450
451
**Create organization-wide permissions:**
452
```python
453
# Create permissions for all buckets in organization
454
all_buckets_read = Permission(
455
action="read",
456
resource=PermissionResource(type="buckets", org_id="org_id_here")
457
)
458
459
all_buckets_write = Permission(
460
action="write",
461
resource=PermissionResource(type="buckets", org_id="org_id_here")
462
)
463
464
# Create admin-level authorization
465
admin_auth = auth_api.create_authorization(
466
org_id="org_id_here",
467
permissions=[all_buckets_read, all_buckets_write],
468
description="Organization admin token"
469
)
470
```
471
472
### TasksApi
473
474
API for managing InfluxDB tasks with support for task execution, monitoring, and log management.
475
476
```python { .api }
477
class TasksApi:
478
def __init__(self, influxdb_client): ...
479
480
def create_task(self, task: Task = None, **kwargs) -> Task:
481
"""
482
Create a new task.
483
484
Parameters:
485
- task (Task, optional): Pre-configured task object
486
- **kwargs: Task properties (name, flux, org_id, status, cron, every)
487
488
Returns:
489
Task: Created task object
490
"""
491
492
def delete_task(self, task: Union[Task, str]) -> Task:
493
"""
494
Delete a task.
495
496
Parameters:
497
- task (Union[Task, str]): Task object or task ID
498
499
Returns:
500
Task: Deleted task object
501
"""
502
503
def find_task_by_id(self, task_id: str) -> Task:
504
"""
505
Find task by ID.
506
507
Parameters:
508
- task_id (str): Task ID
509
510
Returns:
511
Task: Found task object or None
512
"""
513
514
def find_tasks(self, **kwargs) -> List[Task]:
515
"""
516
List tasks.
517
518
Parameters:
519
- **kwargs: Query parameters (after, user_id, org_id, org, status, limit)
520
521
Returns:
522
List[Task]: List of task objects
523
"""
524
525
def update_task(self, task: Task) -> Task:
526
"""
527
Update an existing task.
528
529
Parameters:
530
- task (Task): Task object with updated properties
531
532
Returns:
533
Task: Updated task object
534
"""
535
536
def run_manually(self, task_id: str, **kwargs) -> Run:
537
"""
538
Manually execute a task.
539
540
Parameters:
541
- task_id (str): Task ID to execute
542
- **kwargs: Execution parameters
543
544
Returns:
545
Run: Task execution run object
546
"""
547
548
def find_runs(self, task_id: str, **kwargs) -> List[Run]:
549
"""
550
List task execution runs.
551
552
Parameters:
553
- task_id (str): Task ID
554
- **kwargs: Query parameters (after, limit, after_time, before_time)
555
556
Returns:
557
List[Run]: List of task run objects
558
"""
559
560
def find_run_by_id(self, task_id: str, run_id: str) -> Run:
561
"""
562
Find specific task run.
563
564
Parameters:
565
- task_id (str): Task ID
566
- run_id (str): Run ID
567
568
Returns:
569
Run: Task run object or None
570
"""
571
572
def cancel_run(self, task_id: str, run_id: str) -> Run:
573
"""
574
Cancel a running task execution.
575
576
Parameters:
577
- task_id (str): Task ID
578
- run_id (str): Run ID to cancel
579
580
Returns:
581
Run: Cancelled run object
582
"""
583
584
def retry_run(self, task_id: str, run_id: str) -> Run:
585
"""
586
Retry a failed task execution.
587
588
Parameters:
589
- task_id (str): Task ID
590
- run_id (str): Run ID to retry
591
592
Returns:
593
Run: New run object for retry
594
"""
595
596
def find_logs(self, task_id: str, run_id: str = None, **kwargs) -> Logs:
597
"""
598
Get task execution logs.
599
600
Parameters:
601
- task_id (str): Task ID
602
- run_id (str, optional): Specific run ID (if None, gets logs for all runs)
603
- **kwargs: Log query parameters
604
605
Returns:
606
Logs: Task execution log entries
607
"""
608
```
609
610
#### TasksApi Usage Examples
611
612
**Task creation and management:**
613
```python
614
tasks_api = client.tasks_api()
615
616
# Create a data processing task
617
flux_script = '''
618
from(bucket: "raw_data")
619
|> range(start: -1h)
620
|> filter(fn: (r) => r._measurement == "temperature")
621
|> aggregateWindow(every: 5m, fn: mean)
622
|> to(bucket: "processed_data")
623
'''
624
625
task = tasks_api.create_task(
626
name="Temperature Aggregation",
627
flux=flux_script,
628
every="5m", # Run every 5 minutes
629
org_id="org_id_here"
630
)
631
print(f"Created task: {task.name} (ID: {task.id})")
632
633
# List all tasks
634
tasks = tasks_api.find_tasks(org_id="org_id_here")
635
for t in tasks:
636
print(f"Task: {t.name} - Status: {t.status}")
637
638
# Manually run task
639
run = tasks_api.run_manually(task.id)
640
print(f"Started manual run: {run.id}")
641
642
# Monitor task runs
643
runs = tasks_api.find_runs(task.id, limit=10)
644
for run in runs:
645
print(f"Run {run.id}: {run.status} at {run.started_at}")
646
647
# Get logs for failed runs
648
if run.status == "failed":
649
logs = tasks_api.find_logs(task.id, run.id)
650
print(f"Error logs: {logs}")
651
```
652
653
**Task scheduling patterns:**
654
```python
655
# Cron-based scheduling
656
cron_task = tasks_api.create_task(
657
name="Daily Report",
658
flux=daily_report_flux,
659
cron="0 9 * * *", # 9 AM daily
660
org_id="org_id_here"
661
)
662
663
# Interval-based scheduling
664
interval_task = tasks_api.create_task(
665
name="Real-time Processing",
666
flux=realtime_flux,
667
every="30s", # Every 30 seconds
668
org_id="org_id_here"
669
)
670
671
# Update task status
672
interval_task.status = "inactive"
673
updated_task = tasks_api.update_task(interval_task)
674
```
675
676
### LabelsApi
677
678
API for managing InfluxDB labels for organizing and categorizing resources.
679
680
```python { .api }
681
class LabelsApi:
682
def __init__(self, influxdb_client): ...
683
684
def create_label(self, label: Label = None, **kwargs) -> LabelResponse:
685
"""
686
Create a new label.
687
688
Parameters:
689
- label (Label, optional): Pre-configured label object
690
- **kwargs: Label properties (name, org_id, properties)
691
692
Returns:
693
LabelResponse: Created label object
694
"""
695
696
def delete_label(self, label: Union[LabelResponse, str]) -> LabelResponse:
697
"""
698
Delete a label.
699
700
Parameters:
701
- label (Union[LabelResponse, str]): Label object or label ID
702
703
Returns:
704
LabelResponse: Deleted label object
705
"""
706
707
def find_label_by_id(self, label_id: str) -> LabelResponse:
708
"""
709
Find label by ID.
710
711
Parameters:
712
- label_id (str): Label ID
713
714
Returns:
715
LabelResponse: Found label object or None
716
"""
717
718
def find_labels(self, **kwargs) -> List[LabelResponse]:
719
"""
720
List labels.
721
722
Parameters:
723
- **kwargs: Query parameters (org_id, name, limit)
724
725
Returns:
726
List[LabelResponse]: List of label objects
727
"""
728
729
def update_label(self, label: LabelResponse) -> LabelResponse:
730
"""
731
Update an existing label.
732
733
Parameters:
734
- label (LabelResponse): Label object with updated properties
735
736
Returns:
737
LabelResponse: Updated label object
738
"""
739
```
740
741
#### LabelsApi Usage Examples
742
743
**Label management:**
744
```python
745
labels_api = client.labels_api()
746
747
# Create labels for organization
748
env_label = labels_api.create_label(
749
name="environment",
750
org_id="org_id_here",
751
properties={"color": "#FF5733", "description": "Environment classification"}
752
)
753
754
team_label = labels_api.create_label(
755
name="team:data-science",
756
org_id="org_id_here",
757
properties={"color": "#33A1FF", "description": "Data Science team resources"}
758
)
759
760
# List all labels
761
labels = labels_api.find_labels(org_id="org_id_here")
762
for label in labels:
763
print(f"Label: {label.name} - Color: {label.properties.get('color', 'N/A')}")
764
765
# Update label properties
766
env_label.properties["description"] = "Updated: Environment and deployment classification"
767
updated_label = labels_api.update_label(env_label)
768
```
769
770
## Types
771
772
```python { .api }
773
# Resource entity types
774
class Bucket:
775
id: str
776
name: str
777
org_id: str
778
description: str
779
retention_rules: List[RetentionRule]
780
created_at: datetime
781
updated_at: datetime
782
783
class Organization:
784
id: str
785
name: str
786
description: str
787
created_at: datetime
788
updated_at: datetime
789
790
class UserResponse:
791
id: str
792
name: str
793
status: str # "active", "inactive"
794
created_at: datetime
795
updated_at: datetime
796
797
class Authorization:
798
id: str
799
token: str
800
status: str # "active", "inactive"
801
description: str
802
org_id: str
803
user_id: str
804
permissions: List[Permission]
805
created_at: datetime
806
updated_at: datetime
807
808
class Task:
809
id: str
810
name: str
811
org_id: str
812
status: str # "active", "inactive"
813
flux: str
814
cron: str
815
every: str
816
created_at: datetime
817
updated_at: datetime
818
latest_completed: datetime
819
820
class LabelResponse:
821
id: str
822
name: str
823
org_id: str
824
properties: dict
825
created_at: datetime
826
updated_at: datetime
827
828
# Permission and access control types
829
class Permission:
830
action: str # "read", "write"
831
resource: PermissionResource
832
833
class PermissionResource:
834
type: str # "buckets", "tasks", "authorizations", etc.
835
id: str # Specific resource ID (optional)
836
org_id: str # Organization ID (optional)
837
838
# Retention policy types
839
class RetentionRule:
840
type: str # "expire"
841
every_seconds: int
842
shard_group_duration_seconds: int
843
844
# Task execution types
845
class Run:
846
id: str
847
task_id: str
848
status: str # "success", "failed", "canceled", "scheduled"
849
scheduled_for: datetime
850
started_at: datetime
851
finished_at: datetime
852
requested_at: datetime
853
log: List[LogEvent]
854
855
class Logs:
856
events: List[LogEvent]
857
858
class LogEvent:
859
run_id: str
860
time: datetime
861
level: str # "info", "error", "debug"
862
message: str
863
864
# Query and filter types
865
ResourceFilter = Dict[str, Any] # Generic resource filtering
866
PaginationParams = Dict[str, Union[str, int]] # Pagination parameters
867
868
# Exception types
869
class ResourceNotFoundError(InfluxDBError):
870
"""Raised when requested resource is not found."""
871
pass
872
873
class DuplicateResourceError(InfluxDBError):
874
"""Raised when attempting to create duplicate resource."""
875
pass
876
877
class InsufficientPermissionsError(InfluxDBError):
878
"""Raised when user lacks required permissions."""
879
pass
880
```