or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-operations.mdclient-management.mdindex.mdquerying-data.mdresource-management.mdwriting-data.md

advanced-operations.mddocs/

0

# Advanced Operations

1

2

Advanced functionality for performing specialized operations including data deletion, invokable scripts, and direct access to low-level service APIs for maximum control over InfluxDB operations. These APIs provide fine-grained control and advanced capabilities beyond standard read/write operations.

3

4

## Capabilities

5

6

### DeleteApi

7

8

API for deleting time series data from InfluxDB buckets with support for time-based and predicate-based deletion.

9

10

```python { .api }

11

class DeleteApi:

12

def __init__(self, influxdb_client): ...

13

14

def delete(

15

self,

16

start: Union[str, datetime],

17

stop: Union[str, datetime],

18

predicate: str = "",

19

bucket: str = None,

20

org: str = None

21

) -> None:

22

"""

23

Delete time series data from InfluxDB.

24

25

Parameters:

26

- start (Union[str, datetime]): Start time for deletion range (RFC3339 or datetime)

27

- stop (Union[str, datetime]): Stop time for deletion range (RFC3339 or datetime)

28

- predicate (str, optional): InfluxDB predicate for filtering data to delete

29

- bucket (str, optional): Bucket name (uses client default if not specified)

30

- org (str, optional): Organization name or ID (uses client default if not specified)

31

32

Note: Deletion is permanent and cannot be undone. Use predicates carefully.

33

"""

34

```

35

36

#### DeleteApi Usage Examples

37

38

**Basic time-based deletion:**

39

```python

40

from influxdb_client import InfluxDBClient

41

from datetime import datetime, timedelta

42

43

client = InfluxDBClient(url="http://localhost:8086", token="token", org="my-org")

44

delete_api = client.delete_api()

45

46

# Delete data from the last hour

47

now = datetime.utcnow()

48

one_hour_ago = now - timedelta(hours=1)

49

50

delete_api.delete(

51

start=one_hour_ago,

52

stop=now,

53

bucket="sensor_data"

54

)

55

print("Deleted data from the last hour")

56

```

57

58

**Predicate-based deletion:**

59

```python

60

# Delete specific measurement data

61

delete_api.delete(

62

start="2023-01-01T00:00:00Z",

63

stop="2023-01-02T00:00:00Z",

64

predicate='_measurement="temperature" AND location="room1"',

65

bucket="sensor_data",

66

org="my-org"

67

)

68

69

# Delete data with specific tag values

70

delete_api.delete(

71

start=datetime(2023, 1, 1),

72

stop=datetime(2023, 1, 31),

73

predicate='sensor_id="temp_001" OR sensor_id="temp_002"',

74

bucket="sensor_data"

75

)

76

77

# Delete all data for a specific field

78

delete_api.delete(

79

start="2023-01-01T00:00:00Z",

80

stop="2023-12-31T23:59:59Z",

81

predicate='_field="debug_info"',

82

bucket="logs"

83

)

84

```

85

86

**Range-based cleanup:**

87

```python

88

# Delete old data (data retention cleanup)

89

cutoff_date = datetime.utcnow() - timedelta(days=90)

90

delete_api.delete(

91

start="1970-01-01T00:00:00Z", # Beginning of time

92

stop=cutoff_date,

93

bucket="archive_data"

94

)

95

96

# Delete test data

97

delete_api.delete(

98

start="2023-06-01T00:00:00Z",

99

stop="2023-06-02T00:00:00Z",

100

predicate='environment="test" OR environment="staging"',

101

bucket="application_metrics"

102

)

103

```

104

105

### DeleteApiAsync

106

107

Asynchronous version of DeleteApi for non-blocking deletion operations.

108

109

