or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-operations.mdclient-management.mdindex.mdquerying-data.mdresource-management.mdwriting-data.md

index.mddocs/

0

# InfluxDB Client

1

2

A comprehensive Python client library for InfluxDB 2.x that provides both synchronous and asynchronous APIs for writing, querying, and managing time series data. The library offers high-level convenience APIs and low-level service access for maximum flexibility in interacting with InfluxDB instances.

3

4

## Package Information

5

6

- **Package Name**: influxdb-client

7

- **Language**: Python

8

- **Installation**: `pip install influxdb-client`

9

- **Version**: 1.49.0

10

11

## Core Imports

12

13

```python

14

from influxdb_client import InfluxDBClient, WriteApi, QueryApi, Point, WritePrecision

15

```

16

17

For asynchronous operations:

18

19

```python

20

from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync

21

from influxdb_client.client.write_api_async import WriteApiAsync

22

from influxdb_client.client.query_api_async import QueryApiAsync

23

```

24

25

## Basic Usage

26

27

```python

28

from influxdb_client import InfluxDBClient, Point, WritePrecision

29

from influxdb_client.client.write_api import SYNCHRONOUS

30

from datetime import datetime

31

32

# Initialize client

33

client = InfluxDBClient(url="http://localhost:8086", token="your-token", org="your-org")

34

35

# Write data

36

write_api = client.write_api(write_options=SYNCHRONOUS)

37

point = Point("measurement_name") \

38

.tag("location", "us-west") \

39

.field("temperature", 25.3) \

40

.time(datetime.utcnow(), WritePrecision.NS)

41

42

write_api.write(bucket="your-bucket", record=point)

43

44

# Query data

45

query_api = client.query_api()

46

query = '''

47

from(bucket: "your-bucket")

48

|> range(start: -1h)

49

|> filter(fn: (r) => r._measurement == "measurement_name")

50

'''

51

tables = query_api.query(query)

52

53

for table in tables:

54

for record in table.records:

55

print(f"Time: {record.get_time()}, Value: {record.get_value()}")

56

57

client.close()

58

```

59

60

## Architecture

61

62

The InfluxDB client library is built around a dual-API architecture:

63

64

- **High-level APIs**: Convenient wrapper classes (WriteApi, QueryApi, etc.) that handle common operations with sensible defaults

65

- **Low-level Services**: Direct API access classes for advanced usage and custom configurations

66

- **Sync/Async Support**: Parallel synchronous and asynchronous implementations for all core functionality

67

- **Domain Models**: Auto-generated classes representing all InfluxDB API entities

68

- **Configuration System**: Flexible configuration through objects, files, or environment variables

69

70

The library provides both batched and immediate writing modes, streaming and materialized query results, and comprehensive resource management capabilities for buckets, users, organizations, and more.

71

72

## Capabilities

73

74

### Writing Data

75

76

Write time series data points to InfluxDB using the WriteApi with support for different write modes, batching, retry policies, and precision levels.

77

78

```python { .api }

79

class WriteApi:

80

def write(

81

self,

82

bucket: str,

83

org: str = None,

84

record: Union[Point, str, List[Point], List[str], bytes, Any] = None,

85

write_precision: WritePrecision = WritePrecision.NS,

86

**kwargs

87

) -> None: ...

88

89

class Point:

90

def __init__(self, measurement_name: str): ...

91

def tag(self, key: str, value: str) -> 'Point': ...

92

def field(self, key: str, value: Any) -> 'Point': ...

93

def time(self, timestamp: Any, write_precision: WritePrecision = WritePrecision.NS) -> 'Point': ...

94

def to_line_protocol(self) -> str: ...

95

96

class WriteOptions:

97

def __init__(

98

self,

99

write_type: WriteType = WriteType.batching,

100

batch_size: int = 1000,

101

flush_interval: int = 1000,

102

jitter_interval: int = 0,

103

retry_interval: int = 5000,

104

max_retries: int = 5,

105

max_retry_delay: int = 125000,

106

max_retry_time: int = 180000,

107

exponential_base: int = 2

108

): ...

109

```

