0
# Writing Data
1
2
Comprehensive functionality for writing time series data points to InfluxDB using the WriteApi with support for different write modes, batching, retry policies, and precision levels. The writing system supports both synchronous and asynchronous operations with flexible configuration options.
3
4
## Capabilities
5
6
### WriteApi
7
8
Main API for writing data to InfluxDB buckets with configurable write options, retry policies, and batching behavior.
9
10
```python { .api }
11
class WriteApi:
12
def __init__(
13
self,
14
influxdb_client,
15
write_options: WriteOptions = WriteOptions(),
16
point_settings: PointSettings = PointSettings()
17
): ...
18
19
def write(
20
self,
21
bucket: str,
22
org: str = None,
23
record: Union[Point, str, List[Point], List[str], pandas.DataFrame, bytes, Any] = None,
24
write_precision: WritePrecision = WritePrecision.NS,
25
**kwargs
26
) -> None:
27
"""
28
Write data to InfluxDB.
29
30
Parameters:
31
- bucket (str): Destination bucket name
32
- org (str, optional): Organization name or ID
33
- record (Union[Point, str, List, DataFrame, bytes]): Data to write in various formats
34
- **kwargs: Additional write parameters
35
36
Supported record formats:
37
- Point object or list of Point objects
38
- Line protocol string or list of strings
39
- Pandas DataFrame with proper time indexing
40
- Bytes representing line protocol data
41
"""
42
43
def flush(self) -> None:
44
"""
45
Flush any pending writes to InfluxDB.
46
"""
47
48
def close(self) -> None:
49
"""
50
Close the write API and flush any pending writes.
51
"""
52
53
def __enter__(self) -> 'WriteApi': ...
54
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
55
```
56
57
#### Usage Examples
58
59
**Basic point writing:**
60
```python
61
from influxdb_client import InfluxDBClient, Point, WritePrecision
62
from influxdb_client.client.write_api import SYNCHRONOUS
63
64
client = InfluxDBClient(url="http://localhost:8086", token="token", org="org")
65
write_api = client.write_api(write_options=SYNCHRONOUS)
66
67
# Write single point
68
point = Point("temperature") \
69
.tag("location", "room1") \
70
.field("value", 23.5) \
71
.time(datetime.utcnow(), WritePrecision.S)
72
73
write_api.write(bucket="sensors", record=point)
74
75
# Write multiple points
76
points = [
77
Point("temperature").tag("location", "room1").field("value", 23.5),
78
Point("temperature").tag("location", "room2").field("value", 24.1)
79
]
80
write_api.write(bucket="sensors", record=points)
81
82
# Write line protocol string
83
line_protocol = "temperature,location=room3 value=22.8"
84
write_api.write(bucket="sensors", record=line_protocol)
85
```
86
87
**DataFrame writing:**
88
```python
89
import pandas as pd
90
91
# Create DataFrame with datetime index
92
df = pd.DataFrame({
93
'temperature': [23.5, 24.1, 22.8],
94
'humidity': [45.2, 43.1, 46.8],
95
'location': ['room1', 'room2', 'room3']
96
})
97
df.index = pd.date_range('2023-01-01', periods=3, freq='H')
98
99
write_api.write(bucket="sensors", record=df, data_frame_measurement_name='climate')
100
```
101
102
### WriteApiAsync
103
104
Asynchronous version of WriteApi for non-blocking write operations.
105
106
```python { .api }
107
class WriteApiAsync:
108
def __init__(
109
self,
110
influxdb_client,
111
point_settings: PointSettings = PointSettings()
112
): ...
113
114
async def write(
115
self,
116
bucket: str,
117
org: str = None,
118
record: Union[Point, str, List[Point], List[str], pandas.DataFrame, bytes] = None,
119
**kwargs
120
) -> bool:
121
"""
122
Asynchronously write data to InfluxDB.
123
124
Parameters:
125
- bucket (str): Destination bucket name
126
- org (str, optional): Organization name or ID
127
- record: Data to write in supported formats
128
"""
129
130
async def close(self) -> None:
131
"""
132
Close the async write API.
133
"""
134
```
135
136
#### Async Usage Example
137
138
```python
139
import asyncio
140
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
141
142
async def write_data():
143
async with InfluxDBClientAsync(url="http://localhost:8086", token="token") as client:
144
write_api = client.write_api()
145
146
point = Point("async_measurement") \
147
.tag("source", "async") \
148
.field("value", 42.0)
149
150
await write_api.write(bucket="async_bucket", record=point)
151
152
asyncio.run(write_data())
153
```
154
155
### Point
156
157
Core class for representing individual data points with measurement, tags, fields, and timestamps.
158
159
```python { .api }
160
class Point:
161
def __init__(self, measurement_name: str = None): ...
162
163
@staticmethod
164
def measurement(measurement: str) -> 'Point':
165
"""
166
Create a Point with the specified measurement name.
167
168
Parameters:
169
- measurement (str): Measurement name
170
171
Returns:
172
Point: New Point instance
173
"""
174
175
@staticmethod
176
def from_dict(
177
dictionary: dict,
178
write_precision: WritePrecision = WritePrecision.NS,
179
**kwargs
180
) -> 'Point':
181
"""
182
Create Point from dictionary representation.
183
184
Parameters:
185
- dictionary (dict): Dictionary with measurement, tags, fields, and time
186
- write_precision (WritePrecision): Time precision for timestamp
187
188
Returns:
189
Point: Point created from dictionary
190
"""
191
192
def tag(self, key: str, value: str) -> 'Point':
193
"""
194
Add a tag key-value pair to the point.
195
196
Parameters:
197
- key (str): Tag key
198
- value (str): Tag value
199
200
Returns:
201
Point: Self for method chaining
202
"""
203
204
def field(self, key: str, value: Any) -> 'Point':
205
"""
206
Add a field key-value pair to the point.
207
208
Parameters:
209
- key (str): Field key
210
- value (Any): Field value (int, float, str, bool)
211
212
Returns:
213
Point: Self for method chaining
214
"""
215
216
def time(
217
self,
218
timestamp: Union[datetime, str, int, float],
219
write_precision: WritePrecision = WritePrecision.NS
220
) -> 'Point':
221
"""
222
Set the timestamp for the point.
223
224
Parameters:
225
- timestamp: Timestamp in various formats
226
- write_precision (WritePrecision): Precision for the timestamp
227
228
Returns:
229
Point: Self for method chaining
230
"""
231
232
def to_line_protocol(self) -> str:
233
"""
234
Convert the point to line protocol format.
235
236
Returns:
237
str: Line protocol representation
238
"""
239
240
@property
241
def write_precision(self) -> WritePrecision:
242
"""
243
Get the write precision of this point.
244
245
Returns:
246
WritePrecision: The precision used for the timestamp
247
"""
248
249
@classmethod
250
def set_str_rep(cls, str_rep: str) -> None:
251
"""
252
Set string representation for Point class.
253
254
Parameters:
255
- str_rep (str): String representation format
256
"""
257
```
258
259
#### Point Usage Examples
260
261
**Basic point creation:**
262
```python
263
from influxdb_client import Point, WritePrecision
264
from datetime import datetime
265
266
# Method chaining
267
point = Point("cpu_usage") \
268
.tag("host", "server1") \
269
.tag("region", "us-west") \
270
.field("usage_percent", 85.2) \
271
.field("core_count", 8) \
272
.time(datetime.utcnow(), WritePrecision.S)
273
274
print(point.to_line_protocol())
275
# Output: cpu_usage,host=server1,region=us-west usage_percent=85.2,core_count=8i 1640995200000000000
276
```
277
278
**Creating from dictionary:**
279
```python
280
data = {
281
"measurement": "sensor_data",
282
"tags": {"location": "warehouse", "sensor_id": "temp001"},
283
"fields": {"temperature": 23.5, "battery": 87},
284
"time": datetime.utcnow()
285
}
286
287
point = Point.from_dict(data, WritePrecision.MS)
288
```
289
290
**Different timestamp formats:**
291
```python
292
from datetime import datetime
293
import time
294
295
# Using datetime object
296
point1 = Point("test").field("value", 1).time(datetime.utcnow(), WritePrecision.S)
297
298
# Using Unix timestamp
299
point2 = Point("test").field("value", 2).time(time.time(), WritePrecision.S)
300
301
# Using string timestamp (RFC3339)
302
point3 = Point("test").field("value", 3).time("2023-01-01T12:00:00Z", WritePrecision.S)
303
304
# Using nanosecond timestamp
305
point4 = Point("test").field("value", 4).time(1640995200000000000, WritePrecision.NS)
306
```
307
308
### WriteOptions
309
310
Configuration class for controlling write behavior including batching, retry policies, and write modes.
311
312
```python { .api }
313
class WriteOptions:
314
def __init__(
315
self,
316
write_type: WriteType = WriteType.batching,
317
batch_size: int = 1000,
318
flush_interval: int = 1000,
319
jitter_interval: int = 0,
320
retry_interval: int = 5000,
321
max_retries: int = 5,
322
max_retry_delay: int = 125000,
323
max_retry_time: int = 180000,
324
exponential_base: int = 2,
325
max_close_wait: int = 300000,
326
write_scheduler: Any = None
327
):
328
"""
329
Configure write operation behavior.
330
331
Parameters:
332
- write_type (WriteType): Write operation mode
333
- batch_size (int): Number of points to batch together
334
- flush_interval (int): Interval in milliseconds to flush batches
335
- jitter_interval (int): Random delay in milliseconds to add to flush_interval
336
- retry_interval (int): Initial retry delay in milliseconds
337
- max_retries (int): Maximum number of retry attempts
338
- max_retry_delay (int): Maximum retry delay in milliseconds
339
- max_retry_time (int): Maximum total retry time in milliseconds
340
- exponential_base (int): Base for exponential backoff calculation
341
- max_close_wait (int): Maximum wait time when closing write API
342
- write_scheduler: Custom scheduler for write operations
343
"""
344
```
345
346
#### Predefined WriteOptions
347
348
```python { .api }
349
# Available as constants
350
SYNCHRONOUS: WriteOptions # Immediate synchronous writes
351
ASYNCHRONOUS: WriteOptions # Asynchronous writes with default batching
352
```
353
354
#### WriteOptions Usage Examples
355
356
**Custom write options:**
357
```python
358
from influxdb_client import WriteOptions, WriteType
359
from influxdb_client.client.write.retry import WritesRetry
360
361
# High-throughput batching configuration
362
high_throughput_options = WriteOptions(
363
write_type=WriteType.batching,
364
batch_size=5000,
365
flush_interval=500, # 500ms
366
max_retries=3,
367
retry_interval=1000
368
)
369
370
# Synchronous writes with retries
371
sync_options = WriteOptions(
372
write_type=WriteType.synchronous,
373
max_retries=5,
374
retry_interval=2000,
375
max_retry_delay=30000
376
)
377
378
# Use with WriteAPI
379
write_api = client.write_api(write_options=high_throughput_options)
380
```
381
382
**Retry strategy configuration:**
383
```python
384
# Configure exponential backoff
385
exponential_options = WriteOptions(
386
write_type=WriteType.batching,
387
retry_interval=1000, # Start with 1 second
388
max_retry_delay=60000, # Max 60 seconds
389
exponential_base=2, # Double the delay each time
390
max_retries=5
391
)
392
393
# Configure linear backoff
394
linear_options = WriteOptions(
395
write_type=WriteType.batching,
396
retry_interval=5000, # 5 second intervals
397
max_retry_delay=5000, # Keep constant delay
398
exponential_base=1, # No exponential increase
399
max_retries=3
400
)
401
```
402
403
### PointSettings
404
405
Configuration for default tags applied to all points written through a WriteApi instance.
406
407
```python { .api }
408
class PointSettings:
409
def __init__(self, default_tags: dict = None):
410
"""
411
Configure default tags for all points.
412
413
Parameters:
414
- default_tags (dict): Tags to add to every point
415
"""
416
417
def add_default_tag(self, key: str, value: str) -> None:
418
"""
419
Add a default tag that will be applied to all points.
420
421
Parameters:
422
- key (str): Tag key
423
- value (str): Tag value
424
"""
425
```
426
427
#### PointSettings Usage Example
428
429
```python
430
from influxdb_client import PointSettings
431
432
# Configure default tags
433
point_settings = PointSettings(default_tags={
434
"environment": "production",
435
"application": "sensor-collector",
436
"version": "1.2.3"
437
})
438
439
# Add additional default tag
440
point_settings.add_default_tag("datacenter", "us-west-2")
441
442
# Use with WriteAPI
443
write_api = client.write_api(
444
write_options=SYNCHRONOUS,
445
point_settings=point_settings
446
)
447
448
# All points written will automatically include default tags
449
point = Point("temperature").field("value", 23.5)
450
write_api.write(bucket="sensors", record=point)
451
# Actual point written: temperature,environment=production,application=sensor-collector,version=1.2.3,datacenter=us-west-2 value=23.5
452
```
453
454
## Types
455
456
```python { .api }
457
class WritePrecision(Enum):
458
"""Time precision constants for timestamps."""
459
NS = "ns" # nanoseconds (default)
460
US = "us" # microseconds
461
MS = "ms" # milliseconds
462
S = "s" # seconds
463
464
class WriteType(Enum):
465
"""Write operation modes."""
466
batching = "batching" # Background batching (default)
467
asynchronous = "asynchronous" # Async individual writes
468
synchronous = "synchronous" # Immediate synchronous writes
469
470
# Retry configuration
471
class WritesRetry:
472
def __init__(
473
self,
474
total: int = 3,
475
retry_interval: int = 5000,
476
max_retry_delay: int = 125000,
477
max_retry_time: int = 180000,
478
exponential_base: int = 2,
479
jitter_interval: int = 0
480
): ...
481
482
# Exception types
483
class WriteApiError(InfluxDBError):
484
"""Raised when write operations fail."""
485
pass
486
487
class WriteRetryError(WriteApiError):
488
"""Raised when write retries are exhausted."""
489
pass
490
```