0
# Reading Data
1
2
High-performance streaming reads from BigQuery tables using the BigQuery Storage API. Supports parallel processing, column selection, row filtering, and multiple data formats with direct conversion to pandas DataFrames and Apache Arrow.
3
4
## Capabilities
5
6
### BigQuery Read Client
7
8
Main client for reading data from BigQuery tables with streaming capabilities and format flexibility.
9
10
```python { .api }
11
class BigQueryReadClient:
12
def __init__(self, **kwargs):
13
"""
14
Initialize BigQuery Read Client.
15
16
Parameters:
17
- credentials: Google Cloud credentials
18
- project: Default project ID
19
- client_info: Client library information
20
"""
21
22
def create_read_session(
23
self,
24
parent: str,
25
read_session: ReadSession,
26
max_stream_count: int = None,
27
**kwargs
28
) -> ReadSession:
29
"""
30
Create a new read session for streaming data from BigQuery.
31
32
Parameters:
33
- parent: Project ID in format "projects/{project_id}"
34
- read_session: ReadSession configuration with table and options
35
- max_stream_count: Maximum number of parallel streams (optional)
36
37
Returns:
38
ReadSession with stream information and metadata
39
"""
40
41
def read_rows(
42
self,
43
name: str,
44
offset: int = 0,
45
**kwargs
46
) -> ReadRowsStream:
47
"""
48
Read rows from a specific stream.
49
50
Parameters:
51
- name: Stream name from ReadSession.streams[].name
52
- offset: Starting offset for reading (optional)
53
54
Returns:
55
ReadRowsStream iterator for processing messages
56
"""
57
58
def split_read_stream(
59
self,
60
name: str,
61
fraction: float = None,
62
**kwargs
63
) -> SplitReadStreamResponse:
64
"""
65
Split a read stream into two streams for parallel processing.
66
67
Parameters:
68
- name: Stream name to split
69
- fraction: Split point as fraction (0.0 to 1.0)
70
71
Returns:
72
SplitReadStreamResponse with primary and remainder streams
73
"""
74
75
### BigQuery Read Async Client
76
77
Async version of BigQueryReadClient with async methods for non-blocking operations.
78
79
```python { .api }
80
class BigQueryReadAsyncClient:
81
def __init__(self, **kwargs):
82
"""
83
Initialize BigQuery Read Async Client.
84
85
Parameters:
86
- credentials: Google Cloud credentials
87
- project: Default project ID
88
- client_info: Client library information
89
"""
90
91
async def create_read_session(
92
self,
93
parent: str,
94
read_session: ReadSession,
95
max_stream_count: int = None,
96
**kwargs
97
) -> ReadSession:
98
"""
99
Create a new read session for streaming data from BigQuery (async).
100
101
Parameters:
102
- parent: Project ID in format "projects/{project_id}"
103
- read_session: ReadSession configuration with table and options
104
- max_stream_count: Maximum number of parallel streams (optional)
105
106
Returns:
107
ReadSession with stream information and metadata
108
"""
109
110
def read_rows(
111
self,
112
name: str,
113
offset: int = 0,
114
**kwargs
115
) -> ReadRowsStream:
116
"""
117
Read rows from a specific stream (sync method on async client).
118
119
Parameters:
120
- name: Stream name from ReadSession.streams[].name
121
- offset: Starting offset for reading (optional)
122
123
Returns:
124
ReadRowsStream iterator for processing messages
125
"""
126
127
async def split_read_stream(
128
self,
129
name: str,
130
fraction: float = None,
131
**kwargs
132
) -> SplitReadStreamResponse:
133
"""
134
Split a read stream into two streams for parallel processing (async).
135
136
Parameters:
137
- name: Stream name to split
138
- fraction: Split point as fraction (0.0 to 1.0)
139
140
Returns:
141
SplitReadStreamResponse with primary and remainder streams
142
"""
143
```
144
145
### Read Rows Stream
146
147
Helper class that wraps read stream responses and provides convenient data parsing methods.
148
149
```python { .api }
150
class ReadRowsStream:
151
def __iter__(self) -> Iterator[ReadRowsResponse]:
152
"""Iterate over ReadRowsResponse messages."""
153
154
def rows(self, read_session: ReadSession = None) -> Iterator[dict]:
155
"""
156
Parse stream messages into row dictionaries.
157
158
Parameters:
159
- read_session: ReadSession for schema information (required for Avro)
160
161
Returns:
162
Iterator of row dictionaries
163
164
Note: Requires fastavro for Avro format support
165
"""
166
167
def to_arrow(self, read_session: ReadSession = None):
168
"""
169
Convert stream to Apache Arrow format.
170
171
Parameters:
172
- read_session: ReadSession for schema information
173
174
Returns:
175
Apache Arrow Table
176
177
Note: Requires pyarrow for Arrow format support
178
"""
179
180
def to_dataframe(
181
self,
182
read_session: ReadSession = None,
183
dtypes: dict = None
184
):
185
"""
186
Convert stream to pandas DataFrame.
187
188
Parameters:
189
- read_session: ReadSession for schema information
190
- dtypes: Column data type specifications
191
192
Returns:
193
pandas DataFrame
194
195
Note: Requires pandas for DataFrame support
196
"""
197
```
198
199
### Path Helper Methods
200
201
Utilities for constructing and parsing BigQuery resource paths.
202
203
```python { .api }
204
class BigQueryReadClient:
205
@staticmethod
206
def read_session_path(project: str, location: str, session: str) -> str:
207
"""Construct read session resource path."""
208
209
@staticmethod
210
def parse_read_session_path(path: str) -> dict:
211
"""Parse read session path into components."""
212
213
@staticmethod
214
def read_stream_path(
215
project: str,
216
location: str,
217
session: str,
218
stream: str
219
) -> str:
220
"""Construct read stream resource path."""
221
222
@staticmethod
223
def parse_read_stream_path(path: str) -> dict:
224
"""Parse read stream path into components."""
225
226
@staticmethod
227
def table_path(project: str, dataset: str, table: str) -> str:
228
"""Construct BigQuery table resource path."""
229
230
@staticmethod
231
def parse_table_path(path: str) -> dict:
232
"""Parse table path into project, dataset, table components."""
233
```
234
235
## Usage Examples
236
237
### Basic Read Session
238
239
```python
240
from google.cloud.bigquery_storage import BigQueryReadClient, types
241
242
# Create client
243
client = BigQueryReadClient()
244
245
# Configure table and session
246
table = "projects/bigquery-public-data/datasets/usa_names/tables/usa_1910_current"
247
requested_session = types.ReadSession(
248
table=table,
249
data_format=types.DataFormat.AVRO
250
)
251
252
# Create session with single stream
253
session = client.create_read_session(
254
parent="projects/your-project",
255
read_session=requested_session,
256
max_stream_count=1
257
)
258
259
# Read data
260
reader = client.read_rows(session.streams[0].name)
261
for row in reader.rows(session):
262
print(f"Name: {row['name']}, State: {row['state']}")
263
```
264
265
### Column Selection and Filtering
266
267
```python
268
from google.cloud.bigquery_storage import BigQueryReadClient, types
269
270
client = BigQueryReadClient()
271
table = "projects/your-project/datasets/your_dataset/tables/your_table"
272
273
# Configure read options
274
read_options = types.ReadSession.TableReadOptions(
275
selected_fields=["name", "age", "city"],
276
row_restriction='age > 18 AND city = "New York"'
277
)
278
279
requested_session = types.ReadSession(
280
table=table,
281
data_format=types.DataFormat.ARROW,
282
read_options=read_options
283
)
284
285
session = client.create_read_session(
286
parent="projects/your-project",
287
read_session=requested_session
288
)
289
290
# Process all streams in parallel
291
for stream in session.streams:
292
reader = client.read_rows(stream.name)
293
for row in reader.rows(session):
294
print(row)
295
```
296
297
### Convert to DataFrame
298
299
```python
300
import pandas as pd
301
from google.cloud.bigquery_storage import BigQueryReadClient, types
302
303
client = BigQueryReadClient()
304
table = "projects/bigquery-public-data/datasets/new_york_trees/tables/tree_species"
305
306
requested_session = types.ReadSession(
307
table=table,
308
data_format=types.DataFormat.ARROW,
309
read_options=types.ReadSession.TableReadOptions(
310
selected_fields=["species_common_name", "fall_color"]
311
)
312
)
313
314
session = client.create_read_session(
315
parent="projects/your-project",
316
read_session=requested_session,
317
max_stream_count=1
318
)
319
320
# Read into pandas DataFrame using to_dataframe method
321
reader = client.read_rows(session.streams[0].name)
322
dataframe = reader.to_dataframe(session)
323
324
print(dataframe.head())
325
```
326
327
### Parallel Stream Processing
328
329
```python
330
import concurrent.futures
331
from google.cloud.bigquery_storage import BigQueryReadClient, types
332
333
def process_stream(client, stream_name, session):
334
"""Process a single stream."""
335
reader = client.read_rows(stream_name)
336
rows = []
337
for row in reader.rows(session):
338
rows.append(row)
339
return rows
340
341
client = BigQueryReadClient()
342
table = "projects/your-project/datasets/large_dataset/tables/big_table"
343
344
requested_session = types.ReadSession(
345
table=table,
346
data_format=types.DataFormat.AVRO
347
)
348
349
session = client.create_read_session(
350
parent="projects/your-project",
351
read_session=requested_session,
352
max_stream_count=4 # Request multiple streams
353
)
354
355
# Process streams in parallel
356
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
357
futures = []
358
for stream in session.streams:
359
future = executor.submit(process_stream, client, stream.name, session)
360
futures.append(future)
361
362
# Collect results
363
all_rows = []
364
for future in concurrent.futures.as_completed(futures):
365
stream_rows = future.result()
366
all_rows.extend(stream_rows)
367
368
print(f"Processed {len(all_rows)} total rows")
369
```
370
371
### Stream Splitting
372
373
```python
374
from google.cloud.bigquery_storage import BigQueryReadClient, types
375
376
client = BigQueryReadClient()
377
# ... create session with single stream ...
378
379
original_stream = session.streams[0]
380
381
# Split stream at 50% mark
382
split_response = client.split_read_stream(
383
name=original_stream.name,
384
fraction=0.5
385
)
386
387
# Process both streams
388
if split_response.primary_stream:
389
reader1 = client.read_rows(split_response.primary_stream.name)
390
# Process first half...
391
392
if split_response.remainder_stream:
393
reader2 = client.read_rows(split_response.remainder_stream.name)
394
# Process second half...
395
```
396
397
## Error Handling
398
399
```python
400
from google.cloud.bigquery_storage import BigQueryReadClient
401
from google.api_core import exceptions
402
403
client = BigQueryReadClient()
404
405
try:
406
session = client.create_read_session(
407
parent="projects/your-project",
408
read_session=requested_session
409
)
410
411
reader = client.read_rows(session.streams[0].name)
412
for row in reader.rows(session):
413
# Process row
414
pass
415
416
except exceptions.NotFound:
417
print("Table not found")
418
except exceptions.PermissionDenied:
419
print("Access denied to table or project")
420
except exceptions.ResourceExhausted:
421
print("Quota exceeded")
422
except Exception as e:
423
print(f"Unexpected error: {e}")
424
```
425
426
## Types
427
428
### ReadSession
429
430
```python { .api }
431
class ReadSession:
432
name: str
433
table: str
434
data_format: DataFormat
435
read_options: TableReadOptions
436
streams: List[ReadStream]
437
estimated_total_bytes_scanned: int
438
estimated_row_count: int
439
avro_schema: AvroSchema
440
arrow_schema: ArrowSchema
441
table_modifiers: ReadSession.TableModifiers
442
443
class ReadSession.TableReadOptions:
444
selected_fields: List[str]
445
row_restriction: str
446
arrow_serialization_options: ArrowSerializationOptions
447
avro_serialization_options: AvroSerializationOptions
448
449
class ReadSession.TableModifiers:
450
snapshot_time: Timestamp
451
```
452
453
### ReadStream
454
455
```python { .api }
456
class ReadStream:
457
name: str
458
```
459
460
### Request/Response Types
461
462
```python { .api }
463
class CreateReadSessionRequest:
464
parent: str
465
read_session: ReadSession
466
max_stream_count: int
467
468
class ReadRowsRequest:
469
read_stream: str
470
offset: int
471
472
class ReadRowsResponse:
473
avro_rows: AvroRows
474
arrow_record_batch: ArrowRecordBatch
475
row_count: int
476
stats: StreamStats
477
throttle_state: ThrottleState
478
479
class SplitReadStreamRequest:
480
name: str
481
fraction: float
482
483
class SplitReadStreamResponse:
484
primary_stream: ReadStream
485
remainder_stream: ReadStream
486
```