Google BigQuery API client library for Python providing comprehensive data warehouse and analytics capabilities
—
SQL query execution with parameters, job configuration, and result processing. BigQuery supports both simple ad-hoc queries and complex analytical workloads with features like pagination, streaming, and integration with data science libraries.
Execute SQL queries asynchronously with comprehensive job monitoring and configuration options.
class QueryJob:
def __init__(self, job_id: str, query: str, client: Client): ...
@property
def state(self) -> str:
"""Current state of the job ('PENDING', 'RUNNING', 'DONE')."""
@property
def query(self) -> str:
"""SQL query being executed."""
@property
def job_id(self) -> str:
"""Unique identifier for this job."""
@property
def location(self) -> str:
"""Location where the job is running."""
@property
def created(self) -> datetime.datetime:
"""Timestamp when the job was created."""
@property
def started(self) -> datetime.datetime:
"""Timestamp when the job started running."""
@property
def ended(self) -> datetime.datetime:
"""Timestamp when the job completed."""
@property
def total_bytes_processed(self) -> int:
"""Total bytes processed by the query."""
@property
def total_bytes_billed(self) -> int:
"""Total bytes billed for the query."""
@property
def slot_millis(self) -> int:
"""Slot milliseconds consumed by the query."""
@property
def num_dml_affected_rows(self) -> int:
"""Number of rows affected by DML statement."""
def result(
self,
page_size: int = None,
max_results: int = None,
retry: google.api_core.retry.Retry = DEFAULT_RETRY,
timeout: float = None,
start_index: int = None,
) -> google.cloud.bigquery.table.RowIterator:
"""
Wait for query completion and return results.
Args:
page_size: Number of rows per page.
max_results: Maximum total rows to return.
retry: Retry configuration for polling.
timeout: Timeout in seconds for polling.
start_index: Zero-based index of first row to return.
Returns:
RowIterator: Iterator over query results.
"""
def to_dataframe(
self,
create_bqstorage_client: bool = True,
dtypes: Dict[str, str] = None,
progress_bar_type: str = None,
**kwargs
) -> pandas.DataFrame:
"""
Return query results as a pandas DataFrame.
Args:
create_bqstorage_client: Use BigQuery Storage API for faster downloads.
dtypes: Pandas data types for specific columns.
progress_bar_type: Type of progress bar ('tqdm', None).
Returns:
pandas.DataFrame: Query results as DataFrame.
"""
def to_arrow(
self,
create_bqstorage_client: bool = True,
progress_bar_type: str = None,
) -> pyarrow.Table:
"""
Return query results as a PyArrow Table.
Args:
create_bqstorage_client: Use BigQuery Storage API for faster downloads.
progress_bar_type: Type of progress bar ('tqdm', None).
Returns:
pyarrow.Table: Query results as PyArrow Table.
"""
def cancel(
self,
retry: google.api_core.retry.Retry = DEFAULT_RETRY,
timeout: float = None,
) -> bool:
"""
Cancel the query job.
Args:
retry: Retry configuration.
timeout: Timeout in seconds.
Returns:
bool: True if cancellation was successful.
"""Configure query behavior, performance, and output options.
class QueryJobConfig:
def __init__(self, **kwargs): ...
@property
def allow_large_results(self) -> bool:
"""Allow large results that exceed response size limits."""
@allow_large_results.setter
def allow_large_results(self, value: bool): ...
@property
def create_disposition(self) -> str:
"""Action when destination table doesn't exist."""
@create_disposition.setter
def create_disposition(self, value: str): ...
@property
def default_dataset(self) -> DatasetReference:
"""Default dataset for unqualified table names."""
@default_dataset.setter
def default_dataset(self, value: DatasetReference): ...
@property
def destination(self) -> TableReference:
"""Table to store query results."""
@destination.setter
def destination(self, value: TableReference): ...
@property
def dry_run(self) -> bool:
"""Validate query without executing it."""
@dry_run.setter
def dry_run(self, value: bool): ...
@property
def maximum_bytes_billed(self) -> int:
"""Maximum bytes that can be billed."""
@maximum_bytes_billed.setter
def maximum_bytes_billed(self, value: int): ...
@property
def priority(self) -> str:
"""Query priority ('INTERACTIVE' or 'BATCH')."""
@priority.setter
def priority(self, value: str): ...
@property
def query_parameters(self) -> List[Union[ScalarQueryParameter, ArrayQueryParameter, StructQueryParameter]]:
"""Parameters for parameterized queries."""
@query_parameters.setter
def query_parameters(self, value: List): ...
@property
def use_legacy_sql(self) -> bool:
"""Use legacy SQL syntax instead of standard SQL."""
@use_legacy_sql.setter
def use_legacy_sql(self, value: bool): ...
@property
def use_query_cache(self) -> bool:
"""Enable query result caching."""
@use_query_cache.setter
def use_query_cache(self, value: bool): ...
@property
def write_disposition(self) -> str:
"""Action when destination table exists."""
@write_disposition.setter
def write_disposition(self, value: str): ...
@property
def labels(self) -> Dict[str, str]:
"""Labels for the query job."""
@labels.setter
def labels(self, value: Dict[str, str]): ...
@property
def job_timeout(self) -> datetime.timedelta:
"""Maximum time to wait for job completion."""
@job_timeout.setter
def job_timeout(self, value: datetime.timedelta): ...Process query results with support for pagination, type conversion, and data science integrations.
class RowIterator:
def __init__(self, client: Client, query_job: QueryJob): ...
@property
def total_rows(self) -> int:
"""Total number of rows in the result set."""
@property
def schema(self) -> List[SchemaField]:
"""Schema of the result set."""
def to_dataframe(
self,
create_bqstorage_client: bool = True,
dtypes: Dict[str, str] = None,
progress_bar_type: str = None,
**kwargs
) -> pandas.DataFrame:
"""
Convert results to pandas DataFrame.
Args:
create_bqstorage_client: Use BigQuery Storage API.
dtypes: Pandas data types for columns.
progress_bar_type: Progress bar type.
Returns:
pandas.DataFrame: Results as DataFrame.
"""
def to_arrow(
self,
create_bqstorage_client: bool = True,
progress_bar_type: str = None,
) -> pyarrow.Table:
"""
Convert results to PyArrow Table.
Args:
create_bqstorage_client: Use BigQuery Storage API.
progress_bar_type: Progress bar type.
Returns:
pyarrow.Table: Results as PyArrow Table.
"""
class Row:
def __init__(self, values: List[Any], field_to_index: Dict[str, int]): ...
def values(self) -> List[Any]:
"""Return row values as a list."""
def keys(self) -> List[str]:
"""Return column names."""
def items(self) -> List[Tuple[str, Any]]:
"""Return (column_name, value) pairs."""
def get(self, key: str, default: Any = None) -> Any:
"""Get value by column name with optional default."""Access detailed statistics for Data Manipulation Language (INSERT, UPDATE, DELETE) operations.
class DmlStats:
def __init__(self, **kwargs): ...
@property
def inserted_row_count(self) -> int:
"""Number of rows inserted."""
@property
def deleted_row_count(self) -> int:
"""Number of rows deleted."""
@property
def updated_row_count(self) -> int:
"""Number of rows updated."""Execute multi-statement scripts with detailed execution statistics and error handling.
class ScriptOptions:
def __init__(self, **kwargs): ...
@property
def statement_timeout_ms(self) -> int:
"""Timeout for individual statements in milliseconds."""
@statement_timeout_ms.setter
def statement_timeout_ms(self, value: int): ...
@property
def statement_byte_budget(self) -> int:
"""Maximum bytes processed per statement."""
@statement_byte_budget.setter
def statement_byte_budget(self, value: int): ...
class ScriptStatistics:
def __init__(self, **kwargs): ...
@property
def evaluation_kind(self) -> str:
"""Type of script evaluation."""
@property
def stack_frames(self) -> List[ScriptStackFrame]:
"""Execution stack frames."""
class ScriptStackFrame:
def __init__(self, **kwargs): ...
@property
def start_line(self) -> int:
"""Starting line number."""
@property
def start_column(self) -> int:
"""Starting column number."""
@property
def end_line(self) -> int:
"""Ending line number."""
@property
def end_column(self) -> int:
"""Ending column number."""
@property
def procedure_id(self) -> str:
"""Procedure identifier."""
@property
def text(self) -> str:
"""Stack frame text."""from google.cloud import bigquery
client = bigquery.Client()
# Simple query
query = """
SELECT name, COUNT(*) as count
FROM `bigquery-public-data.usa_names.usa_1910_2013`
WHERE state = 'CA'
GROUP BY name
ORDER BY count DESC
LIMIT 10
"""
query_job = client.query(query)
results = query_job.result()
# Process results
for row in results:
print(f"{row.name}: {row.count}")from google.cloud.bigquery import ScalarQueryParameter
# Query with parameters
query = """
SELECT name, COUNT(*) as count
FROM `bigquery-public-data.usa_names.usa_1910_2013`
WHERE state = @state AND year >= @min_year
GROUP BY name
ORDER BY count DESC
LIMIT @limit
"""
job_config = bigquery.QueryJobConfig(
query_parameters=[
ScalarQueryParameter("state", "STRING", "TX"),
ScalarQueryParameter("min_year", "INT64", 2000),
ScalarQueryParameter("limit", "INT64", 5),
]
)
query_job = client.query(query, job_config=job_config)
results = query_job.result()
for row in results:
print(f"{row.name}: {row.count}")# Advanced query configuration
job_config = bigquery.QueryJobConfig(
destination=f"{client.project}.my_dataset.my_table",
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
priority=bigquery.QueryPriority.BATCH,
maximum_bytes_billed=1000000, # 1MB limit
use_query_cache=True,
labels={"team": "data-science", "env": "prod"}
)
query_job = client.query(query, job_config=job_config)
query_job.result() # Wait for completion
print(f"Query processed {query_job.total_bytes_processed} bytes")
print(f"Query billed {query_job.total_bytes_billed} bytes")# Query with pagination
query = "SELECT * FROM `bigquery-public-data.samples.wikipedia`"
query_job = client.query(query)
# Process in batches
for page in query_job.result().pages:
for row in page:
# Process each row
print(row.title)
# Convert to pandas DataFrame
df = query_job.to_dataframe()
print(df.head())
# Convert to PyArrow for efficient processing
arrow_table = query_job.to_arrow()
print(arrow_table.schema)# INSERT query
insert_query = """
INSERT INTO `my_project.my_dataset.my_table` (name, age, city)
VALUES
('Alice', 30, 'New York'),
('Bob', 25, 'San Francisco')
"""
query_job = client.query(insert_query)
query_job.result()
# Check DML statistics
if query_job.dml_stats:
print(f"Inserted {query_job.dml_stats.inserted_row_count} rows")
# UPDATE query
update_query = """
UPDATE `my_project.my_dataset.my_table`
SET age = age + 1
WHERE city = 'New York'
"""
query_job = client.query(update_query)
query_job.result()
if query_job.dml_stats:
print(f"Updated {query_job.dml_stats.updated_row_count} rows")# Dry run to estimate cost
job_config = bigquery.QueryJobConfig(dry_run=True)
query_job = client.query(query, job_config=job_config)
print(f"This query will process {query_job.total_bytes_processed} bytes")
print(f"Estimated cost: ${query_job.total_bytes_processed / (1024**4) * 5:.2f}")Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-bigquery