0
# Writing Data
1
2
Streaming write operations to BigQuery tables using the BigQuery Storage API. Supports transactional semantics with multiple write stream types, batch commit operations, and multiple data formats including Protocol Buffers, Avro, and Arrow.
3
4
## Capabilities
5
6
### BigQuery Write Client
7
8
Main client for writing data to BigQuery tables with streaming capabilities and transactional guarantees.
9
10
```python { .api }
11
class BigQueryWriteClient:
12
def __init__(self, **kwargs):
13
"""
14
Initialize BigQuery Write Client.
15
16
Parameters:
17
- credentials: Google Cloud credentials
18
- project: Default project ID
19
- client_info: Client library information
20
"""
21
22
def create_write_stream(
23
self,
24
parent: str,
25
write_stream: WriteStream,
26
**kwargs
27
) -> WriteStream:
28
"""
29
Create a new write stream for appending data to BigQuery.
30
31
Parameters:
32
- parent: Table path in format "projects/{project}/datasets/{dataset}/tables/{table}"
33
- write_stream: WriteStream configuration with type and options
34
35
Returns:
36
WriteStream with stream name and metadata
37
"""
38
39
def append_rows(
40
self,
41
requests: Iterator[AppendRowsRequest],
42
**kwargs
43
) -> Iterator[AppendRowsResponse]:
44
"""
45
Append rows to a write stream (bidirectional streaming RPC).
46
47
Parameters:
48
- requests: Iterator of AppendRowsRequest messages with serialized data
49
50
Returns:
51
Iterator of AppendRowsResponse messages with append results
52
"""
53
54
def get_write_stream(self, name: str, **kwargs) -> WriteStream:
55
"""
56
Get write stream information and current state.
57
58
Parameters:
59
- name: Write stream name
60
61
Returns:
62
WriteStream with current state and metadata
63
"""
64
65
def finalize_write_stream(
66
self,
67
name: str,
68
**kwargs
69
) -> FinalizeWriteStreamResponse:
70
"""
71
Finalize a write stream to prepare it for commit.
72
73
Parameters:
74
- name: Write stream name
75
76
Returns:
77
FinalizeWriteStreamResponse with row count and state
78
"""
79
80
def batch_commit_write_streams(
81
self,
82
parent: str,
83
write_streams: List[str],
84
**kwargs
85
) -> BatchCommitWriteStreamsResponse:
86
"""
87
Atomically commit multiple write streams.
88
89
Parameters:
90
- parent: Table path
91
- write_streams: List of write stream names to commit
92
93
Returns:
94
BatchCommitWriteStreamsResponse with commit timestamp and errors
95
"""
96
97
def flush_rows(
98
self,
99
write_stream: str,
100
offset: int = None,
101
**kwargs
102
) -> FlushRowsResponse:
103
"""
104
Flush buffered rows in a write stream.
105
106
Parameters:
107
- write_stream: Write stream name
108
- offset: Offset to flush up to (optional)
109
110
Returns:
111
FlushRowsResponse with flush offset
112
"""
113
```
114
115
### BigQuery Write Async Client
116
117
Async version of BigQueryWriteClient with same methods using async/await pattern.
118
119
```python { .api }
120
class BigQueryWriteAsyncClient:
121
async def create_write_stream(
122
self,
123
parent: str,
124
write_stream: WriteStream,
125
**kwargs
126
) -> WriteStream: ...
127
128
async def append_rows(
129
self,
130
requests: AsyncIterator[AppendRowsRequest],
131
**kwargs
132
) -> AsyncIterator[AppendRowsResponse]: ...
133
134
async def get_write_stream(self, name: str, **kwargs) -> WriteStream: ...
135
136
async def finalize_write_stream(
137
self,
138
name: str,
139
**kwargs
140
) -> FinalizeWriteStreamResponse: ...
141
142
async def batch_commit_write_streams(
143
self,
144
parent: str,
145
write_streams: List[str],
146
**kwargs
147
) -> BatchCommitWriteStreamsResponse: ...
148
149
async def flush_rows(
150
self,
151
write_stream: str,
152
offset: int = None,
153
**kwargs
154
) -> FlushRowsResponse: ...
155
```
156
157
### Append Rows Stream
158
159
Helper class that wraps write stream operations and provides convenient data appending methods.
160
161
```python { .api }
162
class AppendRowsStream:
163
def send(self, request: AppendRowsRequest) -> AppendRowsFuture:
164
"""
165
Send append request and get future for response.
166
167
Parameters:
168
- request: AppendRowsRequest with serialized row data
169
170
Returns:
171
AppendRowsFuture for tracking append result
172
"""
173
174
def close(self, reason: str = None):
175
"""
176
Close the write stream.
177
178
Parameters:
179
- reason: Optional reason for closing
180
"""
181
182
def is_active(self) -> bool:
183
"""Check if the write stream is still active."""
184
185
def add_close_callback(self, callback: Callable):
186
"""
187
Add callback to be called when stream closes.
188
189
Parameters:
190
- callback: Function to call on stream close
191
"""
192
```
193
194
### Path Helper Methods
195
196
Utilities for constructing and parsing BigQuery resource paths.
197
198
```python { .api }
199
class BigQueryWriteClient:
200
@staticmethod
201
def table_path(project: str, dataset: str, table: str) -> str:
202
"""Construct BigQuery table resource path."""
203
204
@staticmethod
205
def parse_table_path(path: str) -> dict:
206
"""Parse table path into project, dataset, table components."""
207
208
@staticmethod
209
def write_stream_path(
210
project: str,
211
dataset: str,
212
table: str,
213
stream: str
214
) -> str:
215
"""Construct write stream resource path."""
216
217
@staticmethod
218
def parse_write_stream_path(path: str) -> dict:
219
"""Parse write stream path into components."""
220
```
221
222
## Usage Examples
223
224
### Basic Write Stream (Pending Mode)
225
226
```python
227
from google.cloud import bigquery_storage_v1
228
from google.cloud.bigquery_storage_v1 import types
229
230
# Create client
231
write_client = bigquery_storage_v1.BigQueryWriteClient()
232
233
# Create write stream
234
parent = write_client.table_path("your-project", "your_dataset", "your_table")
235
write_stream = types.WriteStream(type_=types.WriteStream.Type.PENDING)
236
stream = write_client.create_write_stream(parent=parent, write_stream=write_stream)
237
238
# Prepare append request with protocol buffer data
239
request = types.AppendRowsRequest()
240
request.write_stream = stream.name
241
242
# Add serialized row data (requires protocol buffer schema)
243
proto_data = types.AppendRowsRequest.ProtoData()
244
proto_data.serialized_rows = [serialized_row_data] # Your serialized data
245
request.proto_rows = proto_data
246
247
# Append rows
248
response_stream = write_client.append_rows([request])
249
for response in response_stream:
250
if response.HasField('error'):
251
print(f"Error: {response.error}")
252
else:
253
print(f"Appended {len(response.append_result.offset)} rows")
254
255
# Finalize and commit
256
write_client.finalize_write_stream(name=stream.name)
257
commit_response = write_client.batch_commit_write_streams(
258
parent=parent,
259
write_streams=[stream.name]
260
)
261
print(f"Committed at: {commit_response.commit_time}")
262
```
263
264
### Default Stream (Immediate Commit)
265
266
```python
267
from google.cloud import bigquery_storage_v1
268
from google.cloud.bigquery_storage_v1 import types
269
270
write_client = bigquery_storage_v1.BigQueryWriteClient()
271
parent = write_client.table_path("your-project", "your_dataset", "your_table")
272
273
# Create default stream (auto-commits)
274
write_stream = types.WriteStream(type_=types.WriteStream.Type.COMMITTED)
275
stream = write_client.create_write_stream(parent=parent, write_stream=write_stream)
276
277
# Append data (automatically committed)
278
request = types.AppendRowsRequest(write_stream=stream.name)
279
# ... configure with data ...
280
281
response_stream = write_client.append_rows([request])
282
for response in response_stream:
283
if response.append_result:
284
print(f"Data committed at offset: {response.append_result.offset}")
285
```
286
287
### Batch Commit Multiple Streams
288
289
```python
290
from google.cloud import bigquery_storage_v1
291
from google.cloud.bigquery_storage_v1 import types
292
293
write_client = bigquery_storage_v1.BigQueryWriteClient()
294
parent = write_client.table_path("your-project", "your_dataset", "your_table")
295
296
# Create multiple pending streams
297
streams = []
298
for i in range(3):
299
write_stream = types.WriteStream(type_=types.WriteStream.Type.PENDING)
300
stream = write_client.create_write_stream(parent=parent, write_stream=write_stream)
301
streams.append(stream)
302
303
# Append data to each stream
304
request = types.AppendRowsRequest(write_stream=stream.name)
305
# ... add data for stream i ...
306
write_client.append_rows([request])
307
308
# Finalize all streams
309
for stream in streams:
310
write_client.finalize_write_stream(name=stream.name)
311
312
# Atomic batch commit
313
stream_names = [stream.name for stream in streams]
314
commit_response = write_client.batch_commit_write_streams(
315
parent=parent,
316
write_streams=stream_names
317
)
318
319
if commit_response.stream_errors:
320
print("Some streams failed to commit")
321
else:
322
print(f"All streams committed at: {commit_response.commit_time}")
323
```
324
325
### Using AppendRowsStream Helper
326
327
```python
328
from google.cloud.bigquery_storage_v1 import writer, types
329
330
# Create write stream
331
write_client = bigquery_storage_v1.BigQueryWriteClient()
332
parent = write_client.table_path("your-project", "your_dataset", "your_table")
333
write_stream = types.WriteStream(type_=types.WriteStream.Type.PENDING)
334
stream = write_client.create_write_stream(parent=parent, write_stream=write_stream)
335
336
# Use helper class
337
append_stream = writer.AppendRowsStream(write_client, stream.name)
338
339
# Send data using helper
340
request = types.AppendRowsRequest()
341
# ... configure request ...
342
343
future = append_stream.send(request)
344
try:
345
response = future.result(timeout=30)
346
print(f"Append successful: {response.append_result.offset}")
347
except Exception as e:
348
print(f"Append failed: {e}")
349
350
# Clean up
351
append_stream.close()
352
```
353
354
### Arrow Format Writing
355
356
```python
357
import pyarrow as pa
358
from google.cloud import bigquery_storage_v1
359
from google.cloud.bigquery_storage_v1 import types
360
361
write_client = bigquery_storage_v1.BigQueryWriteClient()
362
parent = write_client.table_path("your-project", "your_dataset", "your_table")
363
364
# Create Arrow schema and data
365
schema = pa.schema([
366
pa.field("id", pa.int64()),
367
pa.field("name", pa.string()),
368
pa.field("value", pa.float64())
369
])
370
371
# Create Arrow table
372
data = pa.table([
373
pa.array([1, 2, 3]),
374
pa.array(["Alice", "Bob", "Charlie"]),
375
pa.array([10.5, 20.3, 30.1])
376
], schema=schema)
377
378
# Convert to record batch
379
record_batch = data.to_batches()[0]
380
381
# Create write stream
382
write_stream = types.WriteStream(type_=types.WriteStream.Type.PENDING)
383
stream = write_client.create_write_stream(parent=parent, write_stream=write_stream)
384
385
# Prepare append request
386
request = types.AppendRowsRequest(write_stream=stream.name)
387
arrow_data = types.AppendRowsRequest.ArrowData()
388
arrow_data.serialized_record_batch = record_batch.serialize().to_pybytes()
389
request.arrow_rows = arrow_data
390
391
# Append and commit
392
write_client.append_rows([request])
393
write_client.finalize_write_stream(name=stream.name)
394
write_client.batch_commit_write_streams(parent=parent, write_streams=[stream.name])
395
```
396
397
### Error Handling and Retry
398
399
```python
400
from google.cloud import bigquery_storage_v1
401
from google.cloud.bigquery_storage_v1 import types
402
from google.api_core import exceptions, retry
403
import time
404
405
write_client = bigquery_storage_v1.BigQueryWriteClient()
406
parent = write_client.table_path("your-project", "your_dataset", "your_table")
407
408
def append_with_retry(request, max_retries=3):
409
for attempt in range(max_retries):
410
try:
411
response_stream = write_client.append_rows([request])
412
for response in response_stream:
413
if response.HasField('error'):
414
raise Exception(f"Append error: {response.error}")
415
return response
416
417
except exceptions.ResourceExhausted:
418
if attempt < max_retries - 1:
419
wait_time = 2 ** attempt # Exponential backoff
420
print(f"Rate limited, waiting {wait_time}s...")
421
time.sleep(wait_time)
422
else:
423
raise
424
except exceptions.Aborted:
425
if attempt < max_retries - 1:
426
print(f"Request aborted, retrying attempt {attempt + 1}")
427
time.sleep(1)
428
else:
429
raise
430
431
# Use retry wrapper
432
try:
433
write_stream = types.WriteStream(type_=types.WriteStream.Type.PENDING)
434
stream = write_client.create_write_stream(parent=parent, write_stream=write_stream)
435
436
request = types.AppendRowsRequest(write_stream=stream.name)
437
# ... configure request ...
438
439
response = append_with_retry(request)
440
print(f"Successfully appended rows: {response.append_result.offset}")
441
442
except Exception as e:
443
print(f"Final append failure: {e}")
444
```
445
446
## Types
447
448
### WriteStream
449
450
```python { .api }
451
class WriteStream:
452
name: str
453
type_: WriteStream.Type
454
create_time: Timestamp
455
commit_time: Timestamp
456
table_schema: TableSchema
457
state: WriteStream.State
458
location: str
459
460
class WriteStream.Type(enum.Enum):
461
TYPE_UNSPECIFIED = 0
462
COMMITTED = 1 # Default stream, auto-commits
463
PENDING = 2 # Pending stream, requires explicit commit
464
BUFFERED = 3 # Buffered stream, for batch processing
465
466
class WriteStream.State(enum.Enum):
467
STATE_UNSPECIFIED = 0
468
CREATED = 1
469
RUNNING = 2
470
FINALIZED = 3
471
COMMITTED = 4
472
ABORTED = 5
473
```
474
475
### Request/Response Types
476
477
```python { .api }
478
class CreateWriteStreamRequest:
479
parent: str
480
write_stream: WriteStream
481
482
class AppendRowsRequest:
483
write_stream: str
484
offset: int
485
proto_rows: AppendRowsRequest.ProtoData
486
arrow_rows: AppendRowsRequest.ArrowData
487
trace_id: str
488
489
class AppendRowsRequest.ProtoData:
490
writer_schema: ProtoSchema
491
serialized_rows: List[bytes]
492
493
class AppendRowsRequest.ArrowData:
494
writer_schema: ArrowSchema
495
serialized_record_batch: bytes
496
497
class AppendRowsResponse:
498
append_result: AppendRowsResponse.AppendResult
499
error: Status
500
updated_schema: TableSchema
501
row_errors: List[RowError]
502
503
class AppendRowsResponse.AppendResult:
504
offset: int
505
506
class GetWriteStreamRequest:
507
name: str
508
view: WriteStreamView
509
510
class FinalizeWriteStreamRequest:
511
name: str
512
513
class FinalizeWriteStreamResponse:
514
row_count: int
515
516
class BatchCommitWriteStreamsRequest:
517
parent: str
518
write_streams: List[str]
519
520
class BatchCommitWriteStreamsResponse:
521
commit_time: Timestamp
522
stream_errors: List[StorageError]
523
524
class FlushRowsRequest:
525
write_stream: str
526
offset: int
527
528
class FlushRowsResponse:
529
offset: int
530
```
531
532
### Error Types
533
534
```python { .api }
535
class StorageError:
536
code: StorageError.StorageErrorCode
537
entity: str
538
error_message: str
539
540
class StorageError.StorageErrorCode(enum.Enum):
541
STORAGE_ERROR_CODE_UNSPECIFIED = 0
542
TABLE_NOT_FOUND = 1
543
STREAM_ALREADY_COMMITTED = 2
544
STREAM_NOT_FOUND = 3
545
INVALID_STREAM_TYPE = 4
546
INVALID_STREAM_STATE = 5
547
STREAM_FINALIZED = 6
548
549
class RowError:
550
index: int
551
code: RowError.RowErrorCode
552
message: str
553
554
class RowError.RowErrorCode(enum.Enum):
555
ROW_ERROR_CODE_UNSPECIFIED = 0
556
ROW_PARSE_ERROR = 1
557
UNKNOWN_ERROR = 2
558
FIELDS_ERROR = 3
559
560
class WriteStreamView(enum.Enum):
561
"""Views for write stream information."""
562
WRITE_STREAM_VIEW_UNSPECIFIED = 0
563
BASIC = 1 # Basic stream information
564
FULL = 2 # Full stream details including schema
565
566
class AppendRowsFuture:
567
"""Future object for tracking append operation results."""
568
def result(self, timeout: float = None) -> AppendRowsResponse:
569
"""
570
Get the append operation result.
571
572
Parameters:
573
- timeout: Maximum time to wait for result
574
575
Returns:
576
AppendRowsResponse with operation result
577
"""
578
579
def exception(self, timeout: float = None) -> Exception:
580
"""Get exception if operation failed."""
581
582
def done(self) -> bool:
583
"""Check if operation is complete."""
584
```