Google BigQuery API client library for Python providing comprehensive data warehouse and analytics capabilities
—
Table creation, schema management, data operations, and metadata management. BigQuery tables store structured data with enforced schemas and support various configurations including partitioning, clustering, and expiration policies.
Create and configure BigQuery tables with schemas, partitioning, and metadata.
class Table:
def __init__(
self,
table_ref: Union[TableReference, str],
schema: List[SchemaField] = None
):
"""
Initialize a Table.
Args:
table_ref: Table reference string or TableReference object.
schema: Table schema as list of SchemaField objects.
"""
@property
def reference(self) -> TableReference:
"""Table reference object."""
@property
def table_id(self) -> str:
"""Table ID."""
@property
def dataset_id(self) -> str:
"""Dataset ID containing the table."""
@property
def project(self) -> str:
"""Project ID containing the table."""
@property
def schema(self) -> List[SchemaField]:
"""Table schema."""
@schema.setter
def schema(self, value: List[SchemaField]): ...
@property
def friendly_name(self) -> str:
"""Human-readable table name."""
@friendly_name.setter
def friendly_name(self, value: str): ...
@property
def description(self) -> str:
"""Table description."""
@description.setter
def description(self, value: str): ...
@property
def num_bytes(self) -> int:
"""Size of table in bytes."""
@property
def num_rows(self) -> int:
"""Number of rows in table."""
@property
def created(self) -> datetime.datetime:
"""Table creation timestamp."""
@property
def modified(self) -> datetime.datetime:
"""Table last modification timestamp."""
@property
def expires(self) -> datetime.datetime:
"""Table expiration timestamp."""
@expires.setter
def expires(self, value: datetime.datetime): ...
@property
def labels(self) -> Dict[str, str]:
"""Labels for organizing tables."""
@labels.setter
def labels(self, value: Dict[str, str]): ...
@property
def time_partitioning(self) -> TimePartitioning:
"""Time partitioning configuration."""
@time_partitioning.setter
def time_partitioning(self, value: TimePartitioning): ...
@property
def range_partitioning(self) -> RangePartitioning:
"""Range partitioning configuration."""
@range_partitioning.setter
def range_partitioning(self, value: RangePartitioning): ...
@property
def clustering_fields(self) -> List[str]:
"""Fields used for clustering."""
@clustering_fields.setter
def clustering_fields(self, value: List[str]): ...
@property
def require_partition_filter(self) -> bool:
"""Require partition filter in queries."""
@require_partition_filter.setter
def require_partition_filter(self, value: bool): ...Reference tables by project, dataset, and table ID for API operations.
class TableReference:
def __init__(self, dataset_ref: DatasetReference, table_id: str):
"""
Reference to a BigQuery table.
Args:
dataset_ref: Dataset reference containing the table.
table_id: Table ID.
"""
@property
def dataset_id(self) -> str:
"""Dataset ID."""
@property
def project(self) -> str:
"""Project ID."""
@property
def table_id(self) -> str:
"""Table ID."""
@property
def path(self) -> str:
"""Full table path (project:dataset.table)."""
@classmethod
def from_string(
cls,
table_id: str,
default_project: str = None
) -> TableReference:
"""
Create TableReference from string.
Args:
table_id: Table ID with optional project and dataset.
default_project: Default project if not specified.
Returns:
TableReference: Table reference object.
"""Configure time-based and range-based table partitioning for query performance and cost optimization.
class TimePartitioning:
def __init__(
self,
type_: str = None,
field: str = None,
expiration_ms: int = None,
require_partition_filter: bool = None,
):
"""
Time-based partitioning configuration.
Args:
type_: Partitioning type (DAY, HOUR, MONTH, YEAR).
field: Field to partition by (None for ingestion time).
expiration_ms: Partition expiration in milliseconds.
require_partition_filter: Require partition filter in queries.
"""
@property
def type_(self) -> str:
"""Partitioning type."""
@property
def field(self) -> str:
"""Partitioning field."""
@property
def expiration_ms(self) -> int:
"""Partition expiration in milliseconds."""
@property
def require_partition_filter(self) -> bool:
"""Require partition filter in queries."""
class RangePartitioning:
def __init__(self, field: str = None, range_: PartitionRange = None):
"""
Range-based partitioning configuration.
Args:
field: Field to partition by.
range_: Range configuration for partitioning.
"""
@property
def field(self) -> str:
"""Partitioning field."""
@property
def range_(self) -> PartitionRange:
"""Range configuration."""
class PartitionRange:
def __init__(self, start: int = None, end: int = None, interval: int = None):
"""
Range definition for range partitioning.
Args:
start: Start of range (inclusive).
end: End of range (exclusive).
interval: Interval between partitions.
"""
@property
def start(self) -> int:
"""Range start (inclusive)."""
@property
def end(self) -> int:
"""Range end (exclusive)."""
@property
def interval(self) -> int:
"""Partition interval."""Support for different table types including views, materialized views, and external tables.
# View configuration
@property
def view_query(self) -> str:
"""SQL query defining the view."""
@view_query.setter
def view_query(self, value: str): ...
@property
def view_use_legacy_sql(self) -> bool:
"""Use legacy SQL for view query."""
@view_use_legacy_sql.setter
def view_use_legacy_sql(self, value: bool): ...
# Materialized view configuration
@property
def mview_query(self) -> str:
"""SQL query defining the materialized view."""
@mview_query.setter
def mview_query(self, value: str): ...
@property
def mview_enable_refresh(self) -> bool:
"""Enable automatic refresh of materialized view."""
@mview_enable_refresh.setter
def mview_enable_refresh(self, value: bool): ...
@property
def mview_refresh_interval_ms(self) -> int:
"""Refresh interval in milliseconds."""
@mview_refresh_interval_ms.setter
def mview_refresh_interval_ms(self, value: int): ...
# External table configuration
@property
def external_data_configuration(self) -> ExternalConfig:
"""External data source configuration."""
@external_data_configuration.setter
def external_data_configuration(self, value: ExternalConfig): ...from google.cloud import bigquery
client = bigquery.Client()
# Define table schema
schema = [
bigquery.SchemaField("transaction_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("user_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("amount", "NUMERIC", mode="REQUIRED", precision=10, scale=2),
bigquery.SchemaField("currency", "STRING", mode="REQUIRED"),
bigquery.SchemaField("transaction_date", "DATE", mode="REQUIRED"),
bigquery.SchemaField("created_at", "TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField("metadata", "JSON", mode="NULLABLE"),
]
# Create table
table_id = f"{client.project}.transactions.daily_transactions"
table = bigquery.Table(table_id, schema=schema)
# Configure table properties
table.friendly_name = "Daily Transactions"
table.description = "Daily transaction records with user and payment information"
table.labels = {
"environment": "production",
"data_type": "financial"
}
# Set expiration (90 days)
table.expires = datetime.datetime.now() + datetime.timedelta(days=90)
# Create the table
table = client.create_table(table, exists_ok=True)
print(f"Created table {table.table_id}")# Create time-partitioned table
schema = [
bigquery.SchemaField("event_id", "STRING", mode="REQUIRED"),
bigquery.SchemaField("user_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("event_type", "STRING", mode="REQUIRED"),
bigquery.SchemaField("event_timestamp", "TIMESTAMP", mode="REQUIRED"),
bigquery.SchemaField("properties", "JSON", mode="NULLABLE"),
]
table = bigquery.Table(f"{client.project}.analytics.events", schema=schema)
# Configure time partitioning by event_timestamp field
table.time_partitioning = bigquery.TimePartitioning(
type_=bigquery.TimePartitioningType.DAY,
field="event_timestamp",
expiration_ms=30 * 24 * 60 * 60 * 1000, # 30 days
require_partition_filter=True
)
# Add clustering for better query performance
table.clustering_fields = ["event_type", "user_id"]
table = client.create_table(table, exists_ok=True)
print(f"Created partitioned table {table.table_id}")# Create range-partitioned table
schema = [
bigquery.SchemaField("customer_id", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("region_code", "INTEGER", mode="REQUIRED"),
bigquery.SchemaField("customer_name", "STRING", mode="REQUIRED"),
bigquery.SchemaField("signup_date", "DATE", mode="REQUIRED"),
]
table = bigquery.Table(f"{client.project}.customers.customers_by_region", schema=schema)
# Configure range partitioning by region_code
table.range_partitioning = bigquery.RangePartitioning(
field="region_code",
range_=bigquery.PartitionRange(start=0, end=1000, interval=100)
)
table = client.create_table(table, exists_ok=True)
print(f"Created range-partitioned table {table.table_id}")# Create a view
view_query = """
SELECT
t.transaction_id,
t.user_id,
t.amount,
t.currency,
t.transaction_date,
u.user_name,
u.email
FROM `{}.transactions.daily_transactions` t
JOIN `{}.users.user_profiles` u ON t.user_id = u.user_id
WHERE t.transaction_date >= DATE_SUB(CURRENT_DATE(), INTERVAL 30 DAY)
""".format(client.project, client.project)
view = bigquery.Table(f"{client.project}.reports.recent_transactions_with_users")
view.view_query = view_query
view.description = "Recent transactions joined with user information"
view = client.create_table(view, exists_ok=True)
print(f"Created view {view.table_id}")# Create materialized view for aggregated data
mview_query = """
SELECT
DATE(created_at) as transaction_date,
currency,
COUNT(*) as transaction_count,
SUM(amount) as total_amount,
AVG(amount) as avg_amount
FROM `{}.transactions.daily_transactions`
GROUP BY DATE(created_at), currency
""".format(client.project)
mview = bigquery.Table(f"{client.project}.reports.daily_transaction_summary")
mview.mview_query = mview_query
mview.mview_enable_refresh = True
mview.mview_refresh_interval_ms = 60 * 60 * 1000 # 1 hour
mview = client.create_table(mview, exists_ok=True)
print(f"Created materialized view {mview.table_id}")# Get existing table
table = client.get_table(f"{client.project}.transactions.daily_transactions")
current_schema = table.schema
# Add new fields to schema
new_schema = list(current_schema)
new_schema.extend([
bigquery.SchemaField("payment_method", "STRING", mode="NULLABLE"),
bigquery.SchemaField("merchant_id", "INTEGER", mode="NULLABLE"),
bigquery.SchemaField("risk_score", "FLOAT", mode="NULLABLE"),
])
# Update table schema
table.schema = new_schema
table = client.update_table(table, ["schema"])
print(f"Updated schema for {table.table_id}")
# Verify schema changes
updated_table = client.get_table(table.reference)
print("New schema fields:")
for field in updated_table.schema:
if field.name not in [f.name for f in current_schema]:
print(f" {field.name}: {field.field_type} ({field.mode})")# Get table metadata and statistics
table = client.get_table(f"{client.project}.transactions.daily_transactions")
print(f"Table: {table.table_id}")
print(f"Size: {table.num_bytes:,} bytes ({table.num_bytes / 1024**3:.2f} GB)")
print(f"Rows: {table.num_rows:,}")
print(f"Created: {table.created}")
print(f"Modified: {table.modified}")
if table.time_partitioning:
print(f"Partitioned by: {table.time_partitioning.field or 'ingestion time'}")
print(f"Partition type: {table.time_partitioning.type_}")
if table.clustering_fields:
print(f"Clustered by: {', '.join(table.clustering_fields)}")
# Copy table
source_table = f"{client.project}.transactions.daily_transactions"
dest_table = f"{client.project}.backups.daily_transactions_backup"
job_config = bigquery.CopyJobConfig(
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE
)
copy_job = client.copy_table(source_table, dest_table, job_config=job_config)
copy_job.result() # Wait for completion
print(f"Copied {source_table} to {dest_table}")# List tables in dataset with metadata
dataset_id = f"{client.project}.transactions"
tables = client.list_tables(dataset_id)
print("Tables in dataset:")
for table_item in tables:
table = client.get_table(table_item.reference)
size_gb = table.num_bytes / (1024**3) if table.num_bytes else 0
print(f" {table.table_id}")
print(f" Rows: {table.num_rows:,}")
print(f" Size: {size_gb:.2f} GB")
print(f" Modified: {table.modified}")
if table.expires:
print(f" Expires: {table.expires}")
# Clean up old tables
cutoff_date = datetime.datetime.now() - datetime.timedelta(days=30)
for table_item in tables:
table = client.get_table(table_item.reference)
if table.modified < cutoff_date and table.table_id.startswith("temp_"):
print(f"Deleting old temp table: {table.table_id}")
client.delete_table(table.reference, not_found_ok=True)
# Update table metadata
table = client.get_table(f"{client.project}.transactions.daily_transactions")
table.description = "Updated: Daily transaction records with enhanced metadata"
table.labels.update({"last_updated": "2023-12-01"})
table = client.update_table(table, ["description", "labels"])
print("Updated table metadata")Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-bigquery