or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

advanced-operations.mdclient-management.mdindex.mdquerying-data.mdresource-management.mdwriting-data.md

querying-data.mddocs/

0

# Querying Data

1

2

Comprehensive functionality for executing Flux queries against InfluxDB and processing results in various formats including streaming, materialized tables, CSV, pandas DataFrames, and raw HTTP responses. The querying system supports both synchronous and asynchronous operations with flexible result processing.

3

4

## Capabilities

5

6

### QueryApi

7

8

Main API for executing Flux queries against InfluxDB with support for multiple result formats and query profiling.

9

10

```python { .api }

11

class QueryApi:

12

def __init__(

13

self,

14

influxdb_client,

15

query_options: QueryOptions = QueryOptions()

16

): ...

17

18

def query(

19

self,

20

query: str,

21

org: str = None,

22

params: dict = None

23

) -> TableList:

24

"""

25

Execute Flux query and return materialized results as TableList.

26

27

Parameters:

28

- query (str): Flux query string

29

- org (str, optional): Organization name or ID

30

- params (dict, optional): Query parameters for parameterized queries

31

32

Returns:

33

TableList: Materialized query results

34

"""

35

36

def query_stream(

37

self,

38

query: str,

39

org: str = None,

40

params: dict = None

41

) -> Generator[FluxRecord]:

42

"""

43

Execute Flux query and return streaming results as generator.

44

45

Parameters:

46

- query (str): Flux query string

47

- org (str, optional): Organization name or ID

48

- params (dict, optional): Query parameters

49

50

Returns:

51

Generator[FluxRecord]: Streaming query results

52

"""

53

54

def query_csv(

55

self,

56

query: str,

57

org: str = None,

58

dialect: Dialect = None,

59

params: dict = None

60

) -> CSVIterator:

61

"""

62

Execute Flux query and return results as CSV iterator.

63

64

Parameters:

65

- query (str): Flux query string

66

- org (str, optional): Organization name or ID

67

- dialect (Dialect, optional): CSV format configuration

68

- params (dict, optional): Query parameters

69

70

Returns:

71

CSVIterator: CSV formatted query results

72

"""

73

74

def query_raw(

75

self,

76

query: str,

77

org: str = None,

78

dialect: Dialect = None,

79

params: dict = None

80

) -> str:

81

"""

82

Execute Flux query and return raw HTTP response.

83

84

Parameters:

85

- query (str): Flux query string

86

- org (str, optional): Organization name or ID

87

- dialect (Dialect, optional): Response format configuration

88

- params (dict, optional): Query parameters

89

90

Returns:

91

HTTPResponse: Raw HTTP response object

92

"""

93

94

def query_data_frame(

95

self,

96

query: str,

97

org: str = None,

98

data_frame_index: List[str] = None,

99

params: dict = None,

100

use_extension_dtypes: bool = False

101

):

102

"""

103

Execute Flux query and return results as pandas DataFrame.

104

105

Parameters:

106

- query (str): Flux query string

107

- org (str, optional): Organization name or ID

108

- data_frame_index (List[str], optional): Columns to use as DataFrame index

109

- params (dict, optional): Query parameters

110

- use_extension_dtypes (bool): Use pandas extension data types

111

112

Returns:

113

pandas.DataFrame: Query results as DataFrame

114

"""

115

```

116

117

#### QueryApi Usage Examples

118

119

**Basic query execution:**

120

```python

121

from influxdb_client import InfluxDBClient

122

123

client = InfluxDBClient(url="http://localhost:8086", token="token", org="org")

124

query_api = client.query_api()

125

126

# Basic Flux query

127

query = '''

128

from(bucket: "sensors")

129

|> range(start: -1h)

130

|> filter(fn: (r) => r._measurement == "temperature")

131

|> filter(fn: (r) => r.location == "room1")

132

'''

133

134

# Get materialized results

135

tables = query_api.query(query)

136

for table in tables:

137

for record in table.records:

138

print(f"Time: {record.get_time()}, Value: {record.get_value()}")

139

```

140

141

**Streaming query results:**

142

```python

143

# Stream results for large datasets

144

query = '''

145

from(bucket: "large_dataset")

146

|> range(start: -24h)

147

|> filter(fn: (r) => r._measurement == "metrics")

148

'''

149

150

for record in query_api.query_stream(query):

151

# Process records one at a time without loading all into memory

152

process_record(record)

153

```

154

155

**Parameterized queries:**

156

