0
# Advanced Operations
1
2
Advanced functionality for performing specialized operations including data deletion, invokable scripts, and direct access to low-level service APIs for maximum control over InfluxDB operations. These APIs provide fine-grained control and advanced capabilities beyond standard read/write operations.
3
4
## Capabilities
5
6
### DeleteApi
7
8
API for deleting time series data from InfluxDB buckets with support for time-based and predicate-based deletion.
9
10
```python { .api }
11
class DeleteApi:
12
def __init__(self, influxdb_client): ...
13
14
def delete(
15
self,
16
start: Union[str, datetime],
17
stop: Union[str, datetime],
18
predicate: str = "",
19
bucket: str = None,
20
org: str = None
21
) -> None:
22
"""
23
Delete time series data from InfluxDB.
24
25
Parameters:
26
- start (Union[str, datetime]): Start time for deletion range (RFC3339 or datetime)
27
- stop (Union[str, datetime]): Stop time for deletion range (RFC3339 or datetime)
28
- predicate (str, optional): InfluxDB predicate for filtering data to delete
29
- bucket (str, optional): Bucket name (uses client default if not specified)
30
- org (str, optional): Organization name or ID (uses client default if not specified)
31
32
Note: Deletion is permanent and cannot be undone. Use predicates carefully.
33
"""
34
```
35
36
#### DeleteApi Usage Examples
37
38
**Basic time-based deletion:**
39
```python
40
from influxdb_client import InfluxDBClient
41
from datetime import datetime, timedelta
42
43
client = InfluxDBClient(url="http://localhost:8086", token="token", org="my-org")
44
delete_api = client.delete_api()
45
46
# Delete data from the last hour
47
now = datetime.utcnow()
48
one_hour_ago = now - timedelta(hours=1)
49
50
delete_api.delete(
51
start=one_hour_ago,
52
stop=now,
53
bucket="sensor_data"
54
)
55
print("Deleted data from the last hour")
56
```
57
58
**Predicate-based deletion:**
59
```python
60
# Delete specific measurement data
61
delete_api.delete(
62
start="2023-01-01T00:00:00Z",
63
stop="2023-01-02T00:00:00Z",
64
predicate='_measurement="temperature" AND location="room1"',
65
bucket="sensor_data",
66
org="my-org"
67
)
68
69
# Delete data with specific tag values
70
delete_api.delete(
71
start=datetime(2023, 1, 1),
72
stop=datetime(2023, 1, 31),
73
predicate='sensor_id="temp_001" OR sensor_id="temp_002"',
74
bucket="sensor_data"
75
)
76
77
# Delete all data for a specific field
78
delete_api.delete(
79
start="2023-01-01T00:00:00Z",
80
stop="2023-12-31T23:59:59Z",
81
predicate='_field="debug_info"',
82
bucket="logs"
83
)
84
```
85
86
**Range-based cleanup:**
87
```python
88
# Delete old data (data retention cleanup)
89
cutoff_date = datetime.utcnow() - timedelta(days=90)
90
delete_api.delete(
91
start="1970-01-01T00:00:00Z", # Beginning of time
92
stop=cutoff_date,
93
bucket="archive_data"
94
)
95
96
# Delete test data
97
delete_api.delete(
98
start="2023-06-01T00:00:00Z",
99
stop="2023-06-02T00:00:00Z",
100
predicate='environment="test" OR environment="staging"',
101
bucket="application_metrics"
102
)
103
```
104
105
### DeleteApiAsync
106
107
Asynchronous version of DeleteApi for non-blocking deletion operations.
108
109
```python { .api }
110
class DeleteApiAsync:
111
def __init__(self, influxdb_client): ...
112
113
async def delete(
114
self,
115
start: Union[str, datetime],
116
stop: Union[str, datetime],
117
predicate: str = "",
118
bucket: str = None,
119
org: str = None
120
) -> None:
121
"""
122
Asynchronously delete time series data from InfluxDB.
123
124
Parameters: Same as DeleteApi.delete()
125
"""
126
```
127
128
#### DeleteApiAsync Usage Example
129
130
```python
131
import asyncio
132
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
133
134
async def cleanup_old_data():
135
async with InfluxDBClientAsync(url="http://localhost:8086", token="token") as client:
136
delete_api = client.delete_api()
137
138
# Delete multiple ranges concurrently
139
deletion_tasks = []
140
141
# Task 1: Delete old test data
142
deletion_tasks.append(
143
delete_api.delete(
144
start="2023-01-01T00:00:00Z",
145
stop="2023-01-31T23:59:59Z",
146
predicate='environment="test"',
147
bucket="metrics"
148
)
149
)
150
151
# Task 2: Delete old logs
152
deletion_tasks.append(
153
delete_api.delete(
154
start="2023-01-01T00:00:00Z",
155
stop="2023-02-01T00:00:00Z",
156
bucket="application_logs"
157
)
158
)
159
160
# Execute all deletions concurrently
161
await asyncio.gather(*deletion_tasks)
162
print("All deletions completed")
163
164
asyncio.run(cleanup_old_data())
165
```
166
167
### InvokableScriptsApi
168
169
API for managing and executing InfluxDB invokable scripts for custom data processing and analysis.
170
171
```python { .api }
172
class InvokableScriptsApi:
173
def __init__(self, influxdb_client): ...
174
175
def create_script(self, script_create_request: ScriptCreateRequest) -> Script:
176
"""
177
Create a new invokable script.
178
179
Parameters:
180
- script_create_request (ScriptCreateRequest): Script configuration and code
181
182
Returns:
183
Script: Created script object
184
"""
185
186
def delete_script(self, script_id: str) -> None:
187
"""
188
Delete an invokable script.
189
190
Parameters:
191
- script_id (str): Script ID to delete
192
"""
193
194
def find_scripts(self, **kwargs) -> Scripts:
195
"""
196
List invokable scripts.
197
198
Parameters:
199
- **kwargs: Query parameters (limit, offset)
200
201
Returns:
202
Scripts: Collection of script objects
203
"""
204
205
def find_script_by_id(self, script_id: str) -> Script:
206
"""
207
Find script by ID.
208
209
Parameters:
210
- script_id (str): Script ID
211
212
Returns:
213
Script: Found script object or None
214
"""
215
216
def update_script(
217
self,
218
script_id: str,
219
script_update_request: ScriptUpdateRequest
220
) -> Script:
221
"""
222
Update an existing script.
223
224
Parameters:
225
- script_id (str): Script ID to update
226
- script_update_request (ScriptUpdateRequest): Updated script configuration
227
228
Returns:
229
Script: Updated script object
230
"""
231
232
def invoke_script(
233
self,
234
script_id: str,
235
params: dict = None
236
) -> str:
237
"""
238
Execute an invokable script.
239
240
Parameters:
241
- script_id (str): Script ID to execute
242
- params (dict, optional): Parameters to pass to the script
243
244
Returns:
245
str: Script execution result
246
"""
247
```
248
249
#### InvokableScriptsApi Usage Examples
250
251
**Script creation and management:**
252
```python
253
from influxdb_client import ScriptCreateRequest, ScriptLanguage
254
255
scripts_api = client.invokable_scripts_api()
256
257
# Create a data analysis script
258
flux_code = '''
259
import "array"
260
import "math"
261
262
// Calculate moving average for temperature data
263
data = from(bucket: params.bucket_name)
264
|> range(start: params.start_time)
265
|> filter(fn: (r) => r._measurement == "temperature")
266
|> filter(fn: (r) => r.location == params.location)
267
268
// Calculate 5-point moving average
269
data
270
|> timedMovingAverage(every: params.window_duration, period: params.window_duration * 5)
271
|> yield(name: "moving_average")
272
'''
273
274
script_request = ScriptCreateRequest(
275
name="temperature_analysis",
276
description="Calculate moving average for temperature sensors",
277
script=flux_code,
278
language=ScriptLanguage.flux
279
)
280
281
script = scripts_api.create_script(script_request)
282
print(f"Created script: {script.name} (ID: {script.id})")
283
284
# List all scripts
285
scripts = scripts_api.find_scripts()
286
for s in scripts.scripts:
287
print(f"Script: {s.name} - {s.description}")
288
```
289
290
**Script execution with parameters:**
291
```python
292
# Execute script with parameters
293
execution_params = {
294
"bucket_name": "sensor_data",
295
"start_time": "-2h",
296
"location": "datacenter1",
297
"window_duration": "5m"
298
}
299
300
result = scripts_api.invoke_script(script.id, params=execution_params)
301
print(f"Script execution result: {result}")
302
303
# Update script
304
from influxdb_client import ScriptUpdateRequest
305
306
updated_script_code = '''
307
// Enhanced version with anomaly detection
308
import "array"
309
import "math"
310
311
data = from(bucket: params.bucket_name)
312
|> range(start: params.start_time)
313
|> filter(fn: (r) => r._measurement == "temperature")
314
315
// Detect anomalies using standard deviation
316
anomalies = data
317
|> aggregateWindow(every: 1h, fn: stddev)
318
|> filter(fn: (r) => r._value > params.anomaly_threshold)
319
|> yield(name: "anomalies")
320
321
// Also yield the moving average
322
data
323
|> timedMovingAverage(every: params.window_duration, period: params.window_duration * 5)
324
|> yield(name: "moving_average")
325
'''
326
327
update_request = ScriptUpdateRequest(
328
name="enhanced_temperature_analysis",
329
description="Temperature analysis with anomaly detection",
330
script=updated_script_code
331
)
332
333
updated_script = scripts_api.update_script(script.id, update_request)
334
```
335
336
**Parameterized script execution patterns:**
337
```python
338
# Create a flexible data export script
339
export_script_code = '''
340
// Flexible data export with filtering
341
data = from(bucket: params.source_bucket)
342
|> range(start: params.start_time, stop: params.stop_time)
343
344
// Apply optional measurement filter
345
filtered_data = if exists params.measurement then
346
data |> filter(fn: (r) => r._measurement == params.measurement)
347
else
348
data
349
350
// Apply optional tag filters
351
result = if exists params.tag_filters then
352
filtered_data |> filter(fn: (r) =>
353
array.from(rows: params.tag_filters)
354
|> array.any(fn: (tag) => r[tag.key] == tag.value))
355
else
356
filtered_data
357
358
// Export to destination
359
result |> to(bucket: params.destination_bucket)
360
'''
361
362
export_script_request = ScriptCreateRequest(
363
name="flexible_data_export",
364
description="Export filtered data between buckets",
365
script=export_script_code,
366
language=ScriptLanguage.flux
367
)
368
369
export_script = scripts_api.create_script(export_script_request)
370
371
# Use the export script
372
export_params = {
373
"source_bucket": "raw_data",
374
"destination_bucket": "processed_data",
375
"start_time": "-24h",
376
"stop_time": "now()",
377
"measurement": "cpu_usage",
378
"tag_filters": [
379
{"key": "host", "value": "web-server-1"},
380
{"key": "environment", "value": "production"}
381
]
382
}
383
384
export_result = scripts_api.invoke_script(export_script.id, params=export_params)
385
```
386
387
### Low-Level Service APIs
388
389
Direct access to InfluxDB's OpenAPI service layer for advanced use cases and custom integrations.
390
391
```python { .api }
392
# Core service classes (examples of the 40+ available services)
393
class QueryService:
394
"""Direct access to query API endpoints."""
395
def post_query(
396
self,
397
org: str,
398
query: Query,
399
zap_trace_span: str = None,
400
accept_encoding: str = None,
401
content_encoding: str = None,
402
**kwargs
403
): ...
404
405
class WriteService:
406
"""Direct access to write API endpoints."""
407
def post_write(
408
self,
409
org: str,
410
bucket: str,
411
body: str,
412
zap_trace_span: str = None,
413
content_encoding: str = None,
414
content_type: str = "text/plain; charset=utf-8",
415
content_length: int = None,
416
accept: str = None,
417
precision: WritePrecision = None,
418
**kwargs
419
): ...
420
421
class BucketsService:
422
"""Direct access to bucket management endpoints."""
423
def get_buckets(
424
self,
425
zap_trace_span: str = None,
426
org: str = None,
427
org_id: str = None,
428
after: str = None,
429
limit: int = None,
430
**kwargs
431
): ...
432
433
def post_buckets(
434
self,
435
post_bucket_request: PostBucketRequest,
436
zap_trace_span: str = None,
437
**kwargs
438
): ...
439
440
class AuthorizationsService:
441
"""Direct access to authorization endpoints."""
442
def get_authorizations(
443
self,
444
zap_trace_span: str = None,
445
user_id: str = None,
446
user: str = None,
447
org_id: str = None,
448
org: str = None,
449
token: str = None,
450
**kwargs
451
): ...
452
453
class HealthService:
454
"""Direct access to health check endpoints."""
455
def get_health(self, zap_trace_span: str = None, **kwargs): ...
456
457
class SetupService:
458
"""Direct access to InfluxDB setup endpoints."""
459
def post_setup(
460
self,
461
onboarding_request: OnboardingRequest,
462
zap_trace_span: str = None,
463
**kwargs
464
): ...
465
466
class BackupService:
467
"""Direct access to backup and restore endpoints."""
468
def post_backup_kv(
469
self,
470
zap_trace_span: str = None,
471
**kwargs
472
): ...
473
474
class TelegrafsService:
475
"""Direct access to Telegraf configuration endpoints."""
476
def get_telegrafs(
477
self,
478
zap_trace_span: str = None,
479
org_id: str = None,
480
**kwargs
481
): ...
482
483
# Advanced management services
484
class DashboardsService:
485
"""Direct access to dashboard management endpoints."""
486
def get_dashboards(
487
self,
488
zap_trace_span: str = None,
489
owner: str = None,
490
sort_by: str = None,
491
**kwargs
492
): ...
493
494
class ChecksService:
495
"""Direct access to monitoring check endpoints."""
496
def get_checks(
497
self,
498
zap_trace_span: str = None,
499
org_id: str = None,
500
**kwargs
501
): ...
502
503
class NotificationRulesService:
504
"""Direct access to notification rule endpoints."""
505
def get_notification_rules(
506
self,
507
zap_trace_span: str = None,
508
org_id: str = None,
509
**kwargs
510
): ...
511
```
512
513
#### Low-Level Service Usage Examples
514
515
**Direct service access:**
516
```python
517
from influxdb_client.service.query_service import QueryService
518
from influxdb_client.domain.query import Query
519
520
# Get direct access to services
521
query_service = QueryService(client.api_client)
522
523
# Execute query using low-level service
524
query_obj = Query(query='from(bucket: "test") |> range(start: -1h)')
525
response = query_service.post_query(
526
org="my-org",
527
query=query_obj,
528
accept_encoding="gzip"
529
)
530
531
# Process raw response
532
print(f"Query response: {response}")
533
```
534
535
**Custom HTTP headers and advanced options:**
536
```python
537
from influxdb_client.service.write_service import WriteService
538
539
write_service = WriteService(client.api_client)
540
541
# Write with custom headers and options
542
line_protocol = "measurement,tag1=value1 field1=1.0"
543
response = write_service.post_write(
544
org="my-org",
545
bucket="my-bucket",
546
body=line_protocol,
547
content_encoding="gzip",
548
precision=WritePrecision.S,
549
zap_trace_span="custom-trace-id"
550
)
551
```
552
553
**Advanced bucket management:**
554
```python
555
from influxdb_client.service.buckets_service import BucketsService
556
from influxdb_client.domain.post_bucket_request import PostBucketRequest
557
from influxdb_client.domain.retention_rule import RetentionRule
558
559
buckets_service = BucketsService(client.api_client)
560
561
# Create bucket with advanced options
562
retention_rule = RetentionRule(
563
type="expire",
564
every_seconds=86400 * 30, # 30 days
565
shard_group_duration_seconds=3600 # 1 hour shard groups
566
)
567
568
bucket_request = PostBucketRequest(
569
name="advanced_bucket",
570
org_id="org_id_here",
571
description="Bucket with custom shard configuration",
572
retention_rules=[retention_rule]
573
)
574
575
response = buckets_service.post_buckets(
576
post_bucket_request=bucket_request,
577
zap_trace_span="bucket-creation-trace"
578
)
579
```
580
581
## Types
582
583
```python { .api }
584
# Deletion-related types
585
DeletePredicate = str # InfluxDB predicate expression
586
TimeRange = Tuple[Union[str, datetime], Union[str, datetime]] # Start and stop time pair
587
588
# Script-related types
589
class ScriptCreateRequest:
590
name: str
591
description: str
592
script: str
593
language: ScriptLanguage
594
595
class ScriptUpdateRequest:
596
name: str
597
description: str
598
script: str
599
600
class Script:
601
id: str
602
name: str
603
description: str
604
script: str
605
language: ScriptLanguage
606
created_at: datetime
607
updated_at: datetime
608
609
class Scripts:
610
scripts: List[Script]
611
612
class ScriptLanguage(Enum):
613
flux = "flux"
614
615
# Low-level API types
616
class Query:
617
query: str
618
type: str
619
params: dict
620
621
class PostBucketRequest:
622
org_id: str
623
name: str
624
description: str
625
retention_rules: List[RetentionRule]
626
schema_type: str
627
628
class OnboardingRequest:
629
username: str
630
password: str
631
org: str
632
bucket: str
633
retention_period_hrs: int
634
retention_period_ns: int
635
token: str
636
637
# Service response types
638
ServiceResponse = Dict[str, Any] # Generic service response
639
RawHTTPResponse = Any # Raw HTTP response from services
640
641
# Advanced configuration types
642
class TracingConfig:
643
zap_trace_span: str
644
custom_headers: Dict[str, str]
645
646
class CompressionConfig:
647
content_encoding: str # "gzip", "identity"
648
accept_encoding: str # "gzip", "deflate"
649
650
# Exception types for advanced operations
651
class DeletionError(InfluxDBError):
652
"""Raised when data deletion fails."""
653
pass
654
655
class ScriptExecutionError(InfluxDBError):
656
"""Raised when script execution fails."""
657
pass
658
659
class ServiceAPIError(InfluxDBError):
660
"""Raised when low-level service calls fail."""
661
pass
662
663
class InvalidPredicateError(DeletionError):
664
"""Raised when deletion predicate is invalid."""
665
pass
666
667
# Constants for advanced operations
668
DEFAULT_DELETE_PRECISION = WritePrecision.NS
669
MAX_DELETE_RANGE_DAYS = 7 # Recommended maximum range for single deletion
670
SCRIPT_TIMEOUT_MS = 30000 # Default script execution timeout
671
672
# Service endpoint constants
673
QUERY_ENDPOINT = "/api/v2/query"
674
WRITE_ENDPOINT = "/api/v2/write"
675
DELETE_ENDPOINT = "/api/v2/delete"
676
SCRIPTS_ENDPOINT = "/api/v2/scripts"
677
```