CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-google-cloud-bigquery

Google BigQuery API client library for Python providing comprehensive data warehouse and analytics capabilities

Pending
Overview
Eval results
Files

query-operations.mddocs/

Query Operations

SQL query execution with parameters, job configuration, and result processing. BigQuery supports both simple ad-hoc queries and complex analytical workloads with features like pagination, streaming, and integration with data science libraries.

Capabilities

Query Job Execution

Execute SQL queries asynchronously with comprehensive job monitoring and configuration options.

class QueryJob:
    def __init__(self, job_id: str, query: str, client: Client): ...

    @property
    def state(self) -> str:
        """Current state of the job ('PENDING', 'RUNNING', 'DONE')."""

    @property
    def query(self) -> str:
        """SQL query being executed."""

    @property
    def job_id(self) -> str:
        """Unique identifier for this job."""

    @property
    def location(self) -> str:
        """Location where the job is running."""

    @property
    def created(self) -> datetime.datetime:
        """Timestamp when the job was created."""

    @property
    def started(self) -> datetime.datetime:
        """Timestamp when the job started running."""

    @property
    def ended(self) -> datetime.datetime:
        """Timestamp when the job completed."""

    @property
    def total_bytes_processed(self) -> int:
        """Total bytes processed by the query."""

    @property
    def total_bytes_billed(self) -> int:
        """Total bytes billed for the query."""

    @property
    def slot_millis(self) -> int:
        """Slot milliseconds consumed by the query."""

    @property
    def num_dml_affected_rows(self) -> int:
        """Number of rows affected by DML statement."""

    def result(
        self,
        page_size: int = None,
        max_results: int = None,
        retry: google.api_core.retry.Retry = DEFAULT_RETRY,
        timeout: float = None,
        start_index: int = None,
    ) -> google.cloud.bigquery.table.RowIterator:
        """
        Wait for query completion and return results.

        Args:
            page_size: Number of rows per page.
            max_results: Maximum total rows to return.
            retry: Retry configuration for polling.
            timeout: Timeout in seconds for polling.
            start_index: Zero-based index of first row to return.

        Returns:
            RowIterator: Iterator over query results.
        """

    def to_dataframe(
        self,
        create_bqstorage_client: bool = True,
        dtypes: Dict[str, str] = None,
        progress_bar_type: str = None,
        **kwargs
    ) -> pandas.DataFrame:
        """
        Return query results as a pandas DataFrame.

        Args:
            create_bqstorage_client: Use BigQuery Storage API for faster downloads.
            dtypes: Pandas data types for specific columns.
            progress_bar_type: Type of progress bar ('tqdm', None).

        Returns:
            pandas.DataFrame: Query results as DataFrame.
        """

    def to_arrow(
        self,
        create_bqstorage_client: bool = True,
        progress_bar_type: str = None,
    ) -> pyarrow.Table:
        """
        Return query results as a PyArrow Table.

        Args:
            create_bqstorage_client: Use BigQuery Storage API for faster downloads.
            progress_bar_type: Type of progress bar ('tqdm', None).

        Returns:
            pyarrow.Table: Query results as PyArrow Table.
        """

    def cancel(
        self,
        retry: google.api_core.retry.Retry = DEFAULT_RETRY,
        timeout: float = None,
    ) -> bool:
        """
        Cancel the query job.

        Args:
            retry: Retry configuration.
            timeout: Timeout in seconds.

        Returns:
            bool: True if cancellation was successful.
        """

Query Job Configuration

Configure query behavior, performance, and output options.

