CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-apache-airflow-providers-microsoft-azure

Provider package for Microsoft Azure integrations with Apache Airflow

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

cosmos-db.mddocs/

Azure Cosmos DB

Comprehensive Azure Cosmos DB integration supporting database and collection management, document operations, and query execution across all Cosmos DB APIs. Provides full CRUD operations with support for multiple consistency levels and partitioning strategies.

Capabilities

Cosmos DB Hook

Primary interface for Azure Cosmos DB operations, providing authenticated connections and database functionality.

class AzureCosmosDBHook(BaseHook):
    """
    Hook for Azure Cosmos DB operations.
    
    Provides methods for database management, collection operations, and document
    manipulation across all Cosmos DB APIs (SQL, MongoDB, Cassandra, Gremlin, Table).
    """
    
    def get_conn(self) -> CosmosClient:
        """Get authenticated Azure Cosmos DB client."""
    
    def does_database_exist(self, database_name: str) -> bool:
        """
        Check if a database exists.
        
        Args:
            database_name (str): Name of database to check
            
        Returns:
            bool: True if database exists, False otherwise
        """
    
    def create_database(self, database_name: str) -> None:
        """
        Create a new database.
        
        Args:
            database_name (str): Name of database to create
        """
    
    def delete_database(self, database_name: str) -> None:
        """
        Delete a database and all its collections.
        
        Args:
            database_name (str): Name of database to delete
        """
    
    def does_collection_exist(self, collection_name: str, database_name: str) -> bool:
        """
        Check if a collection exists in the specified database.
        
        Args:
            collection_name (str): Name of collection to check
            database_name (str): Name of database containing collection
            
        Returns:
            bool: True if collection exists, False otherwise
        """
    
    def create_collection(
        self,
        collection_name: str,
        database_name: str | None = None,
        partition_key: str | None = None,
        throughput: int | None = None,
        **kwargs
    ) -> None:
        """
        Create a new collection in the database.
        
        Args:
            collection_name (str): Name of collection to create
            database_name (str): Name of database (uses default if None)
            partition_key (str): Partition key for the collection
            throughput (int): Provisioned throughput (RU/s)
            **kwargs: Additional collection configuration options
        """
    
    def delete_collection(
        self,
        collection_name: str,
        database_name: str | None = None
    ) -> None:
        """
        Delete a collection from the database.
        
        Args:
            collection_name (str): Name of collection to delete
            database_name (str): Name of database (uses default if None)
        """
    
    def upsert_document(
        self,
        document: dict,
        database_name: str | None = None,
        collection_name: str | None = None,
        **kwargs
    ) -> dict:
        """
        Insert or update a document in the collection.
        
        Args:
            document (dict): Document data to upsert
            database_name (str): Name of database (uses default if None)
            collection_name (str): Name of collection (uses default if None)
            **kwargs: Additional upsert options
            
        Returns:
            dict: Upserted document with metadata
        """
    
    def insert_documents(
        self,
        documents: list[dict],
        database_name: str | None = None,
        collection_name: str | None = None,
        **kwargs
    ) -> list[dict]:
        """
        Insert multiple documents into the collection.
        
        Args:
            documents (list[dict]): List of documents to insert
            database_name (str): Name of database (uses default if None)
            collection_name (str): Name of collection (uses default if None)
            **kwargs: Additional insert options
            
        Returns:
            list[dict]: List of inserted documents with metadata
        """
    
    def delete_document(
        self,
        document_id: str,
        database_name: str | None = None,
        collection_name: str | None = None,
        partition_key: Any = None,
        **kwargs
    ) -> None:
        """
        Delete a document from the collection.
        
        Args:
            document_id (str): ID of document to delete
            database_name (str): Name of database (uses default if None)
            collection_name (str): Name of collection (uses default if None)
            partition_key: Partition key value for the document
            **kwargs: Additional delete options
        """
    
    def get_document(
        self,
        document_id: str,
        database_name: str | None = None,
        collection_name: str | None = None,
        partition_key: Any = None,
        **kwargs
    ) -> dict:
        """
        Retrieve a document by ID.
        
        Args:
            document_id (str): ID of document to retrieve
            database_name (str): Name of database (uses default if None)
            collection_name (str): Name of collection (uses default if None)
            partition_key: Partition key value for the document
            **kwargs: Additional retrieval options
            
        Returns:
            dict: Retrieved document
        """
    
    def get_documents(
        self,
        sql_string: str | None = None,
        database_name: str | None = None,
        collection_name: str | None = None,
        partition_key: Any = None,
        **kwargs
    ) -> list[dict]:
        """
        Query documents using SQL syntax or retrieve all documents.
        
        Args:
            sql_string (str): SQL query string (None retrieves all)
            database_name (str): Name of database (uses default if None)
            collection_name (str): Name of collection (uses default if None)
            partition_key: Partition key value to filter by
            **kwargs: Additional query options (parameters, max_item_count, etc.)
            
        Returns:
            list[dict]: List of documents matching query
        """
    
    def test_connection(self) -> tuple[bool, str]:
        """
        Test the Cosmos DB connection.
        
        Returns:
            tuple[bool, str]: (success, message) indicating connection status
        """

