or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-operations.mdclient-management.mdindex.mdquerying-data.mdresource-management.mdwriting-data.md

writing-data.mddocs/

0

# Writing Data

1

2

Comprehensive functionality for writing time series data points to InfluxDB using the WriteApi with support for different write modes, batching, retry policies, and precision levels. The writing system supports both synchronous and asynchronous operations with flexible configuration options.

3

4

## Capabilities

5

6

### WriteApi

7

8

Main API for writing data to InfluxDB buckets with configurable write options, retry policies, and batching behavior.

9

10

```python { .api }

11

class WriteApi:

12

def __init__(

13

self,

14

influxdb_client,

15

write_options: WriteOptions = WriteOptions(),

16

point_settings: PointSettings = PointSettings()

17

): ...

18

19

def write(

20

self,

21

bucket: str,

22

org: str = None,

23

record: Union[Point, str, List[Point], List[str], pandas.DataFrame, bytes, Any] = None,

24

write_precision: WritePrecision = WritePrecision.NS,

25

**kwargs

26

) -> None:

27

"""

28

Write data to InfluxDB.

29

30

Parameters:

31

- bucket (str): Destination bucket name

32

- org (str, optional): Organization name or ID

33

- record (Union[Point, str, List, DataFrame, bytes]): Data to write in various formats

34

- **kwargs: Additional write parameters

35

36

Supported record formats:

37

- Point object or list of Point objects

38

- Line protocol string or list of strings

39

- Pandas DataFrame with proper time indexing

40

- Bytes representing line protocol data

41

"""

42

43

def flush(self) -> None:

44

"""

45

Flush any pending writes to InfluxDB.

46

"""

47

48

def close(self) -> None:

49

"""

50

Close the write API and flush any pending writes.

51

"""

52

53

def __enter__(self) -> 'WriteApi': ...

54

def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...

55

```

56

57

#### Usage Examples

58

59

**Basic point writing:**

60

```python

61

from influxdb_client import InfluxDBClient, Point, WritePrecision

62

from influxdb_client.client.write_api import SYNCHRONOUS

63

64

client = InfluxDBClient(url="http://localhost:8086", token="token", org="org")

65

write_api = client.write_api(write_options=SYNCHRONOUS)

66

67

# Write single point

68

point = Point("temperature") \

69

.tag("location", "room1") \

70

.field("value", 23.5) \

71

.time(datetime.utcnow(), WritePrecision.S)

72

73

write_api.write(bucket="sensors", record=point)

74

75

# Write multiple points

76

points = [

77

Point("temperature").tag("location", "room1").field("value", 23.5),

78

Point("temperature").tag("location", "room2").field("value", 24.1)

79

]

80

write_api.write(bucket="sensors", record=points)

81

82

# Write line protocol string

83

line_protocol = "temperature,location=room3 value=22.8"

84

write_api.write(bucket="sensors", record=line_protocol)

85

```

86

87

**DataFrame writing:**

88

```python

89

import pandas as pd

90

91

# Create DataFrame with datetime index

92

df = pd.DataFrame({

93

'temperature': [23.5, 24.1, 22.8],

94

'humidity': [45.2, 43.1, 46.8],

95

'location': ['room1', 'room2', 'room3']

96

})

97

df.index = pd.date_range('2023-01-01', periods=3, freq='H')

98

99

write_api.write(bucket="sensors", record=df, data_frame_measurement_name='climate')

100

```

101

102

### WriteApiAsync

103

104

Asynchronous version of WriteApi for non-blocking write operations.

105

106

```python { .api }

107

class WriteApiAsync:

108

def __init__(

109

self,

110

influxdb_client,

111

point_settings: PointSettings = PointSettings()

112

): ...

113

114

async def write(

115

self,

116

bucket: str,

117

org: str = None,

118

record: Union[Point, str, List[Point], List[str], pandas.DataFrame, bytes] = None,

119

**kwargs

120

) -> bool:

121

"""

122

Asynchronously write data to InfluxDB.

123

124

Parameters:

125

- bucket (str): Destination bucket name

126

- org (str, optional): Organization name or ID

127

- record: Data to write in supported formats

128

"""

129

130

async def close(self) -> None:

131

"""

132

Close the async write API.

133

"""

134

```

135

136

#### Async Usage Example

137

138

```python

139

import asyncio

140

from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync

141

142

async def write_data():

143

async with InfluxDBClientAsync(url="http://localhost:8086", token="token") as client:

144

write_api = client.write_api()

145

146

point = Point("async_measurement") \

147

.tag("source", "async") \

148

.field("value", 42.0)

149

150

await write_api.write(bucket="async_bucket", record=point)

151

152

asyncio.run(write_data())

153

```

154

155

### Point

156

157

Core class for representing individual data points with measurement, tags, fields, and timestamps.

158

159