class QueryJobConfig:
    def __init__(self, **kwargs): ...

    @property
    def allow_large_results(self) -> bool:
        """Allow large results that exceed response size limits."""

    @allow_large_results.setter
    def allow_large_results(self, value: bool): ...

    @property
    def create_disposition(self) -> str:
        """Action when destination table doesn't exist."""

    @create_disposition.setter
    def create_disposition(self, value: str): ...

    @property
    def default_dataset(self) -> DatasetReference:
        """Default dataset for unqualified table names."""

    @default_dataset.setter
    def default_dataset(self, value: DatasetReference): ...

    @property
    def destination(self) -> TableReference:
        """Table to store query results."""

    @destination.setter
    def destination(self, value: TableReference): ...

    @property
    def dry_run(self) -> bool:
        """Validate query without executing it."""

    @dry_run.setter
    def dry_run(self, value: bool): ...

    @property
    def maximum_bytes_billed(self) -> int:
        """Maximum bytes that can be billed."""

    @maximum_bytes_billed.setter
    def maximum_bytes_billed(self, value: int): ...

    @property
    def priority(self) -> str:
        """Query priority ('INTERACTIVE' or 'BATCH')."""

    @priority.setter
    def priority(self, value: str): ...

    @property
    def query_parameters(self) -> List[Union[ScalarQueryParameter, ArrayQueryParameter, StructQueryParameter]]:
        """Parameters for parameterized queries."""

    @query_parameters.setter
    def query_parameters(self, value: List): ...

    @property
    def use_legacy_sql(self) -> bool:
        """Use legacy SQL syntax instead of standard SQL."""

    @use_legacy_sql.setter
    def use_legacy_sql(self, value: bool): ...

    @property
    def use_query_cache(self) -> bool:
        """Enable query result caching."""

    @use_query_cache.setter
    def use_query_cache(self, value: bool): ...

    @property
    def write_disposition(self) -> str:
        """Action when destination table exists."""

    @write_disposition.setter
    def write_disposition(self, value: str): ...

    @property
    def labels(self) -> Dict[str, str]:
        """Labels for the query job."""

    @labels.setter
    def labels(self, value: Dict[str, str]): ...

    @property
    def job_timeout(self) -> datetime.timedelta:
        """Maximum time to wait for job completion."""

    @job_timeout.setter
    def job_timeout(self, value: datetime.timedelta): ...

Result Processing

Process query results with support for pagination, type conversion, and data science integrations.

class RowIterator:
    def __init__(self, client: Client, query_job: QueryJob): ...

    @property
    def total_rows(self) -> int:
        """Total number of rows in the result set."""

    @property
    def schema(self) -> List[SchemaField]:
        """Schema of the result set."""

    def to_dataframe(
        self,
        create_bqstorage_client: bool = True,
        dtypes: Dict[str, str] = None,
        progress_bar_type: str = None,
        **kwargs
    ) -> pandas.DataFrame:
        """
        Convert results to pandas DataFrame.

        Args:
            create_bqstorage_client: Use BigQuery Storage API.
            dtypes: Pandas data types for columns.
            progress_bar_type: Progress bar type.

        Returns:
            pandas.DataFrame: Results as DataFrame.
        """

    def to_arrow(
        self,
        create_bqstorage_client: bool = True,
        progress_bar_type: str = None,
    ) -> pyarrow.Table:
        """
        Convert results to PyArrow Table.

        Args:
            create_bqstorage_client: Use BigQuery Storage API.
            progress_bar_type: Progress bar type.

        Returns:
            pyarrow.Table: Results as PyArrow Table.
        """

class Row:
    def __init__(self, values: List[Any], field_to_index: Dict[str, int]): ...

    def values(self) -> List[Any]:
        """Return row values as a list."""

    def keys(self) -> List[str]:
        """Return column names."""

    def items(self) -> List[Tuple[str, Any]]:
        """Return (column_name, value) pairs."""

    def get(self, key: str, default: Any = None) -> Any:
        """Get value by column name with optional default."""

DML Statistics

Access detailed statistics for Data Manipulation Language (INSERT, UPDATE, DELETE) operations.

class DmlStats:
    def __init__(self, **kwargs): ...

    @property
    def inserted_row_count(self) -> int:
        """Number of rows inserted."""

    @property
    def deleted_row_count(self) -> int:
        """Number of rows deleted."""

    @property
    def updated_row_count(self) -> int:
        """Number of rows updated."""

Script Execution

Execute multi-statement scripts with detailed execution statistics and error handling.

class ScriptOptions:
    def __init__(self, **kwargs): ...

    @property
    def statement_timeout_ms(self) -> int:
        """Timeout for individual statements in milliseconds."""

    @statement_timeout_ms.setter
    def statement_timeout_ms(self, value: int): ...

    @property
    def statement_byte_budget(self) -> int:
        """Maximum bytes processed per statement."""

    @statement_byte_budget.setter
    def statement_byte_budget(self, value: int): ...

