CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-bigquery

Google BigQuery API client library for Python providing comprehensive data warehouse and analytics capabilities

Pending
Overview
Eval results
Files

table-operations.mddocs/

Table Operations

Table creation, schema management, data operations, and metadata management. BigQuery tables store structured data with enforced schemas and support various configurations including partitioning, clustering, and expiration policies.

Capabilities

Table Definition

Create and configure BigQuery tables with schemas, partitioning, and metadata.

class Table:
    def __init__(
        self,
        table_ref: Union[TableReference, str],
        schema: List[SchemaField] = None
    ):
        """
        Initialize a Table.

        Args:
            table_ref: Table reference string or TableReference object.
            schema: Table schema as list of SchemaField objects.
        """

    @property
    def reference(self) -> TableReference:
        """Table reference object."""

    @property
    def table_id(self) -> str:
        """Table ID."""

    @property
    def dataset_id(self) -> str:
        """Dataset ID containing the table."""

    @property
    def project(self) -> str:
        """Project ID containing the table."""

    @property
    def schema(self) -> List[SchemaField]:
        """Table schema."""

    @schema.setter
    def schema(self, value: List[SchemaField]): ...

    @property
    def friendly_name(self) -> str:
        """Human-readable table name."""

    @friendly_name.setter
    def friendly_name(self, value: str): ...

    @property
    def description(self) -> str:
        """Table description."""

    @description.setter
    def description(self, value: str): ...

    @property
    def num_bytes(self) -> int:
        """Size of table in bytes."""

    @property
    def num_rows(self) -> int:
        """Number of rows in table."""

    @property
    def created(self) -> datetime.datetime:
        """Table creation timestamp."""

    @property
    def modified(self) -> datetime.datetime:
        """Table last modification timestamp."""

    @property
    def expires(self) -> datetime.datetime:
        """Table expiration timestamp."""

    @expires.setter
    def expires(self, value: datetime.datetime): ...

    @property
    def labels(self) -> Dict[str, str]:
        """Labels for organizing tables."""

    @labels.setter
    def labels(self, value: Dict[str, str]): ...

    @property
    def time_partitioning(self) -> TimePartitioning:
        """Time partitioning configuration."""

    @time_partitioning.setter
    def time_partitioning(self, value: TimePartitioning): ...

    @property
    def range_partitioning(self) -> RangePartitioning:
        """Range partitioning configuration."""

    @range_partitioning.setter
    def range_partitioning(self, value: RangePartitioning): ...

    @property
    def clustering_fields(self) -> List[str]:
        """Fields used for clustering."""

    @clustering_fields.setter
    def clustering_fields(self, value: List[str]): ...

    @property
    def require_partition_filter(self) -> bool:
        """Require partition filter in queries."""

    @require_partition_filter.setter
    def require_partition_filter(self, value: bool): ...

Table Reference

Reference tables by project, dataset, and table ID for API operations.

class TableReference:
    def __init__(self, dataset_ref: DatasetReference, table_id: str):
        """
        Reference to a BigQuery table.

        Args:
            dataset_ref: Dataset reference containing the table.
            table_id: Table ID.
        """

    @property
    def dataset_id(self) -> str:
        """Dataset ID."""

    @property
    def project(self) -> str:
        """Project ID."""

    @property
    def table_id(self) -> str:
        """Table ID."""

    @property
    def path(self) -> str:
        """Full table path (project:dataset.table)."""

    @classmethod
    def from_string(
        cls, 
        table_id: str, 
        default_project: str = None
    ) -> TableReference:
        """
        Create TableReference from string.

        Args:
            table_id: Table ID with optional project and dataset.
            default_project: Default project if not specified.

        Returns:
            TableReference: Table reference object.
        """

Partitioning Configuration

Configure time-based and range-based table partitioning for query performance and cost optimization.

class TimePartitioning:
    def __init__(
        self,
        type_: str = None,
        field: str = None,
        expiration_ms: int = None,
        require_partition_filter: bool = None,
    ):
        """
        Time-based partitioning configuration.

        Args:
            type_: Partitioning type (DAY, HOUR, MONTH, YEAR).
            field: Field to partition by (None for ingestion time).
            expiration_ms: Partition expiration in milliseconds.
            require_partition_filter: Require partition filter in queries.
        """

    @property
    def type_(self) -> str:
        """Partitioning type."""

    @property
    def field(self) -> str:
        """Partitioning field."""

    @property
    def expiration_ms(self) -> int:
        """Partition expiration in milliseconds."""

    @property
    def require_partition_filter(self) -> bool:
        """Require partition filter in queries."""