```python { .api }

160

class Point:

161

def __init__(self, measurement_name: str = None): ...

162

163

@staticmethod

164

def measurement(measurement: str) -> 'Point':

165

"""

166

Create a Point with the specified measurement name.

167

168

Parameters:

169

- measurement (str): Measurement name

170

171

Returns:

172

Point: New Point instance

173

"""

174

175

@staticmethod

176

def from_dict(

177

dictionary: dict,

178

write_precision: WritePrecision = WritePrecision.NS,

179

**kwargs

180

) -> 'Point':

181

"""

182

Create Point from dictionary representation.

183

184

Parameters:

185

- dictionary (dict): Dictionary with measurement, tags, fields, and time

186

- write_precision (WritePrecision): Time precision for timestamp

187

188

Returns:

189

Point: Point created from dictionary

190

"""

191

192

def tag(self, key: str, value: str) -> 'Point':

193

"""

194

Add a tag key-value pair to the point.

195

196

Parameters:

197

- key (str): Tag key

198

- value (str): Tag value

199

200

Returns:

201

Point: Self for method chaining

202

"""

203

204

def field(self, key: str, value: Any) -> 'Point':

205

"""

206

Add a field key-value pair to the point.

207

208

Parameters:

209

- key (str): Field key

210

- value (Any): Field value (int, float, str, bool)

211

212

Returns:

213

Point: Self for method chaining

214

"""

215

216

def time(

217

self,

218

timestamp: Union[datetime, str, int, float],

219

write_precision: WritePrecision = WritePrecision.NS

220

) -> 'Point':

221

"""

222

Set the timestamp for the point.

223

224

Parameters:

225

- timestamp: Timestamp in various formats

226

- write_precision (WritePrecision): Precision for the timestamp

227

228

Returns:

229

Point: Self for method chaining

230

"""

231

232

def to_line_protocol(self) -> str:

233

"""

234

Convert the point to line protocol format.

235

236

Returns:

237

str: Line protocol representation

238

"""

239

240

@property

241

def write_precision(self) -> WritePrecision:

242

"""

243

Get the write precision of this point.

244

245

Returns:

246

WritePrecision: The precision used for the timestamp

247

"""

248

249

@classmethod

250

def set_str_rep(cls, str_rep: str) -> None:

251

"""

252

Set string representation for Point class.

253

254

Parameters:

255

- str_rep (str): String representation format

256

"""

257

```

258

259

#### Point Usage Examples

260

261

**Basic point creation:**

262

```python

263

from influxdb_client import Point, WritePrecision

264

from datetime import datetime

265

266

# Method chaining

267

point = Point("cpu_usage") \

268

.tag("host", "server1") \

269

.tag("region", "us-west") \

270

.field("usage_percent", 85.2) \

271

.field("core_count", 8) \

272

.time(datetime.utcnow(), WritePrecision.S)

273

274

print(point.to_line_protocol())

275

# Output: cpu_usage,host=server1,region=us-west usage_percent=85.2,core_count=8i 1640995200000000000

276

```

277

278

**Creating from dictionary:**

279

```python

280

data = {

281

"measurement": "sensor_data",

282

"tags": {"location": "warehouse", "sensor_id": "temp001"},

283

"fields": {"temperature": 23.5, "battery": 87},

284

"time": datetime.utcnow()

285

}

286

287

point = Point.from_dict(data, WritePrecision.MS)

288

```

289

290

**Different timestamp formats:**

291

```python

292

from datetime import datetime

293

import time

294

295

# Using datetime object

296

point1 = Point("test").field("value", 1).time(datetime.utcnow(), WritePrecision.S)

297

298

# Using Unix timestamp

299

point2 = Point("test").field("value", 2).time(time.time(), WritePrecision.S)

300

301

# Using string timestamp (RFC3339)

302

point3 = Point("test").field("value", 3).time("2023-01-01T12:00:00Z", WritePrecision.S)

303

304

# Using nanosecond timestamp

305

point4 = Point("test").field("value", 4).time(1640995200000000000, WritePrecision.NS)

306

```

307

308

### WriteOptions

309

310

Configuration class for controlling write behavior including batching, retry policies, and write modes.

311

312

```python { .api }

313

class WriteOptions:

314

def __init__(

315

self,

316

write_type: WriteType = WriteType.batching,

317

batch_size: int = 1000,

318

flush_interval: int = 1000,

319

jitter_interval: int = 0,

320

retry_interval: int = 5000,

321

max_retries: int = 5,

322

max_retry_delay: int = 125000,

323

max_retry_time: int = 180000,

324

exponential_base: int = 2,

325

max_close_wait: int = 300000,

326

write_scheduler: Any = None

327

):

328

"""

329

Configure write operation behavior.

330

331

Parameters:

332

- write_type (WriteType): Write operation mode

333

- batch_size (int): Number of points to batch together

334

- flush_interval (int): Interval in milliseconds to flush batches

335

- jitter_interval (int): Random delay in milliseconds to add to flush_interval

336

- retry_interval (int): Initial retry delay in milliseconds

337

- max_retries (int): Maximum number of retry attempts

338

- max_retry_delay (int): Maximum retry delay in milliseconds

339

- max_retry_time (int): Maximum total retry time in milliseconds

340

- exponential_base (int): Base for exponential backoff calculation

341

- max_close_wait (int): Maximum wait time when closing write API

342

- write_scheduler: Custom scheduler for write operations

343

"""

344

```

