InfluxDB client library for time series database operations with comprehensive API for data management and querying
—
The InfluxDBClient is the primary interface for interacting with InfluxDB servers. It provides comprehensive functionality for database operations, data querying and writing, connection management, and administrative tasks.
Create and configure InfluxDB client connections with extensive customization options.
class InfluxDBClient:
def __init__(self, host='localhost', port=8086, username='root', password='root',
database=None, ssl=False, verify_ssl=False, timeout=None, retries=3,
use_udp=False, udp_port=4444, proxies=None, pool_size=10, path='',
cert=None, gzip=False, session=None, headers=None, socket_options=None):
"""
Initialize InfluxDB client.
Parameters:
- host (str): Hostname (default: 'localhost')
- port (int): Port number (default: 8086)
- username (str): Username (default: 'root')
- password (str): Password (default: 'root')
- database (str): Database name (default: None)
- ssl (bool): Use SSL (default: False)
- verify_ssl (bool): Verify SSL certificates (default: False)
- timeout (int): Request timeout in seconds (default: None)
- retries (int): Number of retries (default: 3)
- use_udp (bool): Use UDP for writes (default: False)
- udp_port (int): UDP port (default: 4444)
- proxies (dict): HTTP proxies (default: None)
- pool_size (int): Connection pool size (default: 10)
- path (str): URL path prefix (default: '')
- cert (str): Client certificate path (default: None)
- gzip (bool): Use gzip compression (default: False)
- session (requests.Session): Custom HTTP session (default: None)
- headers (dict): Custom HTTP headers (default: None)
- socket_options (list): Socket options (default: None)
"""
@classmethod
def from_dsn(cls, dsn, **kwargs):
"""
Create client from data source name (DSN).
Parameters:
- dsn (str): Data source name URL
- **kwargs: Additional client parameters
Returns:
InfluxDBClient instance
"""# Basic connection
client = InfluxDBClient()
# Production configuration
client = InfluxDBClient(
host='influxdb.example.com',
port=8086,
username='myuser',
password='mypass',
database='production',
ssl=True,
verify_ssl=True,
timeout=30,
retries=5,
pool_size=20
)
# DSN connection
client = InfluxDBClient.from_dsn('https://user:pass@influxdb.example.com:8086/mydb')Manage client connections and authentication dynamically.
def ping(self):
"""
Check connectivity to InfluxDB server.
Returns:
dict: Server response with version information
Raises:
InfluxDBClientError: If connection fails
"""
def switch_database(self, database):
"""
Change the client's default database.
Parameters:
- database (str): Database name to switch to
"""
def switch_user(self, username, password):
"""
Change the client's authentication credentials.
Parameters:
- username (str): New username
- password (str): New password
"""
def close(self):
"""
Close the HTTP session and release resources.
"""Execute queries and write data to InfluxDB with flexible options.
def query(self, query, params=None, bind_params=None, epoch=None,
expected_response_code=200, database=None, raise_errors=True,
chunked=False, chunk_size=0, method="GET"):
"""
Send query to InfluxDB and return results.
Parameters:
- query (str): InfluxQL query string
- params (dict): URL parameters (default: None)
- bind_params (dict): Query parameter bindings (default: None)
- epoch (str): Time precision ('s', 'ms', 'u', 'ns') (default: None)
- expected_response_code (int): Expected HTTP status (default: 200)
- database (str): Database name override (default: None)
- raise_errors (bool): Raise exceptions on query errors (default: True)
- chunked (bool): Enable chunked responses (default: False)
- chunk_size (int): Chunk size for chunked responses (default: 0)
- method (str): HTTP method ('GET' or 'POST') (default: 'GET')
Returns:
ResultSet: Query results wrapper
Raises:
InfluxDBClientError: On client errors
InfluxDBServerError: On server errors
"""
def write_points(self, points, time_precision=None, database=None,
retention_policy=None, tags=None, batch_size=None,
protocol='json', consistency=None):
"""
Write multiple time series points to InfluxDB.
Parameters:
- points (list): List of point dictionaries
- time_precision (str): Time precision ('s', 'ms', 'u', 'ns') (default: None)
- database (str): Database name override (default: None)
- retention_policy (str): Retention policy name (default: None)
- tags (dict): Global tags to add to all points (default: None)
- batch_size (int): Points per batch for large writes (default: None)
- protocol (str): Write protocol ('json' or 'line') (default: 'json')
- consistency (str): Write consistency level (default: None)
Returns:
bool: True if successful
Raises:
InfluxDBClientError: On write errors
"""
def write(self, data, params=None, expected_response_code=204, protocol='json'):
"""
Low-level write method for raw data.
Parameters:
- data (str or dict): Data to write
- params (dict): URL parameters (default: None)
- expected_response_code (int): Expected HTTP status (default: 204)
- protocol (str): Write protocol (default: 'json')
Returns:
bool: True if successful
"""# Simple query
result = client.query('SELECT * FROM cpu_usage')
# Query with time range
result = client.query("""
SELECT mean(value) as avg_cpu
FROM cpu_usage
WHERE time >= now() - 1h
GROUP BY time(10m)
""")
# Parameterized query
result = client.query(
'SELECT * FROM measurement WHERE host = $host',
bind_params={'host': 'server01'}
)
# Chunked query for large results
result = client.query(
'SELECT * FROM large_measurement',
chunked=True,
chunk_size=10000
)# Write single point
json_body = [
{
"measurement": "cpu_load_short",
"tags": {
"host": "server01",
"region": "us-west"
},
"time": "2023-09-07T07:18:24Z",
"fields": {
"value": 0.64
}
}
]
client.write_points(json_body)
# Write multiple points with global tags
points = [
{
"measurement": "temperature",
"fields": {"value": 23.5},
"time": "2023-09-07T07:18:24Z"
},
{
"measurement": "temperature",
"fields": {"value": 24.1},
"time": "2023-09-07T07:19:24Z"
}
]
client.write_points(points, tags={"location": "room1"})
# Write with line protocol
client.write_points(points, protocol='line')
# Batch write for large datasets
large_points = [...] # Many points
client.write_points(large_points, batch_size=5000)Direct access to InfluxDB HTTP API for advanced operations.
def request(self, url, method='GET', params=None, data=None, stream=False,
expected_response_code=200, headers=None):
"""
Make direct HTTP request to InfluxDB API.
Parameters:
- url (str): API endpoint URL
- method (str): HTTP method (default: 'GET')
- params (dict): URL parameters (default: None)
- data (str or dict): Request body data (default: None)
- stream (bool): Stream response (default: False)
- expected_response_code (int): Expected HTTP status (default: 200)
- headers (dict): Additional HTTP headers (default: None)
Returns:
requests.Response: HTTP response object
Raises:
InfluxDBClientError: On request errors
"""High-throughput data writing using UDP protocol.
def send_packet(self, packet, protocol='json', time_precision=None):
"""
Send data packet via UDP for high-throughput writes.
Parameters:
- packet (str or dict): Data packet to send
- protocol (str): Packet protocol ('json' or 'line') (default: 'json')
- time_precision (str): Time precision ('s', 'ms', 'u', 'ns') (default: None)
Note: UDP writes are fire-and-forget with no error handling
"""# Configure client for UDP
client = InfluxDBClient(use_udp=True, udp_port=4444)
# Send UDP packets
packet = {
"measurement": "cpu_load_short",
"tags": {"host": "server01"},
"fields": {"value": 0.64}
}
client.send_packet(packet)
# Line protocol UDP
line_data = "cpu_load_short,host=server01 value=0.64"
client.send_packet(line_data, protocol='line')Automatic resource management using Python context managers.
def __enter__(self):
"""Enter context manager."""
return self
def __exit__(self, _exc_type, _exc_value, _traceback):
"""Exit context manager and clean up resources."""# Automatic cleanup
with InfluxDBClient(host='localhost', database='mydb') as client:
client.write_points([{"measurement": "test", "fields": {"value": 1}}])
results = client.query('SELECT * FROM test')
# Client automatically closed when exiting contextclient = InfluxDBClient(
host='secure.influxdb.com',
port=8086,
ssl=True,
verify_ssl=True,
cert='/path/to/client.pem'
)client = InfluxDBClient(
host='influxdb.internal.com',
proxies={
'http': 'http://proxy.company.com:8080',
'https': 'https://proxy.company.com:8080'
}
)client = InfluxDBClient(
host='influxdb.com',
headers={
'Authorization': 'Bearer your-jwt-token',
'Custom-Header': 'value'
}
)Install with Tessl CLI
npx tessl i tessl/pypi-influxdb