class ScriptStatistics:
    def __init__(self, **kwargs): ...

    @property
    def evaluation_kind(self) -> str:
        """Type of script evaluation."""

    @property
    def stack_frames(self) -> List[ScriptStackFrame]:
        """Execution stack frames."""

class ScriptStackFrame:
    def __init__(self, **kwargs): ...

    @property
    def start_line(self) -> int:
        """Starting line number."""

    @property
    def start_column(self) -> int:
        """Starting column number."""

    @property
    def end_line(self) -> int:
        """Ending line number."""

    @property
    def end_column(self) -> int:
        """Ending column number."""

    @property
    def procedure_id(self) -> str:
        """Procedure identifier."""

    @property
    def text(self) -> str:
        """Stack frame text."""

Usage Examples

Basic Query Execution

from google.cloud import bigquery

client = bigquery.Client()

# Simple query
query = """
    SELECT name, COUNT(*) as count
    FROM `bigquery-public-data.usa_names.usa_1910_2013`
    WHERE state = 'CA'
    GROUP BY name
    ORDER BY count DESC
    LIMIT 10
"""

query_job = client.query(query)
results = query_job.result()

# Process results
for row in results:
    print(f"{row.name}: {row.count}")

Parameterized Queries

from google.cloud.bigquery import ScalarQueryParameter

# Query with parameters
query = """
    SELECT name, COUNT(*) as count
    FROM `bigquery-public-data.usa_names.usa_1910_2013`
    WHERE state = @state AND year >= @min_year
    GROUP BY name
    ORDER BY count DESC
    LIMIT @limit
"""

job_config = bigquery.QueryJobConfig(
    query_parameters=[
        ScalarQueryParameter("state", "STRING", "TX"),
        ScalarQueryParameter("min_year", "INT64", 2000),
        ScalarQueryParameter("limit", "INT64", 5),
    ]
)

query_job = client.query(query, job_config=job_config)
results = query_job.result()

for row in results:
    print(f"{row.name}: {row.count}")

Query with Configuration

# Advanced query configuration
job_config = bigquery.QueryJobConfig(
    destination=f"{client.project}.my_dataset.my_table",
    write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
    priority=bigquery.QueryPriority.BATCH,
    maximum_bytes_billed=1000000,  # 1MB limit
    use_query_cache=True,
    labels={"team": "data-science", "env": "prod"}
)

query_job = client.query(query, job_config=job_config)
query_job.result()  # Wait for completion

print(f"Query processed {query_job.total_bytes_processed} bytes")
print(f"Query billed {query_job.total_bytes_billed} bytes")

Working with Large Results

# Query with pagination
query = "SELECT * FROM `bigquery-public-data.samples.wikipedia`"

query_job = client.query(query)

# Process in batches
for page in query_job.result().pages:
    for row in page:
        # Process each row
        print(row.title)

# Convert to pandas DataFrame
df = query_job.to_dataframe()
print(df.head())

# Convert to PyArrow for efficient processing
arrow_table = query_job.to_arrow()
print(arrow_table.schema)

DML Operations

# INSERT query
insert_query = """
    INSERT INTO `my_project.my_dataset.my_table` (name, age, city)
    VALUES 
        ('Alice', 30, 'New York'),
        ('Bob', 25, 'San Francisco')
"""

query_job = client.query(insert_query)
query_job.result()

# Check DML statistics
if query_job.dml_stats:
    print(f"Inserted {query_job.dml_stats.inserted_row_count} rows")

# UPDATE query
update_query = """
    UPDATE `my_project.my_dataset.my_table`
    SET age = age + 1
    WHERE city = 'New York'
"""

query_job = client.query(update_query)
query_job.result()

if query_job.dml_stats:
    print(f"Updated {query_job.dml_stats.updated_row_count} rows")

Dry Run and Cost Estimation

# Dry run to estimate cost
job_config = bigquery.QueryJobConfig(dry_run=True)

query_job = client.query(query, job_config=job_config)

print(f"This query will process {query_job.total_bytes_processed} bytes")
print(f"Estimated cost: ${query_job.total_bytes_processed / (1024**4) * 5:.2f}")

Install with Tessl CLI

npx tessl i tessl/pypi-google-cloud-bigquery

docs

client-operations.md

data-loading.md

database-api.md

dataset-management.md

index.md

models-routines.md

query-operations.md

query-parameters.md

schema-definition.md

table-operations.md

tile.json