345

346

#### Predefined WriteOptions

347

348

```python { .api }

349

# Available as constants

350

SYNCHRONOUS: WriteOptions # Immediate synchronous writes

351

ASYNCHRONOUS: WriteOptions # Asynchronous writes with default batching

352

```

353

354

#### WriteOptions Usage Examples

355

356

**Custom write options:**

357

```python

358

from influxdb_client import WriteOptions, WriteType

359

from influxdb_client.client.write.retry import WritesRetry

360

361

# High-throughput batching configuration

362

high_throughput_options = WriteOptions(

363

write_type=WriteType.batching,

364

batch_size=5000,

365

flush_interval=500, # 500ms

366

max_retries=3,

367

retry_interval=1000

368

)

369

370

# Synchronous writes with retries

371

sync_options = WriteOptions(

372

write_type=WriteType.synchronous,

373

max_retries=5,

374

retry_interval=2000,

375

max_retry_delay=30000

376

)

377

378

# Use with WriteAPI

379

write_api = client.write_api(write_options=high_throughput_options)

380

```

381

382

**Retry strategy configuration:**

383

```python

384

# Configure exponential backoff

385

exponential_options = WriteOptions(

386

write_type=WriteType.batching,

387

retry_interval=1000, # Start with 1 second

388

max_retry_delay=60000, # Max 60 seconds

389

exponential_base=2, # Double the delay each time

390

max_retries=5

391

)

392

393

# Configure linear backoff

394

linear_options = WriteOptions(

395

write_type=WriteType.batching,

396

retry_interval=5000, # 5 second intervals

397

max_retry_delay=5000, # Keep constant delay

398

exponential_base=1, # No exponential increase

399

max_retries=3

400

)

401

```

402

403

### PointSettings

404

405

Configuration for default tags applied to all points written through a WriteApi instance.

406

407

```python { .api }

408

class PointSettings:

409

def __init__(self, default_tags: dict = None):

410

"""

411

Configure default tags for all points.

412

413

Parameters:

414

- default_tags (dict): Tags to add to every point

415

"""

416

417

def add_default_tag(self, key: str, value: str) -> None:

418

"""

419

Add a default tag that will be applied to all points.

420

421

Parameters:

422

- key (str): Tag key

423

- value (str): Tag value

424

"""

425

```

426

427

#### PointSettings Usage Example

428

429

```python

430

from influxdb_client import PointSettings

431

432

# Configure default tags

433

point_settings = PointSettings(default_tags={

434

"environment": "production",

435

"application": "sensor-collector",

436

"version": "1.2.3"

437

})

438

439

# Add additional default tag

440

point_settings.add_default_tag("datacenter", "us-west-2")

441

442

# Use with WriteAPI

443

write_api = client.write_api(

444

write_options=SYNCHRONOUS,

445

point_settings=point_settings

446

)

447

448

# All points written will automatically include default tags

449

point = Point("temperature").field("value", 23.5)

450

write_api.write(bucket="sensors", record=point)

451

# Actual point written: temperature,environment=production,application=sensor-collector,version=1.2.3,datacenter=us-west-2 value=23.5

452

```

453

454

## Types

455

456

```python { .api }

457

class WritePrecision(Enum):

458

"""Time precision constants for timestamps."""

459

NS = "ns" # nanoseconds (default)

460

US = "us" # microseconds

461

MS = "ms" # milliseconds

462

S = "s" # seconds

463

464

class WriteType(Enum):

465

"""Write operation modes."""

466

batching = "batching" # Background batching (default)

467

asynchronous = "asynchronous" # Async individual writes

468

synchronous = "synchronous" # Immediate synchronous writes

469

470

# Retry configuration

471

class WritesRetry:

472

def __init__(

473

self,

474

total: int = 3,

475

retry_interval: int = 5000,

476

max_retry_delay: int = 125000,

477

max_retry_time: int = 180000,

478

exponential_base: int = 2,

479

jitter_interval: int = 0

480

): ...

481

482

# Exception types

483

class WriteApiError(InfluxDBError):

484

"""Raised when write operations fail."""

485

pass

486

487

class WriteRetryError(WriteApiError):

488

"""Raised when write retries are exhausted."""

489

pass

490

```