CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-bigquery

Google BigQuery API client library for Python providing comprehensive data warehouse and analytics capabilities

Pending
Overview
Eval results
Files

query-parameters.mddocs/

Query Parameters

Type-safe parameter binding for SQL queries supporting scalar, array, struct, and range parameter types with proper type validation. Parameters enable secure and efficient query execution while preventing SQL injection attacks.

Capabilities

Scalar Parameters

Single-value parameters for basic data types in SQL queries.

class ScalarQueryParameter:
    def __init__(self, name: str, type_: str, value: Any):
        """
        Scalar query parameter for single values.

        Args:
            name: Parameter name (without @ prefix).
            type_: BigQuery data type (STRING, INT64, FLOAT64, etc.).
            value: Parameter value.
        """

    @property
    def name(self) -> str:
        """Parameter name."""

    @property
    def type_(self) -> str:
        """Parameter data type."""

    @property
    def value(self) -> Any:
        """Parameter value."""

class ScalarQueryParameterType:
    def __init__(self, type_: str):
        """
        Type definition for scalar parameters.

        Args:
            type_: BigQuery data type.
        """

    @property
    def type_(self) -> str:
        """Parameter data type."""

Array Parameters

Array-type parameters for passing lists of values to SQL queries.

class ArrayQueryParameter:
    def __init__(self, name: str, array_type: str, values: List[Any]):
        """
        Array query parameter for multiple values.

        Args:
            name: Parameter name (without @ prefix).
            array_type: BigQuery data type for array elements.
            values: List of parameter values.
        """

    @property
    def name(self) -> str:
        """Parameter name."""

    @property
    def array_type(self) -> str:
        """Array element data type."""

    @property
    def values(self) -> List[Any]:
        """Array values."""

class ArrayQueryParameterType:
    def __init__(self, array_type: Union[str, ScalarQueryParameterType, StructQueryParameterType]):
        """
        Type definition for array parameters.

        Args:
            array_type: Type of array elements.
        """

    @property
    def array_type(self) -> Union[str, ScalarQueryParameterType, StructQueryParameterType]:
        """Array element type."""

Struct Parameters

Structured parameters for passing complex nested data to SQL queries.

class StructQueryParameter:
    def __init__(self, name: str, *sub_params: Union[ScalarQueryParameter, ArrayQueryParameter]):
        """
        Struct query parameter for nested data.

        Args:
            name: Parameter name (without @ prefix).
            *sub_params: Sub-parameters for struct fields.
        """

    @property
    def name(self) -> str:
        """Parameter name."""

    @property
    def struct_types(self) -> Dict[str, Any]:
        """Struct field types."""

    @property
    def struct_values(self) -> Dict[str, Any]:
        """Struct field values."""

class StructQueryParameterType:
    def __init__(self, *sub_types: Union[ScalarQueryParameterType, ArrayQueryParameterType]):
        """
        Type definition for struct parameters.

        Args:
            *sub_types: Types for struct fields.
        """

    @property
    def struct_types(self) -> List[Union[ScalarQueryParameterType, ArrayQueryParameterType]]:
        """Struct field types."""

Range Parameters

Range parameters for passing range values (intervals) to SQL queries.

class RangeQueryParameter:
    def __init__(self, name: str, range_element_type: str, start: Any, end: Any):
        """
        Range query parameter for interval values.

        Args:
            name: Parameter name (without @ prefix).
            range_element_type: Type of range bounds (DATE, DATETIME, TIMESTAMP).
            start: Range start value (inclusive).
            end: Range end value (exclusive).
        """

    @property
    def name(self) -> str:
        """Parameter name."""

    @property
    def range_element_type(self) -> str:
        """Range element data type."""

    @property
    def start(self) -> Any:
        """Range start value (inclusive)."""

    @property
    def end(self) -> Any:
        """Range end value (exclusive)."""

class RangeQueryParameterType:
    def __init__(self, element_type: str):
        """
        Type definition for range parameters.

        Args:
            element_type: Type of range bounds.
        """

    @property
    def element_type(self) -> str:
        """Range element type."""

Connection Properties

Connection-level properties for query execution context.

class ConnectionProperty:
    def __init__(self, key: str, value: str):
        """
        Connection property for query context.

        Args:
            key: Property key.
            value: Property value.
        """

    @property
    def key(self) -> str:
        """Property key."""

    @property
    def value(self) -> str:
        """Property value."""

Parameter Type Constants

class SqlParameterScalarTypes:
    """Constants for SQL parameter scalar types."""
    
    BOOL: str = "BOOL"
    INT64: str = "INT64" 
    FLOAT64: str = "FLOAT64"
    NUMERIC: str = "NUMERIC"
    BIGNUMERIC: str = "BIGNUMERIC"
    STRING: str = "STRING"
    BYTES: str = "BYTES"
    DATE: str = "DATE"
    DATETIME: str = "DATETIME"
    TIME: str = "TIME"
    TIMESTAMP: str = "TIMESTAMP"
    GEOGRAPHY: str = "GEOGRAPHY"
    JSON: str = "JSON"