Document Insert Operator

Operator for inserting documents into Azure Cosmos DB collections.

class AzureCosmosInsertDocumentOperator(BaseOperator):
    """
    Insert documents into Azure Cosmos DB collection.
    
    Supports inserting single documents or multiple documents with 
    automatic collection creation if needed.
    """
    
    def __init__(
        self,
        database_name: str,
        collection_name: str,
        document: dict | list[dict],
        azure_cosmos_conn_id: str = "azure_cosmos_default",
        **kwargs
    ):
        """
        Initialize Cosmos DB document insert operator.
        
        Args:
            database_name (str): Name of Cosmos DB database
            collection_name (str): Name of collection to insert into
            document (dict | list[dict]): Document(s) to insert
            azure_cosmos_conn_id (str): Airflow connection ID for Cosmos DB
        """

Document Existence Sensor

Sensor that waits for a document to exist in Azure Cosmos DB.

class AzureCosmosDocumentSensor(BaseSensorOperator):
    """
    Sensor that waits for a document to exist in Azure Cosmos DB.
    
    Monitors a collection for the existence of a specific document
    by ID or query criteria.
    """
    
    def __init__(
        self,
        database_name: str,
        collection_name: str,
        document_id: str | None = None,
        sql_string: str | None = None,
        azure_cosmos_conn_id: str = "azure_cosmos_default",
        **kwargs
    ):
        """
        Initialize Cosmos DB document sensor.
        
        Args:
            database_name (str): Name of Cosmos DB database
            collection_name (str): Name of collection to monitor
            document_id (str): Specific document ID to wait for
            sql_string (str): SQL query to check for documents
            azure_cosmos_conn_id (str): Airflow connection ID for Cosmos DB
        """
    
    def poke(self, context: dict) -> bool:
        """Check if the document exists."""

Usage Examples

Basic Document Operations

from airflow.providers.microsoft.azure.hooks.cosmos import AzureCosmosDBHook

# Initialize hook
cosmos_hook = AzureCosmosDBHook(azure_cosmos_conn_id='cosmos_default')

# Create database
cosmos_hook.create_database('my-database')

# Create collection with partition key
cosmos_hook.create_collection(
    collection_name='users',
    database_name='my-database',
    partition_key='/userId',
    throughput=400
)

# Insert a document
user_doc = {
    'id': 'user-123',
    'userId': 'user-123',
    'name': 'John Doe',
    'email': 'john@example.com',
    'age': 30
}

result = cosmos_hook.upsert_document(
    document=user_doc,
    database_name='my-database',
    collection_name='users'
)

# Query documents
users = cosmos_hook.get_documents(
    sql_string="SELECT * FROM c WHERE c.age > 25",
    database_name='my-database',
    collection_name='users'
)

# Get specific document
user = cosmos_hook.get_document(
    document_id='user-123',
    database_name='my-database',
    collection_name='users',
    partition_key='user-123'
)

# Delete document
cosmos_hook.delete_document(
    document_id='user-123',
    database_name='my-database',
    collection_name='users',
    partition_key='user-123'
)

Batch Document Operations

# Insert multiple documents
users_batch = [
    {'id': 'user-1', 'userId': 'user-1', 'name': 'Alice'},
    {'id': 'user-2', 'userId': 'user-2', 'name': 'Bob'},
    {'id': 'user-3', 'userId': 'user-3', 'name': 'Charlie'}
]

results = cosmos_hook.insert_documents(
    documents=users_batch,
    database_name='my-database',
    collection_name='users'
)

# Query with parameters
users_in_city = cosmos_hook.get_documents(
    sql_string="SELECT * FROM c WHERE c.city = @city",
    database_name='my-database',
    collection_name='users',
    parameters=[{'name': '@city', 'value': 'New York'}]
)

Using in Airflow DAGs

