or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

client-operations.mddata-loading.mddatabase-api.mddataset-management.mdindex.mdmodels-routines.mdquery-operations.mdquery-parameters.mdschema-definition.mdtable-operations.md

data-loading.mddocs/

0

# Data Loading

1

2

Loading data from various sources including local files, Cloud Storage, and streaming inserts. BigQuery supports multiple data formats and provides comprehensive transformation and validation options during the loading process.

3

4

## Capabilities

5

6

### Load Job Execution

7

8

Load data into BigQuery tables from external sources with comprehensive job monitoring and error handling.

9

10

```python { .api }

11

class LoadJob:

12

def __init__(self, job_id: str, source_uris: List[str], destination: Table, client: Client): ...

13

14

@property

15

def state(self) -> str:

16

"""Current state of the job ('PENDING', 'RUNNING', 'DONE')."""

17

18

@property

19

def source_uris(self) -> List[str]:

20

"""Source URIs being loaded."""

21

22

@property

23

def destination(self) -> TableReference:

24

"""Destination table reference."""

25

26

@property

27

def input_files(self) -> int:

28

"""Number of source files processed."""

29

30

@property

31

def input_file_bytes(self) -> int:

32

"""Total bytes of source files."""

33

34

@property

35

def output_bytes(self) -> int:

36

"""Bytes written to destination table."""

37

38

@property

39

def output_rows(self) -> int:

40

"""Rows written to destination table."""

41

42

@property

43

def bad_records(self) -> int:

44

"""Number of bad records encountered."""

45

46

def result(

47

self,

48

retry: google.api_core.retry.Retry = DEFAULT_RETRY,

49

timeout: float = None,

50

) -> LoadJob:

51

"""

52

Wait for load job completion.

53

54

Args:

55

retry: Retry configuration for polling.

56

timeout: Timeout in seconds for polling.

57

58

Returns:

59

LoadJob: The completed job instance.

60

"""

61

```

62

63

### Load Job Configuration

64

65

Configure data loading behavior, format options, and schema handling.

66

67

```python { .api }

68

class LoadJobConfig:

69

def __init__(self, **kwargs): ...

70

71

@property

72

def source_format(self) -> str:

73

"""Source data format (CSV, JSON, AVRO, PARQUET, ORC)."""

74

75

@source_format.setter

76

def source_format(self, value: str): ...

77

78

@property

79

def schema(self) -> List[SchemaField]:

80

"""Target table schema."""

81

82

@schema.setter

83

def schema(self, value: List[SchemaField]): ...

84

85

@property

86

def create_disposition(self) -> str:

87

"""Action when destination table doesn't exist."""

88

89

@create_disposition.setter

90

def create_disposition(self, value: str): ...

91

92

@property

93

def write_disposition(self) -> str:

94

"""Action when destination table exists."""

95

96

@write_disposition.setter

97

def write_disposition(self, value: str): ...

98

99

@property

100

def skip_leading_rows(self) -> int:

101

"""Number of header rows to skip."""

102

103

@skip_leading_rows.setter

104

def skip_leading_rows(self, value: int): ...

105

106

@property

107

def max_bad_records(self) -> int:

108

"""Maximum number of bad records to ignore."""

109

110

@max_bad_records.setter

111

def max_bad_records(self, value: int): ...

112

113

@property

114

def ignore_unknown_values(self) -> bool:

115

"""Ignore unknown values in input data."""

116

117

@ignore_unknown_values.setter

118

def ignore_unknown_values(self, value: bool): ...

119

120

@property

121

def autodetect(self) -> bool:

122

"""Auto-detect schema from source data."""

123

124

@autodetect.setter

125

def autodetect(self, value: bool): ...

126

127

@property

128

def encoding(self) -> str:

129

"""Character encoding of source data."""

130

131

@encoding.setter

132

def encoding(self, value: str): ...

133

134

@property

135

def field_delimiter(self) -> str:

136

"""Field delimiter for CSV files."""

137

138

@field_delimiter.setter

139

def field_delimiter(self, value: str): ...

140

141

@property

142

def quote_character(self) -> str:

143

"""Quote character for CSV files."""

144

145

@quote_character.setter

146

def quote_character(self, value: str): ...

147

148

@property

149

def allow_quoted_newlines(self) -> bool:

150

"""Allow quoted newlines in CSV data."""

151

152

@allow_quoted_newlines.setter

153

def allow_quoted_newlines(self, value: bool): ...

154

155

@property

156

def allow_jagged_rows(self) -> bool:

157

"""Allow rows with missing trailing columns."""

158

159

@allow_jagged_rows.setter

160

def allow_jagged_rows(self, value: bool): ...

161

162

@property

163

def clustering_fields(self) -> List[str]:

164

"""Fields to cluster the table by."""

165

166

@clustering_fields.setter

167

def clustering_fields(self, value: List[str]): ...

168

169

@property

170

def time_partitioning(self) -> TimePartitioning:

171

"""Time partitioning configuration."""

172

173

@time_partitioning.setter

174

def time_partitioning(self, value: TimePartitioning): ...

175

176

@property

177

def range_partitioning(self) -> RangePartitioning:

178

"""Range partitioning configuration."""

179

180

@range_partitioning.setter

181

def range_partitioning(self, value: RangePartitioning): ...

182

```

