or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdmetastore-services.mdreading-data.mdtypes-schemas.mdwriting-data.md

reading-data.mddocs/

0

# Reading Data

1

2

High-performance streaming reads from BigQuery tables using the BigQuery Storage API. Supports parallel processing, column selection, row filtering, and multiple data formats with direct conversion to pandas DataFrames and Apache Arrow.

3

4

## Capabilities

5

6

### BigQuery Read Client

7

8

Main client for reading data from BigQuery tables with streaming capabilities and format flexibility.

9

10

```python { .api }

11

class BigQueryReadClient:

12

def __init__(self, **kwargs):

13

"""

14

Initialize BigQuery Read Client.

15

16

Parameters:

17

- credentials: Google Cloud credentials

18

- project: Default project ID

19

- client_info: Client library information

20

"""

21

22

def create_read_session(

23

self,

24

parent: str,

25

read_session: ReadSession,

26

max_stream_count: int = None,

27

**kwargs

28

) -> ReadSession:

29

"""

30

Create a new read session for streaming data from BigQuery.

31

32

Parameters:

33

- parent: Project ID in format "projects/{project_id}"

34

- read_session: ReadSession configuration with table and options

35

- max_stream_count: Maximum number of parallel streams (optional)

36

37

Returns:

38

ReadSession with stream information and metadata

39

"""

40

41

def read_rows(

42

self,

43

name: str,

44

offset: int = 0,

45

**kwargs

46

) -> ReadRowsStream:

47

"""

48

Read rows from a specific stream.

49

50

Parameters:

51

- name: Stream name from ReadSession.streams[].name

52

- offset: Starting offset for reading (optional)

53

54

Returns:

55

ReadRowsStream iterator for processing messages

56

"""

57

58

def split_read_stream(

59

self,

60

name: str,

61

fraction: float = None,

62

**kwargs

63

) -> SplitReadStreamResponse:

64

"""

65

Split a read stream into two streams for parallel processing.

66

67

Parameters:

68

- name: Stream name to split

69

- fraction: Split point as fraction (0.0 to 1.0)

70

71

Returns:

72

SplitReadStreamResponse with primary and remainder streams

73

"""

74

75

### BigQuery Read Async Client

76

77

Async version of BigQueryReadClient with async methods for non-blocking operations.

78

79

```python { .api }

80

class BigQueryReadAsyncClient:

81

def __init__(self, **kwargs):

82

"""

83

Initialize BigQuery Read Async Client.

84

85

Parameters:

86

- credentials: Google Cloud credentials

87

- project: Default project ID

88

- client_info: Client library information

89

"""

90

91

async def create_read_session(

92

self,

93

parent: str,

94

read_session: ReadSession,

95

max_stream_count: int = None,

96

**kwargs

97

) -> ReadSession:

98

"""

99

Create a new read session for streaming data from BigQuery (async).

100

101

Parameters:

102

- parent: Project ID in format "projects/{project_id}"

103

- read_session: ReadSession configuration with table and options

104

- max_stream_count: Maximum number of parallel streams (optional)

105

106

Returns:

107

ReadSession with stream information and metadata

108

"""

109

110

def read_rows(

111

self,

112

name: str,

113

offset: int = 0,

114

**kwargs

115

) -> ReadRowsStream:

116

"""

117

Read rows from a specific stream (sync method on async client).

118

119

Parameters:

120

- name: Stream name from ReadSession.streams[].name

121

- offset: Starting offset for reading (optional)

122

123

Returns:

124

ReadRowsStream iterator for processing messages

125

"""

126

127

async def split_read_stream(

128

self,

129

name: str,

130

fraction: float = None,

131

**kwargs

132

) -> SplitReadStreamResponse:

133

"""

134

Split a read stream into two streams for parallel processing (async).

135

136

Parameters:

137

- name: Stream name to split

138

- fraction: Split point as fraction (0.0 to 1.0)

139

140

Returns:

141

SplitReadStreamResponse with primary and remainder streams

142

"""

143

```

144

145

### Read Rows Stream

146

147

Helper class that wraps read stream responses and provides convenient data parsing methods.

148

149

```python { .api }

150

class ReadRowsStream:

151

def __iter__(self) -> Iterator[ReadRowsResponse]:

152

"""Iterate over ReadRowsResponse messages."""

153

154

def rows(self, read_session: ReadSession = None) -> Iterator[dict]:

155