Usage Examples

Basic Scalar Parameters

from google.cloud import bigquery
from google.cloud.bigquery import ScalarQueryParameter

client = bigquery.Client()

# Query with scalar parameters
query = """
    SELECT name, age, city
    FROM `my_project.my_dataset.users`
    WHERE age >= @min_age 
    AND city = @target_city
    AND created_date >= @start_date
    LIMIT @max_results
"""

job_config = bigquery.QueryJobConfig(
    query_parameters=[
        ScalarQueryParameter("min_age", "INT64", 25),
        ScalarQueryParameter("target_city", "STRING", "San Francisco"),
        ScalarQueryParameter("start_date", "DATE", "2023-01-01"),
        ScalarQueryParameter("max_results", "INT64", 100),
    ]
)

query_job = client.query(query, job_config=job_config)
results = query_job.result()

for row in results:
    print(f"{row.name}, {row.age}, {row.city}")

Array Parameters

# Query with array parameters
query = """
    SELECT product_id, product_name, category, price
    FROM `my_project.my_dataset.products`
    WHERE category IN UNNEST(@categories)
    AND product_id IN UNNEST(@product_ids)
    ORDER BY price DESC
"""

job_config = bigquery.QueryJobConfig(
    query_parameters=[
        bigquery.ArrayQueryParameter(
            "categories", 
            "STRING", 
            ["electronics", "computers", "smartphones"]
        ),
        bigquery.ArrayQueryParameter(
            "product_ids",
            "INT64",
            [1001, 1002, 1003, 1004, 1005]
        ),
    ]
)

query_job = client.query(query, job_config=job_config)
results = query_job.result()

for row in results:
    print(f"{row.product_name}: ${row.price}")

Struct Parameters

# Query with struct parameters
query = """
    SELECT user_id, event_type, event_timestamp
    FROM `my_project.my_dataset.events`
    WHERE event_timestamp BETWEEN @date_range.start_date AND @date_range.end_date
    AND user_id = @user_filter.user_id
    AND event_type IN UNNEST(@user_filter.event_types)
"""

job_config = bigquery.QueryJobConfig(
    query_parameters=[
        bigquery.StructQueryParameter(
            "date_range",
            ScalarQueryParameter("start_date", "TIMESTAMP", "2023-01-01 00:00:00"),
            ScalarQueryParameter("end_date", "TIMESTAMP", "2023-01-31 23:59:59"),
        ),
        bigquery.StructQueryParameter(
            "user_filter",
            ScalarQueryParameter("user_id", "INT64", 12345),
            bigquery.ArrayQueryParameter("event_types", "STRING", ["login", "purchase", "logout"]),
        ),
    ]
)

query_job = client.query(query, job_config=job_config)
results = query_job.result()

for row in results:
    print(f"User {row.user_id}: {row.event_type} at {row.event_timestamp}")

Range Parameters

# Query with range parameters (for date/time ranges)
query = """
    SELECT order_id, customer_id, order_date, total_amount
    FROM `my_project.my_dataset.orders`
    WHERE order_date IN UNNEST(GENERATE_DATE_ARRAY(@date_range.start, @date_range.end))
    ORDER BY order_date DESC
"""

from datetime import date

job_config = bigquery.QueryJobConfig(
    query_parameters=[
        bigquery.RangeQueryParameter(
            "date_range",
            "DATE",
            date(2023, 12, 1),   # Start date (inclusive)
            date(2023, 12, 31)   # End date (exclusive)
        ),
    ]
)

query_job = client.query(query, job_config=job_config)
results = query_job.result()

for row in results:
    print(f"Order {row.order_id}: ${row.total_amount} on {row.order_date}")

Complex Nested Parameters

# Advanced query with nested struct and array parameters
query = """
    WITH filtered_events AS (
        SELECT 
            user_id,
            event_type,
            event_timestamp,
            properties
        FROM `my_project.my_dataset.user_events`
        WHERE user_id IN UNNEST(@filters.user_ids)
        AND event_type IN UNNEST(@filters.event_types)
        AND event_timestamp BETWEEN @filters.time_range.start_time AND @filters.time_range.end_time
    )
    SELECT 
        user_id,
        COUNT(*) as event_count,
        ARRAY_AGG(event_type ORDER BY event_timestamp) as event_sequence
    FROM filtered_events
    GROUP BY user_id
    HAVING COUNT(*) >= @filters.min_events
    ORDER BY event_count DESC
    LIMIT @output.max_results
"""

