0
# InfluxDB Client
1
2
A comprehensive Python client library for InfluxDB 2.x that provides both synchronous and asynchronous APIs for writing, querying, and managing time series data. The library offers high-level convenience APIs and low-level service access for maximum flexibility in interacting with InfluxDB instances.
3
4
## Package Information
5
6
- **Package Name**: influxdb-client
7
- **Language**: Python
8
- **Installation**: `pip install influxdb-client`
9
- **Version**: 1.49.0
10
11
## Core Imports
12
13
```python
14
from influxdb_client import InfluxDBClient, WriteApi, QueryApi, Point, WritePrecision
15
```
16
17
For asynchronous operations:
18
19
```python
20
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
21
from influxdb_client.client.write_api_async import WriteApiAsync
22
from influxdb_client.client.query_api_async import QueryApiAsync
23
```
24
25
## Basic Usage
26
27
```python
28
from influxdb_client import InfluxDBClient, Point, WritePrecision
29
from influxdb_client.client.write_api import SYNCHRONOUS
30
from datetime import datetime
31
32
# Initialize client
33
client = InfluxDBClient(url="http://localhost:8086", token="your-token", org="your-org")
34
35
# Write data
36
write_api = client.write_api(write_options=SYNCHRONOUS)
37
point = Point("measurement_name") \
38
.tag("location", "us-west") \
39
.field("temperature", 25.3) \
40
.time(datetime.utcnow(), WritePrecision.NS)
41
42
write_api.write(bucket="your-bucket", record=point)
43
44
# Query data
45
query_api = client.query_api()
46
query = '''
47
from(bucket: "your-bucket")
48
|> range(start: -1h)
49
|> filter(fn: (r) => r._measurement == "measurement_name")
50
'''
51
tables = query_api.query(query)
52
53
for table in tables:
54
for record in table.records:
55
print(f"Time: {record.get_time()}, Value: {record.get_value()}")
56
57
client.close()
58
```
59
60
## Architecture
61
62
The InfluxDB client library is built around a dual-API architecture:
63
64
- **High-level APIs**: Convenient wrapper classes (WriteApi, QueryApi, etc.) that handle common operations with sensible defaults
65
- **Low-level Services**: Direct API access classes for advanced usage and custom configurations
66
- **Sync/Async Support**: Parallel synchronous and asynchronous implementations for all core functionality
67
- **Domain Models**: Auto-generated classes representing all InfluxDB API entities
68
- **Configuration System**: Flexible configuration through objects, files, or environment variables
69
70
The library provides both batched and immediate writing modes, streaming and materialized query results, and comprehensive resource management capabilities for buckets, users, organizations, and more.
71
72
## Capabilities
73
74
### Writing Data
75
76
Write time series data points to InfluxDB using the WriteApi with support for different write modes, batching, retry policies, and precision levels.
77
78
```python { .api }
79
class WriteApi:
80
def write(
81
self,
82
bucket: str,
83
org: str = None,
84
record: Union[Point, str, List[Point], List[str], bytes, Any] = None,
85
write_precision: WritePrecision = WritePrecision.NS,
86
**kwargs
87
) -> None: ...
88
89
class Point:
90
def __init__(self, measurement_name: str): ...
91
def tag(self, key: str, value: str) -> 'Point': ...
92
def field(self, key: str, value: Any) -> 'Point': ...
93
def time(self, timestamp: Any, write_precision: WritePrecision = WritePrecision.NS) -> 'Point': ...
94
def to_line_protocol(self) -> str: ...
95
96
class WriteOptions:
97
def __init__(
98
self,
99
write_type: WriteType = WriteType.batching,
100
batch_size: int = 1000,
101
flush_interval: int = 1000,
102
jitter_interval: int = 0,
103
retry_interval: int = 5000,
104
max_retries: int = 5,
105
max_retry_delay: int = 125000,
106
max_retry_time: int = 180000,
107
exponential_base: int = 2
108
): ...
109
```
110
111
[Writing Data](./writing-data.md)
112
113
### Querying Data
114
115
Execute Flux queries against InfluxDB and process results in various formats including streaming, materialized tables, CSV, and pandas DataFrames.
116
117
```python { .api }
118
class QueryApi:
119
def query(self, query: str, org: str = None, params: dict = None) -> TableList: ...
120
def query_stream(self, query: str, org: str = None, params: dict = None) -> Generator[FluxRecord]: ...
121
def query_csv(self, query: str, org: str = None, dialect: Dialect = None, params: dict = None) -> CSVIterator: ...
122
def query_raw(self, query: str, org: str = None, dialect: Dialect = None, params: dict = None) -> HTTPResponse: ...
123
def query_data_frame(
124
self,
125
query: str,
126
org: str = None,
127
data_frame_index: List[str] = None,
128
params: dict = None,
129
use_extension_dtypes: bool = False
130
): ...
131
132
class FluxRecord:
133
def get_time(self) -> datetime: ...
134
def get_value(self) -> Any: ...
135
def get_field(self) -> str: ...
136
def get_measurement(self) -> str: ...
137
def values(self) -> dict: ...
138
139
class TableList(list):
140
def to_json(self, indent: int = None) -> str: ...
141
def to_values(self, columns: List[str] = None) -> List[List[Any]]: ...
142
```
143
144
[Querying Data](./querying-data.md)
145
146
### Client Management
147
148
Configure and manage InfluxDB client instances with support for both synchronous and asynchronous operations, connection pooling, authentication, and health monitoring.
149
150
```python { .api }
151
class InfluxDBClient:
152
def __init__(
153
self,
154
url: str,
155
token: str = None,
156
debug: bool = None,
157
timeout: int = 10000,
158
enable_gzip: bool = False,
159
org: str = None,
160
**kwargs
161
): ...
162
def write_api(self, write_options: WriteOptions = None, point_settings: PointSettings = None) -> WriteApi: ...
163
def query_api(self, query_options: QueryOptions = None) -> QueryApi: ...
164
def delete_api(self) -> DeleteApi: ...
165
def ping(self) -> bool: ...
166
def version(self) -> str: ...
167
def close(self) -> None: ...
168
@classmethod
169
def from_config_file(cls, config_file: str = "config.ini", **kwargs) -> 'InfluxDBClient': ...
170
@classmethod
171
def from_env_properties(cls, **kwargs) -> 'InfluxDBClient': ...
172
173
class InfluxDBClientAsync:
174
def __init__(
175
self,
176
url: str,
177
token: str = None,
178
org: str = None,
179
debug: bool = None,
180
timeout: int = 10000,
181
enable_gzip: bool = False,
182
**kwargs
183
): ...
184
def query_api(self, query_options: QueryOptions = QueryOptions()) -> QueryApiAsync: ...
185
def write_api(self, point_settings: PointSettings = PointSettings()) -> WriteApiAsync: ...
186
def delete_api(self) -> DeleteApiAsync: ...
187
async def ping(self) -> bool: ...
188
async def close(self) -> None: ...
189
```
190
191
[Client Management](./client-management.md)
192
193
### Resource Management
194
195
Manage InfluxDB resources including buckets, organizations, users, authorizations, tasks, and labels through dedicated API classes.
196
197
```python { .api }
198
class BucketsApi:
199
def create_bucket(
200
self,
201
bucket: Bucket = None,
202
bucket_name: str = None,
203
org_id: str = None,
204
retention_rules: List[RetentionRule] = None,
205
description: str = None,
206
org: str = None
207
) -> Bucket: ...
208
def find_bucket_by_name(self, bucket_name: str, org: str = None) -> Bucket: ...
209
def find_buckets(self, org: str = None, **kwargs) -> List[Bucket]: ...
210
def update_bucket(self, bucket: Bucket) -> Bucket: ...
211
def delete_bucket(self, bucket: Union[Bucket, str]) -> Bucket: ...
212
213
class AuthorizationsApi:
214
def create_authorization(
215
self,
216
org_id: str = None,
217
permissions: List[Permission] = None,
218
**kwargs
219
) -> Authorization: ...
220
def find_authorizations(self, **kwargs) -> List[Authorization]: ...
221
def delete_authorization(self, authorization: Union[Authorization, str]) -> Authorization: ...
222
223
class OrganizationsApi:
224
def create_organization(self, org: Organization = None, **kwargs) -> Organization: ...
225
def find_organizations(self, **kwargs) -> List[Organization]: ...
226
def find_organization_by_id(self, org_id: str) -> Organization: ...
227
228
class UsersApi:
229
def create_user(self, user: User = None, **kwargs) -> UserResponse: ...
230
def find_users(self, **kwargs) -> List[UserResponse]: ...
231
def me(self) -> UserResponse: ...
232
233
class TasksApi:
234
def create_task(self, task: Task = None, **kwargs) -> Task: ...
235
def find_tasks(self, **kwargs) -> List[Task]: ...
236
def run_manually(self, task_id: str, **kwargs) -> Run: ...
237
def find_runs(self, task_id: str, **kwargs) -> List[Run]: ...
238
239
class LabelsApi:
240
def create_label(self, label: Label = None, **kwargs) -> LabelResponse: ...
241
def find_labels(self, **kwargs) -> List[LabelResponse]: ...
242
```
243
244
[Resource Management](./resource-management.md)
245
246
### Advanced Operations
247
248
Perform advanced operations including data deletion, invokable scripts, and direct access to low-level service APIs for maximum control.
249
250
```python { .api }
251
class DeleteApi:
252
def delete(
253
self,
254
start: Union[str, datetime],
255
stop: Union[str, datetime],
256
predicate: str = "",
257
bucket: str = None,
258
org: str = None
259
) -> None: ...
260
261
class InvokableScriptsApi:
262
def create_script(self, script_create_request: ScriptCreateRequest) -> Script: ...
263
def find_scripts(self, **kwargs) -> Scripts: ...
264
def invoke_script(
265
self,
266
script_id: str,
267
params: dict = None
268
) -> str: ...
269
def update_script(
270
self,
271
script_id: str,
272
script_update_request: ScriptUpdateRequest
273
) -> Script: ...
274
def delete_script(self, script_id: str) -> None: ...
275
```
276
277
[Advanced Operations](./advanced-operations.md)
278
279
## Types
280
281
```python { .api }
282
class WritePrecision(Enum):
283
NS = "ns" # nanoseconds
284
US = "us" # microseconds
285
MS = "ms" # milliseconds
286
S = "s" # seconds
287
288
class WriteType(Enum):
289
batching = "batching"
290
asynchronous = "asynchronous"
291
synchronous = "synchronous"
292
293
class PointSettings:
294
def __init__(self, default_tags: dict = None): ...
295
296
class QueryOptions:
297
def __init__(
298
self,
299
profilers: List[str] = None,
300
profiler_callback: Callable = None
301
): ...
302
303
class Configuration:
304
def __init__(self): ...
305
# HTTP client configuration attributes
306
host: str
307
api_key: dict
308
username: str
309
password: str
310
debug: bool
311
verify_ssl: bool
312
timeout: int
313
314
# Domain model classes (300+ available)
315
class Bucket:
316
id: str
317
name: str
318
org_id: str
319
retention_rules: List[RetentionRule]
320
description: str
321
322
class Authorization:
323
id: str
324
token: str
325
status: str
326
org_id: str
327
permissions: List[Permission]
328
329
class Organization:
330
id: str
331
name: str
332
description: str
333
334
class User:
335
id: str
336
name: str
337
338
class Task:
339
id: str
340
name: str
341
flux: str
342
org_id: str
343
status: str
344
345
class Label:
346
id: str
347
name: str
348
org_id: str
349
properties: dict
350
351
class InfluxDBError(Exception):
352
def __init__(self, response: HTTPResponse = None, message: str = None, reason: str = None): ...
353
```