```python { .api }

110

class DeleteApiAsync:

111

def __init__(self, influxdb_client): ...

112

113

async def delete(

114

self,

115

start: Union[str, datetime],

116

stop: Union[str, datetime],

117

predicate: str = "",

118

bucket: str = None,

119

org: str = None

120

) -> None:

121

"""

122

Asynchronously delete time series data from InfluxDB.

123

124

Parameters: Same as DeleteApi.delete()

125

"""

126

```

127

128

#### DeleteApiAsync Usage Example

129

130

```python

131

import asyncio

132

from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync

133

134

async def cleanup_old_data():

135

async with InfluxDBClientAsync(url="http://localhost:8086", token="token") as client:

136

delete_api = client.delete_api()

137

138

# Delete multiple ranges concurrently

139

deletion_tasks = []

140

141

# Task 1: Delete old test data

142

deletion_tasks.append(

143

delete_api.delete(

144

start="2023-01-01T00:00:00Z",

145

stop="2023-01-31T23:59:59Z",

146

predicate='environment="test"',

147

bucket="metrics"

148

)

149

)

150

151

# Task 2: Delete old logs

152

deletion_tasks.append(

153

delete_api.delete(

154

start="2023-01-01T00:00:00Z",

155

stop="2023-02-01T00:00:00Z",

156

bucket="application_logs"

157

)

158

)

159

160

# Execute all deletions concurrently

161

await asyncio.gather(*deletion_tasks)

162

print("All deletions completed")

163

164

asyncio.run(cleanup_old_data())

165

```

166

167

### InvokableScriptsApi

168

169

API for managing and executing InfluxDB invokable scripts for custom data processing and analysis.

170

171

```python { .api }

172

class InvokableScriptsApi:

173

def __init__(self, influxdb_client): ...

174

175

def create_script(self, script_create_request: ScriptCreateRequest) -> Script:

176

"""

177

Create a new invokable script.

178

179

Parameters:

180

- script_create_request (ScriptCreateRequest): Script configuration and code

181

182

Returns:

183

Script: Created script object

184

"""

185

186

def delete_script(self, script_id: str) -> None:

187

"""

188

Delete an invokable script.

189

190

Parameters:

191

- script_id (str): Script ID to delete

192

"""

193

194

def find_scripts(self, **kwargs) -> Scripts:

195

"""

196

List invokable scripts.

197

198

Parameters:

199

- **kwargs: Query parameters (limit, offset)

200

201

Returns:

202

Scripts: Collection of script objects

203

"""

204

205

def find_script_by_id(self, script_id: str) -> Script:

206

"""

207

Find script by ID.

208

209

Parameters:

210

- script_id (str): Script ID

211

212

Returns:

213

Script: Found script object or None

214

"""

215

216

def update_script(

217

self,

218

script_id: str,

219

script_update_request: ScriptUpdateRequest

220

) -> Script:

221

"""

222

Update an existing script.

223

224

Parameters:

225

- script_id (str): Script ID to update

226

- script_update_request (ScriptUpdateRequest): Updated script configuration

227

228

Returns:

229

Script: Updated script object

230

"""

231

232

def invoke_script(

233

self,

234

script_id: str,

235

params: dict = None

236

) -> str:

237

"""

238

Execute an invokable script.

239

240

Parameters:

241

- script_id (str): Script ID to execute

242

- params (dict, optional): Parameters to pass to the script

243

244

Returns:

245

str: Script execution result

246

"""

247

```

248

249

#### InvokableScriptsApi Usage Examples

250

251

**Script creation and management:**

252

