0
# Types and Schemas
1
2
Comprehensive type system for BigQuery Storage operations including data format specifications, schema definitions, session configuration, stream management, and error handling. All types are based on Protocol Buffers and provide strong typing for BigQuery Storage API interactions.
3
4
## Capabilities
5
6
### Data Format Types
7
8
Core data format types for serializing and deserializing BigQuery data in different formats.
9
10
```python { .api }
11
class DataFormat(enum.Enum):
12
"""Supported data serialization formats."""
13
DATA_FORMAT_UNSPECIFIED = 0
14
AVRO = 1 # Apache Avro format
15
ARROW = 2 # Apache Arrow format
16
PROTO = 3 # Protocol Buffer format
17
```
18
19
### Avro Format Types
20
21
Types for working with Apache Avro serialized data.
22
23
```python { .api }
24
class AvroSchema:
25
"""Avro schema definition for BigQuery data."""
26
schema: str # JSON schema string
27
28
class AvroRows:
29
"""Avro-encoded row data."""
30
serialized_binary_rows: bytes # Avro binary data
31
row_count: int # Number of rows encoded
32
33
class AvroSerializationOptions:
34
"""Options for Avro serialization."""
35
enable_display_name_attribute: bool # Use display names in schema
36
```
37
38
### Arrow Format Types
39
40
Types for working with Apache Arrow serialized data.
41
42
```python { .api }
43
class ArrowSchema:
44
"""Arrow schema definition for BigQuery data."""
45
serialized_schema: bytes # Serialized Arrow schema
46
47
class ArrowRecordBatch:
48
"""Arrow record batch data."""
49
serialized_record_batch: bytes # Serialized Arrow record batch
50
row_count: int # Number of rows in batch
51
52
class ArrowSerializationOptions:
53
"""Options for Arrow serialization."""
54
buffer_compression: ArrowSerializationOptions.CompressionCodec
55
56
class ArrowSerializationOptions.CompressionCodec(enum.Enum):
57
"""Arrow compression codecs."""
58
COMPRESSION_UNSPECIFIED = 0
59
LZ4_FRAME = 1 # LZ4 frame compression
60
ZSTD = 2 # Zstandard compression
61
```
62
63
### Protocol Buffer Types
64
65
Types for working with Protocol Buffer serialized data.
66
67
```python { .api }
68
class ProtoSchema:
69
"""Protocol Buffer schema definition."""
70
proto_descriptor: DescriptorProto # Protocol buffer descriptor
71
72
class ProtoRows:
73
"""Protocol Buffer encoded rows."""
74
serialized_rows: List[bytes] # List of serialized row messages
75
```
76
77
### Session and Stream Types
78
79
Types for configuring and managing read/write sessions and streams.
80
81
```python { .api }
82
class ReadSession:
83
"""Configuration and state for a BigQuery read session."""
84
name: str # Session resource name
85
table: str # Source table path
86
data_format: DataFormat # Output data format
87
read_options: ReadSession.TableReadOptions
88
streams: List[ReadStream] # Available read streams
89
estimated_total_bytes_scanned: int
90
estimated_row_count: int
91
avro_schema: AvroSchema # Schema for Avro format
92
arrow_schema: ArrowSchema # Schema for Arrow format
93
table_modifiers: ReadSession.TableModifiers
94
95
class ReadSession.TableReadOptions:
96
"""Options for reading table data."""
97
selected_fields: List[str] # Column names to read
98
row_restriction: str # SQL WHERE clause filter
99
arrow_serialization_options: ArrowSerializationOptions
100
avro_serialization_options: AvroSerializationOptions
101
sample_percentage: float # Percentage of data to sample
102
103
class ReadSession.TableModifiers:
104
"""Modifiers for table access."""
105
snapshot_time: Timestamp # Point-in-time snapshot
106
107
class ReadStream:
108
"""Individual read stream within a session."""
109
name: str # Stream resource name
110
111
class WriteStream:
112
"""Configuration and state for a BigQuery write stream."""
113
name: str # Stream resource name
114
type_: WriteStream.Type # Stream type
115
create_time: Timestamp # Creation timestamp
116
commit_time: Timestamp # Commit timestamp (if committed)
117
table_schema: TableSchema # Target table schema
118
state: WriteStream.State # Current stream state
119
location: str # Geographic location
120
121
class WriteStream.Type(enum.Enum):
122
"""Write stream types."""
123
TYPE_UNSPECIFIED = 0
124
COMMITTED = 1 # Default stream, auto-commits
125
PENDING = 2 # Pending stream, requires explicit commit
126
BUFFERED = 3 # Buffered stream for batch processing
127
128
class WriteStream.State(enum.Enum):
129
"""Write stream states."""
130
STATE_UNSPECIFIED = 0
131
CREATED = 1 # Stream created but not active
132
RUNNING = 2 # Stream accepting data
133
FINALIZED = 3 # Stream finalized, ready for commit
134
COMMITTED = 4 # Stream data committed to table
135
ABORTED = 5 # Stream aborted, data discarded
136
137
class WriteStream.WriteMode(enum.Enum):
138
"""Write stream modes."""
139
WRITE_MODE_UNSPECIFIED = 0
140
INSERT = 1 # Insert mode for appending rows
141
142
class WriteStreamView(enum.Enum):
143
"""Views for write stream information."""
144
WRITE_STREAM_VIEW_UNSPECIFIED = 0
145
BASIC = 1 # Basic stream information
146
FULL = 2 # Full stream details including schema
147
```
148
149
### Table Schema Types
150
151
Types for representing BigQuery table schemas and field definitions.
152
153
```python { .api }
154
class TableSchema:
155
"""BigQuery table schema definition."""
156
fields: List[TableFieldSchema] # Table field definitions
157
158
class TableFieldSchema:
159
"""Individual table field schema."""
160
name: str # Field name
161
type_: TableFieldSchema.Type # Field data type
162
mode: TableFieldSchema.Mode # Field mode (nullable, required, repeated)
163
fields: List[TableFieldSchema] # Nested field schemas (for RECORD type)
164
description: str # Field description
165
max_length: int # Maximum length for STRING/BYTES
166
precision: int # Precision for NUMERIC/BIGNUMERIC
167
scale: int # Scale for NUMERIC/BIGNUMERIC
168
default_value_expression: str # Default value expression
169
170
class TableFieldSchema.Type(enum.Enum):
171
"""BigQuery field data types."""
172
TYPE_UNSPECIFIED = 0
173
STRING = 1
174
INT64 = 2
175
DOUBLE = 3
176
NUMERIC = 4
177
BOOL = 5
178
TIMESTAMP = 6
179
DATE = 7
180
TIME = 8
181
DATETIME = 9
182
GEOGRAPHY = 10
183
RECORD = 11 # Nested record/struct
184
BYTES = 12
185
JSON = 13
186
BIGNUMERIC = 14
187
INTERVAL = 15
188
RANGE = 16
189
190
class TableFieldSchema.Mode(enum.Enum):
191
"""BigQuery field modes."""
192
MODE_UNSPECIFIED = 0
193
NULLABLE = 1 # Field can be null
194
REQUIRED = 2 # Field cannot be null
195
REPEATED = 3 # Field is an array
196
```
197
198
### Request and Response Types
199
200
Message types for BigQuery Storage API operations.
201
202
```python { .api }
203
class CreateReadSessionRequest:
204
"""Request to create a read session."""
205
parent: str # Project ID
206
read_session: ReadSession # Session configuration
207
max_stream_count: int # Maximum parallel streams
208
209
class ReadRowsRequest:
210
"""Request to read rows from a stream."""
211
read_stream: str # Stream name
212
offset: int # Starting offset
213
214
class ReadRowsResponse:
215
"""Response containing row data from a stream."""
216
avro_rows: AvroRows # Avro format data
217
arrow_record_batch: ArrowRecordBatch # Arrow format data
218
row_count: int # Number of rows in response
219
stats: StreamStats # Stream statistics
220
throttle_state: ThrottleState # Throttling information
221
222
class SplitReadStreamRequest:
223
"""Request to split a read stream."""
224
name: str # Stream to split
225
fraction: float # Split point (0.0 to 1.0)
226
227
class SplitReadStreamResponse:
228
"""Response with split stream information."""
229
primary_stream: ReadStream # First part of split
230
remainder_stream: ReadStream # Second part of split
231
232
class CreateWriteStreamRequest:
233
"""Request to create a write stream."""
234
parent: str # Table path
235
write_stream: WriteStream # Stream configuration
236
237
class AppendRowsRequest:
238
"""Request to append rows to a write stream."""
239
write_stream: str # Stream name
240
offset: int # Append offset
241
proto_rows: AppendRowsRequest.ProtoData # Protocol buffer data
242
arrow_rows: AppendRowsRequest.ArrowData # Arrow format data
243
trace_id: str # Request trace ID
244
245
class AppendRowsRequest.ProtoData:
246
"""Protocol buffer row data."""
247
writer_schema: ProtoSchema # Schema for data
248
serialized_rows: List[bytes] # Serialized row messages
249
250
class AppendRowsRequest.ArrowData:
251
"""Arrow format row data."""
252
writer_schema: ArrowSchema # Schema for data
253
serialized_record_batch: bytes # Serialized record batch
254
255
class AppendRowsResponse:
256
"""Response to append rows request."""
257
append_result: AppendRowsResponse.AppendResult # Success result
258
error: Status # Error information
259
updated_schema: TableSchema # Updated table schema
260
row_errors: List[RowError] # Individual row errors
261
262
class AppendRowsResponse.AppendResult:
263
"""Successful append result."""
264
offset: int # Offset of appended data
265
```
266
267
### Error and Status Types
268
269
Types for error handling and operation status reporting.
270
271
```python { .api }
272
class StorageError:
273
"""Storage operation error information."""
274
code: StorageError.StorageErrorCode # Error code
275
entity: str # Affected entity
276
error_message: str # Error description
277
278
class StorageError.StorageErrorCode(enum.Enum):
279
"""Storage error codes."""
280
STORAGE_ERROR_CODE_UNSPECIFIED = 0
281
TABLE_NOT_FOUND = 1 # Table does not exist
282
STREAM_ALREADY_COMMITTED = 2 # Stream already committed
283
STREAM_NOT_FOUND = 3 # Stream does not exist
284
INVALID_STREAM_TYPE = 4 # Invalid stream type for operation
285
INVALID_STREAM_STATE = 5 # Stream in wrong state
286
STREAM_FINALIZED = 6 # Stream already finalized
287
288
class RowError:
289
"""Error information for individual rows."""
290
index: int # Row index with error
291
code: RowError.RowErrorCode # Error code
292
message: str # Error message
293
294
class RowError.RowErrorCode(enum.Enum):
295
"""Row-level error codes."""
296
ROW_ERROR_CODE_UNSPECIFIED = 0
297
ROW_PARSE_ERROR = 1 # Row parsing error
298
UNKNOWN_ERROR = 2 # Unknown error
299
FIELDS_ERROR = 3 # Field validation error
300
301
class StreamStats:
302
"""Statistics for stream operations."""
303
progress: StreamStats.Progress # Progress information
304
305
class StreamStats.Progress:
306
"""Stream progress information."""
307
at_response_start: float # Progress at response start
308
at_response_end: float # Progress at response end
309
310
class ThrottleState:
311
"""Throttling state information."""
312
throttle_percent: int # Throttle percentage (0-100)
313
```
314
315
### Utility Types
316
317
Common utility types used across BigQuery Storage operations.
318
319
```python { .api }
320
class Timestamp:
321
"""Timestamp representation."""
322
seconds: int # Seconds since Unix epoch
323
nanos: int # Nanoseconds within second
324
325
def FromMilliseconds(self, millis: int):
326
"""Set timestamp from milliseconds."""
327
328
def ToMilliseconds(self) -> int:
329
"""Convert timestamp to milliseconds."""
330
331
class Status:
332
"""Operation status information."""
333
code: int # Status code
334
message: str # Status message
335
details: List[Any] # Additional details
336
337
class AppendRowsFuture:
338
"""Future object for tracking append operation results."""
339
def result(self, timeout: float = None) -> AppendRowsResponse:
340
"""
341
Get the append operation result.
342
343
Parameters:
344
- timeout: Maximum time to wait for result
345
346
Returns:
347
AppendRowsResponse with operation result
348
"""
349
350
def exception(self, timeout: float = None) -> Exception:
351
"""Get exception if operation failed."""
352
353
def done(self) -> bool:
354
"""Check if operation is complete."""
355
356
class StreamClosedError(Exception):
357
"""Exception raised when operations are attempted on closed streams."""
358
```
359
360
## Usage Examples
361
362
### Working with Schema Types
363
364
```python
365
from google.cloud.bigquery_storage import types
366
367
# Define table schema
368
schema = types.TableSchema(
369
fields=[
370
types.TableFieldSchema(
371
name="id",
372
type_=types.TableFieldSchema.Type.INT64,
373
mode=types.TableFieldSchema.Mode.REQUIRED
374
),
375
types.TableFieldSchema(
376
name="name",
377
type_=types.TableFieldSchema.Type.STRING,
378
mode=types.TableFieldSchema.Mode.NULLABLE,
379
max_length=100
380
),
381
types.TableFieldSchema(
382
name="scores",
383
type_=types.TableFieldSchema.Type.DOUBLE,
384
mode=types.TableFieldSchema.Mode.REPEATED
385
),
386
types.TableFieldSchema(
387
name="metadata",
388
type_=types.TableFieldSchema.Type.RECORD,
389
mode=types.TableFieldSchema.Mode.NULLABLE,
390
fields=[
391
types.TableFieldSchema(
392
name="created_at",
393
type_=types.TableFieldSchema.Type.TIMESTAMP,
394
mode=types.TableFieldSchema.Mode.REQUIRED
395
),
396
types.TableFieldSchema(
397
name="tags",
398
type_=types.TableFieldSchema.Type.STRING,
399
mode=types.TableFieldSchema.Mode.REPEATED
400
)
401
]
402
)
403
]
404
)
405
```
406
407
### Configuring Data Formats
408
409
```python
410
from google.cloud.bigquery_storage import types
411
412
# Arrow serialization with compression
413
arrow_options = types.ArrowSerializationOptions(
414
buffer_compression=types.ArrowSerializationOptions.CompressionCodec.ZSTD
415
)
416
417
# Avro serialization with display names
418
avro_options = types.AvroSerializationOptions(
419
enable_display_name_attribute=True
420
)
421
422
# Read session with format options
423
read_options = types.ReadSession.TableReadOptions(
424
selected_fields=["id", "name", "metadata.created_at"],
425
row_restriction='id > 1000 AND name IS NOT NULL',
426
arrow_serialization_options=arrow_options,
427
sample_percentage=10.0 # Sample 10% of data
428
)
429
430
requested_session = types.ReadSession(
431
table="projects/my-project/datasets/my_dataset/tables/my_table",
432
data_format=types.DataFormat.ARROW,
433
read_options=read_options
434
)
435
```
436
437
### Working with Write Stream Types
438
439
```python
440
from google.cloud.bigquery_storage import types
441
442
# Create pending write stream
443
write_stream = types.WriteStream(
444
type_=types.WriteStream.Type.PENDING
445
)
446
447
# Check stream state
448
if write_stream.state == types.WriteStream.State.RUNNING:
449
print("Stream is accepting data")
450
elif write_stream.state == types.WriteStream.State.FINALIZED:
451
print("Stream is ready for commit")
452
453
# Create append request with proto data
454
proto_data = types.AppendRowsRequest.ProtoData()
455
proto_data.serialized_rows = [serialized_row_1, serialized_row_2]
456
457
request = types.AppendRowsRequest(
458
write_stream=stream_name,
459
proto_rows=proto_data,
460
trace_id="my-trace-123" # For debugging
461
)
462
```
463
464
### Error Handling with Types
465
466
```python
467
from google.cloud.bigquery_storage import types
468
from google.cloud import bigquery_storage
469
470
try:
471
# Perform append operation
472
response = client.append_rows([request])
473
474
except Exception as e:
475
# Handle storage errors
476
if hasattr(e, 'details'):
477
for detail in e.details:
478
if isinstance(detail, types.StorageError):
479
if detail.code == types.StorageError.StorageErrorCode.TABLE_NOT_FOUND:
480
print(f"Table not found: {detail.entity}")
481
elif detail.code == types.StorageError.StorageErrorCode.STREAM_FINALIZED:
482
print(f"Stream already finalized: {detail.entity}")
483
484
# Check for row-level errors in response
485
for response in response_stream:
486
if response.row_errors:
487
for row_error in response.row_errors:
488
print(f"Row {row_error.index} error: {row_error.message}")
489
```
490
491
### Time-based Operations
492
493
```python
494
from google.cloud.bigquery_storage import types
495
import time
496
497
# Create timestamp for snapshot
498
snapshot_time = types.Timestamp()
499
current_millis = int(time.time() * 1000)
500
snapshot_time.FromMilliseconds(current_millis)
501
502
# Use in table modifiers
503
table_modifiers = types.ReadSession.TableModifiers(
504
snapshot_time=snapshot_time
505
)
506
507
read_session = types.ReadSession(
508
table=table_path,
509
data_format=types.DataFormat.AVRO,
510
table_modifiers=table_modifiers
511
)
512
```