or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdmetastore-services.mdreading-data.mdtypes-schemas.mdwriting-data.md

types-schemas.mddocs/

0

# Types and Schemas

1

2

Comprehensive type system for BigQuery Storage operations including data format specifications, schema definitions, session configuration, stream management, and error handling. All types are based on Protocol Buffers and provide strong typing for BigQuery Storage API interactions.

3

4

## Capabilities

5

6

### Data Format Types

7

8

Core data format types for serializing and deserializing BigQuery data in different formats.

9

10

```python { .api }

11

class DataFormat(enum.Enum):

12

"""Supported data serialization formats."""

13

DATA_FORMAT_UNSPECIFIED = 0

14

AVRO = 1 # Apache Avro format

15

ARROW = 2 # Apache Arrow format

16

PROTO = 3 # Protocol Buffer format

17

```

18

19

### Avro Format Types

20

21

Types for working with Apache Avro serialized data.

22

23

```python { .api }

24

class AvroSchema:

25

"""Avro schema definition for BigQuery data."""

26

schema: str # JSON schema string

27

28

class AvroRows:

29

"""Avro-encoded row data."""

30

serialized_binary_rows: bytes # Avro binary data

31

row_count: int # Number of rows encoded

32

33

class AvroSerializationOptions:

34

"""Options for Avro serialization."""

35

enable_display_name_attribute: bool # Use display names in schema

36

```

37

38

### Arrow Format Types

39

40

Types for working with Apache Arrow serialized data.

41

42

```python { .api }

43

class ArrowSchema:

44

"""Arrow schema definition for BigQuery data."""

45

serialized_schema: bytes # Serialized Arrow schema

46

47

class ArrowRecordBatch:

48

"""Arrow record batch data."""

49

serialized_record_batch: bytes # Serialized Arrow record batch

50

row_count: int # Number of rows in batch

51

52

class ArrowSerializationOptions:

53

"""Options for Arrow serialization."""

54

buffer_compression: ArrowSerializationOptions.CompressionCodec

55

56

class ArrowSerializationOptions.CompressionCodec(enum.Enum):

57

"""Arrow compression codecs."""

58

COMPRESSION_UNSPECIFIED = 0

59

LZ4_FRAME = 1 # LZ4 frame compression

60

ZSTD = 2 # Zstandard compression

61

```

62

63

### Protocol Buffer Types

64

65

Types for working with Protocol Buffer serialized data.

66

67

```python { .api }

68

class ProtoSchema:

69

"""Protocol Buffer schema definition."""

70

proto_descriptor: DescriptorProto # Protocol buffer descriptor

71

72

class ProtoRows:

73

"""Protocol Buffer encoded rows."""

74

serialized_rows: List[bytes] # List of serialized row messages

75

```

76

77

### Session and Stream Types

78

79

Types for configuring and managing read/write sessions and streams.

80

81

```python { .api }

82

class ReadSession:

83

"""Configuration and state for a BigQuery read session."""

84

name: str # Session resource name

85

table: str # Source table path

86

data_format: DataFormat # Output data format

87

read_options: ReadSession.TableReadOptions

88

streams: List[ReadStream] # Available read streams

89

estimated_total_bytes_scanned: int

90

estimated_row_count: int

91

avro_schema: AvroSchema # Schema for Avro format

92

arrow_schema: ArrowSchema # Schema for Arrow format

93

table_modifiers: ReadSession.TableModifiers

94

95

class ReadSession.TableReadOptions:

96

"""Options for reading table data."""

97

selected_fields: List[str] # Column names to read

98

row_restriction: str # SQL WHERE clause filter

99

arrow_serialization_options: ArrowSerializationOptions

100

avro_serialization_options: AvroSerializationOptions

101

sample_percentage: float # Percentage of data to sample

102

103

class ReadSession.TableModifiers:

104

"""Modifiers for table access."""

105

snapshot_time: Timestamp # Point-in-time snapshot

106

107

class ReadStream:

108

"""Individual read stream within a session."""

109

name: str # Stream resource name

110

111

class WriteStream:

112

"""Configuration and state for a BigQuery write stream."""

113

name: str # Stream resource name

114

type_: WriteStream.Type # Stream type

115

create_time: Timestamp # Creation timestamp

116

commit_time: Timestamp # Commit timestamp (if committed)

117

table_schema: TableSchema # Target table schema

118

state: WriteStream.State # Current stream state

119

location: str # Geographic location

120

121

class WriteStream.Type(enum.Enum):

122

"""Write stream types."""

123

TYPE_UNSPECIFIED = 0

124

COMMITTED = 1 # Default stream, auto-commits

125

PENDING = 2 # Pending stream, requires explicit commit

126

BUFFERED = 3 # Buffered stream for batch processing

127

128

class WriteStream.State(enum.Enum):

129

"""Write stream states."""

130

STATE_UNSPECIFIED = 0

131

CREATED = 1 # Stream created but not active

132

RUNNING = 2 # Stream accepting data

133

FINALIZED = 3 # Stream finalized, ready for commit

134

COMMITTED = 4 # Stream data committed to table

135

ABORTED = 5 # Stream aborted, data discarded

136

137

class WriteStream.WriteMode(enum.Enum):

138

"""Write stream modes."""

139

WRITE_MODE_UNSPECIFIED = 0

140

INSERT = 1 # Insert mode for appending rows

141

142

class WriteStreamView(enum.Enum):

143

"""Views for write stream information."""

144

WRITE_STREAM_VIEW_UNSPECIFIED = 0

145

BASIC = 1 # Basic stream information

146

FULL = 2 # Full stream details including schema

147

```

