or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client.mddata-management.mddatabase-operations.mddataframe-client.mdindex.mdlegacy.md

dataframe-client.mddocs/

0

# DataFrame Client

1

2

The DataFrameClient extends InfluxDBClient to provide seamless integration with pandas DataFrames, enabling efficient data analysis workflows and simplified data exchange between InfluxDB and Python data science tools.

3

4

**Requirement**: This client requires the `pandas` library to be installed.

5

6

## Capabilities

7

8

### Client Initialization

9

10

DataFrameClient inherits all InfluxDBClient functionality while adding DataFrame-specific methods.

11

12

```python { .api }

13

class DataFrameClient(InfluxDBClient):

14

def __init__(self, host='localhost', port=8086, username='root', password='root',

15

database=None, ssl=False, verify_ssl=False, timeout=None, retries=3,

16

use_udp=False, udp_port=4444, proxies=None, pool_size=10, path='',

17

cert=None, gzip=False, session=None, headers=None, socket_options=None):

18

"""

19

Initialize DataFrame client with same parameters as InfluxDBClient.

20

21

Raises:

22

ImportError: If pandas is not installed

23

"""

24

```

25

26

#### Usage Example

27

28

```python

29

from influxdb import DataFrameClient

30

import pandas as pd

31

32

# Create DataFrame client

33

client = DataFrameClient(host='localhost', port=8086, database='mydb')

34

35

# Verify pandas is available

36

print("DataFrame client ready for pandas integration")

37

```

38

39

### DataFrame Writing

40

41

Write pandas DataFrames directly to InfluxDB with flexible column mapping and type handling.

42

43

```python { .api }

44

def write_points(self, dataframe, measurement, tags=None, tag_columns=None,

45

field_columns=None, time_precision=None, database=None,

46

retention_policy=None, batch_size=None, protocol='line',

47

numeric_precision=None):

48

"""

49

Write pandas DataFrame to InfluxDB.

50

51

Parameters:

52

- dataframe (pandas.DataFrame): Data to write

53

- measurement (str): Measurement name

54

- tags (dict): Global tags for all points (default: None)

55

- tag_columns (list): DataFrame columns to use as tags (default: None)

56

- field_columns (list): DataFrame columns to use as fields (default: None)

57

- time_precision (str): Time precision ('s', 'ms', 'u', 'ns') (default: None)

58

- database (str): Database name override (default: None)

59

- retention_policy (str): Retention policy name (default: None)

60

- batch_size (int): Points per batch (default: None)

61

- protocol (str): Write protocol ('line' recommended) (default: 'line')

62

- numeric_precision (int): Decimal precision for floats (default: None)

63

64

Returns:

65

bool: True if successful

66

67

Raises:

68

InfluxDBClientError: On write errors

69

TypeError: If dataframe is not a pandas DataFrame

70

"""

71

```

72

73

#### DataFrame Writing Examples

74

75

```python

76

import pandas as pd

77

from datetime import datetime, timezone

78

79

# Create sample DataFrame

80

df = pd.DataFrame({

81

'timestamp': [

82

datetime(2023, 9, 7, 7, 0, 0, tzinfo=timezone.utc),

83

datetime(2023, 9, 7, 7, 1, 0, tzinfo=timezone.utc),

84

datetime(2023, 9, 7, 7, 2, 0, tzinfo=timezone.utc)

85

],

86

'host': ['server01', 'server01', 'server02'],

87

'region': ['us-west', 'us-west', 'us-east'],

88

'cpu_usage': [65.2, 70.1, 45.8],

89

'memory_usage': [78.5, 82.3, 56.7],

90

'disk_io': [1200, 1450, 890]

91

})

92

93

# Set timestamp as index

94

df.set_index('timestamp', inplace=True)

95

96

# Write DataFrame with automatic field detection

97

client.write_points(

98

dataframe=df,

99

measurement='system_metrics',

100

tag_columns=['host', 'region'] # These columns become tags

101

# cpu_usage, memory_usage, disk_io automatically become fields

102

)

103

104

# Write with explicit field selection

105

client.write_points(

106

dataframe=df,

107

measurement='cpu_metrics',

108

tag_columns=['host', 'region'],

109

field_columns=['cpu_usage'] # Only cpu_usage as field

110

)

111

112

# Write with global tags

113

client.write_points(

114

dataframe=df,

115

measurement='system_metrics',

116

tags={'environment': 'production'}, # Added to all points

117

tag_columns=['host'],

118

field_columns=['cpu_usage', 'memory_usage']

119

)

120

121

# Batch writing for large DataFrames

122

large_df = pd.DataFrame(...) # Large dataset

123

client.write_points(

124

dataframe=large_df,

125

measurement='bulk_data',

126

batch_size=10000

127

)

128

```

