Google BigQuery API client library for Python providing comprehensive data warehouse and analytics capabilities
—
Type-safe parameter binding for SQL queries supporting scalar, array, struct, and range parameter types with proper type validation. Parameters enable secure and efficient query execution while preventing SQL injection attacks.
Single-value parameters for basic data types in SQL queries.
class ScalarQueryParameter:
def __init__(self, name: str, type_: str, value: Any):
"""
Scalar query parameter for single values.
Args:
name: Parameter name (without @ prefix).
type_: BigQuery data type (STRING, INT64, FLOAT64, etc.).
value: Parameter value.
"""
@property
def name(self) -> str:
"""Parameter name."""
@property
def type_(self) -> str:
"""Parameter data type."""
@property
def value(self) -> Any:
"""Parameter value."""
class ScalarQueryParameterType:
def __init__(self, type_: str):
"""
Type definition for scalar parameters.
Args:
type_: BigQuery data type.
"""
@property
def type_(self) -> str:
"""Parameter data type."""Array-type parameters for passing lists of values to SQL queries.
class ArrayQueryParameter:
def __init__(self, name: str, array_type: str, values: List[Any]):
"""
Array query parameter for multiple values.
Args:
name: Parameter name (without @ prefix).
array_type: BigQuery data type for array elements.
values: List of parameter values.
"""
@property
def name(self) -> str:
"""Parameter name."""
@property
def array_type(self) -> str:
"""Array element data type."""
@property
def values(self) -> List[Any]:
"""Array values."""
class ArrayQueryParameterType:
def __init__(self, array_type: Union[str, ScalarQueryParameterType, StructQueryParameterType]):
"""
Type definition for array parameters.
Args:
array_type: Type of array elements.
"""
@property
def array_type(self) -> Union[str, ScalarQueryParameterType, StructQueryParameterType]:
"""Array element type."""Structured parameters for passing complex nested data to SQL queries.
class StructQueryParameter:
def __init__(self, name: str, *sub_params: Union[ScalarQueryParameter, ArrayQueryParameter]):
"""
Struct query parameter for nested data.
Args:
name: Parameter name (without @ prefix).
*sub_params: Sub-parameters for struct fields.
"""
@property
def name(self) -> str:
"""Parameter name."""
@property
def struct_types(self) -> Dict[str, Any]:
"""Struct field types."""
@property
def struct_values(self) -> Dict[str, Any]:
"""Struct field values."""
class StructQueryParameterType:
def __init__(self, *sub_types: Union[ScalarQueryParameterType, ArrayQueryParameterType]):
"""
Type definition for struct parameters.
Args:
*sub_types: Types for struct fields.
"""
@property
def struct_types(self) -> List[Union[ScalarQueryParameterType, ArrayQueryParameterType]]:
"""Struct field types."""Range parameters for passing range values (intervals) to SQL queries.
class RangeQueryParameter:
def __init__(self, name: str, range_element_type: str, start: Any, end: Any):
"""
Range query parameter for interval values.
Args:
name: Parameter name (without @ prefix).
range_element_type: Type of range bounds (DATE, DATETIME, TIMESTAMP).
start: Range start value (inclusive).
end: Range end value (exclusive).
"""
@property
def name(self) -> str:
"""Parameter name."""
@property
def range_element_type(self) -> str:
"""Range element data type."""
@property
def start(self) -> Any:
"""Range start value (inclusive)."""
@property
def end(self) -> Any:
"""Range end value (exclusive)."""
class RangeQueryParameterType:
def __init__(self, element_type: str):
"""
Type definition for range parameters.
Args:
element_type: Type of range bounds.
"""
@property
def element_type(self) -> str:
"""Range element type."""Connection-level properties for query execution context.
class ConnectionProperty:
def __init__(self, key: str, value: str):
"""
Connection property for query context.
Args:
key: Property key.
value: Property value.
"""
@property
def key(self) -> str:
"""Property key."""
@property
def value(self) -> str:
"""Property value."""class SqlParameterScalarTypes:
"""Constants for SQL parameter scalar types."""
BOOL: str = "BOOL"
INT64: str = "INT64"
FLOAT64: str = "FLOAT64"
NUMERIC: str = "NUMERIC"
BIGNUMERIC: str = "BIGNUMERIC"
STRING: str = "STRING"
BYTES: str = "BYTES"
DATE: str = "DATE"
DATETIME: str = "DATETIME"
TIME: str = "TIME"
TIMESTAMP: str = "TIMESTAMP"
GEOGRAPHY: str = "GEOGRAPHY"
JSON: str = "JSON"from google.cloud import bigquery
from google.cloud.bigquery import ScalarQueryParameter
client = bigquery.Client()
# Query with scalar parameters
query = """
SELECT name, age, city
FROM `my_project.my_dataset.users`
WHERE age >= @min_age
AND city = @target_city
AND created_date >= @start_date
LIMIT @max_results
"""
job_config = bigquery.QueryJobConfig(
query_parameters=[
ScalarQueryParameter("min_age", "INT64", 25),
ScalarQueryParameter("target_city", "STRING", "San Francisco"),
ScalarQueryParameter("start_date", "DATE", "2023-01-01"),
ScalarQueryParameter("max_results", "INT64", 100),
]
)
query_job = client.query(query, job_config=job_config)
results = query_job.result()
for row in results:
print(f"{row.name}, {row.age}, {row.city}")# Query with array parameters
query = """
SELECT product_id, product_name, category, price
FROM `my_project.my_dataset.products`
WHERE category IN UNNEST(@categories)
AND product_id IN UNNEST(@product_ids)
ORDER BY price DESC
"""
job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.ArrayQueryParameter(
"categories",
"STRING",
["electronics", "computers", "smartphones"]
),
bigquery.ArrayQueryParameter(
"product_ids",
"INT64",
[1001, 1002, 1003, 1004, 1005]
),
]
)
query_job = client.query(query, job_config=job_config)
results = query_job.result()
for row in results:
print(f"{row.product_name}: ${row.price}")# Query with struct parameters
query = """
SELECT user_id, event_type, event_timestamp
FROM `my_project.my_dataset.events`
WHERE event_timestamp BETWEEN @date_range.start_date AND @date_range.end_date
AND user_id = @user_filter.user_id
AND event_type IN UNNEST(@user_filter.event_types)
"""
job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.StructQueryParameter(
"date_range",
ScalarQueryParameter("start_date", "TIMESTAMP", "2023-01-01 00:00:00"),
ScalarQueryParameter("end_date", "TIMESTAMP", "2023-01-31 23:59:59"),
),
bigquery.StructQueryParameter(
"user_filter",
ScalarQueryParameter("user_id", "INT64", 12345),
bigquery.ArrayQueryParameter("event_types", "STRING", ["login", "purchase", "logout"]),
),
]
)
query_job = client.query(query, job_config=job_config)
results = query_job.result()
for row in results:
print(f"User {row.user_id}: {row.event_type} at {row.event_timestamp}")# Query with range parameters (for date/time ranges)
query = """
SELECT order_id, customer_id, order_date, total_amount
FROM `my_project.my_dataset.orders`
WHERE order_date IN UNNEST(GENERATE_DATE_ARRAY(@date_range.start, @date_range.end))
ORDER BY order_date DESC
"""
from datetime import date
job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.RangeQueryParameter(
"date_range",
"DATE",
date(2023, 12, 1), # Start date (inclusive)
date(2023, 12, 31) # End date (exclusive)
),
]
)
query_job = client.query(query, job_config=job_config)
results = query_job.result()
for row in results:
print(f"Order {row.order_id}: ${row.total_amount} on {row.order_date}")# Advanced query with nested struct and array parameters
query = """
WITH filtered_events AS (
SELECT
user_id,
event_type,
event_timestamp,
properties
FROM `my_project.my_dataset.user_events`
WHERE user_id IN UNNEST(@filters.user_ids)
AND event_type IN UNNEST(@filters.event_types)
AND event_timestamp BETWEEN @filters.time_range.start_time AND @filters.time_range.end_time
)
SELECT
user_id,
COUNT(*) as event_count,
ARRAY_AGG(event_type ORDER BY event_timestamp) as event_sequence
FROM filtered_events
GROUP BY user_id
HAVING COUNT(*) >= @filters.min_events
ORDER BY event_count DESC
LIMIT @output.max_results
"""
job_config = bigquery.QueryJobConfig(
query_parameters=[
bigquery.StructQueryParameter(
"filters",
bigquery.ArrayQueryParameter("user_ids", "INT64", [100, 101, 102, 103]),
bigquery.ArrayQueryParameter("event_types", "STRING", ["page_view", "click", "purchase"]),
bigquery.StructQueryParameter(
"time_range",
ScalarQueryParameter("start_time", "TIMESTAMP", "2023-12-01 00:00:00"),
ScalarQueryParameter("end_time", "TIMESTAMP", "2023-12-31 23:59:59"),
),
ScalarQueryParameter("min_events", "INT64", 5),
),
bigquery.StructQueryParameter(
"output",
ScalarQueryParameter("max_results", "INT64", 50),
),
]
)
query_job = client.query(query, job_config=job_config)
results = query_job.result()
for row in results:
print(f"User {row.user_id}: {row.event_count} events")
print(f" Sequence: {', '.join(row.event_sequence)}")def build_filter_query(user_ids=None, event_types=None, date_range=None, min_count=1):
"""Build parameterized query dynamically based on provided filters."""
conditions = []
parameters = []
# Base query
query_parts = [
"SELECT user_id, event_type, event_timestamp, properties",
"FROM `my_project.my_dataset.user_events`",
"WHERE 1=1" # Always true condition to simplify adding AND clauses
]
# Add user ID filter
if user_ids:
conditions.append("AND user_id IN UNNEST(@user_ids)")
parameters.append(
bigquery.ArrayQueryParameter("user_ids", "INT64", user_ids)
)
# Add event type filter
if event_types:
conditions.append("AND event_type IN UNNEST(@event_types)")
parameters.append(
bigquery.ArrayQueryParameter("event_types", "STRING", event_types)
)
# Add date range filter
if date_range:
conditions.append("AND event_timestamp BETWEEN @start_date AND @end_date")
parameters.extend([
ScalarQueryParameter("start_date", "TIMESTAMP", date_range[0]),
ScalarQueryParameter("end_date", "TIMESTAMP", date_range[1]),
])
# Add aggregation and filtering
query_parts.extend(conditions)
query_parts.extend([
"GROUP BY user_id, event_type, event_timestamp, properties",
"HAVING COUNT(*) >= @min_count",
"ORDER BY event_timestamp DESC"
])
parameters.append(ScalarQueryParameter("min_count", "INT64", min_count))
return " ".join(query_parts), parameters
# Use the dynamic query builder
query, params = build_filter_query(
user_ids=[100, 101, 102],
event_types=["login", "purchase"],
date_range=("2023-12-01 00:00:00", "2023-12-31 23:59:59"),
min_count=2
)
job_config = bigquery.QueryJobConfig(query_parameters=params)
query_job = client.query(query, job_config=job_config)
results = query_job.result()
for row in results:
print(f"User {row.user_id}: {row.event_type} at {row.event_timestamp}")def validate_and_execute_query(client, query, parameters):
"""Execute query with parameter validation and error handling."""
try:
# Validate parameters
for param in parameters:
if isinstance(param, ScalarQueryParameter):
if param.value is None and param.type_ != "STRING":
raise ValueError(f"Parameter {param.name} cannot be None for type {param.type_}")
elif isinstance(param, bigquery.ArrayQueryParameter):
if not param.values:
raise ValueError(f"Array parameter {param.name} cannot be empty")
# Create job config
job_config = bigquery.QueryJobConfig(query_parameters=parameters)
# Execute query
query_job = client.query(query, job_config=job_config)
# Wait for completion with timeout
results = query_job.result(timeout=30) # 30 second timeout
print(f"Query executed successfully")
print(f"Processed {query_job.total_bytes_processed:,} bytes")
print(f"Returned {results.total_rows:,} rows")
return results
except Exception as e:
print(f"Query execution failed: {e}")
if hasattr(e, 'errors') and e.errors:
for error in e.errors:
print(f" Error: {error}")
raise
# Example usage with validation
parameters = [
ScalarQueryParameter("user_id", "INT64", 12345),
ScalarQueryParameter("start_date", "DATE", "2023-01-01"),
bigquery.ArrayQueryParameter("categories", "STRING", ["electronics", "books"]),
]
query = """
SELECT * FROM `my_project.my_dataset.purchases`
WHERE user_id = @user_id
AND purchase_date >= @start_date
AND category IN UNNEST(@categories)
"""
try:
results = validate_and_execute_query(client, query, parameters)
# Process results...
except Exception as e:
print(f"Failed to execute query: {e}")Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-bigquery