or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client.mddata-management.mddatabase-operations.mddataframe-client.mdindex.mdlegacy.md

data-management.mddocs/

0

# Data Management

1

2

Comprehensive utilities for efficient data handling including bulk data insertion, line protocol formatting, and query result processing. These tools optimize data operations and provide convenient abstractions for common data management tasks.

3

4

## Capabilities

5

6

### SeriesHelper - Bulk Data Operations

7

8

Class-based utility for efficient bulk data insertion with configurable auto-commit behavior and immutable data point definitions.

9

10

```python { .api }

11

class SeriesHelper:

12

"""

13

Helper class for writing data points in bulk with immutable data points

14

and configurable auto-commit behavior.

15

16

Usage requires defining a Meta class with series configuration.

17

"""

18

19

def __init__(self, **kw):

20

"""

21

Create new data point with field values.

22

23

Parameters:

24

- **kw: Field values matching Meta.fields definitions

25

26

Note: Creates immutable data point that is queued for bulk commit

27

"""

28

29

@classmethod

30

def commit(cls, client=None):

31

"""

32

Commit all queued datapoints via InfluxDB client.

33

34

Parameters:

35

- client (InfluxDBClient): Client instance (default: uses Meta.client)

36

37

Returns:

38

bool: True if successful

39

40

Raises:

41

InfluxDBClientError: On write errors

42

"""

43

44

@classmethod

45

def _json_body_(cls):

46

"""

47

Generate JSON body for all queued datapoints.

48

49

Returns:

50

list: List of point dictionaries ready for InfluxDB

51

"""

52

53

@classmethod

54

def _reset_(cls):

55

"""

56

Reset internal data storage, clearing all queued points.

57

"""

58

59

@staticmethod

60

def _current_timestamp():

61

"""

62

Get current timestamp in nanoseconds since epoch.

63

64

Returns:

65

int: Current timestamp in nanoseconds

66

"""

67

```

68

69

#### SeriesHelper Configuration

70

71

SeriesHelper requires configuration via a Meta class defining the series structure:

72

73

```python

74

class Meta:

75

# Required attributes

76

series_name = 'measurement_name'

77

fields = ['field1', 'field2', 'field3']

78

tags = ['tag1', 'tag2']

79

80

# Optional attributes

81

bulk_size = 300 # Auto-commit after N points (0 disables)

82

client = influxdb_client_instance # Default client

83

autocommit = True # Enable auto-commit behavior

84

```

85

86

#### SeriesHelper Usage Examples

87

88

```python

89

from influxdb import InfluxDBClient, SeriesHelper

90

91

# Configure client

92

client = InfluxDBClient(database='metrics')

93

94

# Define SeriesHelper subclass

95

class CpuMetrics(SeriesHelper):

96

class Meta:

97

series_name = 'cpu_usage'

98

fields = ['user', 'system', 'idle']

99

tags = ['host', 'cpu_core']

100

bulk_size = 100 # Auto-commit every 100 points

101

client = client

102

autocommit = True

103

104

# Create data points

105

CpuMetrics(host='server01', cpu_core='core0', user=25.5, system=10.2, idle=64.3)

106

CpuMetrics(host='server01', cpu_core='core1', user=30.1, system=8.7, idle=61.2)

107

CpuMetrics(host='server02', cpu_core='core0', user=45.8, system=15.3, idle=38.9)

108

109

# Manual commit (if autocommit disabled)

110

CpuMetrics.commit()

111

112

# Reset queued points

113

CpuMetrics._reset_()

114

115

# Generate JSON without committing

116

json_data = CpuMetrics._json_body_()

117

print(json_data)

118

```

119

120

#### Advanced SeriesHelper Patterns

121

122

