0
# Database Operations
1
2
Complete database lifecycle management including database creation and deletion, user management with permissions, retention policies for data lifecycle, continuous queries for real-time aggregation, and measurement operations for schema management.
3
4
## Capabilities
5
6
### Database Management
7
8
Core database operations for creating, listing, and managing InfluxDB databases.
9
10
```python { .api }
11
def create_database(self, dbname):
12
"""
13
Create a new database.
14
15
Parameters:
16
- dbname (str): Name of the database to create
17
18
Returns:
19
bool: True if successful
20
21
Raises:
22
InfluxDBClientError: If database creation fails
23
"""
24
25
def drop_database(self, dbname):
26
"""
27
Delete a database and all its data.
28
29
Parameters:
30
- dbname (str): Name of the database to delete
31
32
Returns:
33
bool: True if successful
34
35
Raises:
36
InfluxDBClientError: If database deletion fails
37
"""
38
39
def get_list_database(self):
40
"""
41
Get list of all databases on the InfluxDB server.
42
43
Returns:
44
list: List of database names
45
46
Raises:
47
InfluxDBClientError: If unable to retrieve database list
48
"""
49
```
50
51
#### Database Management Examples
52
53
```python
54
from influxdb import InfluxDBClient
55
from influxdb.exceptions import InfluxDBClientError
56
57
client = InfluxDBClient(host='localhost', username='admin', password='admin')
58
59
# Create databases
60
try:
61
client.create_database('production_metrics')
62
client.create_database('development_metrics')
63
client.create_database('test_data')
64
print("Databases created successfully")
65
except InfluxDBClientError as e:
66
print(f"Failed to create database: {e}")
67
68
# List all databases
69
databases = client.get_list_database()
70
print("Available databases:")
71
for db in databases:
72
print(f" - {db['name']}")
73
74
# Switch to specific database
75
client.switch_database('production_metrics')
76
77
# Clean up test databases
78
try:
79
client.drop_database('test_data')
80
print("Test database deleted")
81
except InfluxDBClientError as e:
82
print(f"Failed to delete database: {e}")
83
84
# Database existence check example
85
databases = client.get_list_database()
86
db_exists = any(db['name'] == 'metrics' for db in databases)
87
88
if not db_exists:
89
client.create_database('metrics')
90
print("Created metrics database")
91
```
92
93
### User Management
94
95
Comprehensive user account management with role-based permissions and privilege control.
96
97
```python { .api }
98
def create_user(self, username, password, admin=False):
99
"""
100
Create a new user account.
101
102
Parameters:
103
- username (str): Username for the new account
104
- password (str): Password for the new account
105
- admin (bool): Grant admin privileges (default: False)
106
107
Returns:
108
bool: True if successful
109
110
Raises:
111
InfluxDBClientError: If user creation fails
112
"""
113
114
def drop_user(self, username):
115
"""
116
Delete a user account.
117
118
Parameters:
119
- username (str): Username of account to delete
120
121
Returns:
122
bool: True if successful
123
124
Raises:
125
InfluxDBClientError: If user deletion fails
126
"""
127
128
def get_list_users(self):
129
"""
130
Get list of all user accounts.
131
132
Returns:
133
list: List of user dictionaries with user information
134
135
Raises:
136
InfluxDBClientError: If unable to retrieve user list
137
"""
138
139
def set_user_password(self, username, password):
140
"""
141
Change a user's password.
142
143
Parameters:
144
- username (str): Username of account to modify
145
- password (str): New password
146
147
Returns:
148
bool: True if successful
149
150
Raises:
151
InfluxDBClientError: If password change fails
152
"""
153
154
def grant_admin_privileges(self, username):
155
"""
156
Grant administrative privileges to a user.
157
158
Parameters:
159
- username (str): Username to grant admin privileges
160
161
Returns:
162
bool: True if successful
163
164
Raises:
165
InfluxDBClientError: If privilege grant fails
166
"""
167
168
def revoke_admin_privileges(self, username):
169
"""
170
Revoke administrative privileges from a user.
171
172
Parameters:
173
- username (str): Username to revoke admin privileges
174
175
Returns:
176
bool: True if successful
177
178
Raises:
179
InfluxDBClientError: If privilege revocation fails
180
"""
181
182
def grant_privilege(self, privilege, database, username):
183
"""
184
Grant database-specific privilege to a user.
185
186
Parameters:
187
- privilege (str): Privilege type ('READ', 'WRITE', 'ALL')
188
- database (str): Database name for privilege scope
189
- username (str): Username to grant privilege
190
191
Returns:
192
bool: True if successful
193
194
Raises:
195
InfluxDBClientError: If privilege grant fails
196
"""
197
198
def revoke_privilege(self, privilege, database, username):
199
"""
200
Revoke database-specific privilege from a user.
201
202
Parameters:
203
- privilege (str): Privilege type ('READ', 'WRITE', 'ALL')
204
- database (str): Database name for privilege scope
205
- username (str): Username to revoke privilege
206
207
Returns:
208
bool: True if successful
209
210
Raises:
211
InfluxDBClientError: If privilege revocation fails
212
"""
213
214
def get_list_privileges(self, username):
215
"""
216
Get list of privileges for a specific user.
217
218
Parameters:
219
- username (str): Username to query privileges
220
221
Returns:
222
list: List of privilege dictionaries with database and privilege information
223
224
Raises:
225
InfluxDBClientError: If unable to retrieve privileges
226
"""
227
```
228
229
#### User Management Examples
230
231
```python
232
# Admin client for user management
233
admin_client = InfluxDBClient(username='admin', password='admin_password')
234
235
# Create users with different roles
236
try:
237
# Create regular users
238
admin_client.create_user('developer', 'dev_password', admin=False)
239
admin_client.create_user('analyst', 'analyst_password', admin=False)
240
241
# Create admin user
242
admin_client.create_user('db_admin', 'admin_password', admin=True)
243
244
print("Users created successfully")
245
except InfluxDBClientError as e:
246
print(f"User creation failed: {e}")
247
248
# List all users
249
users = admin_client.get_list_users()
250
print("Current users:")
251
for user in users:
252
print(f" - {user['user']} (admin: {user['admin']})")
253
254
# Grant database-specific privileges
255
admin_client.grant_privilege('READ', 'production_metrics', 'analyst')
256
admin_client.grant_privilege('WRITE', 'development_metrics', 'developer')
257
admin_client.grant_privilege('ALL', 'production_metrics', 'developer')
258
259
# Check user privileges
260
dev_privileges = admin_client.get_list_privileges('developer')
261
print("Developer privileges:")
262
for priv in dev_privileges:
263
print(f" - {priv['database']}: {priv['privilege']}")
264
265
# Change passwords
266
admin_client.set_user_password('developer', 'new_secure_password')
267
268
# Revoke privileges
269
admin_client.revoke_privilege('WRITE', 'production_metrics', 'developer')
270
271
# Administrative privilege management
272
admin_client.grant_admin_privileges('senior_developer')
273
admin_client.revoke_admin_privileges('former_admin')
274
275
# Clean up users
276
admin_client.drop_user('temp_user')
277
278
# Bulk user setup example
279
try:
280
username = 'metrics_reader'
281
password = 'reader_password'
282
databases = ['production_metrics', 'development_metrics']
283
privilege = 'READ'
284
285
admin_client.create_user(username, password)
286
287
for database in databases:
288
admin_client.grant_privilege(privilege, database, username)
289
290
print(f"User {username} created with {privilege} access to {databases}")
291
292
except InfluxDBClientError as e:
293
print(f"Failed to setup user {username}: {e}")
294
```
295
296
### Retention Policy Management
297
298
Data lifecycle management through retention policies that automatically manage data expiration and storage optimization.
299
300
```python { .api }
301
def create_retention_policy(self, name, duration, replication, database=None,
302
default=False, shard_duration="0s"):
303
"""
304
Create a retention policy for automatic data lifecycle management.
305
306
Parameters:
307
- name (str): Name of the retention policy
308
- duration (str): How long to keep data (e.g., '7d', '4w', '52w', 'INF')
309
- replication (int): Number of data replicas (usually 1 for single node)
310
- database (str): Database name (default: current database)
311
- default (bool): Make this the default retention policy (default: False)
312
- shard_duration (str): Shard group duration (default: "0s" for auto)
313
314
Returns:
315
bool: True if successful
316
317
Raises:
318
InfluxDBClientError: If retention policy creation fails
319
"""
320
321
def alter_retention_policy(self, name, database=None, duration=None,
322
replication=None, default=None, shard_duration=None):
323
"""
324
Modify an existing retention policy.
325
326
Parameters:
327
- name (str): Name of retention policy to modify
328
- database (str): Database name (default: current database)
329
- duration (str): New duration (default: unchanged)
330
- replication (int): New replication factor (default: unchanged)
331
- default (bool): Make/unmake default policy (default: unchanged)
332
- shard_duration (str): New shard duration (default: unchanged)
333
334
Returns:
335
bool: True if successful
336
337
Raises:
338
InfluxDBClientError: If retention policy modification fails
339
"""
340
341
def drop_retention_policy(self, name, database=None):
342
"""
343
Delete a retention policy.
344
345
Parameters:
346
- name (str): Name of retention policy to delete
347
- database (str): Database name (default: current database)
348
349
Returns:
350
bool: True if successful
351
352
Raises:
353
InfluxDBClientError: If retention policy deletion fails
354
"""
355
356
def get_list_retention_policies(self, database=None):
357
"""
358
Get list of retention policies for a database.
359
360
Parameters:
361
- database (str): Database name (default: current database)
362
363
Returns:
364
list: List of retention policy dictionaries
365
366
Raises:
367
InfluxDBClientError: If unable to retrieve retention policies
368
"""
369
```
370
371
#### Retention Policy Examples
372
373
```python
374
client = InfluxDBClient(database='production_metrics')
375
376
# Create retention policies for different data lifecycles
377
try:
378
# Short-term high-resolution data (1 week)
379
client.create_retention_policy(
380
name='realtime',
381
duration='7d',
382
replication=1,
383
default=True, # Make this the default policy
384
shard_duration='1h' # 1-hour shards for recent data
385
)
386
387
# Medium-term aggregated data (1 year)
388
client.create_retention_policy(
389
name='historical',
390
duration='52w',
391
replication=1,
392
shard_duration='1d' # Daily shards for historical data
393
)
394
395
# Long-term archive (infinite retention)
396
client.create_retention_policy(
397
name='archive',
398
duration='INF',
399
replication=1,
400
shard_duration='1w' # Weekly shards for archival data
401
)
402
403
print("Retention policies created successfully")
404
405
except InfluxDBClientError as e:
406
print(f"Failed to create retention policies: {e}")
407
408
# List retention policies
409
policies = client.get_list_retention_policies()
410
print("Current retention policies:")
411
for policy in policies:
412
print(f" - {policy['name']}: {policy['duration']} "
413
f"(default: {policy['default']}, shards: {policy['shardGroupDuration']})")
414
415
# Modify retention policy
416
client.alter_retention_policy(
417
name='historical',
418
duration='26w', # Reduce to 6 months
419
shard_duration='12h' # Smaller shards
420
)
421
422
# Write data to specific retention policy
423
points = [
424
{
425
"measurement": "cpu_usage",
426
"tags": {"host": "server01"},
427
"fields": {"value": 75.5},
428
"time": "2023-09-07T07:18:24Z"
429
}
430
]
431
432
# Write to specific retention policy
433
client.write_points(points, retention_policy='historical')
434
435
# Set different default policy
436
client.alter_retention_policy('historical', default=True)
437
438
# Clean up old retention policy
439
client.drop_retention_policy('old_policy')
440
441
# Tiered retention setup example
442
for database_name in ['production', 'staging', 'development']:
443
client.switch_database(database_name)
444
445
# Real-time data: 24 hours, high resolution
446
client.create_retention_policy(
447
'realtime', '24h', 1, default=True, shard_duration='1h'
448
)
449
450
# Daily aggregates: 30 days
451
client.create_retention_policy(
452
'daily', '30d', 1, shard_duration='1d'
453
)
454
455
# Monthly aggregates: 2 years
456
client.create_retention_policy(
457
'monthly', '104w', 1, shard_duration='1w'
458
)
459
460
print(f"Tiered retention setup complete for {database_name}")
461
```
462
463
### Continuous Query Management
464
465
Real-time data aggregation and downsampling through continuous queries that automatically process data as it arrives.
466
467
```python { .api }
468
def create_continuous_query(self, name, select, database=None, resample_opts=None):
469
"""
470
Create a continuous query for real-time data aggregation.
471
472
Parameters:
473
- name (str): Name of the continuous query
474
- select (str): SELECT statement for the aggregation
475
- database (str): Database name (default: current database)
476
- resample_opts (dict): Resampling options with 'FOR' and 'EVERY' keys
477
478
Returns:
479
bool: True if successful
480
481
Raises:
482
InfluxDBClientError: If continuous query creation fails
483
484
Note: resample_opts format: {'FOR': '2m', 'EVERY': '1m'}
485
"""
486
487
def drop_continuous_query(self, name, database=None):
488
"""
489
Delete a continuous query.
490
491
Parameters:
492
- name (str): Name of continuous query to delete
493
- database (str): Database name (default: current database)
494
495
Returns:
496
bool: True if successful
497
498
Raises:
499
InfluxDBClientError: If continuous query deletion fails
500
"""
501
502
def get_list_continuous_queries(self):
503
"""
504
Get list of all continuous queries across all databases.
505
506
Returns:
507
list: List of dictionaries containing continuous query information
508
509
Raises:
510
InfluxDBClientError: If unable to retrieve continuous queries
511
"""
512
```
513
514
#### Continuous Query Examples
515
516
```python
517
client = InfluxDBClient(database='production_metrics')
518
519
# Create continuous queries for automatic downsampling
520
try:
521
# Downsample CPU data to 5-minute averages
522
client.create_continuous_query(
523
name='cpu_5min_avg',
524
select="""
525
SELECT mean(value) as avg_cpu, max(value) as max_cpu, min(value) as min_cpu
526
INTO "historical"."cpu_5min"
527
FROM "realtime"."cpu_usage"
528
GROUP BY time(5m), host
529
"""
530
)
531
532
# Downsample memory data to hourly statistics
533
client.create_continuous_query(
534
name='memory_hourly_stats',
535
select="""
536
SELECT mean(used) as avg_used, max(used) as max_used,
537
mean(available) as avg_available
538
INTO "daily"."memory_hourly"
539
FROM "realtime"."memory_usage"
540
GROUP BY time(1h), host
541
"""
542
)
543
544
# Create alerts based on thresholds
545
client.create_continuous_query(
546
name='high_cpu_alerts',
547
select="""
548
SELECT mean(value) as avg_cpu
549
INTO "alerts"."high_cpu"
550
FROM "realtime"."cpu_usage"
551
WHERE value > 80
552
GROUP BY time(1m), host
553
"""
554
)
555
556
print("Continuous queries created successfully")
557
558
except InfluxDBClientError as e:
559
print(f"Failed to create continuous queries: {e}")
560
561
# Create continuous query with resampling options
562
client.create_continuous_query(
563
name='disk_io_resampled',
564
select="""
565
SELECT sum(read_bytes) as total_read, sum(write_bytes) as total_write
566
INTO "daily"."disk_io_hourly"
567
FROM "realtime"."disk_io"
568
GROUP BY time(1h), host, device
569
""",
570
resample_opts={
571
'FOR': '2h', # Process data for 2 hours back
572
'EVERY': '30m' # Run every 30 minutes
573
}
574
)
575
576
# List all continuous queries
577
cqs = client.get_list_continuous_queries()
578
print("Active continuous queries:")
579
for cq_info in cqs:
580
database = cq_info['name']
581
for cq in cq_info.get('cqs', []):
582
print(f" Database: {database}")
583
print(f" Name: {cq['name']}")
584
print(f" Query: {cq['query']}")
585
586
# Drop continuous query
587
client.drop_continuous_query('old_downsampling_query')
588
589
# Complex continuous query for business metrics
590
business_analytics_cq = """
591
SELECT sum(revenue) as total_revenue,
592
count(transaction_id) as transaction_count,
593
mean(order_value) as avg_order_value
594
INTO "business"."daily_metrics"
595
FROM "realtime"."transactions"
596
WHERE status = 'completed'
597
GROUP BY time(1d), region, product_category
598
"""
599
600
client.create_continuous_query(
601
name='daily_business_analytics',
602
select=business_analytics_cq
603
)
604
605
# Standard downsampling pattern example
606
retention_policies = {
607
'realtime': 'realtime',
608
'daily': 'daily',
609
'monthly': 'monthly'
610
}
611
612
measurement = 'cpu_usage'
613
fields = ['value']
614
615
# 5-minute averages
616
fields_select = ", ".join([f"mean({field}) as avg_{field}" for field in fields])
617
cq_5min = f"""
618
SELECT {fields_select}
619
INTO "{retention_policies['daily']}"."{measurement}_5min"
620
FROM "{retention_policies['realtime']}"."{measurement}"
621
GROUP BY time(5m), *
622
"""
623
624
client.create_continuous_query(f"{measurement}_5min_avg", cq_5min)
625
626
# Hourly statistics
627
stats_select = ", ".join([
628
f"mean({field}) as avg_{field}, max({field}) as max_{field}, min({field}) as min_{field}"
629
for field in fields
630
])
631
cq_hourly = f"""
632
SELECT {stats_select}
633
INTO "{retention_policies['monthly']}"."{measurement}_hourly"
634
FROM "{retention_policies['daily']}"."{measurement}_5min"
635
GROUP BY time(1h), *
636
"""
637
638
client.create_continuous_query(f"{measurement}_hourly_stats", cq_hourly)
639
640
print(f"Standard downsampling created for {measurement}")
641
```
642
643
### Measurement Operations
644
645
Schema management operations for working with measurements, series, and data organization.
646
647
```python { .api }
648
def get_list_measurements(self):
649
"""
650
Get list of measurements in the current database.
651
652
Returns:
653
list: List of measurement dictionaries
654
655
Raises:
656
InfluxDBClientError: If unable to retrieve measurements
657
"""
658
659
def drop_measurement(self, measurement):
660
"""
661
Delete a measurement and all its data.
662
663
Parameters:
664
- measurement (str): Name of measurement to delete
665
666
Returns:
667
bool: True if successful
668
669
Raises:
670
InfluxDBClientError: If measurement deletion fails
671
"""
672
673
def get_list_series(self, database=None, measurement=None, tags=None):
674
"""
675
Get list of series matching optional filters.
676
677
Parameters:
678
- database (str): Database name (default: current database)
679
- measurement (str): Filter by measurement name (default: None)
680
- tags (dict): Filter by tag key-value pairs (default: None)
681
682
Returns:
683
list: List of series information
684
685
Raises:
686
InfluxDBClientError: If unable to retrieve series
687
"""
688
689
def delete_series(self, database=None, measurement=None, tags=None):
690
"""
691
Delete series matching the specified criteria.
692
693
Parameters:
694
- database (str): Database name (default: current database)
695
- measurement (str): Filter by measurement name (default: None)
696
- tags (dict): Filter by tag key-value pairs (default: None)
697
698
Returns:
699
bool: True if successful
700
701
Raises:
702
InfluxDBClientError: If series deletion fails
703
704
Warning: This operation is destructive and cannot be undone
705
"""
706
```
707
708
#### Measurement Operations Examples
709
710
```python
711
client = InfluxDBClient(database='production_metrics')
712
713
# List all measurements
714
measurements = client.get_list_measurements()
715
print("Available measurements:")
716
for measurement in measurements:
717
print(f" - {measurement['name']}")
718
719
# Get detailed series information
720
series_list = client.get_list_series()
721
print(f"Total series count: {len(series_list)}")
722
723
# Filter series by measurement
724
cpu_series = client.get_list_series(measurement='cpu_usage')
725
print(f"CPU usage series: {len(cpu_series)}")
726
for series in cpu_series[:5]: # Show first 5
727
print(f" - {series}")
728
729
# Filter series by tags
730
server01_series = client.get_list_series(tags={'host': 'server01'})
731
print(f"Series for server01: {len(server01_series)}")
732
733
# Delete specific series (be careful!)
734
# Delete test data series
735
client.delete_series(measurement='test_data', tags={'environment': 'test'})
736
737
# Drop entire measurement (removes all series and data)
738
client.drop_measurement('deprecated_measurement')
739
740
# Measurement analysis example
741
measurement_name = 'cpu_usage'
742
series = client.get_list_series(measurement=measurement_name)
743
744
tag_combinations = {}
745
for series_key in series:
746
# Parse series key to extract tags
747
# Series format: "measurement,tag1=value1,tag2=value2"
748
if ',' in series_key:
749
tags_part = series_key.split(',', 1)[1]
750
tag_pairs = tags_part.split(',')
751
tag_combo = tuple(sorted(tag_pairs))
752
tag_combinations[tag_combo] = tag_combinations.get(tag_combo, 0) + 1
753
754
print(f"Measurement: {measurement_name}")
755
print(f"Total series: {len(series)}")
756
print(f"Unique tag combinations: {len(tag_combinations)}")
757
758
# Show most common tag combinations
759
sorted_combos = sorted(tag_combinations.items(), key=lambda x: x[1], reverse=True)
760
print("Most common tag combinations:")
761
for combo, count in sorted_combos[:5]:
762
print(f" {combo}: {count} series")
763
764
# Clean up measurements by pattern example
765
measurements = client.get_list_measurements()
766
patterns_to_delete = ['test_', 'temp_', 'debug_']
767
768
for measurement_info in measurements:
769
measurement_name = measurement_info['name']
770
771
for pattern in patterns_to_delete:
772
if pattern in measurement_name:
773
print(f"Deleting measurement: {measurement_name}")
774
try:
775
client.drop_measurement(measurement_name)
776
except InfluxDBClientError as e:
777
print(f"Failed to delete {measurement_name}: {e}")
778
```