or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdmetastore-services.mdreading-data.mdtypes-schemas.mdwriting-data.md

writing-data.mddocs/

0

# Writing Data

1

2

Streaming write operations to BigQuery tables using the BigQuery Storage API. Supports transactional semantics with multiple write stream types, batch commit operations, and multiple data formats including Protocol Buffers, Avro, and Arrow.

3

4

## Capabilities

5

6

### BigQuery Write Client

7

8

Main client for writing data to BigQuery tables with streaming capabilities and transactional guarantees.

9

10

```python { .api }

11

class BigQueryWriteClient:

12

def __init__(self, **kwargs):

13

"""

14

Initialize BigQuery Write Client.

15

16

Parameters:

17

- credentials: Google Cloud credentials

18

- project: Default project ID

19

- client_info: Client library information

20

"""

21

22

def create_write_stream(

23

self,

24

parent: str,

25

write_stream: WriteStream,

26

**kwargs

27

) -> WriteStream:

28

"""

29

Create a new write stream for appending data to BigQuery.

30

31

Parameters:

32

- parent: Table path in format "projects/{project}/datasets/{dataset}/tables/{table}"

33

- write_stream: WriteStream configuration with type and options

34

35

Returns:

36

WriteStream with stream name and metadata

37

"""

38

39

def append_rows(

40

self,

41

requests: Iterator[AppendRowsRequest],

42

**kwargs

43

) -> Iterator[AppendRowsResponse]:

44

"""

45

Append rows to a write stream (bidirectional streaming RPC).

46

47

Parameters:

48

- requests: Iterator of AppendRowsRequest messages with serialized data

49

50

Returns:

51

Iterator of AppendRowsResponse messages with append results

52

"""

53

54

def get_write_stream(self, name: str, **kwargs) -> WriteStream:

55

"""

56

Get write stream information and current state.

57

58

Parameters:

59

- name: Write stream name

60

61

Returns:

62

WriteStream with current state and metadata

63

"""

64

65

def finalize_write_stream(

66

self,

67

name: str,

68

**kwargs

69

) -> FinalizeWriteStreamResponse:

70

"""

71

Finalize a write stream to prepare it for commit.

72

73

Parameters:

74

- name: Write stream name

75

76

Returns:

77

FinalizeWriteStreamResponse with row count and state

78

"""

79

80

def batch_commit_write_streams(

81

self,

82

parent: str,

83

write_streams: List[str],

84

**kwargs

85

) -> BatchCommitWriteStreamsResponse:

86

"""

87

Atomically commit multiple write streams.

88

89

Parameters:

90

- parent: Table path

91

- write_streams: List of write stream names to commit

92

93

Returns:

94

BatchCommitWriteStreamsResponse with commit timestamp and errors

95

"""

96

97

def flush_rows(

98

self,

99

write_stream: str,

100

offset: int = None,

101

**kwargs

102

) -> FlushRowsResponse:

103

"""

104

Flush buffered rows in a write stream.

105

106

Parameters:

107

- write_stream: Write stream name

108

- offset: Offset to flush up to (optional)

109

110

Returns:

111

FlushRowsResponse with flush offset

112

"""

113

```

114

115

### BigQuery Write Async Client

116

117

Async version of BigQueryWriteClient with same methods using async/await pattern.

118

119

```python { .api }

120

class BigQueryWriteAsyncClient:

121

async def create_write_stream(

122

self,

123

parent: str,

124

write_stream: WriteStream,

125

**kwargs

126

) -> WriteStream: ...

127

128

async def append_rows(

129

self,

130

requests: AsyncIterator[AppendRowsRequest],

131

**kwargs

132

) -> AsyncIterator[AppendRowsResponse]: ...

133

134

async def get_write_stream(self, name: str, **kwargs) -> WriteStream: ...

135

136

async def finalize_write_stream(

137

self,

138

name: str,

139

**kwargs

140

) -> FinalizeWriteStreamResponse: ...

141

142

async def batch_commit_write_streams(

143

self,

144

parent: str,

145

write_streams: List[str],

146

**kwargs

147

) -> BatchCommitWriteStreamsResponse: ...

148

149

async def flush_rows(

150

self,

151

write_stream: str,

152

offset: int = None,

153

**kwargs

154

) -> FlushRowsResponse: ...

155

```

156

157

### Append Rows Stream

158

159

Helper class that wraps write stream operations and provides convenient data appending methods.

160

161