129

130

### DataFrame Querying

131

132

Execute InfluxQL queries and receive results as pandas DataFrames for immediate analysis.

133

134

```python { .api }

135

def query(self, query, params=None, bind_params=None, epoch=None,

136

expected_response_code=200, database=None, raise_errors=True,

137

chunked=False, chunk_size=0, method="GET", dropna=True,

138

data_frame_index=None):

139

"""

140

Query InfluxDB and return results as pandas DataFrame.

141

142

Parameters:

143

- query (str): InfluxQL query string

144

- params (dict): URL parameters (default: None)

145

- bind_params (dict): Query parameter bindings (default: None)

146

- epoch (str): Time precision ('s', 'ms', 'u', 'ns') (default: None)

147

- expected_response_code (int): Expected HTTP status (default: 200)

148

- database (str): Database name override (default: None)

149

- raise_errors (bool): Raise exceptions on query errors (default: True)

150

- chunked (bool): Enable chunked responses (default: False)

151

- chunk_size (int): Chunk size for chunked responses (default: 0)

152

- method (str): HTTP method ('GET' or 'POST') (default: 'GET')

153

- dropna (bool): Drop rows with NaN values (default: True)

154

- data_frame_index (str): Column to use as DataFrame index (default: None)

155

156

Returns:

157

dict: Dictionary mapping measurement names to pandas DataFrames

158

159

Raises:

160

InfluxDBClientError: On query errors

161

"""

162

```

163

164

#### DataFrame Querying Examples

165

166

```python

167

# Basic query returning DataFrame

168

result = client.query('SELECT * FROM cpu_usage WHERE time >= now() - 1h')

169

170

# result is a dict: {'cpu_usage': DataFrame}

171

df = result['cpu_usage']

172

print(df.head())

173

print(df.describe())

174

175

# Query with time grouping

176

result = client.query("""

177

SELECT mean(value) as avg_cpu, max(value) as max_cpu

178

FROM cpu_usage

179

WHERE time >= now() - 24h

180

GROUP BY time(1h), host

181

""")

182

183

df = result['cpu_usage']

184

# DataFrame with time-based grouping

185

186

# Multiple measurements in one query

187

result = client.query("""

188

SELECT * FROM cpu_usage;

189

SELECT * FROM memory_usage

190

""")

191

192

cpu_df = result['cpu_usage']

193

memory_df = result['memory_usage']

194

195

# Set custom DataFrame index

196

result = client.query(

197

'SELECT * FROM system_metrics ORDER BY time DESC LIMIT 1000',

198

data_frame_index='time' # Use time column as index

199

)

200

201

df = result['system_metrics']

202

# DataFrame indexed by time for time series analysis

203

204

# Query with parameters

205

result = client.query(

206

'SELECT * FROM metrics WHERE host = $host AND time >= $start_time',

207

bind_params={

208

'host': 'server01',

209

'start_time': '2023-09-07T00:00:00Z'

210

}

211

)

212

```

213

214

### Time Series Analysis Integration

215

216

The DataFrame integration enables seamless use of pandas time series analysis tools.

217

218

#### Analysis Examples

219

220

```python

221

# Query data for analysis

222

result = client.query("""

223

SELECT mean(cpu_usage) as cpu, mean(memory_usage) as memory

224

FROM system_metrics

225

WHERE time >= now() - 7d

226

GROUP BY time(1h)

227

""")

228

229

df = result['system_metrics']

230

231

# Time series analysis with pandas

232

df['time'] = pd.to_datetime(df['time'])

233

df.set_index('time', inplace=True)

234

235

# Rolling averages

236

df['cpu_rolling_mean'] = df['cpu'].rolling(window=6).mean() # 6-hour window

237

df['memory_rolling_mean'] = df['memory'].rolling(window=6).mean()

238

239

# Resampling

240

daily_avg = df.resample('D').mean()

241

hourly_max = df.resample('H').max()

242

243

# Statistical analysis

244

correlation = df['cpu'].corr(df['memory'])

245

cpu_stats = df['cpu'].describe()

246

247

# Plotting with matplotlib

248

import matplotlib.pyplot as plt

249

250

df[['cpu', 'cpu_rolling_mean']].plot(figsize=(12, 6))

251

plt.title('CPU Usage Over Time')

252

plt.ylabel('CPU Usage (%)')

253

plt.show()

254

```

