0
# Data Management
1
2
Comprehensive utilities for efficient data handling including bulk data insertion, line protocol formatting, and query result processing. These tools optimize data operations and provide convenient abstractions for common data management tasks.
3
4
## Capabilities
5
6
### SeriesHelper - Bulk Data Operations
7
8
Class-based utility for efficient bulk data insertion with configurable auto-commit behavior and immutable data point definitions.
9
10
```python { .api }
11
class SeriesHelper:
12
"""
13
Helper class for writing data points in bulk with immutable data points
14
and configurable auto-commit behavior.
15
16
Usage requires defining a Meta class with series configuration.
17
"""
18
19
def __init__(self, **kw):
20
"""
21
Create new data point with field values.
22
23
Parameters:
24
- **kw: Field values matching Meta.fields definitions
25
26
Note: Creates immutable data point that is queued for bulk commit
27
"""
28
29
@classmethod
30
def commit(cls, client=None):
31
"""
32
Commit all queued datapoints via InfluxDB client.
33
34
Parameters:
35
- client (InfluxDBClient): Client instance (default: uses Meta.client)
36
37
Returns:
38
bool: True if successful
39
40
Raises:
41
InfluxDBClientError: On write errors
42
"""
43
44
@classmethod
45
def _json_body_(cls):
46
"""
47
Generate JSON body for all queued datapoints.
48
49
Returns:
50
list: List of point dictionaries ready for InfluxDB
51
"""
52
53
@classmethod
54
def _reset_(cls):
55
"""
56
Reset internal data storage, clearing all queued points.
57
"""
58
59
@staticmethod
60
def _current_timestamp():
61
"""
62
Get current timestamp in nanoseconds since epoch.
63
64
Returns:
65
int: Current timestamp in nanoseconds
66
"""
67
```
68
69
#### SeriesHelper Configuration
70
71
SeriesHelper requires configuration via a Meta class defining the series structure:
72
73
```python
74
class Meta:
75
# Required attributes
76
series_name = 'measurement_name'
77
fields = ['field1', 'field2', 'field3']
78
tags = ['tag1', 'tag2']
79
80
# Optional attributes
81
bulk_size = 300 # Auto-commit after N points (0 disables)
82
client = influxdb_client_instance # Default client
83
autocommit = True # Enable auto-commit behavior
84
```
85
86
#### SeriesHelper Usage Examples
87
88
```python
89
from influxdb import InfluxDBClient, SeriesHelper
90
91
# Configure client
92
client = InfluxDBClient(database='metrics')
93
94
# Define SeriesHelper subclass
95
class CpuMetrics(SeriesHelper):
96
class Meta:
97
series_name = 'cpu_usage'
98
fields = ['user', 'system', 'idle']
99
tags = ['host', 'cpu_core']
100
bulk_size = 100 # Auto-commit every 100 points
101
client = client
102
autocommit = True
103
104
# Create data points
105
CpuMetrics(host='server01', cpu_core='core0', user=25.5, system=10.2, idle=64.3)
106
CpuMetrics(host='server01', cpu_core='core1', user=30.1, system=8.7, idle=61.2)
107
CpuMetrics(host='server02', cpu_core='core0', user=45.8, system=15.3, idle=38.9)
108
109
# Manual commit (if autocommit disabled)
110
CpuMetrics.commit()
111
112
# Reset queued points
113
CpuMetrics._reset_()
114
115
# Generate JSON without committing
116
json_data = CpuMetrics._json_body_()
117
print(json_data)
118
```
119
120
#### Advanced SeriesHelper Patterns
121
122
```python
123
# Multiple series helpers
124
class MemoryMetrics(SeriesHelper):
125
class Meta:
126
series_name = 'memory_usage'
127
fields = ['used', 'available', 'buffer', 'cache']
128
tags = ['host']
129
bulk_size = 50
130
client = client
131
132
class DiskMetrics(SeriesHelper):
133
class Meta:
134
series_name = 'disk_io'
135
fields = ['read_bytes', 'write_bytes', 'read_ops', 'write_ops']
136
tags = ['host', 'device']
137
bulk_size = 200
138
client = client
139
140
# Batch data collection
141
def collect_system_metrics(hosts):
142
for host in hosts:
143
# CPU data
144
cpu_data = get_cpu_stats(host)
145
for core, stats in cpu_data.items():
146
CpuMetrics(host=host, cpu_core=core, **stats)
147
148
# Memory data
149
mem_data = get_memory_stats(host)
150
MemoryMetrics(host=host, **mem_data)
151
152
# Disk data
153
disk_data = get_disk_stats(host)
154
for device, stats in disk_data.items():
155
DiskMetrics(host=host, device=device, **stats)
156
157
# All helpers auto-commit based on their bulk_size settings
158
collect_system_metrics(['server01', 'server02', 'server03'])
159
160
# Manual commit all at once
161
CpuMetrics.commit()
162
MemoryMetrics.commit()
163
DiskMetrics.commit()
164
```
165
166
### Line Protocol Utilities
167
168
Functions for creating efficient line protocol formatted data, InfluxDB's native wire format for optimal write performance.
169
170
```python { .api }
171
def make_line(measurement, tags=None, fields=None, time=None, precision=None):
172
"""
173
Create single line protocol formatted string.
174
175
Parameters:
176
- measurement (str): Measurement name
177
- tags (dict): Tag key-value pairs (default: None)
178
- fields (dict): Field key-value pairs (default: None)
179
- time (int or datetime): Timestamp (default: current time)
180
- precision (str): Time precision ('s', 'ms', 'u', 'ns') (default: None)
181
182
Returns:
183
str: Line protocol formatted string
184
185
Raises:
186
ValueError: If measurement or fields are missing
187
"""
188
189
def make_lines(data, precision=None):
190
"""
191
Create multiple line protocol strings from data structure.
192
193
Parameters:
194
- data (list): List of point dictionaries with measurement, tags, fields, time
195
- precision (str): Time precision ('s', 'ms', 'u', 'ns') (default: None)
196
197
Returns:
198
list: List of line protocol formatted strings
199
200
Raises:
201
ValueError: If data format is invalid
202
"""
203
204
def quote_ident(value):
205
"""
206
Quote identifier strings for line protocol.
207
208
Parameters:
209
- value (str): Identifier to quote
210
211
Returns:
212
str: Quoted identifier
213
"""
214
215
def quote_literal(value):
216
"""
217
Quote literal strings for line protocol.
218
219
Parameters:
220
- value (str): Literal to quote
221
222
Returns:
223
str: Quoted literal
224
"""
225
226
# Time reference constant
227
EPOCH: datetime # UTC epoch timestamp reference
228
```
229
230
#### Line Protocol Examples
231
232
```python
233
from influxdb.line_protocol import make_line, make_lines
234
from datetime import datetime, timezone
235
236
# Create single line
237
line = make_line(
238
measurement='cpu_usage',
239
tags={'host': 'server01', 'cpu': 'cpu0'},
240
fields={'user': 23.5, 'system': 12.3, 'idle': 64.2},
241
time=datetime.now(timezone.utc)
242
)
243
print(line)
244
# Output: cpu_usage,host=server01,cpu=cpu0 user=23.5,system=12.3,idle=64.2 1694068704000000000
245
246
# Create multiple lines from data
247
data = [
248
{
249
'measurement': 'cpu_usage',
250
'tags': {'host': 'server01'},
251
'fields': {'value': 75.5},
252
'time': '2023-09-07T07:18:24Z'
253
},
254
{
255
'measurement': 'memory_usage',
256
'tags': {'host': 'server01'},
257
'fields': {'value': 82.3},
258
'time': '2023-09-07T07:18:24Z'
259
}
260
]
261
262
lines = make_lines(data, precision='s')
263
for line in lines:
264
print(line)
265
266
# Write line protocol directly
267
client = InfluxDBClient()
268
line_data = make_line('temperature', {'sensor': 'room1'}, {'value': 23.5})
269
client.write([line_data], protocol='line')
270
271
# Batch line protocol creation
272
def create_sensor_lines(sensor_readings):
273
lines = []
274
for reading in sensor_readings:
275
line = make_line(
276
measurement='sensor_data',
277
tags={'sensor_id': reading['id'], 'location': reading['location']},
278
fields={'temperature': reading['temp'], 'humidity': reading['humidity']},
279
time=reading['timestamp']
280
)
281
lines.append(line)
282
return lines
283
284
# High-performance writing
285
sensor_data = get_sensor_readings() # Get data
286
lines = create_sensor_lines(sensor_data)
287
client.write('\n'.join(lines), protocol='line')
288
```
289
290
### ResultSet - Query Result Processing
291
292
Wrapper class for InfluxDB query results providing iteration, filtering, and data extraction capabilities.
293
294
```python { .api }
295
class ResultSet:
296
"""
297
Wrapper around InfluxDB query results with iteration and filtering capabilities.
298
"""
299
300
def __init__(self, series, raise_errors=True):
301
"""
302
Initialize ResultSet from query response.
303
304
Parameters:
305
- series (list): Raw series data from InfluxDB query response
306
- raise_errors (bool): Raise exceptions on query errors (default: True)
307
308
Raises:
309
InfluxDBClientError: If query errors and raise_errors=True
310
"""
311
312
def get_points(self, measurement=None, tags=None):
313
"""
314
Get data points matching optional filters.
315
316
Parameters:
317
- measurement (str): Filter by measurement name (default: None)
318
- tags (dict): Filter by tag key-value pairs (default: None)
319
320
Yields:
321
dict: Individual data points as dictionaries
322
323
Example:
324
for point in result.get_points(measurement='cpu_usage', tags={'host': 'server01'}):
325
print(point['time'], point['value'])
326
"""
327
328
def keys(self):
329
"""
330
Get list of measurement keys in the result set.
331
332
Returns:
333
list: List of measurement key tuples (name, tags)
334
"""
335
336
def items(self):
337
"""
338
Get key-value pairs for all series in result set.
339
340
Yields:
341
tuple: (key, points_generator) pairs
342
"""
343
344
@staticmethod
345
def point_from_cols_vals(cols, vals):
346
"""
347
Create point dictionary from column names and values.
348
349
Parameters:
350
- cols (list): Column names
351
- vals (list): Column values
352
353
Returns:
354
dict: Point dictionary with column names as keys
355
"""
356
357
def __getitem__(self, key):
358
"""
359
Retrieve series by key (deprecated - use get_points instead).
360
361
Parameters:
362
- key: Series key
363
364
Returns:
365
generator: Points for the specified series
366
"""
367
368
def __iter__(self):
369
"""
370
Iterate over all points in all series.
371
372
Yields:
373
dict: Individual data points
374
"""
375
376
def __len__(self):
377
"""
378
Get number of series in result set.
379
380
Returns:
381
int: Number of series
382
"""
383
384
def __repr__(self):
385
"""
386
String representation of ResultSet.
387
388
Returns:
389
str: ResultSet description
390
"""
391
392
# Properties
393
@property
394
def raw(self):
395
"""Raw JSON response from InfluxDB."""
396
397
@property
398
def error(self):
399
"""Error message from InfluxDB query (if any)."""
400
```
401
402
#### ResultSet Usage Examples
403
404
```python
405
from influxdb import InfluxDBClient
406
407
client = InfluxDBClient(database='metrics')
408
409
# Execute query
410
result = client.query('SELECT * FROM cpu_usage WHERE time >= now() - 1h')
411
412
# Basic iteration
413
for point in result.get_points():
414
print(f"Time: {point['time']}, Value: {point['value']}")
415
416
# Filtered iteration
417
for point in result.get_points(measurement='cpu_usage', tags={'host': 'server01'}):
418
print(f"Server01 CPU: {point['value']}%")
419
420
# Get all keys
421
keys = result.keys()
422
print("Available measurements:", keys)
423
424
# Process by series
425
for key, points in result.items():
426
measurement, tags = key
427
print(f"Processing {measurement} with tags {tags}")
428
for point in points:
429
# Process each point
430
pass
431
432
# Check for errors
433
if result.error:
434
print("Query error:", result.error)
435
436
# Access raw response
437
raw_data = result.raw
438
print("Raw InfluxDB response:", raw_data)
439
440
# Convert to simple list
441
all_points = list(result.get_points())
442
print(f"Retrieved {len(all_points)} total points")
443
444
# Filter and aggregate
445
cpu_values = [
446
point['value'] for point in result.get_points()
447
if point.get('measurement') == 'cpu_usage'
448
]
449
avg_cpu = sum(cpu_values) / len(cpu_values) if cpu_values else 0
450
print(f"Average CPU usage: {avg_cpu:.2f}%")
451
```
452
453
#### Advanced ResultSet Processing
454
455
```python
456
# Complex query with multiple measurements
457
query = """
458
SELECT mean(value) as avg_value, max(value) as max_value
459
FROM cpu_usage, memory_usage
460
WHERE time >= now() - 24h
461
GROUP BY time(1h), host
462
"""
463
464
result = client.query(query)
465
466
# Group results by measurement and host
467
from collections import defaultdict
468
469
results_by_measurement = defaultdict(list)
470
for point in result.get_points():
471
measurement = point.get('name', 'unknown') # Series name
472
host = point.get('host', 'unknown')
473
474
results_by_measurement[f"{measurement}_{host}"].append({
475
'time': point['time'],
476
'avg_value': point.get('avg_value'),
477
'max_value': point.get('max_value')
478
})
479
480
# Process grouped results
481
for key, points in results_by_measurement.items():
482
print(f"Processing {key}: {len(points)} time points")
483
484
# Calculate trends
485
values = [p['avg_value'] for p in points if p['avg_value'] is not None]
486
if len(values) >= 2:
487
trend = values[-1] - values[0] # Simple trend calculation
488
print(f" Trend: {trend:+.2f}")
489
490
# Export results to different formats
491
def export_results(result_set, format='json'):
492
"""Export ResultSet to various formats."""
493
points = list(result_set.get_points())
494
495
if format == 'json':
496
import json
497
return json.dumps(points, default=str)
498
499
elif format == 'csv':
500
import csv, io
501
if not points:
502
return ""
503
504
output = io.StringIO()
505
writer = csv.DictWriter(output, fieldnames=points[0].keys())
506
writer.writeheader()
507
writer.writerows(points)
508
return output.getvalue()
509
510
elif format == 'dataframe':
511
import pandas as pd
512
return pd.DataFrame(points)
513
514
# Export query results
515
json_data = export_results(result, 'json')
516
csv_data = export_results(result, 'csv')
517
df = export_results(result, 'dataframe')
518
```
519
520
## Performance Optimization
521
522
### Efficient Data Operations
523
524
```python
525
# Use line protocol for maximum write performance
526
from influxdb.line_protocol import make_lines
527
528
# Batch create line protocol
529
data_points = [
530
{'measurement': 'metrics', 'tags': {'host': f'server{i:02d}'},
531
'fields': {'value': i * 10.5}}
532
for i in range(1000)
533
]
534
535
lines = make_lines(data_points)
536
line_data = '\n'.join(lines)
537
client.write(line_data, protocol='line')
538
539
# SeriesHelper for structured bulk inserts
540
class HighThroughputMetrics(SeriesHelper):
541
class Meta:
542
series_name = 'high_volume_data'
543
fields = ['value1', 'value2', 'value3']
544
tags = ['source', 'type']
545
bulk_size = 10000 # Large batch size
546
client = client
547
548
# Stream processing pattern
549
def process_data_stream(data_stream):
550
for batch in data_stream.batches(size=1000):
551
for record in batch:
552
HighThroughputMetrics(
553
source=record['source'],
554
type=record['type'],
555
value1=record['v1'],
556
value2=record['v2'],
557
value3=record['v3']
558
)
559
# Auto-commits when bulk_size reached
560
561
# Memory-efficient result processing
562
def process_large_query_results(query):
563
result = client.query(query, chunked=True, chunk_size=10000)
564
565
# Process points in chunks to avoid memory issues
566
chunk_count = 0
567
for point in result.get_points():
568
# Process individual point
569
process_point(point)
570
571
chunk_count += 1
572
if chunk_count % 10000 == 0:
573
print(f"Processed {chunk_count} points...")
574
```