110

111

[Writing Data](./writing-data.md)

112

113

### Querying Data

114

115

Execute Flux queries against InfluxDB and process results in various formats including streaming, materialized tables, CSV, and pandas DataFrames.

116

117

```python { .api }

118

class QueryApi:

119

def query(self, query: str, org: str = None, params: dict = None) -> TableList: ...

120

def query_stream(self, query: str, org: str = None, params: dict = None) -> Generator[FluxRecord]: ...

121

def query_csv(self, query: str, org: str = None, dialect: Dialect = None, params: dict = None) -> CSVIterator: ...

122

def query_raw(self, query: str, org: str = None, dialect: Dialect = None, params: dict = None) -> HTTPResponse: ...

123

def query_data_frame(

124

self,

125

query: str,

126

org: str = None,

127

data_frame_index: List[str] = None,

128

params: dict = None,

129

use_extension_dtypes: bool = False

130

): ...

131

132

class FluxRecord:

133

def get_time(self) -> datetime: ...

134

def get_value(self) -> Any: ...

135

def get_field(self) -> str: ...

136

def get_measurement(self) -> str: ...

137

def values(self) -> dict: ...

138

139

class TableList(list):

140

def to_json(self, indent: int = None) -> str: ...

141

def to_values(self, columns: List[str] = None) -> List[List[Any]]: ...

142

```

143

144

[Querying Data](./querying-data.md)

145

146

### Client Management

147

148

Configure and manage InfluxDB client instances with support for both synchronous and asynchronous operations, connection pooling, authentication, and health monitoring.

149

150

```python { .api }

151

class InfluxDBClient:

152

def __init__(

153

self,

154

url: str,

155

token: str = None,

156

debug: bool = None,

157

timeout: int = 10000,

158

enable_gzip: bool = False,

159

org: str = None,

160

**kwargs

161

): ...

162

def write_api(self, write_options: WriteOptions = None, point_settings: PointSettings = None) -> WriteApi: ...

163

def query_api(self, query_options: QueryOptions = None) -> QueryApi: ...

164

def delete_api(self) -> DeleteApi: ...

165

def ping(self) -> bool: ...

166

def version(self) -> str: ...

167

def close(self) -> None: ...

168

@classmethod

169

def from_config_file(cls, config_file: str = "config.ini", **kwargs) -> 'InfluxDBClient': ...

170

@classmethod

171

def from_env_properties(cls, **kwargs) -> 'InfluxDBClient': ...

172

173

class InfluxDBClientAsync:

174

def __init__(

175

self,

176

url: str,

177

token: str = None,

178

org: str = None,

179

debug: bool = None,

180

timeout: int = 10000,

181

enable_gzip: bool = False,

182

**kwargs

183

): ...

184

def query_api(self, query_options: QueryOptions = QueryOptions()) -> QueryApiAsync: ...

185

def write_api(self, point_settings: PointSettings = PointSettings()) -> WriteApiAsync: ...

186

def delete_api(self) -> DeleteApiAsync: ...

187

async def ping(self) -> bool: ...

188

async def close(self) -> None: ...

189

```

190

191

[Client Management](./client-management.md)

192

193

### Resource Management

194

195

Manage InfluxDB resources including buckets, organizations, users, authorizations, tasks, and labels through dedicated API classes.

196

197