from airflow import DAG
from airflow.providers.microsoft.azure.operators.cosmos import AzureCosmosInsertDocumentOperator
from airflow.providers.microsoft.azure.sensors.cosmos import AzureCosmosDocumentSensor
from datetime import datetime, timedelta

dag = DAG(
    'cosmos_db_workflow',
    default_args={
        'owner': 'data-team',
        'retries': 1,
        'retry_delay': timedelta(minutes=5)
    },
    description='Cosmos DB operations',
    schedule_interval='@daily',
    start_date=datetime(2024, 1, 1)
)

# Insert daily summary document
insert_summary = AzureCosmosInsertDocumentOperator(
    task_id='insert_daily_summary',
    database_name='analytics',
    collection_name='daily_summaries',
    document={
        'id': '{{ ds }}',
        'date': '{{ ds }}',
        'summary_type': 'daily',
        'metrics': {
            'users_active': 1000,
            'orders_count': 50,
            'revenue': 5000.00
        }
    },
    azure_cosmos_conn_id='cosmos_default',
    dag=dag
)

# Wait for processing completion indicator
wait_for_completion = AzureCosmosDocumentSensor(
    task_id='wait_for_processing_complete',
    database_name='analytics',
    collection_name='processing_status',
    sql_string="SELECT * FROM c WHERE c.date = '{{ ds }}' AND c.status = 'completed'",
    timeout=1800,  # 30 minutes
    poke_interval=60,  # Check every minute
    dag=dag
)

insert_summary >> wait_for_completion

Advanced Querying

# Complex query with aggregation
monthly_stats = cosmos_hook.get_documents(
    sql_string="""
        SELECT 
            COUNT(1) as total_orders,
            SUM(c.amount) as total_revenue,
            AVG(c.amount) as avg_order_value
        FROM c 
        WHERE c.order_date >= @start_date 
        AND c.order_date < @end_date
    """,
    database_name='ecommerce',
    collection_name='orders',
    parameters=[
        {'name': '@start_date', 'value': '2024-01-01'},
        {'name': '@end_date', 'value': '2024-02-01'}
    ]
)

# Cross-partition query with continuation
all_users = []
query_iterator = cosmos_hook.get_documents(
    sql_string="SELECT * FROM c",
    database_name='my-database', 
    collection_name='users',
    enable_cross_partition=True,
    max_item_count=100
)

Connection Configuration

Azure Cosmos DB connections support multiple authentication methods and API types.

Connection Type: azure_cosmos

Required Fields:

  • endpoint_uri: Cosmos DB account endpoint URI
  • database_name: Default database name (can be overridden)

Authentication Options:

  • Primary/Secondary Key: Use account key
  • Service Principal: Use client credentials
  • Managed Identity: Use Azure managed identity

Connection Fields:

  • master_key: Cosmos DB account primary/secondary key
  • client_id: Service principal client ID
  • client_secret: Service principal secret
  • tenant_id: Azure tenant ID

Optional Configuration:

  • collection_name: Default collection name
  • consistency_level: Default consistency level (Strong, BoundedStaleness, Session, ConsistentPrefix, Eventual)

Error Handling

# Common Cosmos DB exceptions
class CosmosDBException(AirflowException):
    """Base exception for Cosmos DB operations."""

class DocumentNotFound(CosmosDBException):
    """Raised when a document is not found."""

class CollectionNotFound(CosmosDBException):
    """Raised when a collection is not found."""

class DatabaseNotFound(CosmosDBException):
    """Raised when a database is not found."""

class PartitionKeyMismatch(CosmosDBException):
    """Raised when partition key doesn't match."""

Performance Considerations

The Cosmos DB integration supports:

  • Partitioning: Efficient partition key usage for scale
  • Throughput Management: Configurable RU/s provisioning
  • Batch Operations: Bulk document operations for efficiency
  • Query Optimization: Parameterized queries and indexing hints
  • Consistency Levels: Configurable consistency vs. performance trade-offs
  • Cross-Partition Queries: Support for queries spanning partitions

Azure Cosmos DB integration provides comprehensive NoSQL database capabilities with global distribution, multiple APIs, and automatic scaling suitable for mission-critical applications.

Install with Tessl CLI

npx tessl i tessl/pypi-apache-airflow-providers-microsoft-azure

docs

azure-batch.md

azure-data-explorer.md

azure-file-share.md

blob-storage.md

container-services.md

cosmos-db.md

data-factory.md

data-lake-storage.md

data-transfers.md

index.md

microsoft-graph.md

powerbi.md

service-bus.md

synapse-analytics.md

tile.json