```python { .api }

162

class AppendRowsStream:

163

def send(self, request: AppendRowsRequest) -> AppendRowsFuture:

164

"""

165

Send append request and get future for response.

166

167

Parameters:

168

- request: AppendRowsRequest with serialized row data

169

170

Returns:

171

AppendRowsFuture for tracking append result

172

"""

173

174

def close(self, reason: str = None):

175

"""

176

Close the write stream.

177

178

Parameters:

179

- reason: Optional reason for closing

180

"""

181

182

def is_active(self) -> bool:

183

"""Check if the write stream is still active."""

184

185

def add_close_callback(self, callback: Callable):

186

"""

187

Add callback to be called when stream closes.

188

189

Parameters:

190

- callback: Function to call on stream close

191

"""

192

```

193

194

### Path Helper Methods

195

196

Utilities for constructing and parsing BigQuery resource paths.

197

198

```python { .api }

199

class BigQueryWriteClient:

200

@staticmethod

201

def table_path(project: str, dataset: str, table: str) -> str:

202

"""Construct BigQuery table resource path."""

203

204

@staticmethod

205

def parse_table_path(path: str) -> dict:

206

"""Parse table path into project, dataset, table components."""

207

208

@staticmethod

209

def write_stream_path(

210

project: str,

211

dataset: str,

212

table: str,

213

stream: str

214

) -> str:

215

"""Construct write stream resource path."""

216

217

@staticmethod

218

def parse_write_stream_path(path: str) -> dict:

219

"""Parse write stream path into components."""

220

```

221

222

## Usage Examples

223

224

### Basic Write Stream (Pending Mode)

225

226

```python

227

from google.cloud import bigquery_storage_v1

228

from google.cloud.bigquery_storage_v1 import types

229

230

# Create client

231

write_client = bigquery_storage_v1.BigQueryWriteClient()

232

233

# Create write stream

234

parent = write_client.table_path("your-project", "your_dataset", "your_table")

235

write_stream = types.WriteStream(type_=types.WriteStream.Type.PENDING)

236

stream = write_client.create_write_stream(parent=parent, write_stream=write_stream)

237

238

# Prepare append request with protocol buffer data

239

request = types.AppendRowsRequest()

240

request.write_stream = stream.name

241

242

# Add serialized row data (requires protocol buffer schema)

243

proto_data = types.AppendRowsRequest.ProtoData()

244

proto_data.serialized_rows = [serialized_row_data] # Your serialized data

245

request.proto_rows = proto_data

246

247

# Append rows

248

response_stream = write_client.append_rows([request])

249

for response in response_stream:

250

if response.HasField('error'):

251

print(f"Error: {response.error}")

252

else:

253

print(f"Appended {len(response.append_result.offset)} rows")

254

255

# Finalize and commit

256

write_client.finalize_write_stream(name=stream.name)

257

commit_response = write_client.batch_commit_write_streams(

258

parent=parent,

259

write_streams=[stream.name]

260

)

261

print(f"Committed at: {commit_response.commit_time}")

262

```

263

264

### Default Stream (Immediate Commit)

265

266

```python

267

from google.cloud import bigquery_storage_v1

268

from google.cloud.bigquery_storage_v1 import types

269

270

write_client = bigquery_storage_v1.BigQueryWriteClient()

271

parent = write_client.table_path("your-project", "your_dataset", "your_table")

272

273

# Create default stream (auto-commits)

274

write_stream = types.WriteStream(type_=types.WriteStream.Type.COMMITTED)

275

stream = write_client.create_write_stream(parent=parent, write_stream=write_stream)

276

277

# Append data (automatically committed)

278

request = types.AppendRowsRequest(write_stream=stream.name)

279

# ... configure with data ...

280

281

response_stream = write_client.append_rows([request])

282

for response in response_stream:

283

if response.append_result:

284

print(f"Data committed at offset: {response.append_result.offset}")

285

```

286

287

### Batch Commit Multiple Streams

288

289