class RangePartitioning:
    def __init__(self, field: str = None, range_: PartitionRange = None):
        """
        Range-based partitioning configuration.

        Args:
            field: Field to partition by.
            range_: Range configuration for partitioning.
        """

    @property
    def field(self) -> str:
        """Partitioning field."""

    @property
    def range_(self) -> PartitionRange:
        """Range configuration."""

class PartitionRange:
    def __init__(self, start: int = None, end: int = None, interval: int = None):
        """
        Range definition for range partitioning.

        Args:
            start: Start of range (inclusive).
            end: End of range (exclusive).
            interval: Interval between partitions.
        """

    @property
    def start(self) -> int:
        """Range start (inclusive)."""

    @property
    def end(self) -> int:
        """Range end (exclusive)."""

    @property
    def interval(self) -> int:
        """Partition interval."""

Table Types and Views

Support for different table types including views, materialized views, and external tables.

# View configuration
@property
def view_query(self) -> str:
    """SQL query defining the view."""

@view_query.setter  
def view_query(self, value: str): ...

@property
def view_use_legacy_sql(self) -> bool:
    """Use legacy SQL for view query."""

@view_use_legacy_sql.setter
def view_use_legacy_sql(self, value: bool): ...

# Materialized view configuration
@property
def mview_query(self) -> str:
    """SQL query defining the materialized view."""

@mview_query.setter
def mview_query(self, value: str): ...

@property
def mview_enable_refresh(self) -> bool:
    """Enable automatic refresh of materialized view."""

@mview_enable_refresh.setter
def mview_enable_refresh(self, value: bool): ...

@property
def mview_refresh_interval_ms(self) -> int:
    """Refresh interval in milliseconds."""

@mview_refresh_interval_ms.setter
def mview_refresh_interval_ms(self, value: int): ...

# External table configuration
@property
def external_data_configuration(self) -> ExternalConfig:
    """External data source configuration."""

@external_data_configuration.setter
def external_data_configuration(self, value: ExternalConfig): ...

Usage Examples

Create Standard Table

from google.cloud import bigquery

client = bigquery.Client()

