or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client.mddata-management.mddatabase-operations.mddataframe-client.mdindex.mdlegacy.md

database-operations.mddocs/

0

# Database Operations

1

2

Complete database lifecycle management including database creation and deletion, user management with permissions, retention policies for data lifecycle, continuous queries for real-time aggregation, and measurement operations for schema management.

3

4

## Capabilities

5

6

### Database Management

7

8

Core database operations for creating, listing, and managing InfluxDB databases.

9

10

```python { .api }

11

def create_database(self, dbname):

12

"""

13

Create a new database.

14

15

Parameters:

16

- dbname (str): Name of the database to create

17

18

Returns:

19

bool: True if successful

20

21

Raises:

22

InfluxDBClientError: If database creation fails

23

"""

24

25

def drop_database(self, dbname):

26

"""

27

Delete a database and all its data.

28

29

Parameters:

30

- dbname (str): Name of the database to delete

31

32

Returns:

33

bool: True if successful

34

35

Raises:

36

InfluxDBClientError: If database deletion fails

37

"""

38

39

def get_list_database(self):

40

"""

41

Get list of all databases on the InfluxDB server.

42

43

Returns:

44

list: List of database names

45

46

Raises:

47

InfluxDBClientError: If unable to retrieve database list

48

"""

49

```

50

51

#### Database Management Examples

52

53

```python

54

from influxdb import InfluxDBClient

55

from influxdb.exceptions import InfluxDBClientError

56

57

client = InfluxDBClient(host='localhost', username='admin', password='admin')

58

59

# Create databases

60

try:

61

client.create_database('production_metrics')

62

client.create_database('development_metrics')

63

client.create_database('test_data')

64

print("Databases created successfully")

65

except InfluxDBClientError as e:

66

print(f"Failed to create database: {e}")

67

68

# List all databases

69

databases = client.get_list_database()

70

print("Available databases:")

71

for db in databases:

72

print(f" - {db['name']}")

73

74

# Switch to specific database

75

client.switch_database('production_metrics')

76

77

# Clean up test databases

78

try:

79

client.drop_database('test_data')

80

print("Test database deleted")

81

except InfluxDBClientError as e:

82

print(f"Failed to delete database: {e}")

83

84

# Database existence check example

85

databases = client.get_list_database()

86

db_exists = any(db['name'] == 'metrics' for db in databases)

87

88

if not db_exists:

89

client.create_database('metrics')

90

print("Created metrics database")

91

```

92

93

### User Management

94

95

Comprehensive user account management with role-based permissions and privilege control.

96

97

```python { .api }

98

def create_user(self, username, password, admin=False):

99

"""

100

Create a new user account.

101

102

Parameters:

103

- username (str): Username for the new account

104

- password (str): Password for the new account

105

- admin (bool): Grant admin privileges (default: False)

106

107

Returns:

108

bool: True if successful

109

110

Raises:

111

InfluxDBClientError: If user creation fails

112

"""

113

114

def drop_user(self, username):

115

"""

116

Delete a user account.

117

118

Parameters:

119

- username (str): Username of account to delete

120

121

Returns:

122

bool: True if successful

123

124

Raises:

125

InfluxDBClientError: If user deletion fails

126

"""

127

128

def get_list_users(self):

129

"""

130

Get list of all user accounts.

131

132

Returns:

133

list: List of user dictionaries with user information

134

135

Raises:

136

InfluxDBClientError: If unable to retrieve user list

137

"""

138

139

def set_user_password(self, username, password):

140

"""

141

Change a user's password.

142

143

Parameters:

144

- username (str): Username of account to modify

145

- password (str): New password

146

147

Returns:

148

bool: True if successful

149

150

Raises:

151

InfluxDBClientError: If password change fails

152

"""

153

154

def grant_admin_privileges(self, username):

155

"""

156

Grant administrative privileges to a user.

157

158

Parameters:

159

- username (str): Username to grant admin privileges

160

161

Returns:

162

bool: True if successful

163

164

Raises:

165

InfluxDBClientError: If privilege grant fails

166

"""

167

168

def revoke_admin_privileges(self, username):

169

"""

170

Revoke administrative privileges from a user.

171

172

Parameters:

173

- username (str): Username to revoke admin privileges

174

175

Returns:

176

bool: True if successful

177

178

Raises:

179

InfluxDBClientError: If privilege revocation fails

180

"""

181

182

def grant_privilege(self, privilege, database, username):

183

"""

184

Grant database-specific privilege to a user.

185

186

Parameters:

187

- privilege (str): Privilege type ('READ', 'WRITE', 'ALL')

188

- database (str): Database name for privilege scope

189

- username (str): Username to grant privilege

190

191

Returns:

192

bool: True if successful

193

194

Raises:

195

InfluxDBClientError: If privilege grant fails

196

"""

197

198

def revoke_privilege(self, privilege, database, username):

199

"""

200

Revoke database-specific privilege from a user.

201

202

Parameters:

203

- privilege (str): Privilege type ('READ', 'WRITE', 'ALL')

204

- database (str): Database name for privilege scope

205

- username (str): Username to revoke privilege

206

207

Returns:

208

bool: True if successful

209

210

Raises:

211

InfluxDBClientError: If privilege revocation fails

212

"""

213

214

def get_list_privileges(self, username):

215

"""

216

Get list of privileges for a specific user.

217

218

Parameters:

219

- username (str): Username to query privileges

220

221

Returns:

222

list: List of privilege dictionaries with database and privilege information

223

224

Raises:

225

InfluxDBClientError: If unable to retrieve privileges

226

"""

227

```