```python

290

from google.cloud import bigquery_storage_v1

291

from google.cloud.bigquery_storage_v1 import types

292

293

write_client = bigquery_storage_v1.BigQueryWriteClient()

294

parent = write_client.table_path("your-project", "your_dataset", "your_table")

295

296

# Create multiple pending streams

297

streams = []

298

for i in range(3):

299

write_stream = types.WriteStream(type_=types.WriteStream.Type.PENDING)

300

stream = write_client.create_write_stream(parent=parent, write_stream=write_stream)

301

streams.append(stream)

302

303

# Append data to each stream

304

request = types.AppendRowsRequest(write_stream=stream.name)

305

# ... add data for stream i ...

306

write_client.append_rows([request])

307

308

# Finalize all streams

309

for stream in streams:

310

write_client.finalize_write_stream(name=stream.name)

311

312

# Atomic batch commit

313

stream_names = [stream.name for stream in streams]

314

commit_response = write_client.batch_commit_write_streams(

315

parent=parent,

316

write_streams=stream_names

317

)

318

319

if commit_response.stream_errors:

320

print("Some streams failed to commit")

321

else:

322

print(f"All streams committed at: {commit_response.commit_time}")

323

```

324

325

### Using AppendRowsStream Helper

326

327

```python

328

from google.cloud.bigquery_storage_v1 import writer, types

329

330

# Create write stream

331

write_client = bigquery_storage_v1.BigQueryWriteClient()

332

parent = write_client.table_path("your-project", "your_dataset", "your_table")

333

write_stream = types.WriteStream(type_=types.WriteStream.Type.PENDING)

334

stream = write_client.create_write_stream(parent=parent, write_stream=write_stream)

335

336

# Use helper class

337

append_stream = writer.AppendRowsStream(write_client, stream.name)

338

339

# Send data using helper

340

request = types.AppendRowsRequest()

341

# ... configure request ...

342

343

future = append_stream.send(request)

344

try:

345

response = future.result(timeout=30)

346

print(f"Append successful: {response.append_result.offset}")

347

except Exception as e:

348

print(f"Append failed: {e}")

349

350

# Clean up

351

append_stream.close()

352

```

353

354

### Arrow Format Writing

355

356

```python

357

import pyarrow as pa

358

from google.cloud import bigquery_storage_v1

359

from google.cloud.bigquery_storage_v1 import types

360

361

write_client = bigquery_storage_v1.BigQueryWriteClient()

362

parent = write_client.table_path("your-project", "your_dataset", "your_table")

363

364

# Create Arrow schema and data

365

schema = pa.schema([

366

pa.field("id", pa.int64()),

367

pa.field("name", pa.string()),

368

pa.field("value", pa.float64())

369

])

370

371

# Create Arrow table

372

data = pa.table([

373

pa.array([1, 2, 3]),

374

pa.array(["Alice", "Bob", "Charlie"]),

375

pa.array([10.5, 20.3, 30.1])

376

], schema=schema)

377

378

# Convert to record batch

379

record_batch = data.to_batches()[0]

380

381

# Create write stream

382

write_stream = types.WriteStream(type_=types.WriteStream.Type.PENDING)

383

stream = write_client.create_write_stream(parent=parent, write_stream=write_stream)

384

385

# Prepare append request

386

request = types.AppendRowsRequest(write_stream=stream.name)

387

arrow_data = types.AppendRowsRequest.ArrowData()

388

arrow_data.serialized_record_batch = record_batch.serialize().to_pybytes()

389

request.arrow_rows = arrow_data

390

391

# Append and commit

392

write_client.append_rows([request])

393

write_client.finalize_write_stream(name=stream.name)

394

write_client.batch_commit_write_streams(parent=parent, write_streams=[stream.name])

395

```

396

397

### Error Handling and Retry

398

399

```python

400

from google.cloud import bigquery_storage_v1

401

from google.cloud.bigquery_storage_v1 import types

402

from google.api_core import exceptions, retry

403

import time

404

405

write_client = bigquery_storage_v1.BigQueryWriteClient()

406

parent = write_client.table_path("your-project", "your_dataset", "your_table")

407

408

def append_with_retry(request, max_retries=3):

409

for attempt in range(max_retries):

410

try:

411

response_stream = write_client.append_rows([request])

412

for response in response_stream:

413

if response.HasField('error'):

414

raise Exception(f"Append error: {response.error}")

415

return response

416

417

except exceptions.ResourceExhausted:

418

if attempt < max_retries - 1:

419

wait_time = 2 ** attempt # Exponential backoff

420

print(f"Rate limited, waiting {wait_time}s...")

421

time.sleep(wait_time)

422

else:

423

raise

424

except exceptions.Aborted:

425

if attempt < max_retries - 1:

426

print(f"Request aborted, retrying attempt {attempt + 1}")

427

time.sleep(1)

428

else:

429

raise

430

431

# Use retry wrapper

432

try:

433

write_stream = types.WriteStream(type_=types.WriteStream.Type.PENDING)

434

stream = write_client.create_write_stream(parent=parent, write_stream=write_stream)

435

436

request = types.AppendRowsRequest(write_stream=stream.name)

437

# ... configure request ...

438

439

response = append_with_retry(request)

440

print(f"Successfully appended rows: {response.append_result.offset}")

441

442

except Exception as e:

443

print(f"Final append failure: {e}")

444

```