148

149

### Table Schema Types

150

151

Types for representing BigQuery table schemas and field definitions.

152

153

```python { .api }

154

class TableSchema:

155

"""BigQuery table schema definition."""

156

fields: List[TableFieldSchema] # Table field definitions

157

158

class TableFieldSchema:

159

"""Individual table field schema."""

160

name: str # Field name

161

type_: TableFieldSchema.Type # Field data type

162

mode: TableFieldSchema.Mode # Field mode (nullable, required, repeated)

163

fields: List[TableFieldSchema] # Nested field schemas (for RECORD type)

164

description: str # Field description

165

max_length: int # Maximum length for STRING/BYTES

166

precision: int # Precision for NUMERIC/BIGNUMERIC

167

scale: int # Scale for NUMERIC/BIGNUMERIC

168

default_value_expression: str # Default value expression

169

170

class TableFieldSchema.Type(enum.Enum):

171

"""BigQuery field data types."""

172

TYPE_UNSPECIFIED = 0

173

STRING = 1

174

INT64 = 2

175

DOUBLE = 3

176

NUMERIC = 4

177

BOOL = 5

178

TIMESTAMP = 6

179

DATE = 7

180

TIME = 8

181

DATETIME = 9

182

GEOGRAPHY = 10

183

RECORD = 11 # Nested record/struct

184

BYTES = 12

185

JSON = 13

186

BIGNUMERIC = 14

187

INTERVAL = 15

188

RANGE = 16

189

190

class TableFieldSchema.Mode(enum.Enum):

191

"""BigQuery field modes."""

192

MODE_UNSPECIFIED = 0

193

NULLABLE = 1 # Field can be null

194

REQUIRED = 2 # Field cannot be null

195

REPEATED = 3 # Field is an array

196

```

197

198

### Request and Response Types

199

200

Message types for BigQuery Storage API operations.

201

202

```python { .api }

203

class CreateReadSessionRequest:

204

"""Request to create a read session."""

205

parent: str # Project ID

206

read_session: ReadSession # Session configuration

207

max_stream_count: int # Maximum parallel streams

208

209

class ReadRowsRequest:

210

"""Request to read rows from a stream."""

211

read_stream: str # Stream name

212

offset: int # Starting offset

213

214

class ReadRowsResponse:

215

"""Response containing row data from a stream."""

216

avro_rows: AvroRows # Avro format data

217

arrow_record_batch: ArrowRecordBatch # Arrow format data

218

row_count: int # Number of rows in response

219

stats: StreamStats # Stream statistics

220

throttle_state: ThrottleState # Throttling information

221

222

class SplitReadStreamRequest:

223

"""Request to split a read stream."""

224

name: str # Stream to split

225

fraction: float # Split point (0.0 to 1.0)

226

227

class SplitReadStreamResponse:

228

"""Response with split stream information."""

229

primary_stream: ReadStream # First part of split

230

remainder_stream: ReadStream # Second part of split

231

232

class CreateWriteStreamRequest:

233

"""Request to create a write stream."""

234

parent: str # Table path

235

write_stream: WriteStream # Stream configuration

236

237

class AppendRowsRequest:

238

"""Request to append rows to a write stream."""

239

write_stream: str # Stream name

240

offset: int # Append offset

241

proto_rows: AppendRowsRequest.ProtoData # Protocol buffer data

242

arrow_rows: AppendRowsRequest.ArrowData # Arrow format data

243

trace_id: str # Request trace ID

244

245

class AppendRowsRequest.ProtoData:

246

"""Protocol buffer row data."""

247

writer_schema: ProtoSchema # Schema for data

248

serialized_rows: List[bytes] # Serialized row messages

249

250

class AppendRowsRequest.ArrowData:

251

"""Arrow format row data."""

252

writer_schema: ArrowSchema # Schema for data

253

serialized_record_batch: bytes # Serialized record batch

254

255

class AppendRowsResponse:

256

"""Response to append rows request."""

257

append_result: AppendRowsResponse.AppendResult # Success result

258

error: Status # Error information

259

updated_schema: TableSchema # Updated table schema

260

row_errors: List[RowError] # Individual row errors

261

262

class AppendRowsResponse.AppendResult:

263

"""Successful append result."""

264

offset: int # Offset of appended data

265

```

