CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-google

Provider package for Google services integration with Apache Airflow, including Google Ads, Google Cloud (GCP), Google Firebase, Google LevelDB, Google Marketing Platform, and Google Workspace

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

common-utilities.mddocs/

Common Utilities and Base Classes

Shared utilities, authentication backends, base classes, and helper functions used across all Google service integrations. Provides foundation for authentication, operation management, and service discovery.

Capabilities

Base Hook Classes

Foundation classes that all Google service hooks inherit from, providing common authentication and connection management.

class GoogleBaseHook(BaseHook):
    """
    Base class for all Google Cloud service hooks.
    
    Provides common functionality for authentication, connection management,
    and credential handling across all Google service integrations.
    """
    def __init__(
        self,
        gcp_conn_id: str = "google_cloud_default",
        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
        **kwargs
    ): ...
    
    def get_connection(self, conn_id: str): ...
    def get_credentials_and_project_id(self): ...
    def get_credentials(self): ...
    def quota_retry(self, *args, **kwargs): ...
    def fallback_to_default_project_id(self, project_id: Optional[str]): ...

class GoogleBaseAsyncHook(BaseHook):
    """
    Base class for async Google Cloud service hooks.
    
    Provides asynchronous operations support for long-running Google Cloud
    processes with deferrable operator patterns.
    """
    def __init__(
        self,
        gcp_conn_id: str = "google_cloud_default",
        impersonation_chain: Optional[Union[str, Sequence[str]]] = None,
        **kwargs
    ): ...
    
    async def get_sync_hook(self): ...
    def sync_hook(self): ...

class GoogleDiscoveryApiHook(GoogleBaseHook):
    """
    Hook for Google Discovery API services.
    
    Provides access to Google APIs through the Discovery service,
    enabling dynamic API client generation.
    """
    def __init__(
        self,
        api_service_name: str,
        api_version: str,
        gcp_conn_id: str = "google_cloud_default",
        **kwargs
    ): ...
    
    def get_conn(self): ...
    def build_service(self): ...

Operation Helpers

Utilities for managing long-running Google Cloud operations with polling and status checking.

class OperationHelper:
    """
    Helper class for managing Google Cloud long-running operations.
    
    Provides polling, status checking, and result extraction for
    asynchronous operations across Google Cloud services.
    """
    def __init__(
        self,
        operation: Dict[str, Any],
        project_id: Optional[str] = None,
        location: Optional[str] = None,
        **kwargs
    ): ...
    
    def wait_for_operation(
        self,
        timeout: Optional[float] = None,
        retry: Optional[Retry] = None
    ): ...
    def is_done(self) -> bool: ...
    def get_error_message(self) -> Optional[str]: ...
    def get_result(self) -> Dict[str, Any]: ...
    def cancel_operation(self): ...

Authentication Backend

Google OpenID authentication backend for Airflow web server integration.

class GoogleOpenIdAuthBackend:
    """
    Google OpenID Connect authentication backend for Airflow.
    
    Enables Google OAuth authentication for Airflow web interface
    with support for domain restrictions and role mapping.
    """
    def __init__(self): ...
    
    def authenticate(self, request): ...
    def login_required(self, function): ...
    def has_access(self, action: str, resource: str, user) -> bool: ...

ID Token Credentials

Adapter for Google ID token credentials used in service-to-service authentication.

class IDTokenCredentialsAdapter:
    """
    Adapter for Google ID token credentials.
    
    Provides compatibility layer for ID token-based authentication
    in Google Cloud service-to-service communication.
    """
    def __init__(
        self,
        credentials,
        target_audience: str,
        **kwargs
    ): ...
    
    def refresh(self, request): ...
    def apply(self, headers, token=None): ...
    def before_request(self, request, method, url, headers): ...

def get_default_id_token_credentials(
    target_audience: str,
    request: Optional[Any] = None
): ...
    """
    Get default ID token credentials for target audience.
    
    Args:
        target_audience (str): Target service URL for ID token
        request (Optional[Any]): HTTP request object
        
    Returns:
        ID token credentials for service-to-service auth
    """

def impersonated_id_token_credentials(
    credentials,
    target_audience: str,
    target_principal: str,
    delegates: Optional[List[str]] = None,
    **kwargs
): ...
    """
    Create impersonated ID token credentials.
    
    Args:
        credentials: Source credentials for impersonation
        target_audience (str): Target service URL
        target_principal (str): Service account to impersonate
        delegates (Optional[List[str]]): Delegation chain
        
    Returns:
        Impersonated ID token credentials
    """

Console Links

Link generators for Google Cloud Console navigation from Airflow web interface.

class StorageLink(BaseOperatorLink):
    """
    Link to Google Cloud Storage console for bucket or object viewing.
    
    Generates direct links to GCS resources in the Google Cloud Console
    for easy navigation from Airflow task instances.
    """
    name: str = "Google Cloud Storage"
    
    def get_link(
        self,
        operator: BaseOperator,
        dttm: datetime,
        **kwargs
    ) -> str: ...