228

229

#### User Management Examples

230

231

```python

232

# Admin client for user management

233

admin_client = InfluxDBClient(username='admin', password='admin_password')

234

235

# Create users with different roles

236

try:

237

# Create regular users

238

admin_client.create_user('developer', 'dev_password', admin=False)

239

admin_client.create_user('analyst', 'analyst_password', admin=False)

240

241

# Create admin user

242

admin_client.create_user('db_admin', 'admin_password', admin=True)

243

244

print("Users created successfully")

245

except InfluxDBClientError as e:

246

print(f"User creation failed: {e}")

247

248

# List all users

249

users = admin_client.get_list_users()

250

print("Current users:")

251

for user in users:

252

print(f" - {user['user']} (admin: {user['admin']})")

253

254

# Grant database-specific privileges

255

admin_client.grant_privilege('READ', 'production_metrics', 'analyst')

256

admin_client.grant_privilege('WRITE', 'development_metrics', 'developer')

257

admin_client.grant_privilege('ALL', 'production_metrics', 'developer')

258

259

# Check user privileges

260

dev_privileges = admin_client.get_list_privileges('developer')

261

print("Developer privileges:")

262

for priv in dev_privileges:

263

print(f" - {priv['database']}: {priv['privilege']}")

264

265

# Change passwords

266

admin_client.set_user_password('developer', 'new_secure_password')

267

268

# Revoke privileges

269

admin_client.revoke_privilege('WRITE', 'production_metrics', 'developer')

270

271

# Administrative privilege management

272

admin_client.grant_admin_privileges('senior_developer')

273

admin_client.revoke_admin_privileges('former_admin')

274

275

# Clean up users

276

admin_client.drop_user('temp_user')

277

278

# Bulk user setup example

279

try:

280

username = 'metrics_reader'

281

password = 'reader_password'

282

databases = ['production_metrics', 'development_metrics']

283

privilege = 'READ'

284

285

admin_client.create_user(username, password)

286

287

for database in databases:

288

admin_client.grant_privilege(privilege, database, username)

289

290

print(f"User {username} created with {privilege} access to {databases}")

291

292

except InfluxDBClientError as e:

293

print(f"Failed to setup user {username}: {e}")

294

```

295

296

### Retention Policy Management

297

298

Data lifecycle management through retention policies that automatically manage data expiration and storage optimization.

299

300

```python { .api }

301

def create_retention_policy(self, name, duration, replication, database=None,

302

default=False, shard_duration="0s"):

303

"""

304

Create a retention policy for automatic data lifecycle management.

305

306

Parameters:

307

- name (str): Name of the retention policy

308

- duration (str): How long to keep data (e.g., '7d', '4w', '52w', 'INF')

309

- replication (int): Number of data replicas (usually 1 for single node)

310

- database (str): Database name (default: current database)

311

- default (bool): Make this the default retention policy (default: False)

312

- shard_duration (str): Shard group duration (default: "0s" for auto)

313

314

Returns:

315

bool: True if successful

316

317

Raises:

318

InfluxDBClientError: If retention policy creation fails

319

"""

320

321

def alter_retention_policy(self, name, database=None, duration=None,

322

replication=None, default=None, shard_duration=None):

323

"""

324

Modify an existing retention policy.

325

326

Parameters:

327

- name (str): Name of retention policy to modify

328

- database (str): Database name (default: current database)

329

- duration (str): New duration (default: unchanged)

330

- replication (int): New replication factor (default: unchanged)

331

- default (bool): Make/unmake default policy (default: unchanged)

332

- shard_duration (str): New shard duration (default: unchanged)

333

334

Returns:

335

bool: True if successful

336

337

Raises:

338

InfluxDBClientError: If retention policy modification fails

339

"""

340

341

def drop_retention_policy(self, name, database=None):

342

"""

343

Delete a retention policy.

344

345

Parameters:

346

- name (str): Name of retention policy to delete

347

- database (str): Database name (default: current database)

348

349

Returns:

350

bool: True if successful

351

352

Raises:

353

InfluxDBClientError: If retention policy deletion fails

354

"""

355

356

def get_list_retention_policies(self, database=None):

357

"""

358

Get list of retention policies for a database.

359

360

Parameters:

361

- database (str): Database name (default: current database)

362

363

Returns:

364

list: List of retention policy dictionaries

365

366

Raises:

367

InfluxDBClientError: If unable to retrieve retention policies

368

"""

369

```