```python

253

from influxdb_client import ScriptCreateRequest, ScriptLanguage

254

255

scripts_api = client.invokable_scripts_api()

256

257

# Create a data analysis script

258

flux_code = '''

259

import "array"

260

import "math"

261

262

// Calculate moving average for temperature data

263

data = from(bucket: params.bucket_name)

264

|> range(start: params.start_time)

265

|> filter(fn: (r) => r._measurement == "temperature")

266

|> filter(fn: (r) => r.location == params.location)

267

268

// Calculate 5-point moving average

269

data

270

|> timedMovingAverage(every: params.window_duration, period: params.window_duration * 5)

271

|> yield(name: "moving_average")

272

'''

273

274

script_request = ScriptCreateRequest(

275

name="temperature_analysis",

276

description="Calculate moving average for temperature sensors",

277

script=flux_code,

278

language=ScriptLanguage.flux

279

)

280

281

script = scripts_api.create_script(script_request)

282

print(f"Created script: {script.name} (ID: {script.id})")

283

284

# List all scripts

285

scripts = scripts_api.find_scripts()

286

for s in scripts.scripts:

287

print(f"Script: {s.name} - {s.description}")

288

```

289

290

**Script execution with parameters:**

291

```python

292

# Execute script with parameters

293

execution_params = {

294

"bucket_name": "sensor_data",

295

"start_time": "-2h",

296

"location": "datacenter1",

297

"window_duration": "5m"

298

}

299

300

result = scripts_api.invoke_script(script.id, params=execution_params)

301

print(f"Script execution result: {result}")

302

303

# Update script

304

from influxdb_client import ScriptUpdateRequest

305

306

updated_script_code = '''

307

// Enhanced version with anomaly detection

308

import "array"

309

import "math"

310

311

data = from(bucket: params.bucket_name)

312

|> range(start: params.start_time)

313

|> filter(fn: (r) => r._measurement == "temperature")

314

315

// Detect anomalies using standard deviation

316

anomalies = data

317

|> aggregateWindow(every: 1h, fn: stddev)

318

|> filter(fn: (r) => r._value > params.anomaly_threshold)

319

|> yield(name: "anomalies")

320

321

// Also yield the moving average

322

data

323

|> timedMovingAverage(every: params.window_duration, period: params.window_duration * 5)

324

|> yield(name: "moving_average")

325

'''

326

327

update_request = ScriptUpdateRequest(

328

name="enhanced_temperature_analysis",

329

description="Temperature analysis with anomaly detection",

330

script=updated_script_code

331

)

332

333

updated_script = scripts_api.update_script(script.id, update_request)

334

```

335

336

**Parameterized script execution patterns:**

337

```python

338

# Create a flexible data export script

339

export_script_code = '''

340

// Flexible data export with filtering

341

data = from(bucket: params.source_bucket)

342

|> range(start: params.start_time, stop: params.stop_time)

343

344

// Apply optional measurement filter

345

filtered_data = if exists params.measurement then

346

data |> filter(fn: (r) => r._measurement == params.measurement)

347

else

348

data

349

350

// Apply optional tag filters

351

result = if exists params.tag_filters then

352

filtered_data |> filter(fn: (r) =>

353

array.from(rows: params.tag_filters)

354

|> array.any(fn: (tag) => r[tag.key] == tag.value))

355

else

356

filtered_data

357

358

// Export to destination

359

result |> to(bucket: params.destination_bucket)

360

'''

361

362

export_script_request = ScriptCreateRequest(

363

name="flexible_data_export",

364

description="Export filtered data between buckets",

365

script=export_script_code,

366

language=ScriptLanguage.flux

367

)

368

369

export_script = scripts_api.create_script(export_script_request)

370

371

# Use the export script

372

export_params = {

373

"source_bucket": "raw_data",

374

"destination_bucket": "processed_data",

375

"start_time": "-24h",

376

"stop_time": "now()",

377

"measurement": "cpu_usage",

378

"tag_filters": [

379

{"key": "host", "value": "web-server-1"},

380

{"key": "environment", "value": "production"}

381

]

382

}

383

384

export_result = scripts_api.invoke_script(export_script.id, params=export_params)

385

```

386

387

### Low-Level Service APIs

388

389

Direct access to InfluxDB's OpenAPI service layer for advanced use cases and custom integrations.

390

391