183

184

### Client Load Methods

185

186

Load data from various sources using the BigQuery client.

187

188

```python { .api }

189

def load_table_from_uri(

190

self,

191

source_uris: Union[str, List[str]],

192

destination: Union[Table, TableReference, str],

193

job_config: LoadJobConfig = None,

194

job_id: str = None,

195

job_retry: google.api_core.retry.Retry = DEFAULT_RETRY,

196

timeout: float = None,

197

location: str = None,

198

project: str = None,

199

) -> LoadJob:

200

"""

201

Load data from Cloud Storage URIs.

202

203

Args:

204

source_uris: Cloud Storage URIs (gs://bucket/file).

205

destination: Destination table.

206

job_config: Configuration for the load job.

207

job_id: Unique identifier for the job.

208

job_retry: Retry configuration for job creation.

209

timeout: Timeout in seconds for job creation.

210

location: Location where job should run.

211

project: Project ID for the job.

212

213

Returns:

214

LoadJob: Job instance for the load operation.

215

"""

216

217

def load_table_from_file(

218

self,

219

file_obj: typing.BinaryIO,

220

destination: Union[Table, TableReference, str],

221

rewind: bool = False,

222

size: int = None,

223

num_retries: int = 6,

224

job_config: LoadJobConfig = None,

225

job_id: str = None,

226

location: str = None,

227

project: str = None,

228

) -> LoadJob:

229

"""

230

Load data from a file object.

231

232

Args:

233

file_obj: File-like object to load from.

234

destination: Destination table.

235

rewind: Whether to rewind file before loading.

236

size: Number of bytes to load.

237

num_retries: Number of upload retries.

238

job_config: Configuration for the load job.

239

job_id: Unique identifier for the job.

240

location: Location where job should run.

241

project: Project ID for the job.

242

243

Returns:

244

LoadJob: Job instance for the load operation.

245

"""

246

247

def load_table_from_dataframe(

248

self,

249

dataframe: pandas.DataFrame,

250

destination: Union[Table, TableReference, str],

251

num_retries: int = 6,

252

job_config: LoadJobConfig = None,

253

job_id: str = None,

254

location: str = None,

255

project: str = None,

256

parquet_compression: str = "snappy",

257

) -> LoadJob:

258

"""

259

Load data from a pandas DataFrame.

260

261

Args:

262

dataframe: DataFrame to load.

263

destination: Destination table.

264

num_retries: Number of upload retries.

265

job_config: Configuration for the load job.

266

job_id: Unique identifier for the job.

267

location: Location where job should run.

268

project: Project ID for the job.

269

parquet_compression: Parquet compression type.

270

271

Returns:

272

LoadJob: Job instance for the load operation.

273

"""

274

275

def load_table_from_json(

276

self,

277

json_rows: List[Dict[str, Any]],

278

destination: Union[Table, TableReference, str],

279

num_retries: int = 6,

280

job_config: LoadJobConfig = None,

281

ignore_unknown_values: bool = False,

282

**kwargs

283

) -> LoadJob:

284

"""

285

Load data from JSON rows.

286

287

Args:

288

json_rows: List of JSON objects to load.

289

destination: Destination table.

290

num_retries: Number of upload retries.

291

job_config: Configuration for the load job.

292

ignore_unknown_values: Ignore unknown values.

293

294

Returns:

295

LoadJob: Job instance for the load operation.

296

"""

297

```

298

299