266

267

### Error and Status Types

268

269

Types for error handling and operation status reporting.

270

271

```python { .api }

272

class StorageError:

273

"""Storage operation error information."""

274

code: StorageError.StorageErrorCode # Error code

275

entity: str # Affected entity

276

error_message: str # Error description

277

278

class StorageError.StorageErrorCode(enum.Enum):

279

"""Storage error codes."""

280

STORAGE_ERROR_CODE_UNSPECIFIED = 0

281

TABLE_NOT_FOUND = 1 # Table does not exist

282

STREAM_ALREADY_COMMITTED = 2 # Stream already committed

283

STREAM_NOT_FOUND = 3 # Stream does not exist

284

INVALID_STREAM_TYPE = 4 # Invalid stream type for operation

285

INVALID_STREAM_STATE = 5 # Stream in wrong state

286

STREAM_FINALIZED = 6 # Stream already finalized

287

288

class RowError:

289

"""Error information for individual rows."""

290

index: int # Row index with error

291

code: RowError.RowErrorCode # Error code

292

message: str # Error message

293

294

class RowError.RowErrorCode(enum.Enum):

295

"""Row-level error codes."""

296

ROW_ERROR_CODE_UNSPECIFIED = 0

297

ROW_PARSE_ERROR = 1 # Row parsing error

298

UNKNOWN_ERROR = 2 # Unknown error

299

FIELDS_ERROR = 3 # Field validation error

300

301

class StreamStats:

302

"""Statistics for stream operations."""

303

progress: StreamStats.Progress # Progress information

304

305

class StreamStats.Progress:

306

"""Stream progress information."""

307

at_response_start: float # Progress at response start

308

at_response_end: float # Progress at response end

309

310

class ThrottleState:

311

"""Throttling state information."""

312

throttle_percent: int # Throttle percentage (0-100)

313

```

314

315

### Utility Types

316

317

Common utility types used across BigQuery Storage operations.

318

319

```python { .api }

320

class Timestamp:

321

"""Timestamp representation."""

322

seconds: int # Seconds since Unix epoch

323

nanos: int # Nanoseconds within second

324

325

def FromMilliseconds(self, millis: int):

326

"""Set timestamp from milliseconds."""

327

328

def ToMilliseconds(self) -> int:

329

"""Convert timestamp to milliseconds."""

330

331

class Status:

332

"""Operation status information."""

333

code: int # Status code

334

message: str # Status message

335

details: List[Any] # Additional details

336

337

class AppendRowsFuture:

338

"""Future object for tracking append operation results."""

339

def result(self, timeout: float = None) -> AppendRowsResponse:

340

"""

341

Get the append operation result.

342

343

Parameters:

344

- timeout: Maximum time to wait for result

345

346

Returns:

347

AppendRowsResponse with operation result

348

"""

349

350

def exception(self, timeout: float = None) -> Exception:

351

"""Get exception if operation failed."""

352

353

def done(self) -> bool:

354

"""Check if operation is complete."""

355

356

class StreamClosedError(Exception):

357

"""Exception raised when operations are attempted on closed streams."""

358

```

359

360

## Usage Examples

361

362

### Working with Schema Types

363

364