class FileDetailsLink(BaseOperatorLink):
    """
    Link to specific file details in Google Cloud Storage console.
    
    Provides direct navigation to individual file properties and
    metadata in the Google Cloud Console.
    """
    name: str = "File Details"
    
    def get_link(
        self,
        operator: BaseOperator,
        dttm: datetime,
        **kwargs
    ) -> str: ...

Constants and Configuration

Common constants and configuration values used across the provider.

# Default method name for deferrable operations
GOOGLE_DEFAULT_DEFERRABLE_METHOD_NAME: str = "execute_complete"

# Client information for API requests
CLIENT_INFO: Dict[str, str] = {
    "client_library_name": "airflow",
    "client_library_version": "airflow_version"
}

# Default scopes for Google Cloud APIs
DEFAULT_SCOPES: List[str] = [
    "https://www.googleapis.com/auth/cloud-platform"
]

# Default timeout values
DEFAULT_TIMEOUT: float = 60.0
DEFAULT_RETRY_ATTEMPTS: int = 3

# Common HTTP status codes
HTTP_OK: int = 200
HTTP_CREATED: int = 201
HTTP_NO_CONTENT: int = 204
HTTP_NOT_FOUND: int = 404

Usage Examples

Custom Hook Implementation

from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
from typing import Optional, Dict, Any

class CustomGoogleServiceHook(GoogleBaseHook):
    """Custom hook for a specific Google service."""
    
    def __init__(
        self,
        api_version: str = "v1",
        gcp_conn_id: str = "google_cloud_default",
        **kwargs
    ):
        super().__init__(gcp_conn_id=gcp_conn_id, **kwargs)
        self.api_version = api_version
    
    def get_service_client(self):
        """Get authenticated service client."""
        credentials, project_id = self.get_credentials_and_project_id()
        return build_service_client(
            credentials=credentials,
            api_version=self.api_version
        )
    
    def create_resource(self, resource_config: Dict[str, Any]) -> Dict[str, Any]:
        """Create a resource using the service API."""
        client = self.get_service_client()
        operation = client.create_resource(body=resource_config)
        
        # Use OperationHelper for long-running operations
        from airflow.providers.google.common.hooks.operation_helpers import OperationHelper
        
        helper = OperationHelper(
            operation=operation,
            project_id=self.project_id
        )
        return helper.wait_for_operation(timeout=300)

Authentication Configuration

# Service account key file authentication
gcp_connection = {
    "conn_id": "google_cloud_custom",
    "conn_type": "gcp",
    "extra": {
        "key_path": "/path/to/service-account.json",
        "scope": "https://www.googleapis.com/auth/cloud-platform",
        "project": "my-gcp-project"
    }
}

# Service account impersonation
gcp_impersonation = {
    "conn_id": "google_cloud_impersonated",
    "conn_type": "gcp",
    "extra": {
        "impersonation_chain": [
            "service-account@project.iam.gserviceaccount.com"
        ],
        "project": "target-project"
    }
}

Error Handling Patterns

from airflow.providers.google.common.hooks.base_google import GoogleBaseHook
from google.api_core.exceptions import GoogleAPICallError, RetryError
from airflow.exceptions import AirflowException

def safe_google_api_call(hook: GoogleBaseHook, operation_func, **kwargs):
    """Safely execute Google API calls with error handling."""
    try:
        return operation_func(**kwargs)
    except GoogleAPICallError as e:
        if e.code == 404:
            raise AirflowException(f"Resource not found: {e.message}")
        elif e.code == 403:
            raise AirflowException(f"Permission denied: {e.message}")
        elif e.code == 429:
            raise AirflowException(f"Rate limit exceeded: {e.message}")
        else:
            raise AirflowException(f"Google API error: {e.message}")
    except RetryError as e:
        raise AirflowException(f"Operation timed out after retries: {e}")
    except Exception as e:
        raise AirflowException(f"Unexpected error: {str(e)}")

Types

from typing import Dict, List, Optional, Any, Union, Sequence
from google.oauth2.credentials import Credentials
from google.auth.credentials import Credentials as BaseCredentials
from airflow.models import BaseOperator
from airflow.models.baseoperatorlink import BaseOperatorLink

# Credential types
GoogleCredentials = Union[Credentials, BaseCredentials]
ServiceAccountInfo = Dict[str, Any]
ImpersonationChain = Union[str, Sequence[str]]

# Operation types
OperationResult = Dict[str, Any]
OperationStatus = str
ErrorInfo = Dict[str, str]

# Connection types
ConnectionConfig = Dict[str, Any]
AuthScopes = List[str]
ProjectId = str
LocationId = str

# Link types
ConsoleUrl = str
ResourceLink = str

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-google

docs

common-utilities.md

data-transfers.md

firebase.md

gcp-services.md

google-ads.md

google-workspace.md

index.md

leveldb.md

marketing-platform.md

tile.json