```python { .api }

198

class BucketsApi:

199

def create_bucket(

200

self,

201

bucket: Bucket = None,

202

bucket_name: str = None,

203

org_id: str = None,

204

retention_rules: List[RetentionRule] = None,

205

description: str = None,

206

org: str = None

207

) -> Bucket: ...

208

def find_bucket_by_name(self, bucket_name: str, org: str = None) -> Bucket: ...

209

def find_buckets(self, org: str = None, **kwargs) -> List[Bucket]: ...

210

def update_bucket(self, bucket: Bucket) -> Bucket: ...

211

def delete_bucket(self, bucket: Union[Bucket, str]) -> Bucket: ...

212

213

class AuthorizationsApi:

214

def create_authorization(

215

self,

216

org_id: str = None,

217

permissions: List[Permission] = None,

218

**kwargs

219

) -> Authorization: ...

220

def find_authorizations(self, **kwargs) -> List[Authorization]: ...

221

def delete_authorization(self, authorization: Union[Authorization, str]) -> Authorization: ...

222

223

class OrganizationsApi:

224

def create_organization(self, org: Organization = None, **kwargs) -> Organization: ...

225

def find_organizations(self, **kwargs) -> List[Organization]: ...

226

def find_organization_by_id(self, org_id: str) -> Organization: ...

227

228

class UsersApi:

229

def create_user(self, user: User = None, **kwargs) -> UserResponse: ...

230

def find_users(self, **kwargs) -> List[UserResponse]: ...

231

def me(self) -> UserResponse: ...

232

233

class TasksApi:

234

def create_task(self, task: Task = None, **kwargs) -> Task: ...

235

def find_tasks(self, **kwargs) -> List[Task]: ...

236

def run_manually(self, task_id: str, **kwargs) -> Run: ...

237

def find_runs(self, task_id: str, **kwargs) -> List[Run]: ...

238

239

class LabelsApi:

240

def create_label(self, label: Label = None, **kwargs) -> LabelResponse: ...

241

def find_labels(self, **kwargs) -> List[LabelResponse]: ...

242

```

243

244

[Resource Management](./resource-management.md)

245

246

### Advanced Operations

247

248

Perform advanced operations including data deletion, invokable scripts, and direct access to low-level service APIs for maximum control.

249

250

```python { .api }

251

class DeleteApi:

252

def delete(

253

self,

254

start: Union[str, datetime],

255

stop: Union[str, datetime],

256

predicate: str = "",

257

bucket: str = None,

258

org: str = None

259

) -> None: ...

260

261

class InvokableScriptsApi:

262

def create_script(self, script_create_request: ScriptCreateRequest) -> Script: ...

263

def find_scripts(self, **kwargs) -> Scripts: ...

264

def invoke_script(

265

self,

266

script_id: str,

267

params: dict = None

268

) -> str: ...

269

def update_script(

270

self,

271

script_id: str,

272

script_update_request: ScriptUpdateRequest

273

) -> Script: ...

274

def delete_script(self, script_id: str) -> None: ...

275

```

276

277

[Advanced Operations](./advanced-operations.md)

278

279

## Types

280

281

```python { .api }

282

class WritePrecision(Enum):

283

NS = "ns" # nanoseconds

284

US = "us" # microseconds

285

MS = "ms" # milliseconds

286

S = "s" # seconds

287

288

class WriteType(Enum):

289

batching = "batching"

290

asynchronous = "asynchronous"

291

synchronous = "synchronous"

292

293

class PointSettings:

294

def __init__(self, default_tags: dict = None): ...

295

296

class QueryOptions:

297

def __init__(

298

self,

299

profilers: List[str] = None,

300

profiler_callback: Callable = None

301

): ...

302

303

class Configuration:

304

def __init__(self): ...

305

# HTTP client configuration attributes

306

host: str

307

api_key: dict

308

username: str

309

password: str

310

debug: bool

311

verify_ssl: bool

312

timeout: int

313

314

# Domain model classes (300+ available)

315

class Bucket:

316

id: str

317

name: str

318

org_id: str

319

retention_rules: List[RetentionRule]

320

description: str

321

322

class Authorization:

323

id: str

324

token: str

325

status: str

326

org_id: str

327

permissions: List[Permission]

328

329

class Organization:

330

id: str

331

name: str

332

description: str

333

334

class User:

335

id: str

336

name: str

337

338

class Task:

339

id: str

340

name: str

341

flux: str

342

org_id: str

343

status: str

344

345

class Label:

346

id: str

347

name: str

348

org_id: str

349

properties: dict

350

351

class InfluxDBError(Exception):

352

def __init__(self, response: HTTPResponse = None, message: str = None, reason: str = None): ...

353

```