"""

156

Parse stream messages into row dictionaries.

157

158

Parameters:

159

- read_session: ReadSession for schema information (required for Avro)

160

161

Returns:

162

Iterator of row dictionaries

163

164

Note: Requires fastavro for Avro format support

165

"""

166

167

def to_arrow(self, read_session: ReadSession = None):

168

"""

169

Convert stream to Apache Arrow format.

170

171

Parameters:

172

- read_session: ReadSession for schema information

173

174

Returns:

175

Apache Arrow Table

176

177

Note: Requires pyarrow for Arrow format support

178

"""

179

180

def to_dataframe(

181

self,

182

read_session: ReadSession = None,

183

dtypes: dict = None

184

):

185

"""

186

Convert stream to pandas DataFrame.

187

188

Parameters:

189

- read_session: ReadSession for schema information

190

- dtypes: Column data type specifications

191

192

Returns:

193

pandas DataFrame

194

195

Note: Requires pandas for DataFrame support

196

"""

197

```

198

199

### Path Helper Methods

200

201

Utilities for constructing and parsing BigQuery resource paths.

202

203

```python { .api }

204

class BigQueryReadClient:

205

@staticmethod

206

def read_session_path(project: str, location: str, session: str) -> str:

207

"""Construct read session resource path."""

208

209

@staticmethod

210

def parse_read_session_path(path: str) -> dict:

211

"""Parse read session path into components."""

212

213

@staticmethod

214

def read_stream_path(

215

project: str,

216

location: str,

217

session: str,

218

stream: str

219

) -> str:

220

"""Construct read stream resource path."""

221

222

@staticmethod

223

def parse_read_stream_path(path: str) -> dict:

224

"""Parse read stream path into components."""

225

226

@staticmethod

227

def table_path(project: str, dataset: str, table: str) -> str:

228

"""Construct BigQuery table resource path."""

229

230

@staticmethod

231

def parse_table_path(path: str) -> dict:

232

"""Parse table path into project, dataset, table components."""

233

```

234

235

## Usage Examples

236

237

### Basic Read Session

238

239

```python

240

from google.cloud.bigquery_storage import BigQueryReadClient, types

241

242

# Create client

243

client = BigQueryReadClient()

244

245

# Configure table and session

246

table = "projects/bigquery-public-data/datasets/usa_names/tables/usa_1910_current"

247

requested_session = types.ReadSession(

248

table=table,

249

data_format=types.DataFormat.AVRO

250

)

251

252

# Create session with single stream

253

session = client.create_read_session(

254

parent="projects/your-project",

255

read_session=requested_session,

256

max_stream_count=1

257

)

258

259

# Read data

260

reader = client.read_rows(session.streams[0].name)

261

for row in reader.rows(session):

262

print(f"Name: {row['name']}, State: {row['state']}")

263

```

264

265

### Column Selection and Filtering

266

267

```python

268

from google.cloud.bigquery_storage import BigQueryReadClient, types

269

270

client = BigQueryReadClient()

271

table = "projects/your-project/datasets/your_dataset/tables/your_table"

272

273

# Configure read options

274

read_options = types.ReadSession.TableReadOptions(

275

selected_fields=["name", "age", "city"],

276

row_restriction='age > 18 AND city = "New York"'

277

)

278

279

requested_session = types.ReadSession(

280

table=table,

281

data_format=types.DataFormat.ARROW,

282

read_options=read_options

283

)

284

285

session = client.create_read_session(

286

parent="projects/your-project",

287

read_session=requested_session

288

)

289

290

# Process all streams in parallel

291

for stream in session.streams:

292

reader = client.read_rows(stream.name)

293

for row in reader.rows(session):

294

print(row)

295

```

296

297

### Convert to DataFrame

298

299

```python

300

import pandas as pd

301

from google.cloud.bigquery_storage import BigQueryReadClient, types

302

303

client = BigQueryReadClient()

304

table = "projects/bigquery-public-data/datasets/new_york_trees/tables/tree_species"

305

306

requested_session = types.ReadSession(

307

table=table,

308

data_format=types.DataFormat.ARROW,

309

read_options=types.ReadSession.TableReadOptions(

310

selected_fields=["species_common_name", "fall_color"]

311

)

312

)

313

314

session = client.create_read_session(

315

parent="projects/your-project",

316

read_session=requested_session,

317

max_stream_count=1

318

)

319

320