370

371

#### Retention Policy Examples

372

373

```python

374

client = InfluxDBClient(database='production_metrics')

375

376

# Create retention policies for different data lifecycles

377

try:

378

# Short-term high-resolution data (1 week)

379

client.create_retention_policy(

380

name='realtime',

381

duration='7d',

382

replication=1,

383

default=True, # Make this the default policy

384

shard_duration='1h' # 1-hour shards for recent data

385

)

386

387

# Medium-term aggregated data (1 year)

388

client.create_retention_policy(

389

name='historical',

390

duration='52w',

391

replication=1,

392

shard_duration='1d' # Daily shards for historical data

393

)

394

395

# Long-term archive (infinite retention)

396

client.create_retention_policy(

397

name='archive',

398

duration='INF',

399

replication=1,

400

shard_duration='1w' # Weekly shards for archival data

401

)

402

403

print("Retention policies created successfully")

404

405

except InfluxDBClientError as e:

406

print(f"Failed to create retention policies: {e}")

407

408

# List retention policies

409

policies = client.get_list_retention_policies()

410

print("Current retention policies:")

411

for policy in policies:

412

print(f" - {policy['name']}: {policy['duration']} "

413

f"(default: {policy['default']}, shards: {policy['shardGroupDuration']})")

414

415

# Modify retention policy

416

client.alter_retention_policy(

417

name='historical',

418

duration='26w', # Reduce to 6 months

419

shard_duration='12h' # Smaller shards

420

)

421

422

# Write data to specific retention policy

423

points = [

424

{

425

"measurement": "cpu_usage",

426

"tags": {"host": "server01"},

427

"fields": {"value": 75.5},

428

"time": "2023-09-07T07:18:24Z"

429

}

430

]

431

432

# Write to specific retention policy

433

client.write_points(points, retention_policy='historical')

434

435

# Set different default policy

436

client.alter_retention_policy('historical', default=True)

437

438

# Clean up old retention policy

439

client.drop_retention_policy('old_policy')

440

441

# Tiered retention setup example

442

for database_name in ['production', 'staging', 'development']:

443

client.switch_database(database_name)

444

445

# Real-time data: 24 hours, high resolution

446

client.create_retention_policy(

447

'realtime', '24h', 1, default=True, shard_duration='1h'

448

)

449

450

# Daily aggregates: 30 days

451

client.create_retention_policy(

452

'daily', '30d', 1, shard_duration='1d'

453

)

454

455

# Monthly aggregates: 2 years

456

client.create_retention_policy(

457

'monthly', '104w', 1, shard_duration='1w'

458

)

459

460

print(f"Tiered retention setup complete for {database_name}")

461

```

462

463

### Continuous Query Management

464

465

Real-time data aggregation and downsampling through continuous queries that automatically process data as it arrives.

466

467

```python { .api }

468

def create_continuous_query(self, name, select, database=None, resample_opts=None):

469

"""

470

Create a continuous query for real-time data aggregation.

471

472

Parameters:

473

- name (str): Name of the continuous query

474

- select (str): SELECT statement for the aggregation

475

- database (str): Database name (default: current database)

476

- resample_opts (dict): Resampling options with 'FOR' and 'EVERY' keys

477

478

Returns:

479

bool: True if successful

480

481

Raises:

482

InfluxDBClientError: If continuous query creation fails

483

484

Note: resample_opts format: {'FOR': '2m', 'EVERY': '1m'}

485

"""

486

487

def drop_continuous_query(self, name, database=None):

488

"""

489

Delete a continuous query.

490

491

Parameters:

492

- name (str): Name of continuous query to delete

493

- database (str): Database name (default: current database)

494

495

Returns:

496

bool: True if successful

497

498

Raises:

499

InfluxDBClientError: If continuous query deletion fails

500

"""

501

502

def get_list_continuous_queries(self):

503

"""

504

Get list of all continuous queries across all databases.

505

506

Returns:

507

list: List of dictionaries containing continuous query information

508

509

Raises:

510

InfluxDBClientError: If unable to retrieve continuous queries

511

"""

512

```