```python

157

# Use parameters for dynamic queries

158

parameterized_query = '''

159

from(bucket: params.bucket_name)

160

|> range(start: params.start_time)

161

|> filter(fn: (r) => r._measurement == params.measurement)

162

|> filter(fn: (r) => r.location == params.location_filter)

163

'''

164

165

query_params = {

166

"bucket_name": "sensors",

167

"start_time": "-2h",

168

"measurement": "temperature",

169

"location_filter": "datacenter1"

170

}

171

172

results = query_api.query(parameterized_query, params=query_params)

173

```

174

175

**DataFrame integration:**

176

```python

177

import pandas as pd

178

179

# Get results as pandas DataFrame

180

query = '''

181

from(bucket: "analytics")

182

|> range(start: -1d)

183

|> filter(fn: (r) => r._measurement == "sales")

184

|> aggregateWindow(every: 1h, fn: sum)

185

'''

186

187

df = query_api.query_data_frame(

188

query,

189

data_frame_index=["_time", "store_id"],

190

use_extension_dtypes=True

191

)

192

193

# Now use standard pandas operations

194

monthly_avg = df.groupby(df.index.month).mean()

195

print(monthly_avg)

196

```

197

198

### QueryApiAsync

199

200

Asynchronous version of QueryApi for non-blocking query operations.

201

202

```python { .api }

203

class QueryApiAsync:

204

def __init__(

205

self,

206

influxdb_client,

207

query_options: QueryOptions = QueryOptions()

208

): ...

209

210

async def query(

211

self,

212

query: str,

213

org: str = None,

214

params: dict = None

215

) -> TableList:

216

"""

217

Asynchronously execute Flux query and return materialized results.

218

"""

219

220

async def query_stream(

221

self,

222

query: str,

223

org: str = None,

224

params: dict = None

225

) -> AsyncGenerator[FluxRecord]:

226

"""

227

Asynchronously execute Flux query and return streaming results.

228

"""

229

230

async def query_csv(

231

self,

232

query: str,

233

org: str = None,

234

dialect: Dialect = None,

235

params: dict = None

236

) -> CSVIterator:

237

"""

238

Asynchronously execute Flux query and return CSV results.

239

"""

240

241

async def query_raw(

242

self,

243

query: str,

244

org: str = None,

245

dialect: Dialect = None,

246

params: dict = None

247

) -> str:

248

"""

249

Asynchronously execute Flux query and return raw response.

250

"""

251

252

async def query_data_frame(

253

self,

254

query: str,

255

org: str = None,

256

data_frame_index: List[str] = None,

257

params: dict = None,

258

use_extension_dtypes: bool = False

259

):

260

"""

261

Asynchronously execute Flux query and return DataFrame.

262

"""

263

```

264

265

#### Async Query Usage Example

266

267

```python

268

import asyncio

269

from influxdb_client.client.influxdb_client_async import InfluxDBClientAsync

270

271

async def query_data():

272

async with InfluxDBClientAsync(url="http://localhost:8086", token="token") as client:

273

query_api = client.query_api()

274

275

query = '''

276

from(bucket: "sensors")

277

|> range(start: -1h)

278

|> filter(fn: (r) => r._measurement == "temperature")

279

'''

280

281

# Async materialized query

282

tables = await query_api.query(query)

283

for table in tables:

284

for record in table.records:

285

print(f"Value: {record.get_value()}")

286

287

# Async streaming query

288

async for record in query_api.query_stream(query):

289

print(f"Streaming value: {record.get_value()}")

290

291

asyncio.run(query_data())

292

```

293

294

### FluxRecord

295

296

Represents individual records in query results with methods to access time series data fields.

297

298

```python { .api }

299

class FluxRecord:

300

def __init__(self, table: int, values: dict = None): ...

301

302

def get_start(self) -> datetime:

303

"""

304

Get the start time of the record's time range.

305

306

Returns:

307

datetime: Start time

308

"""

309

310

def get_stop(self) -> datetime:

311

"""

312

Get the stop time of the record's time range.

313

314

Returns:

315

datetime: Stop time

316

"""

317

318

def get_time(self) -> datetime:

319

"""

320

Get the timestamp of the record.

321

322

Returns:

323

datetime: Record timestamp

324

"""

325

326

def get_value(self) -> Any:

327

"""

328

Get the value field of the record.

329

330

Returns:

331

Any: Record value (int, float, str, bool)

332

"""

333

334

def get_field(self) -> str:

335

"""

336

Get the field name of the record.

337

338

Returns:

339

str: Field name

340

"""

341

342

def get_measurement(self) -> str:

343

"""

344

Get the measurement name of the record.

345

346

Returns:

347

str: Measurement name

348

"""

349

350

def values(self) -> dict:

351

"""

352

Get all column values as dictionary.

353

354

Returns:

355

dict: All record values keyed by column name

356

"""

357

358

def __getitem__(self, key: str) -> Any:

359

"""

360

Get value by column name using dict-like access.

361

362

Parameters:

363

- key (str): Column name

364

365

Returns:

366

Any: Column value

367

"""

368

369

def __str__(self) -> str: ...

370

def __repr__(self) -> str: ...

371

```