```python { .api }

392

# Core service classes (examples of the 40+ available services)

393

class QueryService:

394

"""Direct access to query API endpoints."""

395

def post_query(

396

self,

397

org: str,

398

query: Query,

399

zap_trace_span: str = None,

400

accept_encoding: str = None,

401

content_encoding: str = None,

402

**kwargs

403

): ...

404

405

class WriteService:

406

"""Direct access to write API endpoints."""

407

def post_write(

408

self,

409

org: str,

410

bucket: str,

411

body: str,

412

zap_trace_span: str = None,

413

content_encoding: str = None,

414

content_type: str = "text/plain; charset=utf-8",

415

content_length: int = None,

416

accept: str = None,

417

precision: WritePrecision = None,

418

**kwargs

419

): ...

420

421

class BucketsService:

422

"""Direct access to bucket management endpoints."""

423

def get_buckets(

424

self,

425

zap_trace_span: str = None,

426

org: str = None,

427

org_id: str = None,

428

after: str = None,

429

limit: int = None,

430

**kwargs

431

): ...

432

433

def post_buckets(

434

self,

435

post_bucket_request: PostBucketRequest,

436

zap_trace_span: str = None,

437

**kwargs

438

): ...

439

440

class AuthorizationsService:

441

"""Direct access to authorization endpoints."""

442

def get_authorizations(

443

self,

444

zap_trace_span: str = None,

445

user_id: str = None,

446

user: str = None,

447

org_id: str = None,

448

org: str = None,

449

token: str = None,

450

**kwargs

451

): ...

452

453

class HealthService:

454

"""Direct access to health check endpoints."""

455

def get_health(self, zap_trace_span: str = None, **kwargs): ...

456

457

class SetupService:

458

"""Direct access to InfluxDB setup endpoints."""

459

def post_setup(

460

self,

461

onboarding_request: OnboardingRequest,

462

zap_trace_span: str = None,

463

**kwargs

464

): ...

465

466

class BackupService:

467

"""Direct access to backup and restore endpoints."""

468

def post_backup_kv(

469

self,

470

zap_trace_span: str = None,

471

**kwargs

472

): ...

473

474

class TelegrafsService:

475

"""Direct access to Telegraf configuration endpoints."""

476

def get_telegrafs(

477

self,

478

zap_trace_span: str = None,

479

org_id: str = None,

480

**kwargs

481

): ...

482

483

# Advanced management services

484

class DashboardsService:

485

"""Direct access to dashboard management endpoints."""

486

def get_dashboards(

487

self,

488

zap_trace_span: str = None,

489

owner: str = None,

490

sort_by: str = None,

491

**kwargs

492

): ...

493

494

class ChecksService:

495

"""Direct access to monitoring check endpoints."""

496

def get_checks(

497

self,

498

zap_trace_span: str = None,

499

org_id: str = None,

500

**kwargs

501

): ...

502

503

class NotificationRulesService:

504

"""Direct access to notification rule endpoints."""

505

def get_notification_rules(

506

self,

507

zap_trace_span: str = None,

508

org_id: str = None,

509

**kwargs

510

): ...

511

```

512

513

#### Low-Level Service Usage Examples

514

515

**Direct service access:**

516

```python

517

from influxdb_client.service.query_service import QueryService

518

from influxdb_client.domain.query import Query

519

520

# Get direct access to services

521

query_service = QueryService(client.api_client)

522

523

# Execute query using low-level service

524

query_obj = Query(query='from(bucket: "test") |> range(start: -1h)')

525

response = query_service.post_query(

526

org="my-org",

527

query=query_obj,

528

accept_encoding="gzip"

529

)

530

531

# Process raw response

532

print(f"Query response: {response}")

533

```

534

535

**Custom HTTP headers and advanced options:**

536

```python

537

from influxdb_client.service.write_service import WriteService

538

539

write_service = WriteService(client.api_client)

540

541

# Write with custom headers and options

542

line_protocol = "measurement,tag1=value1 field1=1.0"

543

response = write_service.post_write(

544

org="my-org",

545

bucket="my-bucket",

546

body=line_protocol,

547

content_encoding="gzip",

548

precision=WritePrecision.S,

549

zap_trace_span="custom-trace-id"

550

)

551

```

