CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-google

Provider package for Google services integration with Apache Airflow, including Google Ads, Google Cloud (GCP), Google Firebase, Google LevelDB, Google Marketing Platform, and Google Workspace

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

firebase.mddocs/

Firebase Integration

Google Firebase integration for Firestore database operations, enabling NoSQL database interactions in data pipelines for real-time applications and document-based data management.

Capabilities

Cloud Firestore

Google Cloud Firestore NoSQL document database integration for data storage, retrieval, and export operations.

class CloudFirestoreHook(GoogleBaseHook):
    """
    Hook for Google Cloud Firestore NoSQL document database.
    
    Provides methods for database operations including document management,
    collection queries, and database export functionality.
    """
    def __init__(
        self,
        gcp_conn_id: str = "google_cloud_default",
        **kwargs
    ): ...
    
    def get_conn(self): ...
    def export_documents(
        self,
        body: Dict[str, Any],
        database: str = "(default)",
        project_id: Optional[str] = None
    ): ...
    def import_documents(
        self,
        body: Dict[str, Any],
        database: str = "(default)",
        project_id: Optional[str] = None
    ): ...
    def list_collection_ids(
        self,
        parent: str,
        database: str = "(default)",
        project_id: Optional[str] = None
    ): ...
    def list_documents(
        self,
        parent: str,
        collection_id: str,
        page_size: Optional[int] = None,
        order_by: Optional[str] = None,
        database: str = "(default)",
        project_id: Optional[str] = None
    ): ...
    def run_query(
        self,
        body: Dict[str, Any],
        database: str = "(default)",
        project_id: Optional[str] = None
    ): ...

class CloudFirestoreExportDatabaseOperator(BaseOperator):
    """
    Exports Cloud Firestore database to Google Cloud Storage.
    
    Args:
        project_id (str): Google Cloud project ID
        database_id (str): Firestore database ID (default: "(default)")
        body (Dict[str, Any]): Export request body configuration
        gcp_conn_id (str): Connection ID for Google Cloud Platform
        
    Returns:
        Operation result with export details and GCS output location
    """
    def __init__(
        self,
        project_id: Optional[str] = None,
        database_id: str = "(default)",
        body: Dict[str, Any] = None,
        gcp_conn_id: str = "google_cloud_default",
        **kwargs
    ): ...

Usage Examples

Database Export to GCS

from airflow import DAG
from airflow.providers.google.firebase.operators.firestore import CloudFirestoreExportDatabaseOperator
from datetime import datetime

dag = DAG(
    'firestore_backup',
    default_args={'start_date': datetime(2023, 1, 1)},
    schedule_interval='@daily',
    catchup=False
)

# Export Firestore database to GCS
export_firestore = CloudFirestoreExportDatabaseOperator(
    task_id='export_firestore',
    project_id='my-firebase-project',
    database_id='(default)',
    body={
        'outputUriPrefix': 'gs://firestore-backups/{{ ds }}/',
        'collectionIds': ['users', 'orders', 'products']  # Optional: specific collections
    },
    gcp_conn_id='google_cloud_default',
    dag=dag
)

Complete Backup Pipeline

from airflow import DAG
from airflow.providers.google.firebase.operators.firestore import CloudFirestoreExportDatabaseOperator
from airflow.providers.google.cloud.operators.gcs import GCSCreateBucketOperator
from airflow.providers.google.cloud.sensors.gcs import GCSObjectExistenceSensor
from datetime import datetime

dag = DAG(
    'firestore_backup_pipeline',
    default_args={'start_date': datetime(2023, 1, 1)},
    schedule_interval='@weekly',
    catchup=False
)

# Ensure backup bucket exists
create_backup_bucket = GCSCreateBucketOperator(
    task_id='create_backup_bucket',
    bucket_name='firestore-backups-{{ ds_nodash }}',
    project_id='my-firebase-project',
    location='US',
    dag=dag
)

# Export Firestore database
export_database = CloudFirestoreExportDatabaseOperator(
    task_id='export_database',
    project_id='my-firebase-project',
    body={
        'outputUriPrefix': 'gs://firestore-backups-{{ ds_nodash }}/full-backup/',
    },
    dag=dag
)

# Verify export completion
verify_export = GCSObjectExistenceSensor(
    task_id='verify_export',
    bucket='firestore-backups-{{ ds_nodash }}',
    object='full-backup/',
    timeout=1800,  # 30 minutes
    poke_interval=60,
    dag=dag
)

create_backup_bucket >> export_database >> verify_export

Types

from typing import Dict, List, Optional, Any
from airflow.models import BaseOperator

# Firestore types
DatabaseId = str
CollectionId = str
DocumentId = str
FirestoreQuery = Dict[str, Any]
ExportRequest = Dict[str, Any]
ImportRequest = Dict[str, Any]
OperationResult = Dict[str, Any]

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-google

docs

common-utilities.md

data-transfers.md

firebase.md

gcp-services.md

google-ads.md

google-workspace.md

index.md

leveldb.md

marketing-platform.md

tile.json