```python

123

# Multiple series helpers

124

class MemoryMetrics(SeriesHelper):

125

class Meta:

126

series_name = 'memory_usage'

127

fields = ['used', 'available', 'buffer', 'cache']

128

tags = ['host']

129

bulk_size = 50

130

client = client

131

132

class DiskMetrics(SeriesHelper):

133

class Meta:

134

series_name = 'disk_io'

135

fields = ['read_bytes', 'write_bytes', 'read_ops', 'write_ops']

136

tags = ['host', 'device']

137

bulk_size = 200

138

client = client

139

140

# Batch data collection

141

def collect_system_metrics(hosts):

142

for host in hosts:

143

# CPU data

144

cpu_data = get_cpu_stats(host)

145

for core, stats in cpu_data.items():

146

CpuMetrics(host=host, cpu_core=core, **stats)

147

148

# Memory data

149

mem_data = get_memory_stats(host)

150

MemoryMetrics(host=host, **mem_data)

151

152

# Disk data

153

disk_data = get_disk_stats(host)

154

for device, stats in disk_data.items():

155

DiskMetrics(host=host, device=device, **stats)

156

157

# All helpers auto-commit based on their bulk_size settings

158

collect_system_metrics(['server01', 'server02', 'server03'])

159

160

# Manual commit all at once

161

CpuMetrics.commit()

162

MemoryMetrics.commit()

163

DiskMetrics.commit()

164

```

165

166

### Line Protocol Utilities

167

168

Functions for creating efficient line protocol formatted data, InfluxDB's native wire format for optimal write performance.

169

170

```python { .api }

171

def make_line(measurement, tags=None, fields=None, time=None, precision=None):

172

"""

173

Create single line protocol formatted string.

174

175

Parameters:

176

- measurement (str): Measurement name

177

- tags (dict): Tag key-value pairs (default: None)

178

- fields (dict): Field key-value pairs (default: None)

179

- time (int or datetime): Timestamp (default: current time)

180

- precision (str): Time precision ('s', 'ms', 'u', 'ns') (default: None)

181

182

Returns:

183

str: Line protocol formatted string

184

185

Raises:

186

ValueError: If measurement or fields are missing

187

"""

188

189

def make_lines(data, precision=None):

190

"""

191

Create multiple line protocol strings from data structure.

192

193

Parameters:

194

- data (list): List of point dictionaries with measurement, tags, fields, time

195

- precision (str): Time precision ('s', 'ms', 'u', 'ns') (default: None)

196

197

Returns:

198

list: List of line protocol formatted strings

199

200

Raises:

201

ValueError: If data format is invalid

202

"""

203

204

def quote_ident(value):

205

"""

206

Quote identifier strings for line protocol.

207

208

Parameters:

209

- value (str): Identifier to quote

210

211

Returns:

212

str: Quoted identifier

213

"""

214

215

def quote_literal(value):

216

"""

217

Quote literal strings for line protocol.

218

219

Parameters:

220

- value (str): Literal to quote

221

222

Returns:

223

str: Quoted literal

224

"""

225

226

# Time reference constant

227

EPOCH: datetime # UTC epoch timestamp reference

228

```

229

230

#### Line Protocol Examples

231

232

```python

233

from influxdb.line_protocol import make_line, make_lines

234

from datetime import datetime, timezone

235

236

# Create single line

237

line = make_line(

238

measurement='cpu_usage',

239

tags={'host': 'server01', 'cpu': 'cpu0'},

240

fields={'user': 23.5, 'system': 12.3, 'idle': 64.2},

241

time=datetime.now(timezone.utc)

242

)

243

print(line)

244

# Output: cpu_usage,host=server01,cpu=cpu0 user=23.5,system=12.3,idle=64.2 1694068704000000000

245

246

# Create multiple lines from data

247

data = [

248

{

249

'measurement': 'cpu_usage',

250

'tags': {'host': 'server01'},

251

'fields': {'value': 75.5},

252

'time': '2023-09-07T07:18:24Z'

253

},

254

{

255

'measurement': 'memory_usage',

256

'tags': {'host': 'server01'},

257

'fields': {'value': 82.3},

258

'time': '2023-09-07T07:18:24Z'

259

}

260

]

261

262

lines = make_lines(data, precision='s')

263

for line in lines:

264

print(line)

265

266

# Write line protocol directly

267

client = InfluxDBClient()

268

line_data = make_line('temperature', {'sensor': 'room1'}, {'value': 23.5})

269

client.write([line_data], protocol='line')

270

271

# Batch line protocol creation

272

def create_sensor_lines(sensor_readings):

273

lines = []

274

for reading in sensor_readings:

275

line = make_line(

276

measurement='sensor_data',

277

tags={'sensor_id': reading['id'], 'location': reading['location']},

278

fields={'temperature': reading['temp'], 'humidity': reading['humidity']},

279

time=reading['timestamp']

280

)

281

lines.append(line)

282

return lines

283

284

# High-performance writing

285

sensor_data = get_sensor_readings() # Get data

286

lines = create_sensor_lines(sensor_data)

287

client.write('\n'.join(lines), protocol='line')

288

```