# Define table schema
schema = [
    bigquery.SchemaField("transaction_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("user_id", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("amount", "NUMERIC", mode="REQUIRED", precision=10, scale=2),
    bigquery.SchemaField("currency", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("transaction_date", "DATE", mode="REQUIRED"),
    bigquery.SchemaField("created_at", "TIMESTAMP", mode="REQUIRED"),
    bigquery.SchemaField("metadata", "JSON", mode="NULLABLE"),
]

# Create table
table_id = f"{client.project}.transactions.daily_transactions"
table = bigquery.Table(table_id, schema=schema)

# Configure table properties
table.friendly_name = "Daily Transactions"
table.description = "Daily transaction records with user and payment information"
table.labels = {
    "environment": "production",
    "data_type": "financial"
}

# Set expiration (90 days)
table.expires = datetime.datetime.now() + datetime.timedelta(days=90)

# Create the table
table = client.create_table(table, exists_ok=True)
print(f"Created table {table.table_id}")

Create Partitioned Table

# Create time-partitioned table
schema = [
    bigquery.SchemaField("event_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("user_id", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("event_type", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("event_timestamp", "TIMESTAMP", mode="REQUIRED"),
    bigquery.SchemaField("properties", "JSON", mode="NULLABLE"),
]

table = bigquery.Table(f"{client.project}.analytics.events", schema=schema)

# Configure time partitioning by event_timestamp field
table.time_partitioning = bigquery.TimePartitioning(
    type_=bigquery.TimePartitioningType.DAY,
    field="event_timestamp",
    expiration_ms=30 * 24 * 60 * 60 * 1000,  # 30 days
    require_partition_filter=True
)

# Add clustering for better query performance
table.clustering_fields = ["event_type", "user_id"]

table = client.create_table(table, exists_ok=True)
print(f"Created partitioned table {table.table_id}")

Create Range-Partitioned Table

# Create range-partitioned table
schema = [
    bigquery.SchemaField("customer_id", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("region_code", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("customer_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("signup_date", "DATE", mode="REQUIRED"),
]

table = bigquery.Table(f"{client.project}.customers.customers_by_region", schema=schema)

# Configure range partitioning by region_code
table.range_partitioning = bigquery.RangePartitioning(
    field="region_code",
    range_=bigquery.PartitionRange(start=0, end=1000, interval=100)
)

table = client.create_table(table, exists_ok=True)
print(f"Created range-partitioned table {table.table_id}")

Create View

# Create a view
view_query = """
SELECT 
    t.transaction_id,
    t.user_id,
    t.amount,
    t.currency,
    t.transaction_date,
    u.user_name,
    u.email
FROM `{}.transactions.daily_transactions` t
JOIN `{}.users.user_profiles` u ON t.user_id = u.user_id
WHERE t.transaction_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
""".format(client.project, client.project)

view = bigquery.Table(f"{client.project}.reports.recent_transactions_with_users")
view.view_query = view_query
view.description = "Recent transactions joined with user information"

view = client.create_table(view, exists_ok=True)
print(f"Created view {view.table_id}")

Create Materialized View

# Create materialized view for aggregated data
mview_query = """
SELECT 
    DATE(created_at) as transaction_date,
    currency,
    COUNT(*) as transaction_count,
    SUM(amount) as total_amount,
    AVG(amount) as avg_amount
FROM `{}.transactions.daily_transactions`
GROUP BY DATE(created_at), currency
""".format(client.project)

mview = bigquery.Table(f"{client.project}.reports.daily_transaction_summary")
mview.mview_query = mview_query
mview.mview_enable_refresh = True
mview.mview_refresh_interval_ms = 60 * 60 * 1000  # 1 hour

mview = client.create_table(mview, exists_ok=True)
print(f"Created materialized view {mview.table_id}")

Table Schema Evolution

# Get existing table
table = client.get_table(f"{client.project}.transactions.daily_transactions")
current_schema = table.schema

# Add new fields to schema
new_schema = list(current_schema)
new_schema.extend([
    bigquery.SchemaField("payment_method", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("merchant_id", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("risk_score", "FLOAT", mode="NULLABLE"),
])

# Update table schema
table.schema = new_schema
table = client.update_table(table, ["schema"])
print(f"Updated schema for {table.table_id}")

# Verify schema changes
updated_table = client.get_table(table.reference)
print("New schema fields:")
for field in updated_table.schema:
    if field.name not in [f.name for f in current_schema]:
        print(f"  {field.name}: {field.field_type} ({field.mode})")

Table Data Operations

# Get table metadata and statistics
table = client.get_table(f"{client.project}.transactions.daily_transactions")

print(f"Table: {table.table_id}")
print(f"Size: {table.num_bytes:,} bytes ({table.num_bytes / 1024**3:.2f} GB)")
print(f"Rows: {table.num_rows:,}")
print(f"Created: {table.created}")
print(f"Modified: {table.modified}")

if table.time_partitioning:
    print(f"Partitioned by: {table.time_partitioning.field or 'ingestion time'}")
    print(f"Partition type: {table.time_partitioning.type_}")

if table.clustering_fields:
    print(f"Clustered by: {', '.join(table.clustering_fields)}")

# Copy table
source_table = f"{client.project}.transactions.daily_transactions"
dest_table = f"{client.project}.backups.daily_transactions_backup"

job_config = bigquery.CopyJobConfig(
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE
)

copy_job = client.copy_table(source_table, dest_table, job_config=job_config)
copy_job.result()  # Wait for completion

print(f"Copied {source_table} to {dest_table}")

Table Maintenance

# List tables in dataset with metadata
dataset_id = f"{client.project}.transactions"
tables = client.list_tables(dataset_id)

print("Tables in dataset:")
for table_item in tables:
    table = client.get_table(table_item.reference)
    size_gb = table.num_bytes / (1024**3) if table.num_bytes else 0
    
    print(f"  {table.table_id}")
    print(f"    Rows: {table.num_rows:,}")
    print(f"    Size: {size_gb:.2f} GB") 
    print(f"    Modified: {table.modified}")
    
    if table.expires:
        print(f"    Expires: {table.expires}")

# Clean up old tables
cutoff_date = datetime.datetime.now() - datetime.timedelta(days=30)

for table_item in tables:
    table = client.get_table(table_item.reference)
    if table.modified < cutoff_date and table.table_id.startswith("temp_"):
        print(f"Deleting old temp table: {table.table_id}")
        client.delete_table(table.reference, not_found_ok=True)

# Update table metadata
table = client.get_table(f"{client.project}.transactions.daily_transactions")
table.description = "Updated: Daily transaction records with enhanced metadata"
table.labels.update({"last_updated": "2023-12-01"})

table = client.update_table(table, ["description", "labels"])
print("Updated table metadata")

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-bigquery

docs

client-operations.md

data-loading.md

database-api.md

dataset-management.md

index.md

models-routines.md

query-operations.md

query-parameters.md

schema-definition.md

table-operations.md

tile.json