CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-bigquery

Google BigQuery API client library for Python providing comprehensive data warehouse and analytics capabilities

Pending
Overview
Eval results
Files

data-loading.mddocs/

Data Loading

Loading data from various sources including local files, Cloud Storage, and streaming inserts. BigQuery supports multiple data formats and provides comprehensive transformation and validation options during the loading process.

Capabilities

Load Job Execution

Load data into BigQuery tables from external sources with comprehensive job monitoring and error handling.

class LoadJob:
    def __init__(self, job_id: str, source_uris: List[str], destination: Table, client: Client): ...

    @property
    def state(self) -> str:
        """Current state of the job ('PENDING', 'RUNNING', 'DONE')."""

    @property
    def source_uris(self) -> List[str]:
        """Source URIs being loaded."""

    @property
    def destination(self) -> TableReference:
        """Destination table reference."""

    @property
    def input_files(self) -> int:
        """Number of source files processed."""

    @property
    def input_file_bytes(self) -> int:
        """Total bytes of source files."""

    @property
    def output_bytes(self) -> int:
        """Bytes written to destination table."""

    @property
    def output_rows(self) -> int:
        """Rows written to destination table."""

    @property
    def bad_records(self) -> int:
        """Number of bad records encountered."""

    def result(
        self,
        retry: google.api_core.retry.Retry = DEFAULT_RETRY,
        timeout: float = None,
    ) -> LoadJob:
        """
        Wait for load job completion.

        Args:
            retry: Retry configuration for polling.
            timeout: Timeout in seconds for polling.

        Returns:
            LoadJob: The completed job instance.
        """

Load Job Configuration

Configure data loading behavior, format options, and schema handling.

class LoadJobConfig:
    def __init__(self, **kwargs): ...

    @property
    def source_format(self) -> str:
        """Source data format (CSV, JSON, AVRO, PARQUET, ORC)."""

    @source_format.setter
    def source_format(self, value: str): ...

    @property
    def schema(self) -> List[SchemaField]:
        """Target table schema."""

    @schema.setter
    def schema(self, value: List[SchemaField]): ...

    @property
    def create_disposition(self) -> str:
        """Action when destination table doesn't exist."""

    @create_disposition.setter
    def create_disposition(self, value: str): ...

    @property
    def write_disposition(self) -> str:
        """Action when destination table exists."""

    @write_disposition.setter
    def write_disposition(self, value: str): ...

    @property
    def skip_leading_rows(self) -> int:
        """Number of header rows to skip."""

    @skip_leading_rows.setter
    def skip_leading_rows(self, value: int): ...

    @property
    def max_bad_records(self) -> int:
        """Maximum number of bad records to ignore."""

    @max_bad_records.setter
    def max_bad_records(self, value: int): ...

    @property
    def ignore_unknown_values(self) -> bool:
        """Ignore unknown values in input data."""

    @ignore_unknown_values.setter
    def ignore_unknown_values(self, value: bool): ...

    @property
    def autodetect(self) -> bool:
        """Auto-detect schema from source data."""

    @autodetect.setter
    def autodetect(self, value: bool): ...

    @property
    def encoding(self) -> str:
        """Character encoding of source data."""

    @encoding.setter
    def encoding(self, value: str): ...

    @property
    def field_delimiter(self) -> str:
        """Field delimiter for CSV files."""

    @field_delimiter.setter
    def field_delimiter(self, value: str): ...

    @property
    def quote_character(self) -> str:
        """Quote character for CSV files."""

    @quote_character.setter
    def quote_character(self, value: str): ...

    @property
    def allow_quoted_newlines(self) -> bool:
        """Allow quoted newlines in CSV data."""

    @allow_quoted_newlines.setter
    def allow_quoted_newlines(self, value: bool): ...

    @property
    def allow_jagged_rows(self) -> bool:
        """Allow rows with missing trailing columns."""

    @allow_jagged_rows.setter
    def allow_jagged_rows(self, value: bool): ...

    @property
    def clustering_fields(self) -> List[str]:
        """Fields to cluster the table by."""

    @clustering_fields.setter
    def clustering_fields(self, value: List[str]): ...

    @property
    def time_partitioning(self) -> TimePartitioning:
        """Time partitioning configuration."""

    @time_partitioning.setter
    def time_partitioning(self, value: TimePartitioning): ...

    @property
    def range_partitioning(self) -> RangePartitioning:
        """Range partitioning configuration."""

    @range_partitioning.setter
    def range_partitioning(self, value: RangePartitioning): ...

Client Load Methods

Load data from various sources using the BigQuery client.