```python

365

from google.cloud.bigquery_storage import types

366

367

# Define table schema

368

schema = types.TableSchema(

369

fields=[

370

types.TableFieldSchema(

371

name="id",

372

type_=types.TableFieldSchema.Type.INT64,

373

mode=types.TableFieldSchema.Mode.REQUIRED

374

),

375

types.TableFieldSchema(

376

name="name",

377

type_=types.TableFieldSchema.Type.STRING,

378

mode=types.TableFieldSchema.Mode.NULLABLE,

379

max_length=100

380

),

381

types.TableFieldSchema(

382

name="scores",

383

type_=types.TableFieldSchema.Type.DOUBLE,

384

mode=types.TableFieldSchema.Mode.REPEATED

385

),

386

types.TableFieldSchema(

387

name="metadata",

388

type_=types.TableFieldSchema.Type.RECORD,

389

mode=types.TableFieldSchema.Mode.NULLABLE,

390

fields=[

391

types.TableFieldSchema(

392

name="created_at",

393

type_=types.TableFieldSchema.Type.TIMESTAMP,

394

mode=types.TableFieldSchema.Mode.REQUIRED

395

),

396

types.TableFieldSchema(

397

name="tags",

398

type_=types.TableFieldSchema.Type.STRING,

399

mode=types.TableFieldSchema.Mode.REPEATED

400

)

401

]

402

)

403

]

404

)

405

```

406

407

### Configuring Data Formats

408

409

```python

410

from google.cloud.bigquery_storage import types

411

412

# Arrow serialization with compression

413

arrow_options = types.ArrowSerializationOptions(

414

buffer_compression=types.ArrowSerializationOptions.CompressionCodec.ZSTD

415

)

416

417

# Avro serialization with display names

418

avro_options = types.AvroSerializationOptions(

419

enable_display_name_attribute=True

420

)

421

422

# Read session with format options

423

read_options = types.ReadSession.TableReadOptions(

424

selected_fields=["id", "name", "metadata.created_at"],

425

row_restriction='id > 1000 AND name IS NOT NULL',

426

arrow_serialization_options=arrow_options,

427

sample_percentage=10.0 # Sample 10% of data

428

)

429

430

requested_session = types.ReadSession(

431

table="projects/my-project/datasets/my_dataset/tables/my_table",

432

data_format=types.DataFormat.ARROW,

433

read_options=read_options

434

)

435

```

436

437

### Working with Write Stream Types

438

439

```python

440

from google.cloud.bigquery_storage import types

441

442

# Create pending write stream

443

write_stream = types.WriteStream(

444

type_=types.WriteStream.Type.PENDING

445

)

446

447

# Check stream state

448

if write_stream.state == types.WriteStream.State.RUNNING:

449

print("Stream is accepting data")

450

elif write_stream.state == types.WriteStream.State.FINALIZED:

451

print("Stream is ready for commit")

452

453

# Create append request with proto data

454

proto_data = types.AppendRowsRequest.ProtoData()

455

proto_data.serialized_rows = [serialized_row_1, serialized_row_2]

456

457

request = types.AppendRowsRequest(

458

write_stream=stream_name,

459

proto_rows=proto_data,

460

trace_id="my-trace-123" # For debugging

461

)

462

```

463

464

### Error Handling with Types

465

466

```python

467

from google.cloud.bigquery_storage import types

468

from google.cloud import bigquery_storage

469

470

try:

471

# Perform append operation

472

response = client.append_rows([request])

473

474

except Exception as e:

475

# Handle storage errors

476

if hasattr(e, 'details'):

477

for detail in e.details:

478

if isinstance(detail, types.StorageError):

479

if detail.code == types.StorageError.StorageErrorCode.TABLE_NOT_FOUND:

480

print(f"Table not found: {detail.entity}")

481

elif detail.code == types.StorageError.StorageErrorCode.STREAM_FINALIZED:

482

print(f"Stream already finalized: {detail.entity}")

483

484

# Check for row-level errors in response

485

for response in response_stream:

486

if response.row_errors:

487

for row_error in response.row_errors:

488

print(f"Row {row_error.index} error: {row_error.message}")

489

```

490

491

### Time-based Operations

492

493

```python

494

from google.cloud.bigquery_storage import types

495

import time

496

497

# Create timestamp for snapshot

498

snapshot_time = types.Timestamp()

499

current_millis = int(time.time() * 1000)

500

snapshot_time.FromMilliseconds(current_millis)

501

502

# Use in table modifiers

503

table_modifiers = types.ReadSession.TableModifiers(

504

snapshot_time=snapshot_time

505

)

506

507

read_session = types.ReadSession(

508

table=table_path,

509

data_format=types.DataFormat.AVRO,

510

table_modifiers=table_modifiers

511

)

512

```