445

446

## Types

447

448

### WriteStream

449

450

```python { .api }

451

class WriteStream:

452

name: str

453

type_: WriteStream.Type

454

create_time: Timestamp

455

commit_time: Timestamp

456

table_schema: TableSchema

457

state: WriteStream.State

458

location: str

459

460

class WriteStream.Type(enum.Enum):

461

TYPE_UNSPECIFIED = 0

462

COMMITTED = 1 # Default stream, auto-commits

463

PENDING = 2 # Pending stream, requires explicit commit

464

BUFFERED = 3 # Buffered stream, for batch processing

465

466

class WriteStream.State(enum.Enum):

467

STATE_UNSPECIFIED = 0

468

CREATED = 1

469

RUNNING = 2

470

FINALIZED = 3

471

COMMITTED = 4

472

ABORTED = 5

473

```

474

475

### Request/Response Types

476

477

```python { .api }

478

class CreateWriteStreamRequest:

479

parent: str

480

write_stream: WriteStream

481

482

class AppendRowsRequest:

483

write_stream: str

484

offset: int

485

proto_rows: AppendRowsRequest.ProtoData

486

arrow_rows: AppendRowsRequest.ArrowData

487

trace_id: str

488

489

class AppendRowsRequest.ProtoData:

490

writer_schema: ProtoSchema

491

serialized_rows: List[bytes]

492

493

class AppendRowsRequest.ArrowData:

494

writer_schema: ArrowSchema

495

serialized_record_batch: bytes

496

497

class AppendRowsResponse:

498

append_result: AppendRowsResponse.AppendResult

499

error: Status

500

updated_schema: TableSchema

501

row_errors: List[RowError]

502

503

class AppendRowsResponse.AppendResult:

504

offset: int

505

506

class GetWriteStreamRequest:

507

name: str

508

view: WriteStreamView

509

510

class FinalizeWriteStreamRequest:

511

name: str

512

513

class FinalizeWriteStreamResponse:

514

row_count: int

515

516

class BatchCommitWriteStreamsRequest:

517

parent: str

518

write_streams: List[str]

519

520

class BatchCommitWriteStreamsResponse:

521

commit_time: Timestamp

522

stream_errors: List[StorageError]

523

524

class FlushRowsRequest:

525

write_stream: str

526

offset: int

527

528

class FlushRowsResponse:

529

offset: int

530

```

531

532

### Error Types

533

534

```python { .api }

535

class StorageError:

536

code: StorageError.StorageErrorCode

537

entity: str

538

error_message: str

539

540

class StorageError.StorageErrorCode(enum.Enum):

541

STORAGE_ERROR_CODE_UNSPECIFIED = 0

542

TABLE_NOT_FOUND = 1

543

STREAM_ALREADY_COMMITTED = 2

544

STREAM_NOT_FOUND = 3

545

INVALID_STREAM_TYPE = 4

546

INVALID_STREAM_STATE = 5

547

STREAM_FINALIZED = 6

548

549

class RowError:

550

index: int

551

code: RowError.RowErrorCode

552

message: str

553

554

class RowError.RowErrorCode(enum.Enum):

555

ROW_ERROR_CODE_UNSPECIFIED = 0

556

ROW_PARSE_ERROR = 1

557

UNKNOWN_ERROR = 2

558

FIELDS_ERROR = 3

559

560

class WriteStreamView(enum.Enum):

561

"""Views for write stream information."""

562

WRITE_STREAM_VIEW_UNSPECIFIED = 0

563

BASIC = 1 # Basic stream information

564

FULL = 2 # Full stream details including schema

565

566

class AppendRowsFuture:

567

"""Future object for tracking append operation results."""

568

def result(self, timeout: float = None) -> AppendRowsResponse:

569

"""

570

Get the append operation result.

571

572

Parameters:

573

- timeout: Maximum time to wait for result

574

575

Returns:

576

AppendRowsResponse with operation result

577

"""

578

579

def exception(self, timeout: float = None) -> Exception:

580

"""Get exception if operation failed."""

581

582

def done(self) -> bool:

583

"""Check if operation is complete."""

584

```