# Read into pandas DataFrame using to_dataframe method

321

reader = client.read_rows(session.streams[0].name)

322

dataframe = reader.to_dataframe(session)

323

324

print(dataframe.head())

325

```

326

327

### Parallel Stream Processing

328

329

```python

330

import concurrent.futures

331

from google.cloud.bigquery_storage import BigQueryReadClient, types

332

333

def process_stream(client, stream_name, session):

334

"""Process a single stream."""

335

reader = client.read_rows(stream_name)

336

rows = []

337

for row in reader.rows(session):

338

rows.append(row)

339

return rows

340

341

client = BigQueryReadClient()

342

table = "projects/your-project/datasets/large_dataset/tables/big_table"

343

344

requested_session = types.ReadSession(

345

table=table,

346

data_format=types.DataFormat.AVRO

347

)

348

349

session = client.create_read_session(

350

parent="projects/your-project",

351

read_session=requested_session,

352

max_stream_count=4 # Request multiple streams

353

)

354

355

# Process streams in parallel

356

with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:

357

futures = []

358

for stream in session.streams:

359

future = executor.submit(process_stream, client, stream.name, session)

360

futures.append(future)

361

362

# Collect results

363

all_rows = []

364

for future in concurrent.futures.as_completed(futures):

365

stream_rows = future.result()

366

all_rows.extend(stream_rows)

367

368

print(f"Processed {len(all_rows)} total rows")

369

```

370

371

### Stream Splitting

372

373

```python

374

from google.cloud.bigquery_storage import BigQueryReadClient, types

375

376

client = BigQueryReadClient()

377

# ... create session with single stream ...

378

379

original_stream = session.streams[0]

380

381

# Split stream at 50% mark

382

split_response = client.split_read_stream(

383

name=original_stream.name,

384

fraction=0.5

385

)

386

387

# Process both streams

388

if split_response.primary_stream:

389

reader1 = client.read_rows(split_response.primary_stream.name)

390

# Process first half...

391

392

if split_response.remainder_stream:

393

reader2 = client.read_rows(split_response.remainder_stream.name)

394

# Process second half...

395

```

396

397

## Error Handling

398

399

```python

400

from google.cloud.bigquery_storage import BigQueryReadClient

401

from google.api_core import exceptions

402

403

client = BigQueryReadClient()

404

405

try:

406

session = client.create_read_session(

407

parent="projects/your-project",

408

read_session=requested_session

409

)

410

411

reader = client.read_rows(session.streams[0].name)

412

for row in reader.rows(session):

413

# Process row

414

pass

415

416

except exceptions.NotFound:

417

print("Table not found")

418

except exceptions.PermissionDenied:

419

print("Access denied to table or project")

420

except exceptions.ResourceExhausted:

421

print("Quota exceeded")

422

except Exception as e:

423

print(f"Unexpected error: {e}")

424

```

425

426

## Types

427

428

### ReadSession

429

430

```python { .api }

431

class ReadSession:

432

name: str

433

table: str

434

data_format: DataFormat

435

read_options: TableReadOptions

436

streams: List[ReadStream]

437

estimated_total_bytes_scanned: int

438

estimated_row_count: int

439

avro_schema: AvroSchema

440

arrow_schema: ArrowSchema

441

table_modifiers: ReadSession.TableModifiers

442

443

class ReadSession.TableReadOptions:

444

selected_fields: List[str]

445

row_restriction: str

446

arrow_serialization_options: ArrowSerializationOptions

447

avro_serialization_options: AvroSerializationOptions

448

449

class ReadSession.TableModifiers:

450

snapshot_time: Timestamp

451

```

452

453

### ReadStream

454

455

```python { .api }

456

class ReadStream:

457

name: str

458

```

459

460

### Request/Response Types

461

462

```python { .api }

463

class CreateReadSessionRequest:

464

parent: str

465

read_session: ReadSession

466

max_stream_count: int

467

468

class ReadRowsRequest:

469

read_stream: str

470

offset: int

471

472

class ReadRowsResponse:

473

avro_rows: AvroRows

474

arrow_record_batch: ArrowRecordBatch

475

row_count: int

476

stats: StreamStats

477

throttle_state: ThrottleState

478

479

class SplitReadStreamRequest:

480

name: str

481

fraction: float

482

483

class SplitReadStreamResponse:

484

primary_stream: ReadStream

485

remainder_stream: ReadStream

486

```