0
# Client Management
1
2
Comprehensive functionality for configuring and managing InfluxDB client instances with support for both synchronous and asynchronous operations, connection pooling, authentication, health monitoring, and flexible configuration management through objects, files, or environment variables.
3
4
## Capabilities
5
6
### InfluxDBClient
7
8
Main synchronous client class for connecting to InfluxDB 2.x with comprehensive configuration options and API access.
9
10
```python { .api }
11
class InfluxDBClient:
12
def __init__(
13
self,
14
url: str,
15
token: str = None,
16
debug: bool = None,
17
timeout: int = 10000,
18
enable_gzip: bool = False,
19
org: str = None,
20
default_tags: dict = None,
21
verify_ssl: bool = True,
22
ssl_ca_cert: str = None,
23
cert_file: str = None,
24
cert_key_file: str = None,
25
cert_key_password: Union[str, Callable] = None,
26
ssl_context: Any = None,
27
proxy: str = None,
28
proxy_headers: dict = None,
29
connection_pool_maxsize: int = None,
30
retries: Any = None,
31
auth_basic: bool = False,
32
username: str = None,
33
password: str = None,
34
profilers: List[str] = None,
35
**kwargs
36
):
37
"""
38
Initialize InfluxDB client.
39
40
Parameters:
41
- url (str): InfluxDB server URL (e.g., "http://localhost:8086")
42
- token (str, optional): Authentication token
43
- debug (bool, optional): Enable debug logging
44
- timeout (int): Request timeout in milliseconds (default: 10000)
45
- enable_gzip (bool): Enable gzip compression (default: False)
46
- org (str, optional): Default organization name or ID
47
- default_tags (dict, optional): Tags to add to all points
48
- verify_ssl (bool): Verify SSL certificates (default: True)
49
- ssl_ca_cert (str, optional): Path to custom CA certificate file
50
- cert_file (str, optional): Path to client certificate for mTLS
51
- cert_key_file (str, optional): Path to client certificate key for mTLS
52
- cert_key_password (str/callable, optional): Password for mTLS private key
53
- ssl_context (ssl.SSLContext, optional): Custom SSL context
54
- proxy (str, optional): HTTP proxy URL (e.g., "http://proxy:8080")
55
- proxy_headers (dict, optional): Headers for proxy authentication
56
- connection_pool_maxsize (int, optional): Maximum connection pool size
57
- retries (urllib3.util.retry.Retry, optional): Retry strategy configuration
58
- auth_basic (bool): Enable basic authentication for InfluxDB 1.8.x
59
- username (str, optional): Username for credential authentication
60
- password (str, optional): Password for credential authentication
61
- profilers (List[str], optional): List of enabled Flux profilers
62
- **kwargs: Additional HTTP client configuration options
63
"""
64
65
def write_api(
66
self,
67
write_options: WriteOptions = WriteOptions(),
68
point_settings: PointSettings = PointSettings(),
69
**kwargs
70
) -> WriteApi:
71
"""
72
Get WriteApi instance for writing data.
73
74
Parameters:
75
- write_options (WriteOptions): Write behavior configuration
76
- point_settings (PointSettings): Default point settings
77
78
Returns:
79
WriteApi: API for writing data
80
"""
81
82
def query_api(self, query_options: QueryOptions = QueryOptions()) -> QueryApi:
83
"""
84
Get QueryApi instance for querying data.
85
86
Parameters:
87
- query_options (QueryOptions): Query behavior configuration
88
89
Returns:
90
QueryApi: API for querying data
91
"""
92
93
def delete_api(self) -> DeleteApi:
94
"""
95
Get DeleteApi instance for deleting data.
96
97
Returns:
98
DeleteApi: API for deleting data
99
"""
100
101
def buckets_api(self) -> BucketsApi:
102
"""
103
Get BucketsApi instance for managing buckets.
104
105
Returns:
106
BucketsApi: API for bucket management
107
"""
108
109
def organizations_api(self) -> OrganizationsApi:
110
"""
111
Get OrganizationsApi instance for managing organizations.
112
113
Returns:
114
OrganizationsApi: API for organization management
115
"""
116
117
def users_api(self) -> UsersApi:
118
"""
119
Get UsersApi instance for managing users.
120
121
Returns:
122
UsersApi: API for user management
123
"""
124
125
def authorizations_api(self) -> AuthorizationsApi:
126
"""
127
Get AuthorizationsApi instance for managing authorizations.
128
129
Returns:
130
AuthorizationsApi: API for authorization management
131
"""
132
133
def tasks_api(self) -> TasksApi:
134
"""
135
Get TasksApi instance for managing tasks.
136
137
Returns:
138
TasksApi: API for task management
139
"""
140
141
def labels_api(self) -> LabelsApi:
142
"""
143
Get LabelsApi instance for managing labels.
144
145
Returns:
146
LabelsApi: API for label management
147
"""
148
149
def invokable_scripts_api(self) -> InvokableScriptsApi:
150
"""
151
Get InvokableScriptsApi instance for managing scripts.
152
153
Returns:
154
InvokableScriptsApi: API for script management
155
"""
156
157
def ping(self) -> bool:
158
"""
159
Check if InfluxDB server is reachable.
160
161
Returns:
162
bool: True if server is reachable
163
"""
164
165
def version(self) -> str:
166
"""
167
Get InfluxDB server version.
168
169
Returns:
170
str: Server version string
171
"""
172
173
def build(self) -> str:
174
"""
175
Get InfluxDB server build information.
176
177
Returns:
178
str: Build information
179
"""
180
181
def ready(self) -> Ready:
182
"""
183
Check if InfluxDB server is ready to accept requests.
184
185
Returns:
186
Ready: Server readiness status
187
"""
188
189
def health(self) -> HealthCheck:
190
"""
191
Get InfluxDB server health status (deprecated, use ping()).
192
193
Returns:
194
HealthCheck: Server health information
195
"""
196
197
def close(self) -> None:
198
"""
199
Close the client and clean up resources.
200
"""
201
202
def __enter__(self) -> 'InfluxDBClient': ...
203
def __exit__(self, exc_type, exc_val, exc_tb) -> None: ...
204
205
@classmethod
206
def from_config_file(
207
cls,
208
config_file: str = "config.ini",
209
debug: bool = None,
210
enable_gzip: bool = False,
211
**kwargs
212
) -> 'InfluxDBClient':
213
"""
214
Create client from configuration file.
215
216
Parameters:
217
- config_file (str): Path to INI configuration file (default: "config.ini")
218
- debug (bool, optional): Enable debug logging
219
- enable_gzip (bool): Enable gzip compression
220
- **kwargs: Additional configuration options
221
222
Returns:
223
InfluxDBClient: Configured client instance
224
"""
225
226
@classmethod
227
def from_env_properties(
228
cls,
229
debug: bool = None,
230
enable_gzip: bool = False,
231
**kwargs
232
) -> 'InfluxDBClient':
233
"""
234
Create client from environment variables.
235
236
Parameters:
237
- debug (bool, optional): Enable debug logging
238
- enable_gzip (bool): Enable gzip compression
239
- **kwargs: Additional configuration options
240
241
Returns:
242
InfluxDBClient: Configured client instance
243
244
Environment variables:
245
- INFLUXDB_V2_URL: InfluxDB server URL
246
- INFLUXDB_V2_TOKEN: Authentication token
247
- INFLUXDB_V2_ORG: Default organization
248
- INFLUXDB_V2_TIMEOUT: Request timeout
249
"""
250
```
251
252
#### InfluxDBClient Usage Examples
253
254
**Basic client initialization:**
255
```python
256
from influxdb_client import InfluxDBClient
257
258
# Direct initialization
259
client = InfluxDBClient(
260
url="http://localhost:8086",
261
token="your-auth-token",
262
org="my-organization",
263
timeout=30000, # 30 seconds
264
enable_gzip=True
265
)
266
267
# Use context manager for automatic cleanup
268
with InfluxDBClient(url="http://localhost:8086", token="token") as client:
269
# Use client APIs
270
write_api = client.write_api()
271
query_api = client.query_api()
272
# Client automatically closed when exiting context
273
274
client.close() # Manual cleanup if not using context manager
275
```
276
277
**Configuration from file:**
278
```python
279
# config.ini file content:
280
# [influx2]
281
# url=http://localhost:8086
282
# token=your-token-here
283
# org=my-org
284
# timeout=20000
285
# verify_ssl=false
286
287
client = InfluxDBClient.from_config_file("config.ini")
288
289
# Custom config file location
290
client = InfluxDBClient.from_config_file("/path/to/custom-config.ini")
291
```
292
293
**Configuration from environment:**
294
```python
295
import os
296
297
# Set environment variables
298
os.environ['INFLUXDB_V2_URL'] = 'http://localhost:8086'
299
os.environ['INFLUXDB_V2_TOKEN'] = 'your-token'
300
os.environ['INFLUXDB_V2_ORG'] = 'my-org'
301
os.environ['INFLUXDB_V2_TIMEOUT'] = '15000'
302
303
# Create client from environment
304
client = InfluxDBClient.from_env_properties()
305
```
306
307
**Health and version checking:**
308
```python
309
# Check server connectivity
310
if client.ping():
311
print("Server is reachable")
312
313
# Get version information
314
version = client.version()
315
build = client.build()
316
print(f"InfluxDB version: {version}")
317
print(f"Build: {build}")
318
319
# Check readiness
320
ready_status = client.ready()
321
print(f"Server ready: {ready_status}")
322
else:
323
print("Cannot reach InfluxDB server")
324
```
325
326
**API access patterns:**
327
```python
328
# Get different API instances
329
write_api = client.write_api(
330
write_options=WriteOptions(write_type=WriteType.batching, batch_size=1000)
331
)
332
333
query_api = client.query_api(
334
query_options=QueryOptions(profilers=["query"])
335
)
336
337
# Resource management APIs
338
buckets_api = client.buckets_api()
339
users_api = client.users_api()
340
orgs_api = client.organizations_api()
341
342
# Advanced APIs
343
delete_api = client.delete_api()
344
scripts_api = client.invokable_scripts_api()
345
```
346
347
### InfluxDBClientAsync
348
349
Asynchronous version of InfluxDBClient for non-blocking operations with async/await patterns.
350
351
```python { .api }
352
class InfluxDBClientAsync:
353
def __init__(
354
self,
355
url: str,
356
token: str = None,
357
org: str = None,
358
debug: bool = None,
359
timeout: int = 10000,
360
enable_gzip: bool = False,
361
**kwargs
362
):
363
"""
364
Initialize async InfluxDB client.
365
366
Parameters:
367
- url (str): InfluxDB server URL
368
- token (str, optional): Authentication token
369
- org (str, optional): Default organization name or ID
370
- debug (bool, optional): Enable debug logging
371
- timeout (int): Request timeout in milliseconds
372
- enable_gzip (bool): Enable gzip compression
373
- **kwargs: Additional HTTP client configuration
374
"""
375
376
def query_api(self, query_options: QueryOptions = QueryOptions()) -> QueryApiAsync:
377
"""
378
Get async QueryApi instance.
379
380
Parameters:
381
- query_options (QueryOptions): Query configuration
382
383
Returns:
384
QueryApiAsync: Async API for querying data
385
"""
386
387
def write_api(self, point_settings: PointSettings = PointSettings()) -> WriteApiAsync:
388
"""
389
Get async WriteApi instance.
390
391
Parameters:
392
- point_settings (PointSettings): Default point settings
393
394
Returns:
395
WriteApiAsync: Async API for writing data
396
"""
397
398
def delete_api(self) -> DeleteApiAsync:
399
"""
400
Get async DeleteApi instance.
401
402
Returns:
403
DeleteApiAsync: Async API for deleting data
404
"""
405
406
async def ping(self) -> bool:
407
"""
408
Asynchronously check if server is reachable.
409
410
Returns:
411
bool: True if server is reachable
412
"""
413
414
async def version(self) -> str:
415
"""
416
Asynchronously get server version.
417
418
Returns:
419
str: Server version string
420
"""
421
422
async def build(self) -> str:
423
"""
424
Asynchronously get server build information.
425
426
Returns:
427
str: Build information
428
"""
429
430
async def close(self) -> None:
431
"""
432
Close the async client and clean up resources.
433
"""
434
435
async def __aenter__(self) -> 'InfluxDBClientAsync': ...
436
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None: ...
437
438
@classmethod
439
def from_config_file(
440
cls,
441
config_file: str = "config.ini",
442
debug: bool = None,
443
enable_gzip: bool = False,
444
**kwargs
445
) -> 'InfluxDBClientAsync':
446
"""
447
Create async client from configuration file.
448
"""
449
450
@classmethod
451
def from_env_properties(
452
cls,
453
debug: bool = None,
454
enable_gzip: bool = False,
455
**kwargs
456
) -> 'InfluxDBClientAsync':
457
"""
458
Create async client from environment variables.
459
"""
460
```
461
462
#### InfluxDBClientAsync Usage Examples
463
464
**Basic async client usage:**
465
```python
466
import asyncio
467
from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync
468
469
async def main():
470
# Use async context manager
471
async with InfluxDBClientAsync(
472
url="http://localhost:8086",
473
token="your-token",
474
org="your-org"
475
) as client:
476
# Check connectivity
477
if await client.ping():
478
print(f"Connected to InfluxDB version: {await client.version()}")
479
480
# Get async APIs
481
query_api = client.query_api()
482
write_api = client.write_api()
483
484
# Use async operations
485
await write_api.write(bucket="test", record=point)
486
results = await query_api.query(flux_query)
487
488
asyncio.run(main())
489
```
490
491
**Concurrent async operations:**
492
```python
493
async def concurrent_operations():
494
async with InfluxDBClientAsync(url="http://localhost:8086", token="token") as client:
495
query_api = client.query_api()
496
497
# Execute multiple queries concurrently
498
queries = [
499
"from(bucket: 'bucket1') |> range(start: -1h)",
500
"from(bucket: 'bucket2') |> range(start: -1h)",
501
"from(bucket: 'bucket3') |> range(start: -1h)"
502
]
503
504
# Run all queries concurrently
505
results = await asyncio.gather(*[
506
query_api.query(query) for query in queries
507
])
508
509
for i, result in enumerate(results):
510
print(f"Query {i+1}: {len(result)} tables")
511
512
asyncio.run(concurrent_operations())
513
```
514
515
### Configuration
516
517
Low-level HTTP client configuration class for advanced customization of client behavior.
518
519
```python { .api }
520
class Configuration:
521
def __init__(self):
522
"""Initialize configuration with default settings."""
523
524
# Connection settings
525
host: str # InfluxDB server host
526
temp_folder_path: str # Temporary file storage path
527
528
# Authentication settings
529
api_key: dict # API key configuration
530
api_key_prefix: dict # API key prefix mapping
531
username: str # Basic auth username
532
password: str # Basic auth password
533
534
# HTTP client settings
535
debug: bool # Enable debug output
536
verify_ssl: bool # Verify SSL certificates
537
ssl_ca_cert: str # Path to CA certificate file
538
cert_file: str # Path to client certificate file
539
cert_key_file: str # Path to client certificate key file
540
cert_key_password: str # Client certificate key password
541
ssl_context: Any # Custom SSL context
542
543
# Proxy settings
544
proxy: str # Proxy server URL
545
proxy_headers: dict # Additional proxy headers
546
547
# Connection pool settings
548
connection_pool_maxsize: int # Maximum connection pool size
549
timeout: int # Request timeout in seconds
550
551
def to_debug_report(self) -> str:
552
"""
553
Generate debug report of configuration.
554
555
Returns:
556
str: Debug report string
557
"""
558
```
559
560
#### Configuration Usage Examples
561
562
**Custom HTTP client configuration:**
563
```python
564
from influxdb_client import Configuration, InfluxDBClient
565
566
# Create custom configuration
567
config = Configuration()
568
config.host = "https://my-influxdb.example.com"
569
config.verify_ssl = True
570
config.ssl_ca_cert = "/path/to/ca.crt"
571
config.timeout = 30
572
config.connection_pool_maxsize = 20
573
config.debug = True
574
575
# Use with client
576
client = InfluxDBClient(
577
url="https://my-influxdb.example.com",
578
token="token",
579
configuration=config
580
)
581
```
582
583
**Proxy configuration:**
584
```python
585
config = Configuration()
586
config.proxy = "http://proxy.company.com:8080"
587
config.proxy_headers = {
588
"Proxy-Authorization": "Basic dXNlcjpwYXNz",
589
"Custom-Header": "value"
590
}
591
592
client = InfluxDBClient(
593
url="http://localhost:8086",
594
token="token",
595
configuration=config
596
)
597
```
598
599
**SSL/TLS configuration:**
600
```python
601
config = Configuration()
602
config.verify_ssl = True
603
config.ssl_ca_cert = "/etc/ssl/certs/ca-bundle.crt"
604
config.cert_file = "/path/to/client.crt"
605
config.cert_key_file = "/path/to/client.key"
606
config.cert_key_password = "key-password"
607
608
# For custom SSL context
609
import ssl
610
ssl_context = ssl.create_default_context()
611
ssl_context.check_hostname = False
612
config.ssl_context = ssl_context
613
```
614
615
### Health and Status Types
616
617
Response types for server health and status monitoring.
618
619
```python { .api }
620
class Ready:
621
"""Server readiness status."""
622
status: str # "ready" or other status
623
started: str # Server start timestamp
624
up: str # Server uptime
625
626
class HealthCheck:
627
"""Server health check response (deprecated)."""
628
name: str # Service name
629
status: str # Health status
630
version: str # Service version
631
commit: str # Git commit hash
632
633
class Routes:
634
"""Available API routes information."""
635
query: QueryRoute
636
write: WriteRoute
637
ping: PingRoute
638
# Additional route definitions
639
```
640
641
#### Health Monitoring Usage Examples
642
643
**Server health monitoring:**
644
```python
645
def monitor_influxdb_health(client):
646
try:
647
# Basic connectivity check
648
if client.ping():
649
print("✓ InfluxDB server is reachable")
650
651
# Get detailed server information
652
version = client.version()
653
build = client.build()
654
ready_status = client.ready()
655
656
print(f"✓ Version: {version}")
657
print(f"✓ Build: {build}")
658
print(f"✓ Status: {ready_status.status}")
659
print(f"✓ Started: {ready_status.started}")
660
print(f"✓ Uptime: {ready_status.up}")
661
662
return True
663
else:
664
print("✗ InfluxDB server is not reachable")
665
return False
666
667
except Exception as e:
668
print(f"✗ Health check failed: {e}")
669
return False
670
671
# Usage
672
client = InfluxDBClient(url="http://localhost:8086", token="token")
673
is_healthy = monitor_influxdb_health(client)
674
```
675
676
**Async health monitoring:**
677
```python
678
async def async_health_check():
679
async with InfluxDBClientAsync(url="http://localhost:8086", token="token") as client:
680
if await client.ping():
681
version = await client.version()
682
build = await client.build()
683
print(f"Async health check: OK (v{version}, {build})")
684
else:
685
print("Async health check: FAILED")
686
687
asyncio.run(async_health_check())
688
```
689
690
## Types
691
692
```python { .api }
693
# Client configuration types
694
ClientConfig = Dict[str, Any] # Generic client configuration
695
ConnectionConfig = Dict[str, Any] # HTTP connection settings
696
697
# Authentication types
698
AuthToken = str # Authentication token
699
AuthCredentials = Dict[str, str] # Username/password auth
700
701
# Timeout and retry types
702
TimeoutConfig = Union[int, float] # Timeout in seconds/milliseconds
703
RetryConfig = Dict[str, Any] # Retry policy configuration
704
705
# Server response types
706
ServerVersion = str # Version string like "2.7.1"
707
ServerBuild = str # Build information string
708
ServerStatus = str # Status indicator
709
710
# Configuration file sections
711
ConfigSection = Dict[str, str] # INI file section
712
ConfigFile = Dict[str, ConfigSection] # Full INI configuration
713
714
# Environment variable names (constants)
715
ENV_URL = "INFLUXDB_V2_URL"
716
ENV_TOKEN = "INFLUXDB_V2_TOKEN"
717
ENV_ORG = "INFLUXDB_V2_ORG"
718
ENV_TIMEOUT = "INFLUXDB_V2_TIMEOUT"
719
720
# Exception types
721
class ConnectionError(InfluxDBError):
722
"""Raised when connection to InfluxDB fails."""
723
pass
724
725
class AuthenticationError(InfluxDBError):
726
"""Raised when authentication fails."""
727
pass
728
729
class ConfigurationError(InfluxDBError):
730
"""Raised when client configuration is invalid."""
731
pass
732
```