255

256

### DataFrame Data Types and Conversion

257

258

Handle data type mapping between InfluxDB and pandas effectively.

259

260

```python

261

# Specify data types when writing

262

df = pd.DataFrame({

263

'time': pd.date_range('2023-09-07', periods=100, freq='1min'),

264

'sensor_id': ['sensor_' + str(i % 10) for i in range(100)],

265

'temperature': np.random.normal(25.0, 2.0, 100),

266

'humidity': np.random.normal(60.0, 10.0, 100),

267

'is_active': [True] * 50 + [False] * 50

268

})

269

270

df.set_index('time', inplace=True)

271

272

# Write with type preservation

273

client.write_points(

274

dataframe=df,

275

measurement='sensor_data',

276

tag_columns=['sensor_id'], # String tags

277

field_columns=['temperature', 'humidity', 'is_active'], # Mixed field types

278

numeric_precision=2 # Round floats to 2 decimal places

279

)

280

281

# Query back with proper types

282

result = client.query('SELECT * FROM sensor_data LIMIT 10')

283

df_result = result['sensor_data']

284

285

# Verify data types

286

print(df_result.dtypes)

287

print(df_result['is_active'].unique()) # Boolean values preserved

288

```

289

290

## Error Handling

291

292

DataFrameClient inherits all InfluxDBClient error handling plus pandas-specific errors.

293

294

```python

295

from influxdb import DataFrameClient

296

from influxdb.exceptions import InfluxDBClientError

297

298

try:

299

client = DataFrameClient()

300

301

# This will raise ImportError if pandas not installed

302

result = client.query('SELECT * FROM measurement')

303

304

except ImportError as e:

305

print("pandas is required for DataFrameClient:", e)

306

307

except InfluxDBClientError as e:

308

print("InfluxDB error:", e)

309

310

# Validate DataFrame before writing

311

def safe_write_dataframe(client, df, measurement):

312

if not isinstance(df, pd.DataFrame):

313

raise TypeError("Input must be a pandas DataFrame")

314

315

if df.empty:

316

print("Warning: DataFrame is empty, skipping write")

317

return

318

319

try:

320

client.write_points(df, measurement=measurement)

321

print(f"Successfully wrote {len(df)} points to {measurement}")

322

except Exception as e:

323

print(f"Failed to write DataFrame: {e}")

324

```

325

326

## Performance Tips

327

328

### Optimize DataFrame Operations

329

330

```python

331

# Use line protocol for better performance

332

client.write_points(df, measurement='metrics', protocol='line')

333

334

# Batch large DataFrames

335

large_df = pd.read_csv('large_dataset.csv')

336

client.write_points(large_df, measurement='bulk_data', batch_size=10000)

337

338

# Pre-process DataFrames for efficiency

339

df['timestamp'] = pd.to_datetime(df['timestamp'])

340

df.set_index('timestamp', inplace=True)

341

df = df.sort_index() # Pre-sort for better write performance

342

343

# Use appropriate data types

344

df = df.astype({

345

'sensor_id': 'category', # Use category for repeated strings

346

'value': 'float32' # Use float32 if precision allows

347

})

348

```

349

350

### Memory Management

351

352

```python

353

# Process large datasets in chunks

354

def write_large_csv(client, filepath, measurement, chunk_size=10000):

355

for chunk in pd.read_csv(filepath, chunksize=chunk_size):

356

# Process chunk

357

chunk['timestamp'] = pd.to_datetime(chunk['timestamp'])

358

chunk.set_index('timestamp', inplace=True)

359

360

# Write chunk

361

client.write_points(

362

dataframe=chunk,

363

measurement=measurement,

364

protocol='line'

365

)

366

367

print(f"Processed {len(chunk)} records")

368

369

# Use context manager for automatic cleanup

370

with DataFrameClient(database='large_db') as client:

371

write_large_csv(client, 'massive_dataset.csv', 'sensor_readings')

372

```