513

514

#### Continuous Query Examples

515

516

```python

517

client = InfluxDBClient(database='production_metrics')

518

519

# Create continuous queries for automatic downsampling

520

try:

521

# Downsample CPU data to 5-minute averages

522

client.create_continuous_query(

523

name='cpu_5min_avg',

524

select="""

525

SELECT mean(value) as avg_cpu, max(value) as max_cpu, min(value) as min_cpu

526

INTO "historical"."cpu_5min"

527

FROM "realtime"."cpu_usage"

528

GROUP BY time(5m), host

529

"""

530

)

531

532

# Downsample memory data to hourly statistics

533

client.create_continuous_query(

534

name='memory_hourly_stats',

535

select="""

536

SELECT mean(used) as avg_used, max(used) as max_used,

537

mean(available) as avg_available

538

INTO "daily"."memory_hourly"

539

FROM "realtime"."memory_usage"

540

GROUP BY time(1h), host

541

"""

542

)

543

544

# Create alerts based on thresholds

545

client.create_continuous_query(

546

name='high_cpu_alerts',

547

select="""

548

SELECT mean(value) as avg_cpu

549

INTO "alerts"."high_cpu"

550

FROM "realtime"."cpu_usage"

551

WHERE value > 80

552

GROUP BY time(1m), host

553

"""

554

)

555

556

print("Continuous queries created successfully")

557

558

except InfluxDBClientError as e:

559

print(f"Failed to create continuous queries: {e}")

560

561

# Create continuous query with resampling options

562

client.create_continuous_query(

563

name='disk_io_resampled',

564

select="""

565

SELECT sum(read_bytes) as total_read, sum(write_bytes) as total_write

566

INTO "daily"."disk_io_hourly"

567

FROM "realtime"."disk_io"

568

GROUP BY time(1h), host, device

569

""",

570

resample_opts={

571

'FOR': '2h', # Process data for 2 hours back

572

'EVERY': '30m' # Run every 30 minutes

573

}

574

)

575

576

# List all continuous queries

577

cqs = client.get_list_continuous_queries()

578

print("Active continuous queries:")

579

for cq_info in cqs:

580

database = cq_info['name']

581

for cq in cq_info.get('cqs', []):

582

print(f" Database: {database}")

583

print(f" Name: {cq['name']}")

584

print(f" Query: {cq['query']}")

585

586

# Drop continuous query

587

client.drop_continuous_query('old_downsampling_query')

588

589

# Complex continuous query for business metrics

590

business_analytics_cq = """

591

SELECT sum(revenue) as total_revenue,

592

count(transaction_id) as transaction_count,

593

mean(order_value) as avg_order_value

594

INTO "business"."daily_metrics"

595

FROM "realtime"."transactions"

596

WHERE status = 'completed'

597

GROUP BY time(1d), region, product_category

598

"""

599

600

client.create_continuous_query(

601

name='daily_business_analytics',

602

select=business_analytics_cq

603

)

604

605

# Standard downsampling pattern example

606

retention_policies = {

607

'realtime': 'realtime',

608

'daily': 'daily',

609

'monthly': 'monthly'

610

}

611

612

measurement = 'cpu_usage'

613

fields = ['value']

614

615

# 5-minute averages

616

fields_select = ", ".join([f"mean({field}) as avg_{field}" for field in fields])

617

cq_5min = f"""

618

SELECT {fields_select}

619

INTO "{retention_policies['daily']}"."{measurement}_5min"

620

FROM "{retention_policies['realtime']}"."{measurement}"

621

GROUP BY time(5m), *

622

"""

623

624

client.create_continuous_query(f"{measurement}_5min_avg", cq_5min)

625

626

# Hourly statistics

627

stats_select = ", ".join([

628

f"mean({field}) as avg_{field}, max({field}) as max_{field}, min({field}) as min_{field}"

629

for field in fields

630

])

631

cq_hourly = f"""

632

SELECT {stats_select}

633

INTO "{retention_policies['monthly']}"."{measurement}_hourly"

634

FROM "{retention_policies['daily']}"."{measurement}_5min"

635

GROUP BY time(1h), *

636

"""

637

638

client.create_continuous_query(f"{measurement}_hourly_stats", cq_hourly)

639

640

print(f"Standard downsampling created for {measurement}")

641

```