def load_table_from_uri(
    self,
    source_uris: Union[str, List[str]],
    destination: Union[Table, TableReference, str],
    job_config: LoadJobConfig = None,
    job_id: str = None,
    job_retry: google.api_core.retry.Retry = DEFAULT_RETRY,
    timeout: float = None,
    location: str = None,
    project: str = None,
) -> LoadJob:
    """
    Load data from Cloud Storage URIs.

    Args:
        source_uris: Cloud Storage URIs (gs://bucket/file).
        destination: Destination table.
        job_config: Configuration for the load job.
        job_id: Unique identifier for the job.
        job_retry: Retry configuration for job creation.
        timeout: Timeout in seconds for job creation.
        location: Location where job should run.
        project: Project ID for the job.

    Returns:
        LoadJob: Job instance for the load operation.
    """

def load_table_from_file(
    self,
    file_obj: typing.BinaryIO,
    destination: Union[Table, TableReference, str],
    rewind: bool = False,
    size: int = None,
    num_retries: int = 6,
    job_config: LoadJobConfig = None,
    job_id: str = None,
    location: str = None,
    project: str = None,
) -> LoadJob:
    """
    Load data from a file object.

    Args:
        file_obj: File-like object to load from.
        destination: Destination table.
        rewind: Whether to rewind file before loading.
        size: Number of bytes to load.
        num_retries: Number of upload retries.
        job_config: Configuration for the load job.
        job_id: Unique identifier for the job.
        location: Location where job should run.
        project: Project ID for the job.

    Returns:
        LoadJob: Job instance for the load operation.
    """

def load_table_from_dataframe(
    self,
    dataframe: pandas.DataFrame,
    destination: Union[Table, TableReference, str],
    num_retries: int = 6,
    job_config: LoadJobConfig = None,
    job_id: str = None,
    location: str = None,
    project: str = None,
    parquet_compression: str = "snappy",
) -> LoadJob:
    """
    Load data from a pandas DataFrame.

    Args:
        dataframe: DataFrame to load.
        destination: Destination table.
        num_retries: Number of upload retries.
        job_config: Configuration for the load job.
        job_id: Unique identifier for the job.
        location: Location where job should run.
        project: Project ID for the job.
        parquet_compression: Parquet compression type.

    Returns:
        LoadJob: Job instance for the load operation.
    """

def load_table_from_json(
    self,
    json_rows: List[Dict[str, Any]],
    destination: Union[Table, TableReference, str],
    num_retries: int = 6,
    job_config: LoadJobConfig = None,
    ignore_unknown_values: bool = False,
    **kwargs
) -> LoadJob:
    """
    Load data from JSON rows.

    Args:
        json_rows: List of JSON objects to load.
        destination: Destination table.
        num_retries: Number of upload retries.
        job_config: Configuration for the load job.
        ignore_unknown_values: Ignore unknown values.

    Returns:
        LoadJob: Job instance for the load operation.
    """

Streaming Inserts

Insert data into BigQuery tables in real-time with streaming inserts.

def insert_rows_json(
    self,
    table: Union[Table, TableReference, str],
    json_rows: List[Dict[str, Any]],
    row_ids: List[str] = None,
    skip_invalid_rows: bool = False,
    ignore_unknown_values: bool = False,
    template_suffix: str = None,
    retry: google.api_core.retry.Retry = DEFAULT_RETRY,
    timeout: float = None,
) -> List[Dict[str, Any]]:
    """
    Insert JSON rows via streaming API.

    Args:
        table: Target table for inserts.
        json_rows: List of JSON objects to insert.
        row_ids: Unique IDs for deduplication.
        skip_invalid_rows: Skip rows that don't match schema.
        ignore_unknown_values: Ignore unknown fields.
        template_suffix: Suffix for table template.
        retry: Retry configuration.
        timeout: Timeout in seconds.

    Returns:
        List[Dict]: List of insertion errors, empty if successful.
    """

def insert_rows(
    self,
    table: Union[Table, TableReference, str],
    rows: Union[List[Tuple[Any, ...]], List[Dict[str, Any]]],
    selected_fields: List[SchemaField] = None,
    **kwargs
) -> List[Dict[str, Any]]:
    """
    Insert rows via streaming API.

    Args:
        table: Target table for inserts.
        rows: Rows to insert as tuples or dictionaries.
        selected_fields: Schema fields for tuple rows.

    Returns:
        List[Dict]: List of insertion errors, empty if successful.
    """

Format-Specific Options

CSV Options

class CSVOptions:
    def __init__(self, **kwargs): ...

    @property
    def allow_jagged_rows(self) -> bool:
        """Allow missing trailing optional columns."""

    @property
    def allow_quoted_newlines(self) -> bool:
        """Allow quoted newlines in data."""

    @property
    def encoding(self) -> str:
        """Character encoding (UTF-8, ISO-8859-1)."""

    @property
    def field_delimiter(self) -> str:
        """Field separator character."""

    @property
    def quote_character(self) -> str:
        """Quote character."""

    @property
    def skip_leading_rows(self) -> int:
        """Number of header rows to skip."""

Avro and Parquet Options

