Google BigQuery API client library for Python providing comprehensive data warehouse and analytics capabilities
—
Loading data from various sources including local files, Cloud Storage, and streaming inserts. BigQuery supports multiple data formats and provides comprehensive transformation and validation options during the loading process.
Load data into BigQuery tables from external sources with comprehensive job monitoring and error handling.
class LoadJob:
def __init__(self, job_id: str, source_uris: List[str], destination: Table, client: Client): ...
@property
def state(self) -> str:
"""Current state of the job ('PENDING', 'RUNNING', 'DONE')."""
@property
def source_uris(self) -> List[str]:
"""Source URIs being loaded."""
@property
def destination(self) -> TableReference:
"""Destination table reference."""
@property
def input_files(self) -> int:
"""Number of source files processed."""
@property
def input_file_bytes(self) -> int:
"""Total bytes of source files."""
@property
def output_bytes(self) -> int:
"""Bytes written to destination table."""
@property
def output_rows(self) -> int:
"""Rows written to destination table."""
@property
def bad_records(self) -> int:
"""Number of bad records encountered."""
def result(
self,
retry: google.api_core.retry.Retry = DEFAULT_RETRY,
timeout: float = None,
) -> LoadJob:
"""
Wait for load job completion.
Args:
retry: Retry configuration for polling.
timeout: Timeout in seconds for polling.
Returns:
LoadJob: The completed job instance.
"""Configure data loading behavior, format options, and schema handling.
class LoadJobConfig:
def __init__(self, **kwargs): ...
@property
def source_format(self) -> str:
"""Source data format (CSV, JSON, AVRO, PARQUET, ORC)."""
@source_format.setter
def source_format(self, value: str): ...
@property
def schema(self) -> List[SchemaField]:
"""Target table schema."""
@schema.setter
def schema(self, value: List[SchemaField]): ...
@property
def create_disposition(self) -> str:
"""Action when destination table doesn't exist."""
@create_disposition.setter
def create_disposition(self, value: str): ...
@property
def write_disposition(self) -> str:
"""Action when destination table exists."""
@write_disposition.setter
def write_disposition(self, value: str): ...
@property
def skip_leading_rows(self) -> int:
"""Number of header rows to skip."""
@skip_leading_rows.setter
def skip_leading_rows(self, value: int): ...
@property
def max_bad_records(self) -> int:
"""Maximum number of bad records to ignore."""
@max_bad_records.setter
def max_bad_records(self, value: int): ...
@property
def ignore_unknown_values(self) -> bool:
"""Ignore unknown values in input data."""
@ignore_unknown_values.setter
def ignore_unknown_values(self, value: bool): ...
@property
def autodetect(self) -> bool:
"""Auto-detect schema from source data."""
@autodetect.setter
def autodetect(self, value: bool): ...
@property
def encoding(self) -> str:
"""Character encoding of source data."""
@encoding.setter
def encoding(self, value: str): ...
@property
def field_delimiter(self) -> str:
"""Field delimiter for CSV files."""
@field_delimiter.setter
def field_delimiter(self, value: str): ...
@property
def quote_character(self) -> str:
"""Quote character for CSV files."""
@quote_character.setter
def quote_character(self, value: str): ...
@property
def allow_quoted_newlines(self) -> bool:
"""Allow quoted newlines in CSV data."""
@allow_quoted_newlines.setter
def allow_quoted_newlines(self, value: bool): ...
@property
def allow_jagged_rows(self) -> bool:
"""Allow rows with missing trailing columns."""
@allow_jagged_rows.setter
def allow_jagged_rows(self, value: bool): ...
@property
def clustering_fields(self) -> List[str]:
"""Fields to cluster the table by."""
@clustering_fields.setter
def clustering_fields(self, value: List[str]): ...
@property
def time_partitioning(self) -> TimePartitioning:
"""Time partitioning configuration."""
@time_partitioning.setter
def time_partitioning(self, value: TimePartitioning): ...
@property
def range_partitioning(self) -> RangePartitioning:
"""Range partitioning configuration."""
@range_partitioning.setter
def range_partitioning(self, value: RangePartitioning): ...Load data from various sources using the BigQuery client.
def load_table_from_uri(
self,
source_uris: Union[str, List[str]],
destination: Union[Table, TableReference, str],
job_config: LoadJobConfig = None,
job_id: str = None,
job_retry: google.api_core.retry.Retry = DEFAULT_RETRY,
timeout: float = None,
location: str = None,
project: str = None,
) -> LoadJob:
"""
Load data from Cloud Storage URIs.
Args:
source_uris: Cloud Storage URIs (gs://bucket/file).
destination: Destination table.
job_config: Configuration for the load job.
job_id: Unique identifier for the job.
job_retry: Retry configuration for job creation.
timeout: Timeout in seconds for job creation.
location: Location where job should run.
project: Project ID for the job.
Returns:
LoadJob: Job instance for the load operation.
"""
def load_table_from_file(
self,
file_obj: typing.BinaryIO,
destination: Union[Table, TableReference, str],
rewind: bool = False,
size: int = None,
num_retries: int = 6,
job_config: LoadJobConfig = None,
job_id: str = None,
location: str = None,
project: str = None,
) -> LoadJob:
"""
Load data from a file object.
Args:
file_obj: File-like object to load from.
destination: Destination table.
rewind: Whether to rewind file before loading.
size: Number of bytes to load.
num_retries: Number of upload retries.
job_config: Configuration for the load job.
job_id: Unique identifier for the job.
location: Location where job should run.
project: Project ID for the job.
Returns:
LoadJob: Job instance for the load operation.
"""
def load_table_from_dataframe(
self,
dataframe: pandas.DataFrame,
destination: Union[Table, TableReference, str],
num_retries: int = 6,
job_config: LoadJobConfig = None,
job_id: str = None,
location: str = None,
project: str = None,
parquet_compression: str = "snappy",
) -> LoadJob:
"""
Load data from a pandas DataFrame.
Args:
dataframe: DataFrame to load.
destination: Destination table.
num_retries: Number of upload retries.
job_config: Configuration for the load job.
job_id: Unique identifier for the job.
location: Location where job should run.
project: Project ID for the job.
parquet_compression: Parquet compression type.
Returns:
LoadJob: Job instance for the load operation.
"""
def load_table_from_json(
self,
json_rows: List[Dict[str, Any]],
destination: Union[Table, TableReference, str],
num_retries: int = 6,
job_config: LoadJobConfig = None,
ignore_unknown_values: bool = False,
**kwargs
) -> LoadJob:
"""
Load data from JSON rows.
Args:
json_rows: List of JSON objects to load.
destination: Destination table.
num_retries: Number of upload retries.
job_config: Configuration for the load job.
ignore_unknown_values: Ignore unknown values.
Returns:
LoadJob: Job instance for the load operation.
"""Insert data into BigQuery tables in real-time with streaming inserts.
def insert_rows_json(
self,
table: Union[Table, TableReference, str],
json_rows: List[Dict[str, Any]],
row_ids: List[str] = None,
skip_invalid_rows: bool = False,
ignore_unknown_values: bool = False,
template_suffix: str = None,
retry: google.api_core.retry.Retry = DEFAULT_RETRY,
timeout: float = None,
) -> List[Dict[str, Any]]:
"""
Insert JSON rows via streaming API.
Args:
table: Target table for inserts.
json_rows: List of JSON objects to insert.
row_ids: Unique IDs for deduplication.
skip_invalid_rows: Skip rows that don't match schema.
ignore_unknown_values: Ignore unknown fields.
template_suffix: Suffix for table template.
retry: Retry configuration.
timeout: Timeout in seconds.
Returns:
List[Dict]: List of insertion errors, empty if successful.
"""
def insert_rows(
self,
table: Union[Table, TableReference, str],
rows: Union[List[Tuple[Any, ...]], List[Dict[str, Any]]],
selected_fields: List[SchemaField] = None,
**kwargs
) -> List[Dict[str, Any]]:
"""
Insert rows via streaming API.
Args:
table: Target table for inserts.
rows: Rows to insert as tuples or dictionaries.
selected_fields: Schema fields for tuple rows.
Returns:
List[Dict]: List of insertion errors, empty if successful.
"""class CSVOptions:
def __init__(self, **kwargs): ...
@property
def allow_jagged_rows(self) -> bool:
"""Allow missing trailing optional columns."""
@property
def allow_quoted_newlines(self) -> bool:
"""Allow quoted newlines in data."""
@property
def encoding(self) -> str:
"""Character encoding (UTF-8, ISO-8859-1)."""
@property
def field_delimiter(self) -> str:
"""Field separator character."""
@property
def quote_character(self) -> str:
"""Quote character."""
@property
def skip_leading_rows(self) -> int:
"""Number of header rows to skip."""class AvroOptions:
def __init__(self, **kwargs): ...
@property
def use_avro_logical_types(self) -> bool:
"""Use Avro logical types for conversion."""
class ParquetOptions:
def __init__(self, **kwargs): ...
@property
def enum_as_string(self) -> bool:
"""Convert Parquet enums to strings."""
@property
def enable_list_inference(self) -> bool:
"""Enable list type inference."""from google.cloud import bigquery
client = bigquery.Client()
# Load CSV from Cloud Storage
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.CSV,
skip_leading_rows=1, # Skip header row
autodetect=True, # Auto-detect schema
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
)
uri = "gs://my-bucket/data.csv"
table_id = f"{client.project}.my_dataset.my_table"
load_job = client.load_table_from_uri(uri, table_id, job_config=job_config)
load_job.result() # Wait for completion
print(f"Loaded {load_job.output_rows} rows")# Define schema explicitly
schema = [
bigquery.SchemaField("name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("age", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("email", "STRING", mode="NULLABLE"),
bigquery.SchemaField("created_at", "TIMESTAMP", mode="REQUIRED"),
]
job_config = bigquery.LoadJobConfig(
schema=schema,
source_format=bigquery.SourceFormat.CSV,
skip_leading_rows=1,
field_delimiter=',',
quote_character='"',
max_bad_records=10, # Allow up to 10 bad records
)
load_job = client.load_table_from_uri(
"gs://my-bucket/users.csv",
"my_project.my_dataset.users",
job_config=job_config
)
load_job.result()# Load from local file
with open("data.json", "rb") as source_file:
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
autodetect=True,
)
load_job = client.load_table_from_file(
source_file,
table_id,
job_config=job_config
)
load_job.result()
print(f"Loaded {load_job.output_rows} rows from local file")import pandas as pd
# Create sample DataFrame
df = pd.DataFrame({
'name': ['Alice', 'Bob', 'Charlie'],
'age': [25, 30, 35],
'email': ['alice@example.com', 'bob@example.com', 'charlie@example.com'],
'created_at': pd.to_datetime(['2023-01-01', '2023-01-02', '2023-01-03'])
})
# Load DataFrame to BigQuery
job_config = bigquery.LoadJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
schema_update_options=[bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION],
)
load_job = client.load_table_from_dataframe(
df,
table_id,
job_config=job_config
)
load_job.result()
print(f"Loaded {len(df)} rows from DataFrame")# Stream individual records
rows_to_insert = [
{"name": "Alice", "age": 25, "email": "alice@example.com"},
{"name": "Bob", "age": 30, "email": "bob@example.com"},
]
# Insert with error handling
errors = client.insert_rows_json(table_id, rows_to_insert)
if errors:
print(f"Errors occurred: {errors}")
else:
print("Rows inserted successfully")
# Insert with deduplication IDs
import uuid
row_ids = [str(uuid.uuid4()) for _ in rows_to_insert]
errors = client.insert_rows_json(
table_id,
rows_to_insert,
row_ids=row_ids,
ignore_unknown_values=True
)# Load into partitioned table
from datetime import datetime, timedelta
job_config = bigquery.LoadJobConfig(
source_format=bigquery.SourceFormat.JSON,
time_partitioning=bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="created_at", # Partition by this field
expiration_ms=7 * 24 * 60 * 60 * 1000, # 7 days retention
),
clustering_fields=["user_id", "category"], # Add clustering
write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
)
load_job = client.load_table_from_uri(
"gs://my-bucket/events.json",
"my_project.my_dataset.events",
job_config=job_config
)
load_job.result()# Load with comprehensive error handling
try:
load_job = client.load_table_from_uri(uri, table_id, job_config=job_config)
# Monitor progress
while load_job.state != 'DONE':
print(f"Job state: {load_job.state}")
time.sleep(1)
load_job.reload()
# Check for errors
if load_job.errors:
print(f"Job completed with errors: {load_job.errors}")
else:
print(f"Job completed successfully")
print(f" Input files: {load_job.input_files}")
print(f" Input bytes: {load_job.input_file_bytes:,}")
print(f" Output rows: {load_job.output_rows:,}")
print(f" Bad records: {load_job.bad_records}")
except Exception as e:
print(f"Load job failed: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-bigquery