0
# Data Loading
1
2
Loading data from various sources including local files, Cloud Storage, and streaming inserts. BigQuery supports multiple data formats and provides comprehensive transformation and validation options during the loading process.
3
4
## Capabilities
5
6
### Load Job Execution
7
8
Load data into BigQuery tables from external sources with comprehensive job monitoring and error handling.
9
10
```python { .api }
11
class LoadJob:
12
def __init__(self, job_id: str, source_uris: List[str], destination: Table, client: Client): ...
13
14
@property
15
def state(self) -> str:
16
"""Current state of the job ('PENDING', 'RUNNING', 'DONE')."""
17
18
@property
19
def source_uris(self) -> List[str]:
20
"""Source URIs being loaded."""
21
22
@property
23
def destination(self) -> TableReference:
24
"""Destination table reference."""
25
26
@property
27
def input_files(self) -> int:
28
"""Number of source files processed."""
29
30
@property
31
def input_file_bytes(self) -> int:
32
"""Total bytes of source files."""
33
34
@property
35
def output_bytes(self) -> int:
36
"""Bytes written to destination table."""
37
38
@property
39
def output_rows(self) -> int:
40
"""Rows written to destination table."""
41
42
@property
43
def bad_records(self) -> int:
44
"""Number of bad records encountered."""
45
46
def result(
47
self,
48
retry: google.api_core.retry.Retry = DEFAULT_RETRY,
49
timeout: float = None,
50
) -> LoadJob:
51
"""
52
Wait for load job completion.
53
54
Args:
55
retry: Retry configuration for polling.
56
timeout: Timeout in seconds for polling.
57
58
Returns:
59
LoadJob: The completed job instance.
60
"""
61
```
62
63
### Load Job Configuration
64
65
Configure data loading behavior, format options, and schema handling.
66
67
```python { .api }
68
class LoadJobConfig:
69
def __init__(self, **kwargs): ...
70
71
@property
72
def source_format(self) -> str:
73
"""Source data format (CSV, JSON, AVRO, PARQUET, ORC)."""
74
75
@source_format.setter
76
def source_format(self, value: str): ...
77
78
@property
79
def schema(self) -> List[SchemaField]:
80
"""Target table schema."""
81
82
@schema.setter
83
def schema(self, value: List[SchemaField]): ...
84
85
@property
86
def create_disposition(self) -> str:
87
"""Action when destination table doesn't exist."""
88
89
@create_disposition.setter
90
def create_disposition(self, value: str): ...
91
92
@property
93
def write_disposition(self) -> str:
94
"""Action when destination table exists."""
95
96
@write_disposition.setter
97
def write_disposition(self, value: str): ...
98
99
@property
100
def skip_leading_rows(self) -> int:
101
"""Number of header rows to skip."""
102
103
@skip_leading_rows.setter
104
def skip_leading_rows(self, value: int): ...
105
106
@property
107
def max_bad_records(self) -> int:
108
"""Maximum number of bad records to ignore."""
109
110
@max_bad_records.setter
111
def max_bad_records(self, value: int): ...
112
113
@property
114
def ignore_unknown_values(self) -> bool:
115
"""Ignore unknown values in input data."""
116
117
@ignore_unknown_values.setter
118
def ignore_unknown_values(self, value: bool): ...
119
120
@property
121
def autodetect(self) -> bool:
122
"""Auto-detect schema from source data."""
123
124
@autodetect.setter
125
def autodetect(self, value: bool): ...
126
127
@property
128
def encoding(self) -> str:
129
"""Character encoding of source data."""
130
131
@encoding.setter
132
def encoding(self, value: str): ...
133
134
@property
135
def field_delimiter(self) -> str:
136
"""Field delimiter for CSV files."""
137
138
@field_delimiter.setter
139
def field_delimiter(self, value: str): ...
140
141
@property
142
def quote_character(self) -> str:
143
"""Quote character for CSV files."""
144
145
@quote_character.setter
146
def quote_character(self, value: str): ...
147
148
@property
149
def allow_quoted_newlines(self) -> bool:
150
"""Allow quoted newlines in CSV data."""
151
152
@allow_quoted_newlines.setter
153
def allow_quoted_newlines(self, value: bool): ...
154
155
@property
156
def allow_jagged_rows(self) -> bool:
157
"""Allow rows with missing trailing columns."""
158
159
@allow_jagged_rows.setter
160
def allow_jagged_rows(self, value: bool): ...
161
162
@property
163
def clustering_fields(self) -> List[str]:
164
"""Fields to cluster the table by."""
165
166
@clustering_fields.setter
167
def clustering_fields(self, value: List[str]): ...
168
169
@property
170
def time_partitioning(self) -> TimePartitioning:
171
"""Time partitioning configuration."""
172
173
@time_partitioning.setter
174
def time_partitioning(self, value: TimePartitioning): ...
175
176
@property
177
def range_partitioning(self) -> RangePartitioning:
178
"""Range partitioning configuration."""
179
180
@range_partitioning.setter
181
def range_partitioning(self, value: RangePartitioning): ...
182
```
183
184
### Client Load Methods
185
186
Load data from various sources using the BigQuery client.
187
188
```python { .api }
189
def load_table_from_uri(
190
self,
191
source_uris: Union[str, List[str]],
192
destination: Union[Table, TableReference, str],
193
job_config: LoadJobConfig = None,
194
job_id: str = None,
195
job_retry: google.api_core.retry.Retry = DEFAULT_RETRY,
196
timeout: float = None,
197
location: str = None,
198
project: str = None,
199
) -> LoadJob:
200
"""
201
Load data from Cloud Storage URIs.
202
203
Args:
204
source_uris: Cloud Storage URIs (gs://bucket/file).
205
destination: Destination table.
206
job_config: Configuration for the load job.
207
job_id: Unique identifier for the job.
208
job_retry: Retry configuration for job creation.
209
timeout: Timeout in seconds for job creation.
210
location: Location where job should run.
211
project: Project ID for the job.
212
213
Returns:
214
LoadJob: Job instance for the load operation.
215
"""
216
217
def load_table_from_file(
218
self,
219
file_obj: typing.BinaryIO,
220
destination: Union[Table, TableReference, str],
221
rewind: bool = False,
222
size: int = None,
223
num_retries: int = 6,
224
job_config: LoadJobConfig = None,
225
job_id: str = None,
226
location: str = None,
227
project: str = None,
228
) -> LoadJob:
229
"""
230
Load data from a file object.
231
232
Args:
233
file_obj: File-like object to load from.
234
destination: Destination table.
235
rewind: Whether to rewind file before loading.
236
size: Number of bytes to load.
237
num_retries: Number of upload retries.
238
job_config: Configuration for the load job.
239
job_id: Unique identifier for the job.
240
location: Location where job should run.
241
project: Project ID for the job.
242
243
Returns:
244
LoadJob: Job instance for the load operation.
245
"""
246
247
def load_table_from_dataframe(
248
self,
249
dataframe: pandas.DataFrame,
250
destination: Union[Table, TableReference, str],
251
num_retries: int = 6,
252
job_config: LoadJobConfig = None,
253
job_id: str = None,
254
location: str = None,
255
project: str = None,
256
parquet_compression: str = "snappy",
257
) -> LoadJob:
258
"""
259
Load data from a pandas DataFrame.
260
261
Args:
262
dataframe: DataFrame to load.
263
destination: Destination table.
264
num_retries: Number of upload retries.
265
job_config: Configuration for the load job.
266
job_id: Unique identifier for the job.
267
location: Location where job should run.
268
project: Project ID for the job.
269
parquet_compression: Parquet compression type.
270
271
Returns:
272
LoadJob: Job instance for the load operation.
273
"""
274
275
def load_table_from_json(
276
self,
277
json_rows: List[Dict[str, Any]],
278
destination: Union[Table, TableReference, str],
279
num_retries: int = 6,
280
job_config: LoadJobConfig = None,
281
ignore_unknown_values: bool = False,
282
**kwargs
283
) -> LoadJob:
284
"""
285
Load data from JSON rows.
286
287
Args:
288
json_rows: List of JSON objects to load.
289
destination: Destination table.
290
num_retries: Number of upload retries.
291
job_config: Configuration for the load job.
292
ignore_unknown_values: Ignore unknown values.
293
294
Returns:
295
LoadJob: Job instance for the load operation.
296
"""
297
```
298
299
### Streaming Inserts
300
301
Insert data into BigQuery tables in real-time with streaming inserts.
302
303
```python { .api }
304
def insert_rows_json(
305
self,
306
table: Union[Table, TableReference, str],
307
json_rows: List[Dict[str, Any]],
308
row_ids: List[str] = None,
309
skip_invalid_rows: bool = False,
310
ignore_unknown_values: bool = False,
311
template_suffix: str = None,
312
retry: google.api_core.retry.Retry = DEFAULT_RETRY,
313
timeout: float = None,
314
) -> List[Dict[str, Any]]:
315
"""
316
Insert JSON rows via streaming API.
317
318
Args:
319
table: Target table for inserts.
320
json_rows: List of JSON objects to insert.
321
row_ids: Unique IDs for deduplication.
322
skip_invalid_rows: Skip rows that don't match schema.
323
ignore_unknown_values: Ignore unknown fields.
324
template_suffix: Suffix for table template.
325
retry: Retry configuration.
326
timeout: Timeout in seconds.
327
328
Returns:
329
List[Dict]: List of insertion errors, empty if successful.
330
"""
331
332
def insert_rows(
333
self,
334
table: Union[Table, TableReference, str],
335
rows: Union[List[Tuple[Any, ...]], List[Dict[str, Any]]],
336
selected_fields: List[SchemaField] = None,
337
**kwargs
338
) -> List[Dict[str, Any]]:
339
"""
340
Insert rows via streaming API.
341
342
Args:
343
table: Target table for inserts.
344
rows: Rows to insert as tuples or dictionaries.
345
selected_fields: Schema fields for tuple rows.
346
347
Returns:
348
List[Dict]: List of insertion errors, empty if successful.
349
"""
350
```
351
352
## Format-Specific Options
353
354
### CSV Options
355
356
```python { .api }
357
class CSVOptions:
358
def __init__(self, **kwargs): ...
359
360
@property
361
def allow_jagged_rows(self) -> bool:
362
"""Allow missing trailing optional columns."""
363
364
@property
365
def allow_quoted_newlines(self) -> bool:
366
"""Allow quoted newlines in data."""
367
368
@property
369
def encoding(self) -> str:
370
"""Character encoding (UTF-8, ISO-8859-1)."""
371
372
@property
373
def field_delimiter(self) -> str:
374
"""Field separator character."""
375
376
@property
377
def quote_character(self) -> str:
378
"""Quote character."""
379
380
@property
381
def skip_leading_rows(self) -> int:
382
"""Number of header rows to skip."""
383
```
384
385
### Avro and Parquet Options
386
387
```python { .api }
388
class AvroOptions:
389
def __init__(self, **kwargs): ...
390
391
@property
392
def use_avro_logical_types(self) -> bool:
393
"""Use Avro logical types for conversion."""
394
395
class ParquetOptions:
396
def __init__(self, **kwargs): ...
397
398
@property
399
def enum_as_string(self) -> bool:
400
"""Convert Parquet enums to strings."""
401
402
@property
403
def enable_list_inference(self) -> bool:
404
"""Enable list type inference."""
405
```
406
407
## Usage Examples
408
409
### Load from Cloud Storage
410
411
```python
412
from google.cloud import bigquery
413
414
client = bigquery.Client()
415
416
# Load CSV from Cloud Storage
417
job_config = bigquery.LoadJobConfig(
418
source_format=bigquery.SourceFormat.CSV,
419
skip_leading_rows=1, # Skip header row
420
autodetect=True, # Auto-detect schema
421
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
422
)
423
424
uri = "gs://my-bucket/data.csv"
425
table_id = f"{client.project}.my_dataset.my_table"
426
427
load_job = client.load_table_from_uri(uri, table_id, job_config=job_config)
428
load_job.result() # Wait for completion
429
430
print(f"Loaded {load_job.output_rows} rows")
431
```
432
433
### Load with Explicit Schema
434
435
```python
436
# Define schema explicitly
437
schema = [
438
bigquery.SchemaField("name", "STRING", mode="REQUIRED"),
439
bigquery.SchemaField("age", "INTEGER", mode="NULLABLE"),
440
bigquery.SchemaField("email", "STRING", mode="NULLABLE"),
441
bigquery.SchemaField("created_at", "TIMESTAMP", mode="REQUIRED"),
442
]
443
444
job_config = bigquery.LoadJobConfig(
445
schema=schema,
446
source_format=bigquery.SourceFormat.CSV,
447
skip_leading_rows=1,
448
field_delimiter=',',
449
quote_character='"',
450
max_bad_records=10, # Allow up to 10 bad records
451
)
452
453
load_job = client.load_table_from_uri(
454
"gs://my-bucket/users.csv",
455
"my_project.my_dataset.users",
456
job_config=job_config
457
)
458
load_job.result()
459
```
460
461
### Load from Local File
462
463
```python
464
# Load from local file
465
with open("data.json", "rb") as source_file:
466
job_config = bigquery.LoadJobConfig(
467
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
468
autodetect=True,
469
)
470
471
load_job = client.load_table_from_file(
472
source_file,
473
table_id,
474
job_config=job_config
475
)
476
477
load_job.result()
478
print(f"Loaded {load_job.output_rows} rows from local file")
479
```
480
481
### Load from pandas DataFrame
482
483
```python
484
import pandas as pd
485
486
# Create sample DataFrame
487
df = pd.DataFrame({
488
'name': ['Alice', 'Bob', 'Charlie'],
489
'age': [25, 30, 35],
490
'email': ['alice@example.com', 'bob@example.com', 'charlie@example.com'],
491
'created_at': pd.to_datetime(['2023-01-01', '2023-01-02', '2023-01-03'])
492
})
493
494
# Load DataFrame to BigQuery
495
job_config = bigquery.LoadJobConfig(
496
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
497
schema_update_options=[bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION],
498
)
499
500
load_job = client.load_table_from_dataframe(
501
df,
502
table_id,
503
job_config=job_config
504
)
505
load_job.result()
506
507
print(f"Loaded {len(df)} rows from DataFrame")
508
```
509
510
### Streaming Inserts
511
512
```python
513
# Stream individual records
514
rows_to_insert = [
515
{"name": "Alice", "age": 25, "email": "alice@example.com"},
516
{"name": "Bob", "age": 30, "email": "bob@example.com"},
517
]
518
519
# Insert with error handling
520
errors = client.insert_rows_json(table_id, rows_to_insert)
521
if errors:
522
print(f"Errors occurred: {errors}")
523
else:
524
print("Rows inserted successfully")
525
526
# Insert with deduplication IDs
527
import uuid
528
529
row_ids = [str(uuid.uuid4()) for _ in rows_to_insert]
530
errors = client.insert_rows_json(
531
table_id,
532
rows_to_insert,
533
row_ids=row_ids,
534
ignore_unknown_values=True
535
)
536
```
537
538
### Partitioned Table Loading
539
540
```python
541
# Load into partitioned table
542
from datetime import datetime, timedelta
543
544
job_config = bigquery.LoadJobConfig(
545
source_format=bigquery.SourceFormat.JSON,
546
time_partitioning=bigquery.TimePartitioning(
547
type_=bigquery.TimePartitioningType.DAY,
548
field="created_at", # Partition by this field
549
expiration_ms=7 * 24 * 60 * 60 * 1000, # 7 days retention
550
),
551
clustering_fields=["user_id", "category"], # Add clustering
552
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
553
)
554
555
load_job = client.load_table_from_uri(
556
"gs://my-bucket/events.json",
557
"my_project.my_dataset.events",
558
job_config=job_config
559
)
560
load_job.result()
561
```
562
563
### Error Handling and Monitoring
564
565
```python
566
# Load with comprehensive error handling
567
try:
568
load_job = client.load_table_from_uri(uri, table_id, job_config=job_config)
569
570
# Monitor progress
571
while load_job.state != 'DONE':
572
print(f"Job state: {load_job.state}")
573
time.sleep(1)
574
load_job.reload()
575
576
# Check for errors
577
if load_job.errors:
578
print(f"Job completed with errors: {load_job.errors}")
579
else:
580
print(f"Job completed successfully")
581
print(f" Input files: {load_job.input_files}")
582
print(f" Input bytes: {load_job.input_file_bytes:,}")
583
print(f" Output rows: {load_job.output_rows:,}")
584
print(f" Bad records: {load_job.bad_records}")
585
586
except Exception as e:
587
print(f"Load job failed: {e}")
588
```