job_config = bigquery.QueryJobConfig(
    query_parameters=[
        bigquery.StructQueryParameter(
            "filters",
            bigquery.ArrayQueryParameter("user_ids", "INT64", [100, 101, 102, 103]),
            bigquery.ArrayQueryParameter("event_types", "STRING", ["page_view", "click", "purchase"]),
            bigquery.StructQueryParameter(
                "time_range",
                ScalarQueryParameter("start_time", "TIMESTAMP", "2023-12-01 00:00:00"),
                ScalarQueryParameter("end_time", "TIMESTAMP", "2023-12-31 23:59:59"),
            ),
            ScalarQueryParameter("min_events", "INT64", 5),
        ),
        bigquery.StructQueryParameter(
            "output",
            ScalarQueryParameter("max_results", "INT64", 50),
        ),
    ]
)

query_job = client.query(query, job_config=job_config)
results = query_job.result()

for row in results:
    print(f"User {row.user_id}: {row.event_count} events")
    print(f"  Sequence: {', '.join(row.event_sequence)}")

Dynamic Parameter Building

def build_filter_query(user_ids=None, event_types=None, date_range=None, min_count=1):
    """Build parameterized query dynamically based on provided filters."""
    
    conditions = []
    parameters = []
    
    # Base query
    query_parts = [
        "SELECT user_id, event_type, event_timestamp, properties",
        "FROM `my_project.my_dataset.user_events`",
        "WHERE 1=1"  # Always true condition to simplify adding AND clauses
    ]
    
    # Add user ID filter
    if user_ids:
        conditions.append("AND user_id IN UNNEST(@user_ids)")
        parameters.append(
            bigquery.ArrayQueryParameter("user_ids", "INT64", user_ids)
        )
    
    # Add event type filter
    if event_types:
        conditions.append("AND event_type IN UNNEST(@event_types)")
        parameters.append(
            bigquery.ArrayQueryParameter("event_types", "STRING", event_types)
        )
    
    # Add date range filter
    if date_range:
        conditions.append("AND event_timestamp BETWEEN @start_date AND @end_date")
        parameters.extend([
            ScalarQueryParameter("start_date", "TIMESTAMP", date_range[0]),
            ScalarQueryParameter("end_date", "TIMESTAMP", date_range[1]),
        ])
    
    # Add aggregation and filtering
    query_parts.extend(conditions)
    query_parts.extend([
        "GROUP BY user_id, event_type, event_timestamp, properties",
        "HAVING COUNT(*) >= @min_count",
        "ORDER BY event_timestamp DESC"
    ])
    
    parameters.append(ScalarQueryParameter("min_count", "INT64", min_count))
    
    return " ".join(query_parts), parameters

# Use the dynamic query builder
query, params = build_filter_query(
    user_ids=[100, 101, 102],
    event_types=["login", "purchase"],
    date_range=("2023-12-01 00:00:00", "2023-12-31 23:59:59"),
    min_count=2
)

job_config = bigquery.QueryJobConfig(query_parameters=params)
query_job = client.query(query, job_config=job_config)
results = query_job.result()

for row in results:
    print(f"User {row.user_id}: {row.event_type} at {row.event_timestamp}")

Parameter Validation and Error Handling

def validate_and_execute_query(client, query, parameters):
    """Execute query with parameter validation and error handling."""
    
    try:
        # Validate parameters
        for param in parameters:
            if isinstance(param, ScalarQueryParameter):
                if param.value is None and param.type_ != "STRING":
                    raise ValueError(f"Parameter {param.name} cannot be None for type {param.type_}")
            elif isinstance(param, bigquery.ArrayQueryParameter):
                if not param.values:
                    raise ValueError(f"Array parameter {param.name} cannot be empty")
        
        # Create job config
        job_config = bigquery.QueryJobConfig(query_parameters=parameters)
        
        # Execute query
        query_job = client.query(query, job_config=job_config)
        
        # Wait for completion with timeout
        results = query_job.result(timeout=30)  # 30 second timeout
        
        print(f"Query executed successfully")
        print(f"Processed {query_job.total_bytes_processed:,} bytes")
        print(f"Returned {results.total_rows:,} rows")
        
        return results
        
    except Exception as e:
        print(f"Query execution failed: {e}")
        if hasattr(e, 'errors') and e.errors:
            for error in e.errors:
                print(f"  Error: {error}")
        raise

# Example usage with validation
parameters = [
    ScalarQueryParameter("user_id", "INT64", 12345),
    ScalarQueryParameter("start_date", "DATE", "2023-01-01"),
    bigquery.ArrayQueryParameter("categories", "STRING", ["electronics", "books"]),
]

query = """
    SELECT * FROM `my_project.my_dataset.purchases`
    WHERE user_id = @user_id
    AND purchase_date >= @start_date
    AND category IN UNNEST(@categories)
"""

try:
    results = validate_and_execute_query(client, query, parameters)
    # Process results...
except Exception as e:
    print(f"Failed to execute query: {e}")

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-bigquery

docs

client-operations.md

data-loading.md

database-api.md

dataset-management.md

index.md

models-routines.md

query-operations.md

query-parameters.md

schema-definition.md

table-operations.md

tile.json