289

290

### ResultSet - Query Result Processing

291

292

Wrapper class for InfluxDB query results providing iteration, filtering, and data extraction capabilities.

293

294

```python { .api }

295

class ResultSet:

296

"""

297

Wrapper around InfluxDB query results with iteration and filtering capabilities.

298

"""

299

300

def __init__(self, series, raise_errors=True):

301

"""

302

Initialize ResultSet from query response.

303

304

Parameters:

305

- series (list): Raw series data from InfluxDB query response

306

- raise_errors (bool): Raise exceptions on query errors (default: True)

307

308

Raises:

309

InfluxDBClientError: If query errors and raise_errors=True

310

"""

311

312

def get_points(self, measurement=None, tags=None):

313

"""

314

Get data points matching optional filters.

315

316

Parameters:

317

- measurement (str): Filter by measurement name (default: None)

318

- tags (dict): Filter by tag key-value pairs (default: None)

319

320

Yields:

321

dict: Individual data points as dictionaries

322

323

Example:

324

for point in result.get_points(measurement='cpu_usage', tags={'host': 'server01'}):

325

print(point['time'], point['value'])

326

"""

327

328

def keys(self):

329

"""

330

Get list of measurement keys in the result set.

331

332

Returns:

333

list: List of measurement key tuples (name, tags)

334

"""

335

336

def items(self):

337

"""

338

Get key-value pairs for all series in result set.

339

340

Yields:

341

tuple: (key, points_generator) pairs

342

"""

343

344

@staticmethod

345

def point_from_cols_vals(cols, vals):

346

"""

347

Create point dictionary from column names and values.

348

349

Parameters:

350

- cols (list): Column names

351

- vals (list): Column values

352

353

Returns:

354

dict: Point dictionary with column names as keys

355

"""

356

357

def __getitem__(self, key):

358

"""

359

Retrieve series by key (deprecated - use get_points instead).

360

361

Parameters:

362

- key: Series key

363

364

Returns:

365

generator: Points for the specified series

366

"""

367

368

def __iter__(self):

369

"""

370

Iterate over all points in all series.

371

372

Yields:

373

dict: Individual data points

374

"""

375

376

def __len__(self):

377

"""

378

Get number of series in result set.

379

380

Returns:

381

int: Number of series

382

"""

383

384

def __repr__(self):

385

"""

386

String representation of ResultSet.

387

388

Returns:

389

str: ResultSet description

390

"""

391

392

# Properties

393

@property

394

def raw(self):

395

"""Raw JSON response from InfluxDB."""

396

397

@property

398

def error(self):

399

"""Error message from InfluxDB query (if any)."""

400

```

401

402

#### ResultSet Usage Examples

403

404

```python

405

from influxdb import InfluxDBClient

406

407

client = InfluxDBClient(database='metrics')

408

409

# Execute query

410

result = client.query('SELECT * FROM cpu_usage WHERE time >= now() - 1h')

411

412

# Basic iteration

413

for point in result.get_points():

414

print(f"Time: {point['time']}, Value: {point['value']}")

415

416

# Filtered iteration

417

for point in result.get_points(measurement='cpu_usage', tags={'host': 'server01'}):

418

print(f"Server01 CPU: {point['value']}%")

419

420

# Get all keys

421

keys = result.keys()

422

print("Available measurements:", keys)

423

424

# Process by series

425

for key, points in result.items():

426

measurement, tags = key

427

print(f"Processing {measurement} with tags {tags}")

428

for point in points:

429

# Process each point

430

pass

431

432

# Check for errors

433

if result.error:

434

print("Query error:", result.error)

435

436

# Access raw response

437

raw_data = result.raw

438

print("Raw InfluxDB response:", raw_data)

439

440

# Convert to simple list

441

all_points = list(result.get_points())

442

print(f"Retrieved {len(all_points)} total points")

443

444

# Filter and aggregate

445

cpu_values = [

446

point['value'] for point in result.get_points()

447

if point.get('measurement') == 'cpu_usage'

448

]

449

avg_cpu = sum(cpu_values) / len(cpu_values) if cpu_values else 0

450

print(f"Average CPU usage: {avg_cpu:.2f}%")

451

```

