Google Cloud BigQuery Connection API client library for managing external data source connections and credentials
npx @tessl/cli install tessl/pypi-google-cloud-bigquery-connection@1.18.0A Python client library for Google Cloud BigQuery Connection API that enables management of external data source connections and credentials. The library provides programmatic access to create, read, update, and delete BigQuery connections to various external data sources including Cloud SQL, AWS, Azure, Cloud Spanner, and more.
pip install google-cloud-bigquery-connectionfrom google.cloud.bigquery_connection import (
ConnectionServiceClient,
ConnectionServiceAsyncClient,
Connection,
CreateConnectionRequest,
ListConnectionsRequest,
UpdateConnectionRequest,
DeleteConnectionRequest,
GetConnectionRequest,
)For message types and connection properties:
from google.cloud.bigquery_connection import (
CloudSqlProperties,
AwsProperties,
AzureProperties,
CloudSpannerProperties,
CloudResourceProperties,
SparkProperties,
SalesforceDataCloudProperties,
)from google.cloud.bigquery_connection import ConnectionServiceClient, Connection
# Initialize the client (uses default credentials from environment)
client = ConnectionServiceClient()
# Define the parent resource (project and location)
parent = "projects/my-project/locations/us-central1"
# Create a connection configuration
connection = Connection()
connection.friendly_name = "My Test Connection"
connection.description = "Connection to external data source"
# For Cloud SQL connection
from google.cloud.bigquery_connection import CloudSqlProperties, CloudSqlCredential
connection.cloud_sql = CloudSqlProperties()
connection.cloud_sql.instance_id = "project:region:instance"
connection.cloud_sql.database = "my_database"
connection.cloud_sql.type_ = CloudSqlProperties.DatabaseType.POSTGRES
connection.cloud_sql.credential = CloudSqlCredential(
username="db_user",
password="db_password"
)
# Create the connection
created_connection = client.create_connection(
parent=parent,
connection=connection,
connection_id="my-connection-id"
)
print(f"Created connection: {created_connection.name}")
# List all connections
connections = client.list_connections(parent=parent)
for conn in connections:
print(f"Connection: {conn.name}, Type: {conn.friendly_name}")
# Get a specific connection
connection_name = f"{parent}/connections/my-connection-id"
retrieved_connection = client.get_connection(name=connection_name)
# Update a connection
retrieved_connection.description = "Updated description"
from google.protobuf import field_mask_pb2
update_mask = field_mask_pb2.FieldMask(paths=["description"])
updated_connection = client.update_connection(
connection=retrieved_connection,
update_mask=update_mask
)
# Delete a connection
client.delete_connection(name=connection_name)The BigQuery Connection API follows Google Cloud's standard client library patterns:
ConnectionServiceClient) and asynchronous (ConnectionServiceAsyncClient) clients for different use casesCore CRUD operations for managing BigQuery connections to external data sources. Supports creation, retrieval, listing, updating, and deletion of connection resources.
def create_connection(
request: CreateConnectionRequest = None,
*,
parent: str = None,
connection: Connection = None,
connection_id: str = None,
retry: OptionalRetry = DEFAULT,
timeout: Union[float, object] = DEFAULT,
metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
) -> Connection: ...
def get_connection(
request: GetConnectionRequest = None,
*,
name: str = None,
retry: OptionalRetry = DEFAULT,
timeout: Union[float, object] = DEFAULT,
metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
) -> Connection: ...
def list_connections(
request: ListConnectionsRequest = None,
*,
parent: str = None,
retry: OptionalRetry = DEFAULT,
timeout: Union[float, object] = DEFAULT,
metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
) -> ListConnectionsPager: ...
def update_connection(
request: UpdateConnectionRequest = None,
*,
name: str = None,
connection: Connection = None,
update_mask: FieldMask = None,
retry: OptionalRetry = DEFAULT,
timeout: Union[float, object] = DEFAULT,
metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
) -> Connection: ...
def delete_connection(
request: DeleteConnectionRequest = None,
*,
name: str = None,
retry: OptionalRetry = DEFAULT,
timeout: Union[float, object] = DEFAULT,
metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
) -> None: ...Support for multiple external data source connection types including Cloud SQL, AWS, Azure, Cloud Spanner, Cloud Resource, Spark, and Salesforce Data Cloud. Each connection type provides specific configuration options for its respective data source.
class Connection:
name: str
friendly_name: str
description: str
creation_time: int
last_modified_time: int
has_credential: bool
# OneOf connection type fields
cloud_sql: CloudSqlProperties
aws: AwsProperties
azure: AzureProperties
cloud_spanner: CloudSpannerProperties
cloud_resource: CloudResourceProperties
spark: SparkProperties
salesforce_data_cloud: SalesforceDataCloudPropertiesOptions for configuring client behavior including credentials, transport selection, retry policies, timeouts, and metadata handling. Supports both synchronous and asynchronous operation modes.
class ConnectionServiceClient:
def __init__(
self,
*,
credentials: Optional[Credentials] = None,
transport: Optional[Union[str, ConnectionServiceTransport]] = None,
client_options: Optional[Union[ClientOptions, dict]] = None,
client_info: ClientInfo = DEFAULT_CLIENT_INFO,
) -> None: ...
class ConnectionServiceAsyncClient:
def __init__(
self,
*,
credentials: Optional[Credentials] = None,
transport: Optional[Union[str, ConnectionServiceTransport]] = "grpc_asyncio",
client_options: Optional[ClientOptions] = None,
client_info: ClientInfo = DEFAULT_CLIENT_INFO,
) -> None: ...Identity and Access Management operations for connection resources including getting, setting, and testing IAM policies and permissions.
def get_iam_policy(
request: GetIamPolicyRequest = None,
*,
resource: str = None,
retry: OptionalRetry = DEFAULT,
timeout: Union[float, object] = DEFAULT,
metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
) -> Policy: ...
def set_iam_policy(
request: SetIamPolicyRequest = None,
*,
resource: str = None,
retry: OptionalRetry = DEFAULT,
timeout: Union[float, object] = DEFAULT,
metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
) -> Policy: ...
def test_iam_permissions(
request: TestIamPermissionsRequest = None,
*,
resource: str = None,
permissions: Optional[MutableSequence[str]] = None,
retry: OptionalRetry = DEFAULT,
timeout: Union[float, object] = DEFAULT,
metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
) -> TestIamPermissionsResponse: ...Utility methods for constructing and parsing Google Cloud resource paths for connections, clusters, services, and common resources like projects, locations, and billing accounts.
@classmethod
def connection_path(cls, project: str, location: str, connection: str) -> str: ...
@classmethod
def parse_connection_path(cls, path: str) -> Dict[str, str]: ...
@classmethod
def cluster_path(cls, project: str, region: str, cluster: str) -> str: ...
@classmethod
def service_path(cls, project: str, location: str, service: str) -> str: ...
@classmethod
def common_project_path(cls, project: str) -> str: ...
@classmethod
def common_location_path(cls, project: str, location: str) -> str: ...from typing import Optional, Union, Sequence, Tuple, MutableSequence
from google.api_core import retry as retries
from google.api_core import gapic_v1
from google.protobuf import field_mask_pb2
from google.iam.v1 import policy_pb2, iam_policy_pb2
OptionalRetry = Union[retries.Retry, gapic_v1.method._MethodDefault, None]class CreateConnectionRequest:
parent: str
connection_id: str
connection: Connection
class GetConnectionRequest:
name: str
class ListConnectionsRequest:
parent: str
page_size: int
page_token: str
class UpdateConnectionRequest:
name: str
connection: Connection
update_mask: field_mask_pb2.FieldMask
class DeleteConnectionRequest:
name: strclass ListConnectionsResponse:
next_page_token: str
connections: MutableSequence[Connection]
@property
def raw_page(self) -> "ListConnectionsResponse": ...class ListConnectionsPager:
def __iter__(self) -> Iterator[Connection]: ...
@property
def pages(self) -> Iterator[ListConnectionsResponse]: ...
class ListConnectionsAsyncPager:
def __aiter__(self) -> AsyncIterator[Connection]: ...
@property
def pages(self) -> AsyncIterator[ListConnectionsResponse]: ...