552

553

**Advanced bucket management:**

554

```python

555

from influxdb_client.service.buckets_service import BucketsService

556

from influxdb_client.domain.post_bucket_request import PostBucketRequest

557

from influxdb_client.domain.retention_rule import RetentionRule

558

559

buckets_service = BucketsService(client.api_client)

560

561

# Create bucket with advanced options

562

retention_rule = RetentionRule(

563

type="expire",

564

every_seconds=86400 * 30, # 30 days

565

shard_group_duration_seconds=3600 # 1 hour shard groups

566

)

567

568

bucket_request = PostBucketRequest(

569

name="advanced_bucket",

570

org_id="org_id_here",

571

description="Bucket with custom shard configuration",

572

retention_rules=[retention_rule]

573

)

574

575

response = buckets_service.post_buckets(

576

post_bucket_request=bucket_request,

577

zap_trace_span="bucket-creation-trace"

578

)

579

```

580

581

## Types

582

583

```python { .api }

584

# Deletion-related types

585

DeletePredicate = str # InfluxDB predicate expression

586

TimeRange = Tuple[Union[str, datetime], Union[str, datetime]] # Start and stop time pair

587

588

# Script-related types

589

class ScriptCreateRequest:

590

name: str

591

description: str

592

script: str

593

language: ScriptLanguage

594

595

class ScriptUpdateRequest:

596

name: str

597

description: str

598

script: str

599

600

class Script:

601

id: str

602

name: str

603

description: str

604

script: str

605

language: ScriptLanguage

606

created_at: datetime

607

updated_at: datetime

608

609

class Scripts:

610

scripts: List[Script]

611

612

class ScriptLanguage(Enum):

613

flux = "flux"

614

615

# Low-level API types

616

class Query:

617

query: str

618

type: str

619

params: dict

620

621

class PostBucketRequest:

622

org_id: str

623

name: str

624

description: str

625

retention_rules: List[RetentionRule]

626

schema_type: str

627

628

class OnboardingRequest:

629

username: str

630

password: str

631

org: str

632

bucket: str

633

retention_period_hrs: int

634

retention_period_ns: int

635

token: str

636

637

# Service response types

638

ServiceResponse = Dict[str, Any] # Generic service response

639

RawHTTPResponse = Any # Raw HTTP response from services

640

641

# Advanced configuration types

642

class TracingConfig:

643

zap_trace_span: str

644

custom_headers: Dict[str, str]

645

646

class CompressionConfig:

647

content_encoding: str # "gzip", "identity"

648

accept_encoding: str # "gzip", "deflate"

649

650

# Exception types for advanced operations

651

class DeletionError(InfluxDBError):

652

"""Raised when data deletion fails."""

653

pass

654

655

class ScriptExecutionError(InfluxDBError):

656

"""Raised when script execution fails."""

657

pass

658

659

class ServiceAPIError(InfluxDBError):

660

"""Raised when low-level service calls fail."""

661

pass

662

663

class InvalidPredicateError(DeletionError):

664

"""Raised when deletion predicate is invalid."""

665

pass

666

667

# Constants for advanced operations

668

DEFAULT_DELETE_PRECISION = WritePrecision.NS

669

MAX_DELETE_RANGE_DAYS = 7 # Recommended maximum range for single deletion

670

SCRIPT_TIMEOUT_MS = 30000 # Default script execution timeout

671

672

# Service endpoint constants

673

QUERY_ENDPOINT = "/api/v2/query"

674

WRITE_ENDPOINT = "/api/v2/write"

675

DELETE_ENDPOINT = "/api/v2/delete"

676

SCRIPTS_ENDPOINT = "/api/v2/scripts"

677

```