642

643

### Measurement Operations

644

645

Schema management operations for working with measurements, series, and data organization.

646

647

```python { .api }

648

def get_list_measurements(self):

649

"""

650

Get list of measurements in the current database.

651

652

Returns:

653

list: List of measurement dictionaries

654

655

Raises:

656

InfluxDBClientError: If unable to retrieve measurements

657

"""

658

659

def drop_measurement(self, measurement):

660

"""

661

Delete a measurement and all its data.

662

663

Parameters:

664

- measurement (str): Name of measurement to delete

665

666

Returns:

667

bool: True if successful

668

669

Raises:

670

InfluxDBClientError: If measurement deletion fails

671

"""

672

673

def get_list_series(self, database=None, measurement=None, tags=None):

674

"""

675

Get list of series matching optional filters.

676

677

Parameters:

678

- database (str): Database name (default: current database)

679

- measurement (str): Filter by measurement name (default: None)

680

- tags (dict): Filter by tag key-value pairs (default: None)

681

682

Returns:

683

list: List of series information

684

685

Raises:

686

InfluxDBClientError: If unable to retrieve series

687

"""

688

689

def delete_series(self, database=None, measurement=None, tags=None):

690

"""

691

Delete series matching the specified criteria.

692

693

Parameters:

694

- database (str): Database name (default: current database)

695

- measurement (str): Filter by measurement name (default: None)

696

- tags (dict): Filter by tag key-value pairs (default: None)

697

698

Returns:

699

bool: True if successful

700

701

Raises:

702

InfluxDBClientError: If series deletion fails

703

704

Warning: This operation is destructive and cannot be undone

705

"""

706

```

707

708

#### Measurement Operations Examples

709

710

```python

711

client = InfluxDBClient(database='production_metrics')

712

713

# List all measurements

714

measurements = client.get_list_measurements()

715

print("Available measurements:")

716

for measurement in measurements:

717

print(f" - {measurement['name']}")

718

719

# Get detailed series information

720

series_list = client.get_list_series()

721

print(f"Total series count: {len(series_list)}")

722

723

# Filter series by measurement

724

cpu_series = client.get_list_series(measurement='cpu_usage')

725

print(f"CPU usage series: {len(cpu_series)}")

726

for series in cpu_series[:5]: # Show first 5

727

print(f" - {series}")

728

729

# Filter series by tags

730

server01_series = client.get_list_series(tags={'host': 'server01'})

731

print(f"Series for server01: {len(server01_series)}")

732

733

# Delete specific series (be careful!)

734

# Delete test data series

735

client.delete_series(measurement='test_data', tags={'environment': 'test'})

736

737

# Drop entire measurement (removes all series and data)

738

client.drop_measurement('deprecated_measurement')

739

740

# Measurement analysis example

741

measurement_name = 'cpu_usage'

742

series = client.get_list_series(measurement=measurement_name)

743

744

tag_combinations = {}

745

for series_key in series:

746

# Parse series key to extract tags

747

# Series format: "measurement,tag1=value1,tag2=value2"

748

if ',' in series_key:

749

tags_part = series_key.split(',', 1)[1]

750

tag_pairs = tags_part.split(',')

751

tag_combo = tuple(sorted(tag_pairs))

752

tag_combinations[tag_combo] = tag_combinations.get(tag_combo, 0) + 1

753

754

print(f"Measurement: {measurement_name}")

755

print(f"Total series: {len(series)}")

756

print(f"Unique tag combinations: {len(tag_combinations)}")

757

758

# Show most common tag combinations

759

sorted_combos = sorted(tag_combinations.items(), key=lambda x: x[1], reverse=True)

760

print("Most common tag combinations:")

761

for combo, count in sorted_combos[:5]:

762

print(f" {combo}: {count} series")

763

764

# Clean up measurements by pattern example

765

measurements = client.get_list_measurements()

766

patterns_to_delete = ['test_', 'temp_', 'debug_']

767

768

for measurement_info in measurements:

769

measurement_name = measurement_info['name']

770

771

for pattern in patterns_to_delete:

772

if pattern in measurement_name:

773

print(f"Deleting measurement: {measurement_name}")

774

try:

775

client.drop_measurement(measurement_name)

776

except InfluxDBClientError as e:

777

print(f"Failed to delete {measurement_name}: {e}")

778

```