Google Cloud BigQuery Connection API client library for managing external data source connections and credentials
—
Core CRUD operations for managing BigQuery connections to external data sources. These operations allow you to create, retrieve, list, update, and delete connection resources programmatically.
Creates a new BigQuery connection to an external data source. The connection configuration depends on the target data source type.
def create_connection(
request: CreateConnectionRequest = None,
*,
parent: str = None,
connection: Connection = None,
connection_id: str = None,
retry: OptionalRetry = DEFAULT,
timeout: Union[float, object] = DEFAULT,
metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
) -> Connection:
"""
Creates a new connection in the given project and location.
Parameters:
- request: The request object containing connection details
- parent: Required. Parent resource name in format 'projects/{project_id}/locations/{location_id}'
- connection: Required. Connection configuration object
- connection_id: Optional. Connection ID to assign to the created connection
- retry: Retry configuration for the request
- timeout: Timeout for the request in seconds
- metadata: Additional metadata to send with the request
Returns:
Connection: The created connection object with server-generated fields populated
Raises:
google.api_core.exceptions.GoogleAPICallError: If the request fails
"""Usage Example:
from google.cloud.bigquery_connection import (
ConnectionServiceClient,
Connection,
CloudSqlProperties,
CloudSqlCredential
)
client = ConnectionServiceClient()
# Configure a Cloud SQL connection
connection = Connection()
connection.friendly_name = "Production Database"
connection.description = "Connection to production PostgreSQL database"
connection.cloud_sql = CloudSqlProperties()
connection.cloud_sql.instance_id = "my-project:us-central1:prod-db"
connection.cloud_sql.database = "analytics"
connection.cloud_sql.type_ = CloudSqlProperties.DatabaseType.POSTGRES
connection.cloud_sql.credential = CloudSqlCredential(
username="bigquery_user",
password="secure_password"
)
# Create the connection
parent = "projects/my-project/locations/us-central1"
created_connection = client.create_connection(
parent=parent,
connection=connection,
connection_id="prod-analytics-db"
)
print(f"Created connection: {created_connection.name}")
print(f"Service account: {created_connection.cloud_sql.service_account_id}")Retrieves a specific connection by its resource name.
def get_connection(
request: GetConnectionRequest = None,
*,
name: str = None,
retry: OptionalRetry = DEFAULT,
timeout: Union[float, object] = DEFAULT,
metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
) -> Connection:
"""
Returns the specified connection.
Parameters:
- request: The request object containing the connection name
- name: Required. Name of the connection in format 'projects/{project_id}/locations/{location_id}/connections/{connection_id}'
- retry: Retry configuration for the request
- timeout: Timeout for the request in seconds
- metadata: Additional metadata to send with the request
Returns:
Connection: The requested connection object
Raises:
google.api_core.exceptions.NotFound: If the connection does not exist
google.api_core.exceptions.GoogleAPICallError: If the request fails
"""Usage Example:
# Get a specific connection
connection_name = "projects/my-project/locations/us-central1/connections/prod-analytics-db"
connection = client.get_connection(name=connection_name)
print(f"Connection: {connection.friendly_name}")
print(f"Created: {connection.creation_time}")
print(f"Has credential: {connection.has_credential}")Lists all connections in a given project and location with pagination support.
def list_connections(
request: ListConnectionsRequest = None,
*,
parent: str = None,
retry: OptionalRetry = DEFAULT,
timeout: Union[float, object] = DEFAULT,
metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
) -> ListConnectionsPager:
"""
Returns a list of connections in the given project and location.
Parameters:
- request: The request object containing parent and pagination parameters
- parent: Required. Parent resource name in format 'projects/{project_id}/locations/{location_id}'
- retry: Retry configuration for the request
- timeout: Timeout for the request in seconds
- metadata: Additional metadata to send with the request
Returns:
ListConnectionsPager: A pager for iterating through connections
Raises:
google.api_core.exceptions.GoogleAPICallError: If the request fails
"""Usage Examples:
# List all connections
parent = "projects/my-project/locations/us-central1"
connections = client.list_connections(parent=parent)
# Iterate through all connections
for connection in connections:
print(f"Connection: {connection.name}")
print(f" Name: {connection.friendly_name}")
print(f" Description: {connection.description}")
if connection.cloud_sql:
print(f" Type: Cloud SQL ({connection.cloud_sql.database})")
elif connection.aws:
print(f" Type: AWS")
elif connection.azure:
print(f" Type: Azure")
# Iterate through pages manually
for page in connections.pages:
print(f"Page with {len(page.connections)} connections")
for connection in page.connections:
print(f" - {connection.friendly_name}")Updates an existing connection. Use field masks to specify which fields to update.
def update_connection(
request: UpdateConnectionRequest = None,
*,
connection: Connection = None,
update_mask: FieldMask = None,
retry: OptionalRetry = DEFAULT,
timeout: Union[float, object] = DEFAULT,
metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
) -> Connection:
"""
Updates a connection.
Parameters:
- request: The request object containing update details
- connection: Required. Connection object with updated field values (connection.name specifies which connection to update)
- update_mask: Required. Field mask specifying which fields to update
- retry: Retry configuration for the request
- timeout: Timeout for the request in seconds
- metadata: Additional metadata to send with the request
Returns:
Connection: The updated connection object
Raises:
google.api_core.exceptions.NotFound: If the connection does not exist
google.api_core.exceptions.GoogleAPICallError: If the request fails
"""Usage Example:
from google.protobuf import field_mask_pb2
# Get the existing connection
connection_name = "projects/my-project/locations/us-central1/connections/prod-analytics-db"
connection = client.get_connection(name=connection_name)
# Update the description and friendly name
connection.friendly_name = "Production Analytics Database"
connection.description = "Updated: Connection to production PostgreSQL for analytics workloads"
# Create update mask specifying which fields to update
update_mask = field_mask_pb2.FieldMask(
paths=["friendly_name", "description"]
)
# Perform the update
updated_connection = client.update_connection(
connection=connection,
update_mask=update_mask
)
print(f"Updated connection: {updated_connection.friendly_name}")Deletes a connection permanently. This operation cannot be undone.
def delete_connection(
request: DeleteConnectionRequest = None,
*,
name: str = None,
retry: OptionalRetry = DEFAULT,
timeout: Union[float, object] = DEFAULT,
metadata: Sequence[Tuple[str, Union[str, bytes]]] = (),
) -> None:
"""
Deletes connection and associated credential.
Parameters:
- request: The request object containing the connection name
- name: Required. Name of the connection to delete in format 'projects/{project_id}/locations/{location_id}/connections/{connection_id}'
- retry: Retry configuration for the request
- timeout: Timeout for the request in seconds
- metadata: Additional metadata to send with the request
Returns:
None
Raises:
google.api_core.exceptions.NotFound: If the connection does not exist
google.api_core.exceptions.GoogleAPICallError: If the request fails
"""Usage Example:
# Delete a connection
connection_name = "projects/my-project/locations/us-central1/connections/prod-analytics-db"
try:
client.delete_connection(name=connection_name)
print(f"Deleted connection: {connection_name}")
except Exception as e:
print(f"Failed to delete connection: {e}")class CreateConnectionRequest:
"""Request message for creating a connection."""
parent: str # Required. Parent resource name 'projects/{project_id}/locations/{location_id}'
connection_id: str # Optional. Connection ID to assign
connection: Connection # Required. Connection configuration
class GetConnectionRequest:
"""Request message for getting a connection."""
name: str # Required. Connection resource name
class ListConnectionsRequest:
"""Request message for listing connections."""
parent: str # Required. Parent resource name 'projects/{project_id}/locations/{location_id}'
page_size: int # Required. Maximum number of connections to return
page_token: str # Page token for pagination
class UpdateConnectionRequest:
"""Request message for updating a connection."""
name: str # Required. Connection resource name
connection: Connection # Required. Updated connection object
update_mask: field_mask_pb2.FieldMask # Required. Fields to update
class DeleteConnectionRequest:
"""Request message for deleting a connection."""
name: str # Required. Connection resource name to deleteclass ListConnectionsResponse:
"""Response message for listing connections."""
next_page_token: str # Token for next page of results
connections: MutableSequence[Connection] # List of connection objects
@property
def raw_page(self) -> "ListConnectionsResponse":
"""Returns the raw page for pagination support."""
return selfclass ListConnectionsPager:
"""Synchronous pager for iterating through list_connections results."""
def __iter__(self) -> Iterator[Connection]:
"""Iterate through individual connections."""
@property
def pages(self) -> Iterator[ListConnectionsResponse]:
"""Iterate through response pages."""
class ListConnectionsAsyncPager:
"""Asynchronous pager for iterating through list_connections results."""
def __aiter__(self) -> AsyncIterator[Connection]:
"""Async iterate through individual connections."""
@property
def pages(self) -> AsyncIterator[ListConnectionsResponse]:
"""Async iterate through response pages."""Install with Tessl CLI
npx tessl i tessl/pypi-google-cloud-bigquery-connection