452

453

#### Advanced ResultSet Processing

454

455

```python

456

# Complex query with multiple measurements

457

query = """

458

SELECT mean(value) as avg_value, max(value) as max_value

459

FROM cpu_usage, memory_usage

460

WHERE time >= now() - 24h

461

GROUP BY time(1h), host

462

"""

463

464

result = client.query(query)

465

466

# Group results by measurement and host

467

from collections import defaultdict

468

469

results_by_measurement = defaultdict(list)

470

for point in result.get_points():

471

measurement = point.get('name', 'unknown') # Series name

472

host = point.get('host', 'unknown')

473

474

results_by_measurement[f"{measurement}_{host}"].append({

475

'time': point['time'],

476

'avg_value': point.get('avg_value'),

477

'max_value': point.get('max_value')

478

})

479

480

# Process grouped results

481

for key, points in results_by_measurement.items():

482

print(f"Processing {key}: {len(points)} time points")

483

484

# Calculate trends

485

values = [p['avg_value'] for p in points if p['avg_value'] is not None]

486

if len(values) >= 2:

487

trend = values[-1] - values[0] # Simple trend calculation

488

print(f" Trend: {trend:+.2f}")

489

490

# Export results to different formats

491

def export_results(result_set, format='json'):

492

"""Export ResultSet to various formats."""

493

points = list(result_set.get_points())

494

495

if format == 'json':

496

import json

497

return json.dumps(points, default=str)

498

499

elif format == 'csv':

500

import csv, io

501

if not points:

502

return ""

503

504

output = io.StringIO()

505

writer = csv.DictWriter(output, fieldnames=points[0].keys())

506

writer.writeheader()

507

writer.writerows(points)

508

return output.getvalue()

509

510

elif format == 'dataframe':

511

import pandas as pd

512

return pd.DataFrame(points)

513

514

# Export query results

515

json_data = export_results(result, 'json')

516

csv_data = export_results(result, 'csv')

517

df = export_results(result, 'dataframe')

518

```

519

520

## Performance Optimization

521

522

### Efficient Data Operations

523

524

```python

525

# Use line protocol for maximum write performance

526

from influxdb.line_protocol import make_lines

527

528

# Batch create line protocol

529

data_points = [

530

{'measurement': 'metrics', 'tags': {'host': f'server{i:02d}'},

531

'fields': {'value': i * 10.5}}

532

for i in range(1000)

533

]

534

535

lines = make_lines(data_points)

536

line_data = '\n'.join(lines)

537

client.write(line_data, protocol='line')

538

539

# SeriesHelper for structured bulk inserts

540

class HighThroughputMetrics(SeriesHelper):

541

class Meta:

542

series_name = 'high_volume_data'

543

fields = ['value1', 'value2', 'value3']

544

tags = ['source', 'type']

545

bulk_size = 10000 # Large batch size

546

client = client

547

548

# Stream processing pattern

549

def process_data_stream(data_stream):

550

for batch in data_stream.batches(size=1000):

551

for record in batch:

552

HighThroughputMetrics(

553

source=record['source'],

554

type=record['type'],

555

value1=record['v1'],

556

value2=record['v2'],

557

value3=record['v3']

558

)

559

# Auto-commits when bulk_size reached

560

561

# Memory-efficient result processing

562

def process_large_query_results(query):

563

result = client.query(query, chunked=True, chunk_size=10000)

564

565

# Process points in chunks to avoid memory issues

566

chunk_count = 0

567

for point in result.get_points():

568

# Process individual point

569

process_point(point)

570

571

chunk_count += 1

572

if chunk_count % 10000 == 0:

573

print(f"Processed {chunk_count} points...")

574

```