### Streaming Inserts

300

301

Insert data into BigQuery tables in real-time with streaming inserts.

302

303

```python { .api }

304

def insert_rows_json(

305

self,

306

table: Union[Table, TableReference, str],

307

json_rows: List[Dict[str, Any]],

308

row_ids: List[str] = None,

309

skip_invalid_rows: bool = False,

310

ignore_unknown_values: bool = False,

311

template_suffix: str = None,

312

retry: google.api_core.retry.Retry = DEFAULT_RETRY,

313

timeout: float = None,

314

) -> List[Dict[str, Any]]:

315

"""

316

Insert JSON rows via streaming API.

317

318

Args:

319

table: Target table for inserts.

320

json_rows: List of JSON objects to insert.

321

row_ids: Unique IDs for deduplication.

322

skip_invalid_rows: Skip rows that don't match schema.

323

ignore_unknown_values: Ignore unknown fields.

324

template_suffix: Suffix for table template.

325

retry: Retry configuration.

326

timeout: Timeout in seconds.

327

328

Returns:

329

List[Dict]: List of insertion errors, empty if successful.

330

"""

331

332

def insert_rows(

333

self,

334

table: Union[Table, TableReference, str],

335

rows: Union[List[Tuple[Any, ...]], List[Dict[str, Any]]],

336

selected_fields: List[SchemaField] = None,

337

**kwargs

338

) -> List[Dict[str, Any]]:

339

"""

340

Insert rows via streaming API.

341

342

Args:

343

table: Target table for inserts.

344

rows: Rows to insert as tuples or dictionaries.

345

selected_fields: Schema fields for tuple rows.

346

347

Returns:

348

List[Dict]: List of insertion errors, empty if successful.

349

"""

350

```

351

352

## Format-Specific Options

353

354

### CSV Options

355

356

```python { .api }

357

class CSVOptions:

358

def __init__(self, **kwargs): ...

359

360

@property

361

def allow_jagged_rows(self) -> bool:

362

"""Allow missing trailing optional columns."""

363

364

@property

365

def allow_quoted_newlines(self) -> bool:

366

"""Allow quoted newlines in data."""

367

368

@property

369

def encoding(self) -> str:

370

"""Character encoding (UTF-8, ISO-8859-1)."""

371

372

@property

373

def field_delimiter(self) -> str:

374

"""Field separator character."""

375

376

@property

377

def quote_character(self) -> str:

378

"""Quote character."""

379

380

@property

381

def skip_leading_rows(self) -> int:

382

"""Number of header rows to skip."""

383

```

384

385

### Avro and Parquet Options

386

387

```python { .api }

388

class AvroOptions:

389

def __init__(self, **kwargs): ...

390

391

@property

392

def use_avro_logical_types(self) -> bool:

393

"""Use Avro logical types for conversion."""

394

395

class ParquetOptions:

396

def __init__(self, **kwargs): ...

397

398

@property

399

def enum_as_string(self) -> bool:

400

"""Convert Parquet enums to strings."""

401

402

@property

403

def enable_list_inference(self) -> bool:

404

"""Enable list type inference."""

405

```

406

407

## Usage Examples

408

409

### Load from Cloud Storage

410

411

```python

412

from google.cloud import bigquery

413

414

client = bigquery.Client()

415

416

# Load CSV from Cloud Storage

417

job_config = bigquery.LoadJobConfig(

418

source_format=bigquery.SourceFormat.CSV,

419

skip_leading_rows=1, # Skip header row

420

autodetect=True, # Auto-detect schema

421

write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,

422

)

423

424

uri = "gs://my-bucket/data.csv"

425

table_id = f"{client.project}.my_dataset.my_table"

426

427

load_job = client.load_table_from_uri(uri, table_id, job_config=job_config)

428

load_job.result() # Wait for completion

429

430

print(f"Loaded {load_job.output_rows} rows")

431

```

432

433

### Load with Explicit Schema

434

435