372

373

#### FluxRecord Usage Examples

374

375

**Accessing record data:**

376

```python

377

for table in query_api.query(flux_query):

378

for record in table.records:

379

# Access standard time series fields

380

timestamp = record.get_time()

381

value = record.get_value()

382

field_name = record.get_field()

383

measurement = record.get_measurement()

384

385

# Access custom tags and fields

386

location = record["location"] # Tag value

387

sensor_id = record["sensor_id"] # Tag value

388

389

# Check if column exists

390

if "quality" in record:

391

quality = record["quality"]

392

393

# Get all values as dictionary

394

all_values = record.values()

395

print(f"Record: {all_values}")

396

```

397

398

**Processing different data types:**

399

```python

400

for record in query_api.query_stream(query):

401

value = record.get_value()

402

field = record.get_field()

403

404

# Handle different field types

405

if field == "temperature":

406

temperature = float(value)

407

print(f"Temperature: {temperature}°C")

408

elif field == "status":

409

status = str(value)

410

print(f"Status: {status}")

411

elif field == "count":

412

count = int(value)

413

print(f"Count: {count}")

414

elif field == "enabled":

415

enabled = bool(value)

416

print(f"Enabled: {enabled}")

417

```

418

419

### TableList

420

421

Container for multiple flux tables that extends Python list with additional utility methods.

422

423

```python { .api }

424

class TableList(list):

425

def __init__(self): ...

426

427

def to_json(self, indent: int = None) -> str:

428

"""

429

Convert all tables to JSON representation.

430

431

Parameters:

432

- indent (int, optional): JSON indentation for pretty printing

433

434

Returns:

435

str: JSON string representation

436

"""

437

438

def to_values(self, columns: List[str] = None) -> List[List[Any]]:

439

"""

440

Convert all table records to nested list of values.

441

442

Parameters:

443

- columns (List[str], optional): Specific columns to include

444

445

Returns:

446

List[List[Any]]: Nested list with all record values

447

"""

448

```

449

450

#### TableList Usage Examples

451

452

**Processing multiple tables:**

453

```python

454

tables = query_api.query('''

455

from(bucket: "sensors")

456

|> range(start: -1h)

457

|> filter(fn: (r) => r._measurement == "temperature")

458

|> group(columns: ["location"])

459

''')

460

461

# Iterate through all tables

462

for i, table in enumerate(tables):

463

print(f"Table {i}: {len(table.records)} records")

464

465

# Process records in each table

466

for record in table.records:

467

location = record["location"]

468

temp = record.get_value()

469

print(f" {location}: {temp}°C")

470

```

471

472

**Export to JSON:**

473

```python

474

# Convert results to JSON

475

json_output = tables.to_json(indent=2)

476

print(json_output)

477

478

# Save to file

479

with open("query_results.json", "w") as f:

480

f.write(json_output)

481

```

482

483

**Convert to values list:**

484

```python

485

# Get all values as nested list

486

all_values = tables.to_values()

487

print(f"Total records: {len(all_values)}")

488

489

# Get specific columns only

490

temp_values = tables.to_values(columns=["_time", "_value", "location"])

491

for time, value, location in temp_values:

492

print(f"{time}: {location} = {value}")

493

```

494

495

### CSVIterator

496

497

Iterator for processing CSV-formatted query results with support for custom dialects.

498

499

```python { .api }

500

class CSVIterator:

501

def __init__(self, response: HTTPResponse, dialect: Dialect = None): ...

502

503

def to_values(self) -> List[List[str]]:

504

"""

505

Convert CSV results to list of string lists.

506

507

Returns:

508

List[List[str]]: All CSV rows as nested string lists

509

"""

510

511

def __iter__(self) -> 'CSVIterator': ...

512

513

def __next__(self) -> List[str]:

514

"""

515

Get next CSV row as list of strings.

516

517

Returns:

518

List[str]: Next CSV row

519

"""

520

```

521

522

#### CSVIterator Usage Examples

523

524

**Processing CSV results:**

525

```python

526

from influxdb_client import Dialect

527

528

# Configure CSV dialect

529

csv_dialect = Dialect(

530

header=True,

531

delimiter=",",

532

comment_prefix="#",

533

annotations=["datatype", "group", "default"]

534

)

535

536

csv_results = query_api.query_csv(query, dialect=csv_dialect)

537

538

# Iterate through CSV rows

539

for row in csv_results:

540

# row is a list of string values

541

if len(row) >= 4:

542

timestamp = row[0]

543

measurement = row[1]

544

field = row[2]

545

value = row[3]

546

print(f"{timestamp}: {measurement}.{field} = {value}")

547

```

