0
# DataFrame Client
1
2
The DataFrameClient extends InfluxDBClient to provide seamless integration with pandas DataFrames, enabling efficient data analysis workflows and simplified data exchange between InfluxDB and Python data science tools.
3
4
**Requirement**: This client requires the `pandas` library to be installed.
5
6
## Capabilities
7
8
### Client Initialization
9
10
DataFrameClient inherits all InfluxDBClient functionality while adding DataFrame-specific methods.
11
12
```python { .api }
13
class DataFrameClient(InfluxDBClient):
14
def __init__(self, host='localhost', port=8086, username='root', password='root',
15
database=None, ssl=False, verify_ssl=False, timeout=None, retries=3,
16
use_udp=False, udp_port=4444, proxies=None, pool_size=10, path='',
17
cert=None, gzip=False, session=None, headers=None, socket_options=None):
18
"""
19
Initialize DataFrame client with same parameters as InfluxDBClient.
20
21
Raises:
22
ImportError: If pandas is not installed
23
"""
24
```
25
26
#### Usage Example
27
28
```python
29
from influxdb import DataFrameClient
30
import pandas as pd
31
32
# Create DataFrame client
33
client = DataFrameClient(host='localhost', port=8086, database='mydb')
34
35
# Verify pandas is available
36
print("DataFrame client ready for pandas integration")
37
```
38
39
### DataFrame Writing
40
41
Write pandas DataFrames directly to InfluxDB with flexible column mapping and type handling.
42
43
```python { .api }
44
def write_points(self, dataframe, measurement, tags=None, tag_columns=None,
45
field_columns=None, time_precision=None, database=None,
46
retention_policy=None, batch_size=None, protocol='line',
47
numeric_precision=None):
48
"""
49
Write pandas DataFrame to InfluxDB.
50
51
Parameters:
52
- dataframe (pandas.DataFrame): Data to write
53
- measurement (str): Measurement name
54
- tags (dict): Global tags for all points (default: None)
55
- tag_columns (list): DataFrame columns to use as tags (default: None)
56
- field_columns (list): DataFrame columns to use as fields (default: None)
57
- time_precision (str): Time precision ('s', 'ms', 'u', 'ns') (default: None)
58
- database (str): Database name override (default: None)
59
- retention_policy (str): Retention policy name (default: None)
60
- batch_size (int): Points per batch (default: None)
61
- protocol (str): Write protocol ('line' recommended) (default: 'line')
62
- numeric_precision (int): Decimal precision for floats (default: None)
63
64
Returns:
65
bool: True if successful
66
67
Raises:
68
InfluxDBClientError: On write errors
69
TypeError: If dataframe is not a pandas DataFrame
70
"""
71
```
72
73
#### DataFrame Writing Examples
74
75
```python
76
import pandas as pd
77
from datetime import datetime, timezone
78
79
# Create sample DataFrame
80
df = pd.DataFrame({
81
'timestamp': [
82
datetime(2023, 9, 7, 7, 0, 0, tzinfo=timezone.utc),
83
datetime(2023, 9, 7, 7, 1, 0, tzinfo=timezone.utc),
84
datetime(2023, 9, 7, 7, 2, 0, tzinfo=timezone.utc)
85
],
86
'host': ['server01', 'server01', 'server02'],
87
'region': ['us-west', 'us-west', 'us-east'],
88
'cpu_usage': [65.2, 70.1, 45.8],
89
'memory_usage': [78.5, 82.3, 56.7],
90
'disk_io': [1200, 1450, 890]
91
})
92
93
# Set timestamp as index
94
df.set_index('timestamp', inplace=True)
95
96
# Write DataFrame with automatic field detection
97
client.write_points(
98
dataframe=df,
99
measurement='system_metrics',
100
tag_columns=['host', 'region'] # These columns become tags
101
# cpu_usage, memory_usage, disk_io automatically become fields
102
)
103
104
# Write with explicit field selection
105
client.write_points(
106
dataframe=df,
107
measurement='cpu_metrics',
108
tag_columns=['host', 'region'],
109
field_columns=['cpu_usage'] # Only cpu_usage as field
110
)
111
112
# Write with global tags
113
client.write_points(
114
dataframe=df,
115
measurement='system_metrics',
116
tags={'environment': 'production'}, # Added to all points
117
tag_columns=['host'],
118
field_columns=['cpu_usage', 'memory_usage']
119
)
120
121
# Batch writing for large DataFrames
122
large_df = pd.DataFrame(...) # Large dataset
123
client.write_points(
124
dataframe=large_df,
125
measurement='bulk_data',
126
batch_size=10000
127
)
128
```
129
130
### DataFrame Querying
131
132
Execute InfluxQL queries and receive results as pandas DataFrames for immediate analysis.
133
134
```python { .api }
135
def query(self, query, params=None, bind_params=None, epoch=None,
136
expected_response_code=200, database=None, raise_errors=True,
137
chunked=False, chunk_size=0, method="GET", dropna=True,
138
data_frame_index=None):
139
"""
140
Query InfluxDB and return results as pandas DataFrame.
141
142
Parameters:
143
- query (str): InfluxQL query string
144
- params (dict): URL parameters (default: None)
145
- bind_params (dict): Query parameter bindings (default: None)
146
- epoch (str): Time precision ('s', 'ms', 'u', 'ns') (default: None)
147
- expected_response_code (int): Expected HTTP status (default: 200)
148
- database (str): Database name override (default: None)
149
- raise_errors (bool): Raise exceptions on query errors (default: True)
150
- chunked (bool): Enable chunked responses (default: False)
151
- chunk_size (int): Chunk size for chunked responses (default: 0)
152
- method (str): HTTP method ('GET' or 'POST') (default: 'GET')
153
- dropna (bool): Drop rows with NaN values (default: True)
154
- data_frame_index (str): Column to use as DataFrame index (default: None)
155
156
Returns:
157
dict: Dictionary mapping measurement names to pandas DataFrames
158
159
Raises:
160
InfluxDBClientError: On query errors
161
"""
162
```
163
164
#### DataFrame Querying Examples
165
166
```python
167
# Basic query returning DataFrame
168
result = client.query('SELECT * FROM cpu_usage WHERE time >= now() - 1h')
169
170
# result is a dict: {'cpu_usage': DataFrame}
171
df = result['cpu_usage']
172
print(df.head())
173
print(df.describe())
174
175
# Query with time grouping
176
result = client.query("""
177
SELECT mean(value) as avg_cpu, max(value) as max_cpu
178
FROM cpu_usage
179
WHERE time >= now() - 24h
180
GROUP BY time(1h), host
181
""")
182
183
df = result['cpu_usage']
184
# DataFrame with time-based grouping
185
186
# Multiple measurements in one query
187
result = client.query("""
188
SELECT * FROM cpu_usage;
189
SELECT * FROM memory_usage
190
""")
191
192
cpu_df = result['cpu_usage']
193
memory_df = result['memory_usage']
194
195
# Set custom DataFrame index
196
result = client.query(
197
'SELECT * FROM system_metrics ORDER BY time DESC LIMIT 1000',
198
data_frame_index='time' # Use time column as index
199
)
200
201
df = result['system_metrics']
202
# DataFrame indexed by time for time series analysis
203
204
# Query with parameters
205
result = client.query(
206
'SELECT * FROM metrics WHERE host = $host AND time >= $start_time',
207
bind_params={
208
'host': 'server01',
209
'start_time': '2023-09-07T00:00:00Z'
210
}
211
)
212
```
213
214
### Time Series Analysis Integration
215
216
The DataFrame integration enables seamless use of pandas time series analysis tools.
217
218
#### Analysis Examples
219
220
```python
221
# Query data for analysis
222
result = client.query("""
223
SELECT mean(cpu_usage) as cpu, mean(memory_usage) as memory
224
FROM system_metrics
225
WHERE time >= now() - 7d
226
GROUP BY time(1h)
227
""")
228
229
df = result['system_metrics']
230
231
# Time series analysis with pandas
232
df['time'] = pd.to_datetime(df['time'])
233
df.set_index('time', inplace=True)
234
235
# Rolling averages
236
df['cpu_rolling_mean'] = df['cpu'].rolling(window=6).mean() # 6-hour window
237
df['memory_rolling_mean'] = df['memory'].rolling(window=6).mean()
238
239
# Resampling
240
daily_avg = df.resample('D').mean()
241
hourly_max = df.resample('H').max()
242
243
# Statistical analysis
244
correlation = df['cpu'].corr(df['memory'])
245
cpu_stats = df['cpu'].describe()
246
247
# Plotting with matplotlib
248
import matplotlib.pyplot as plt
249
250
df[['cpu', 'cpu_rolling_mean']].plot(figsize=(12, 6))
251
plt.title('CPU Usage Over Time')
252
plt.ylabel('CPU Usage (%)')
253
plt.show()
254
```
255
256
### DataFrame Data Types and Conversion
257
258
Handle data type mapping between InfluxDB and pandas effectively.
259
260
```python
261
# Specify data types when writing
262
df = pd.DataFrame({
263
'time': pd.date_range('2023-09-07', periods=100, freq='1min'),
264
'sensor_id': ['sensor_' + str(i % 10) for i in range(100)],
265
'temperature': np.random.normal(25.0, 2.0, 100),
266
'humidity': np.random.normal(60.0, 10.0, 100),
267
'is_active': [True] * 50 + [False] * 50
268
})
269
270
df.set_index('time', inplace=True)
271
272
# Write with type preservation
273
client.write_points(
274
dataframe=df,
275
measurement='sensor_data',
276
tag_columns=['sensor_id'], # String tags
277
field_columns=['temperature', 'humidity', 'is_active'], # Mixed field types
278
numeric_precision=2 # Round floats to 2 decimal places
279
)
280
281
# Query back with proper types
282
result = client.query('SELECT * FROM sensor_data LIMIT 10')
283
df_result = result['sensor_data']
284
285
# Verify data types
286
print(df_result.dtypes)
287
print(df_result['is_active'].unique()) # Boolean values preserved
288
```
289
290
## Error Handling
291
292
DataFrameClient inherits all InfluxDBClient error handling plus pandas-specific errors.
293
294
```python
295
from influxdb import DataFrameClient
296
from influxdb.exceptions import InfluxDBClientError
297
298
try:
299
client = DataFrameClient()
300
301
# This will raise ImportError if pandas not installed
302
result = client.query('SELECT * FROM measurement')
303
304
except ImportError as e:
305
print("pandas is required for DataFrameClient:", e)
306
307
except InfluxDBClientError as e:
308
print("InfluxDB error:", e)
309
310
# Validate DataFrame before writing
311
def safe_write_dataframe(client, df, measurement):
312
if not isinstance(df, pd.DataFrame):
313
raise TypeError("Input must be a pandas DataFrame")
314
315
if df.empty:
316
print("Warning: DataFrame is empty, skipping write")
317
return
318
319
try:
320
client.write_points(df, measurement=measurement)
321
print(f"Successfully wrote {len(df)} points to {measurement}")
322
except Exception as e:
323
print(f"Failed to write DataFrame: {e}")
324
```
325
326
## Performance Tips
327
328
### Optimize DataFrame Operations
329
330
```python
331
# Use line protocol for better performance
332
client.write_points(df, measurement='metrics', protocol='line')
333
334
# Batch large DataFrames
335
large_df = pd.read_csv('large_dataset.csv')
336
client.write_points(large_df, measurement='bulk_data', batch_size=10000)
337
338
# Pre-process DataFrames for efficiency
339
df['timestamp'] = pd.to_datetime(df['timestamp'])
340
df.set_index('timestamp', inplace=True)
341
df = df.sort_index() # Pre-sort for better write performance
342
343
# Use appropriate data types
344
df = df.astype({
345
'sensor_id': 'category', # Use category for repeated strings
346
'value': 'float32' # Use float32 if precision allows
347
})
348
```
349
350
### Memory Management
351
352
```python
353
# Process large datasets in chunks
354
def write_large_csv(client, filepath, measurement, chunk_size=10000):
355
for chunk in pd.read_csv(filepath, chunksize=chunk_size):
356
# Process chunk
357
chunk['timestamp'] = pd.to_datetime(chunk['timestamp'])
358
chunk.set_index('timestamp', inplace=True)
359
360
# Write chunk
361
client.write_points(
362
dataframe=chunk,
363
measurement=measurement,
364
protocol='line'
365
)
366
367
print(f"Processed {len(chunk)} records")
368
369
# Use context manager for automatic cleanup
370
with DataFrameClient(database='large_db') as client:
371
write_large_csv(client, 'massive_dataset.csv', 'sensor_readings')
372
```