```python

436

# Define schema explicitly

437

schema = [

438

bigquery.SchemaField("name", "STRING", mode="REQUIRED"),

439

bigquery.SchemaField("age", "INTEGER", mode="NULLABLE"),

440

bigquery.SchemaField("email", "STRING", mode="NULLABLE"),

441

bigquery.SchemaField("created_at", "TIMESTAMP", mode="REQUIRED"),

442

]

443

444

job_config = bigquery.LoadJobConfig(

445

schema=schema,

446

source_format=bigquery.SourceFormat.CSV,

447

skip_leading_rows=1,

448

field_delimiter=',',

449

quote_character='"',

450

max_bad_records=10, # Allow up to 10 bad records

451

)

452

453

load_job = client.load_table_from_uri(

454

"gs://my-bucket/users.csv",

455

"my_project.my_dataset.users",

456

job_config=job_config

457

)

458

load_job.result()

459

```

460

461

### Load from Local File

462

463

```python

464

# Load from local file

465

with open("data.json", "rb") as source_file:

466

job_config = bigquery.LoadJobConfig(

467

source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,

468

autodetect=True,

469

)

470

471

load_job = client.load_table_from_file(

472

source_file,

473

table_id,

474

job_config=job_config

475

)

476

477

load_job.result()

478

print(f"Loaded {load_job.output_rows} rows from local file")

479

```

480

481

### Load from pandas DataFrame

482

483

```python

484

import pandas as pd

485

486

# Create sample DataFrame

487

df = pd.DataFrame({

488

'name': ['Alice', 'Bob', 'Charlie'],

489

'age': [25, 30, 35],

490

'email': ['alice@example.com', 'bob@example.com', 'charlie@example.com'],

491

'created_at': pd.to_datetime(['2023-01-01', '2023-01-02', '2023-01-03'])

492

})

493

494

# Load DataFrame to BigQuery

495

job_config = bigquery.LoadJobConfig(

496

write_disposition=bigquery.WriteDisposition.WRITE_APPEND,

497

schema_update_options=[bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION],

498

)

499

500

load_job = client.load_table_from_dataframe(

501

df,

502

table_id,

503

job_config=job_config

504

)

505

load_job.result()

506

507

print(f"Loaded {len(df)} rows from DataFrame")

508

```

509

510

### Streaming Inserts

511

512

```python

513

# Stream individual records

514

rows_to_insert = [

515

{"name": "Alice", "age": 25, "email": "alice@example.com"},

516

{"name": "Bob", "age": 30, "email": "bob@example.com"},

517

]

518

519

# Insert with error handling

520

errors = client.insert_rows_json(table_id, rows_to_insert)

521

if errors:

522

print(f"Errors occurred: {errors}")

523

else:

524

print("Rows inserted successfully")

525

526

# Insert with deduplication IDs

527

import uuid

528

529

row_ids = [str(uuid.uuid4()) for _ in rows_to_insert]

530

errors = client.insert_rows_json(

531

table_id,

532

rows_to_insert,

533

row_ids=row_ids,

534

ignore_unknown_values=True

535

)

536

```

537

538

### Partitioned Table Loading

539

540

```python

541

# Load into partitioned table

542

from datetime import datetime, timedelta

543

544

job_config = bigquery.LoadJobConfig(

545

source_format=bigquery.SourceFormat.JSON,

546

time_partitioning=bigquery.TimePartitioning(

547

type_=bigquery.TimePartitioningType.DAY,

548

field="created_at", # Partition by this field

549

expiration_ms=7 * 24 * 60 * 60 * 1000, # 7 days retention

550

),

551

clustering_fields=["user_id", "category"], # Add clustering

552

write_disposition=bigquery.WriteDisposition.WRITE_APPEND,

553

)

554

555

load_job = client.load_table_from_uri(

556

"gs://my-bucket/events.json",

557

"my_project.my_dataset.events",

558

job_config=job_config

559

)

560

load_job.result()

561

```

562

563

### Error Handling and Monitoring

564

565

```python

566

# Load with comprehensive error handling

567

try:

568

load_job = client.load_table_from_uri(uri, table_id, job_config=job_config)

569

570

# Monitor progress

571

while load_job.state != 'DONE':

572

print(f"Job state: {load_job.state}")

573

time.sleep(1)

574

load_job.reload()

575

576

# Check for errors

577

if load_job.errors:

578

print(f"Job completed with errors: {load_job.errors}")

579

else:

580

print(f"Job completed successfully")

581

print(f" Input files: {load_job.input_files}")

582

print(f" Input bytes: {load_job.input_file_bytes:,}")

583

print(f" Output rows: {load_job.output_rows:,}")

584

print(f" Bad records: {load_job.bad_records}")

585

586

except Exception as e:

587

print(f"Load job failed: {e}")

588

```