548

549

**Convert CSV to list:**

550

```python

551

csv_results = query_api.query_csv(query)

552

all_rows = csv_results.to_values()

553

554

# Process as regular list

555

headers = all_rows[0] if all_rows else []

556

data_rows = all_rows[1:] if len(all_rows) > 1 else []

557

558

print(f"Headers: {headers}")

559

for row in data_rows[:5]: # First 5 data rows

560

print(f"Data: {row}")

561

```

562

563

### QueryOptions

564

565

Configuration class for query behavior including profiling and custom callbacks.

566

567

```python { .api }

568

class QueryOptions:

569

def __init__(

570

self,

571

profilers: List[str] = None,

572

profiler_callback: Callable = None

573

):

574

"""

575

Configure query execution options.

576

577

Parameters:

578

- profilers (List[str]): List of profiler names to enable

579

- profiler_callback (Callable): Callback function for profiler results

580

581

Available profilers:

582

- "query": Query execution profiling

583

- "operator": Individual operator profiling

584

"""

585

```

586

587

#### QueryOptions Usage Examples

588

589

**Query profiling:**

590

```python

591

from influxdb_client import QueryOptions

592

593

def profiler_callback(profiler_name, profiler_result):

594

print(f"Profiler {profiler_name}: {profiler_result}")

595

596

# Configure query profiling

597

query_options = QueryOptions(

598

profilers=["query", "operator"],

599

profiler_callback=profiler_callback

600

)

601

602

# Use with QueryApi

603

query_api = client.query_api(query_options=query_options)

604

605

# Profiler results will be sent to callback during query execution

606

results = query_api.query('''

607

from(bucket: "performance_test")

608

|> range(start: -1h)

609

|> filter(fn: (r) => r._measurement == "cpu")

610

|> mean()

611

''')

612

```

613

614

### Dialect

615

616

Configuration class for CSV output formatting when using query_csv method.

617

618

```python { .api }

619

class Dialect:

620

def __init__(

621

self,

622

header: bool = True,

623

delimiter: str = ",",

624

comment_prefix: str = "#",

625

annotations: List[str] = None,

626

date_time_format: str = "RFC3339"

627

):

628

"""

629

Configure CSV output format.

630

631

Parameters:

632

- header (bool): Include column headers

633

- delimiter (str): CSV field delimiter

634

- comment_prefix (str): Prefix for comment lines

635

- annotations (List[str]): Metadata annotations to include

636

- date_time_format (str): DateTime format ("RFC3339" or "RFC3339Nano")

637

"""

638

```

639

640

#### Dialect Usage Example

641

642

```python

643

from influxdb_client import Dialect

644

645

# Custom CSV format

646

custom_dialect = Dialect(

647

header=True,

648

delimiter=";",

649

comment_prefix="//",

650

annotations=["datatype", "group"],

651

date_time_format="RFC3339Nano"

652

)

653

654

# Use with CSV query

655

csv_iterator = query_api.query_csv(query, dialect=custom_dialect)

656

```

657

658

## Types

659

660

```python { .api }

661

# Core result types

662

class FluxTable:

663

"""Represents a single result table from Flux query."""

664

columns: List[FluxColumn]

665

records: List[FluxRecord]

666

667

class FluxColumn:

668

"""Represents a column in a Flux table."""

669

index: int

670

label: str

671

data_type: str

672

group: bool

673

default_value: str

674

675

# Query parameter types

676

QueryParams = Dict[str, Any] # Parameters for parameterized queries

677

678

# Response types from various query methods

679

from typing import Generator, AsyncGenerator

680

from urllib3 import HTTPResponse

681

import pandas

682

683

QueryResult = TableList

684

QueryStreamResult = Generator[FluxRecord, None, None]

685

QueryAsyncStreamResult = AsyncGenerator[FluxRecord, None]

686

QueryCSVResult = CSVIterator

687

QueryRawResult = HTTPResponse

688

QueryDataFrameResult = pandas.DataFrame

689

690

# Exception types

691

class FluxQueryError(InfluxDBError):

692

"""Raised when Flux queries fail."""

693

pass

694

695

class FluxParseError(FluxQueryError):

696

"""Raised when Flux query parsing fails."""

697

pass

698

699

class FluxRuntimeError(FluxQueryError):

700

"""Raised when Flux query execution fails."""

701

pass

702

```