Google Cloud BigQuery Connection API client library for managing external data source connections and credentials
—
Configuration options for BigQuery Connection API clients including credentials, transport selection, retry policies, timeouts, and metadata handling. The library provides both synchronous and asynchronous clients with flexible configuration options.
The ConnectionServiceClient provides synchronous operations for connection management.
class ConnectionServiceClient:
"""Manages connections to external data source connections and credentials."""
def __init__(
self,
*,
credentials: Optional[ga_credentials.Credentials] = None,
transport: Optional[Union[str, ConnectionServiceTransport, Callable]] = None,
client_options: Optional[Union[client_options_lib.ClientOptions, dict]] = None,
client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
) -> None:
"""
Initialize the Connection Service client.
Parameters:
- credentials: The authorization credentials to attach to requests
- transport: The transport to use for communication with the service
- client_options: Custom options for the client
- client_info: Client information used to generate the user-agent string
"""Usage Examples:
from google.cloud.bigquery_connection import ConnectionServiceClient
from google.oauth2 import service_account
from google.api_core import client_options
# Default initialization (uses Application Default Credentials)
client = ConnectionServiceClient()
# With explicit service account credentials
credentials = service_account.Credentials.from_service_account_file(
"/path/to/service-account-key.json"
)
client = ConnectionServiceClient(credentials=credentials)
# With custom client options
options = client_options.ClientOptions(
api_endpoint="custom-bigqueryconnection.googleapis.com"
)
client = ConnectionServiceClient(client_options=options)
# With specific transport
client = ConnectionServiceClient(transport="grpc")
client = ConnectionServiceClient(transport="rest")The ConnectionServiceAsyncClient provides asynchronous operations for connection management.
class ConnectionServiceAsyncClient:
"""Manages connections to external data source connections and credentials asynchronously."""
def __init__(
self,
*,
credentials: Optional[ga_credentials.Credentials] = None,
transport: Optional[Union[str, ConnectionServiceTransport, Callable]] = "grpc_asyncio",
client_options: Optional[client_options_lib.ClientOptions] = None,
client_info: gapic_v1.client_info.ClientInfo = DEFAULT_CLIENT_INFO,
) -> None:
"""
Initialize the Connection Service async client.
Parameters:
- credentials: The authorization credentials to attach to requests
- transport: The transport to use (defaults to "grpc_asyncio")
- client_options: Custom options for the client
- client_info: Client information used to generate the user-agent string
"""Usage Examples:
import asyncio
from google.cloud.bigquery_connection import ConnectionServiceAsyncClient
async def main():
# Default async initialization
async_client = ConnectionServiceAsyncClient()
# List connections asynchronously
parent = "projects/my-project/locations/us-central1"
connections = async_client.list_connections(parent=parent)
async for connection in connections:
print(f"Connection: {connection.friendly_name}")
# Clean up
await async_client.transport.close()
# Run async code
asyncio.run(main())Both client classes provide factory methods for convenient initialization from service account files.
@classmethod
def from_service_account_info(
cls,
info: dict,
*args,
**kwargs
) -> "ConnectionServiceClient":
"""
Creates a client instance from parsed service account info.
Parameters:
- info: Service account info in JSON format
- args: Additional arguments to pass to the constructor
- kwargs: Additional keyword arguments to pass to the constructor
Returns:
ConnectionServiceClient: The constructed client
"""
@classmethod
def from_service_account_file(
cls,
filename: str,
*args,
**kwargs
) -> "ConnectionServiceClient":
"""
Creates a client instance from a service account JSON file.
Parameters:
- filename: Path to the service account JSON file
- args: Additional arguments to pass to the constructor
- kwargs: Additional keyword arguments to pass to the constructor
Returns:
ConnectionServiceClient: The constructed client
"""
# Alias for from_service_account_file
from_service_account_json = from_service_account_fileUsage Examples:
# From service account file
client = ConnectionServiceClient.from_service_account_file(
"/path/to/service-account-key.json"
)
# From service account info dict
import json
with open("/path/to/service-account-key.json") as f:
service_account_info = json.load(f)
client = ConnectionServiceClient.from_service_account_info(
service_account_info
)
# With additional client options
client = ConnectionServiceClient.from_service_account_file(
"/path/to/service-account-key.json",
client_options={"api_endpoint": "custom-endpoint.googleapis.com"}
)Configure the underlying transport layer for communication with the BigQuery Connection API.
@classmethod
def get_transport_class(
cls,
label: Optional[str] = None,
) -> Type[ConnectionServiceTransport]:
"""
Returns an appropriate transport class.
Parameters:
- label: The name of the desired transport ("grpc", "grpc_asyncio", "rest")
Returns:
Type[ConnectionServiceTransport]: The transport class
"""Available Transport Options:
Usage Examples:
# Explicitly specify transport
client = ConnectionServiceClient(transport="grpc")
client = ConnectionServiceClient(transport="rest")
# Get transport class
transport_class = ConnectionServiceClient.get_transport_class("grpc")
custom_transport = transport_class(credentials=credentials)
client = ConnectionServiceClient(transport=custom_transport)
# For async client
async_client = ConnectionServiceAsyncClient(transport="grpc_asyncio")Customize client behavior using ClientOptions.
from google.api_core import client_options as client_options_lib
class ClientOptions:
"""Options for configuring the client."""
api_endpoint: str # API endpoint to use
client_cert_source: Callable # Client certificate source
client_encrypted_cert_source: Callable # Encrypted client certificate source
quota_project_id: str # Project ID for quota and billing
credentials_file: str # Path to credentials file
scopes: List[str] # OAuth2 scopes for authentication
default_scopes: List[str] # Default OAuth2 scopesUsage Examples:
from google.api_core import client_options
# Custom API endpoint
options = client_options.ClientOptions(
api_endpoint="custom-bigqueryconnection.googleapis.com"
)
client = ConnectionServiceClient(client_options=options)
# Quota project ID
options = client_options.ClientOptions(
quota_project_id="my-billing-project"
)
client = ConnectionServiceClient(client_options=options)
# Multiple options
options = client_options.ClientOptions(
api_endpoint="custom-endpoint.googleapis.com",
quota_project_id="my-billing-project"
)
client = ConnectionServiceClient(client_options=options)
# From dictionary
options_dict = {
"api_endpoint": "custom-endpoint.googleapis.com",
"quota_project_id": "my-billing-project"
}
client = ConnectionServiceClient(client_options=options_dict)Configure individual request behavior using retry, timeout, and metadata parameters.
from google.api_core import retry as retries
from google.api_core import gapic_v1
from typing import Union, Sequence, Tuple
OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None]
# All client methods support these parameters:
# - retry: OptionalRetry = gapic_v1.method.DEFAULT
# - timeout: Union[float, object] = gapic_v1.method.DEFAULT
# - metadata: Sequence[Tuple[str, Union[str, bytes]]] = ()Usage Examples:
from google.api_core import retry as retries
import time
# Custom retry configuration
retry_config = retries.Retry(
initial=1.0, # Initial delay in seconds
maximum=60.0, # Maximum delay in seconds
multiplier=2.0, # Delay multiplier
predicate=retries.if_transient_error, # Retry on transient errors
deadline=300.0 # Total deadline in seconds
)
# Use custom retry
connection = client.get_connection(
name="projects/my-project/locations/us-central1/connections/my-conn",
retry=retry_config
)
# Custom timeout (30 seconds)
connection = client.get_connection(
name="projects/my-project/locations/us-central1/connections/my-conn",
timeout=30.0
)
# Custom metadata
metadata = [
("x-custom-header", "custom-value"),
("x-request-id", "req-12345")
]
connection = client.get_connection(
name="projects/my-project/locations/us-central1/connections/my-conn",
metadata=metadata
)
# Disable retry
connection = client.get_connection(
name="projects/my-project/locations/us-central1/connections/my-conn",
retry=None
)Both client classes support context manager protocols for automatic resource cleanup.
class ConnectionServiceClient:
def __enter__(self) -> "ConnectionServiceClient":
"""Enter context manager."""
return self
def __exit__(self, type, value, traceback) -> None:
"""Exit context manager and clean up resources."""
class ConnectionServiceAsyncClient:
async def __aenter__(self) -> "ConnectionServiceAsyncClient":
"""Enter async context manager."""
return self
async def __aexit__(self, exc_type, exc, tb) -> None:
"""Exit async context manager and clean up resources."""Usage Examples:
# Synchronous context manager
with ConnectionServiceClient() as client:
connections = client.list_connections(
parent="projects/my-project/locations/us-central1"
)
for connection in connections:
print(connection.friendly_name)
# Client automatically cleaned up
# Asynchronous context manager
async def async_example():
async with ConnectionServiceAsyncClient() as async_client:
connections = async_client.list_connections(
parent="projects/my-project/locations/us-central1"
)
async for connection in connections:
print(connection.friendly_name)
# Async client automatically cleaned upAccess client configuration and service information.
class ConnectionServiceClient:
@property
def transport(self) -> ConnectionServiceTransport:
"""The transport used by the client instance."""
@property
def api_endpoint(self) -> str:
"""The API endpoint for the service."""
@property
def universe_domain(self) -> str:
"""The universe domain used by the client instance."""
class ConnectionServiceAsyncClient:
@property
def transport(self) -> ConnectionServiceTransport:
"""The transport used by the client instance."""
@property
def api_endpoint(self) -> str:
"""The API endpoint for the service."""
@property
def universe_domain(self) -> str:
"""The universe domain used by the client instance."""Usage Examples:
client = ConnectionServiceClient()
print(f"API Endpoint: {client.api_endpoint}")
print(f"Universe Domain: {client.universe_domain}")
print(f"Transport Type: {type(client.transport).__name__}")
# Check transport configuration
if hasattr(client.transport, '_credentials'):
print("Using custom credentials")
else:
print("Using default credentials")from google.auth import credentials as ga_credentials
from google.api_core import client_options as client_options_lib
from google.api_core import gapic_v1
# Credential types
Credentials = ga_credentials.Credentials
# Client configuration
ClientOptions = client_options_lib.ClientOptions
ClientInfo = gapic_v1.client_info.ClientInfo
# Transport types
ConnectionServiceTransport = Any # Base transport class
ConnectionServiceGrpcTransport = Any # gRPC transport
ConnectionServiceGrpcAsyncIOTransport = Any # Async gRPC transport
ConnectionServiceRestTransport = Any # REST transport
# Request configuration types
OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None]# Default values used by the client
DEFAULT_ENDPOINT = "bigqueryconnection.googleapis.com"
DEFAULT_MTLS_ENDPOINT = "bigqueryconnection.mtls.googleapis.com"
DEFAULT_UNIVERSE = "googleapis.com"
# Default client info
DEFAULT_CLIENT_INFO = gapic_v1.client_info.ClientInfo(
gapic_version=package_version.__version__
)Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-bigquery-connection