class AvroOptions:
    def __init__(self, **kwargs): ...

    @property
    def use_avro_logical_types(self) -> bool:
        """Use Avro logical types for conversion."""

class ParquetOptions:
    def __init__(self, **kwargs): ...

    @property
    def enum_as_string(self) -> bool:
        """Convert Parquet enums to strings."""

    @property
    def enable_list_inference(self) -> bool:
        """Enable list type inference."""

Usage Examples

Load from Cloud Storage

from google.cloud import bigquery

client = bigquery.Client()

# Load CSV from Cloud Storage
job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.CSV,
    skip_leading_rows=1,  # Skip header row
    autodetect=True,      # Auto-detect schema
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
)

uri = "gs://my-bucket/data.csv"
table_id = f"{client.project}.my_dataset.my_table"

load_job = client.load_table_from_uri(uri, table_id, job_config=job_config)
load_job.result()  # Wait for completion

print(f"Loaded {load_job.output_rows} rows")

Load with Explicit Schema

# Define schema explicitly
schema = [
    bigquery.SchemaField("name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("age", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("email", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("created_at", "TIMESTAMP", mode="REQUIRED"),
]

job_config = bigquery.LoadJobConfig(
    schema=schema,
    source_format=bigquery.SourceFormat.CSV,
    skip_leading_rows=1,
    field_delimiter=',',
    quote_character='"',
    max_bad_records=10,  # Allow up to 10 bad records
)

load_job = client.load_table_from_uri(
    "gs://my-bucket/users.csv",
    "my_project.my_dataset.users",
    job_config=job_config
)
load_job.result()

Load from Local File

# Load from local file
with open("data.json", "rb") as source_file:
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
        autodetect=True,
    )
    
    load_job = client.load_table_from_file(
        source_file, 
        table_id,
        job_config=job_config
    )

load_job.result()
print(f"Loaded {load_job.output_rows} rows from local file")

Load from pandas DataFrame

import pandas as pd

# Create sample DataFrame
df = pd.DataFrame({
    'name': ['Alice', 'Bob', 'Charlie'],
    'age': [25, 30, 35],
    'email': ['alice@example.com', 'bob@example.com', 'charlie@example.com'],
    'created_at': pd.to_datetime(['2023-01-01', '2023-01-02', '2023-01-03'])
})

# Load DataFrame to BigQuery
job_config = bigquery.LoadJobConfig(
    write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
    schema_update_options=[bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION],
)

load_job = client.load_table_from_dataframe(
    df, 
    table_id, 
    job_config=job_config
)
load_job.result()

print(f"Loaded {len(df)} rows from DataFrame")

Streaming Inserts

# Stream individual records
rows_to_insert = [
    {"name": "Alice", "age": 25, "email": "alice@example.com"},
    {"name": "Bob", "age": 30, "email": "bob@example.com"},
]

# Insert with error handling
errors = client.insert_rows_json(table_id, rows_to_insert)
if errors:
    print(f"Errors occurred: {errors}")
else:
    print("Rows inserted successfully")

# Insert with deduplication IDs
import uuid

row_ids = [str(uuid.uuid4()) for _ in rows_to_insert]
errors = client.insert_rows_json(
    table_id, 
    rows_to_insert, 
    row_ids=row_ids,
    ignore_unknown_values=True
)

Partitioned Table Loading

# Load into partitioned table
from datetime import datetime, timedelta

job_config = bigquery.LoadJobConfig(
    source_format=bigquery.SourceFormat.JSON,
    time_partitioning=bigquery.TimePartitioning(
        type_=bigquery.TimePartitioningType.DAY,
        field="created_at",  # Partition by this field
        expiration_ms=7 * 24 * 60 * 60 * 1000,  # 7 days retention
    ),
    clustering_fields=["user_id", "category"],  # Add clustering
    write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
)

load_job = client.load_table_from_uri(
    "gs://my-bucket/events.json",
    "my_project.my_dataset.events",
    job_config=job_config
)
load_job.result()

Error Handling and Monitoring

# Load with comprehensive error handling
try:
    load_job = client.load_table_from_uri(uri, table_id, job_config=job_config)
    
    # Monitor progress
    while load_job.state != 'DONE':
        print(f"Job state: {load_job.state}")
        time.sleep(1)
        load_job.reload()
    
    # Check for errors
    if load_job.errors:
        print(f"Job completed with errors: {load_job.errors}")
    else:
        print(f"Job completed successfully")
        print(f"  Input files: {load_job.input_files}")
        print(f"  Input bytes: {load_job.input_file_bytes:,}")
        print(f"  Output rows: {load_job.output_rows:,}")
        print(f"  Bad records: {load_job.bad_records}")

except Exception as e:
    print(f"Load job failed: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-bigquery

docs

client-operations.md

data-loading.md

database-api.md

dataset-management.md

index.md

models-routines.md

query-operations.md

query-parameters.md

schema-definition.md

table-operations.md

tile.json