Provider package for Microsoft Azure integrations with Apache Airflow
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Comprehensive Azure Cosmos DB integration supporting database and collection management, document operations, and query execution across all Cosmos DB APIs. Provides full CRUD operations with support for multiple consistency levels and partitioning strategies.
Primary interface for Azure Cosmos DB operations, providing authenticated connections and database functionality.
class AzureCosmosDBHook(BaseHook):
"""
Hook for Azure Cosmos DB operations.
Provides methods for database management, collection operations, and document
manipulation across all Cosmos DB APIs (SQL, MongoDB, Cassandra, Gremlin, Table).
"""
def get_conn(self) -> CosmosClient:
"""Get authenticated Azure Cosmos DB client."""
def does_database_exist(self, database_name: str) -> bool:
"""
Check if a database exists.
Args:
database_name (str): Name of database to check
Returns:
bool: True if database exists, False otherwise
"""
def create_database(self, database_name: str) -> None:
"""
Create a new database.
Args:
database_name (str): Name of database to create
"""
def delete_database(self, database_name: str) -> None:
"""
Delete a database and all its collections.
Args:
database_name (str): Name of database to delete
"""
def does_collection_exist(self, collection_name: str, database_name: str) -> bool:
"""
Check if a collection exists in the specified database.
Args:
collection_name (str): Name of collection to check
database_name (str): Name of database containing collection
Returns:
bool: True if collection exists, False otherwise
"""
def create_collection(
self,
collection_name: str,
database_name: str | None = None,
partition_key: str | None = None,
throughput: int | None = None,
**kwargs
) -> None:
"""
Create a new collection in the database.
Args:
collection_name (str): Name of collection to create
database_name (str): Name of database (uses default if None)
partition_key (str): Partition key for the collection
throughput (int): Provisioned throughput (RU/s)
**kwargs: Additional collection configuration options
"""
def delete_collection(
self,
collection_name: str,
database_name: str | None = None
) -> None:
"""
Delete a collection from the database.
Args:
collection_name (str): Name of collection to delete
database_name (str): Name of database (uses default if None)
"""
def upsert_document(
self,
document: dict,
database_name: str | None = None,
collection_name: str | None = None,
**kwargs
) -> dict:
"""
Insert or update a document in the collection.
Args:
document (dict): Document data to upsert
database_name (str): Name of database (uses default if None)
collection_name (str): Name of collection (uses default if None)
**kwargs: Additional upsert options
Returns:
dict: Upserted document with metadata
"""
def insert_documents(
self,
documents: list[dict],
database_name: str | None = None,
collection_name: str | None = None,
**kwargs
) -> list[dict]:
"""
Insert multiple documents into the collection.
Args:
documents (list[dict]): List of documents to insert
database_name (str): Name of database (uses default if None)
collection_name (str): Name of collection (uses default if None)
**kwargs: Additional insert options
Returns:
list[dict]: List of inserted documents with metadata
"""
def delete_document(
self,
document_id: str,
database_name: str | None = None,
collection_name: str | None = None,
partition_key: Any = None,
**kwargs
) -> None:
"""
Delete a document from the collection.
Args:
document_id (str): ID of document to delete
database_name (str): Name of database (uses default if None)
collection_name (str): Name of collection (uses default if None)
partition_key: Partition key value for the document
**kwargs: Additional delete options
"""
def get_document(
self,
document_id: str,
database_name: str | None = None,
collection_name: str | None = None,
partition_key: Any = None,
**kwargs
) -> dict:
"""
Retrieve a document by ID.
Args:
document_id (str): ID of document to retrieve
database_name (str): Name of database (uses default if None)
collection_name (str): Name of collection (uses default if None)
partition_key: Partition key value for the document
**kwargs: Additional retrieval options
Returns:
dict: Retrieved document
"""
def get_documents(
self,
sql_string: str | None = None,
database_name: str | None = None,
collection_name: str | None = None,
partition_key: Any = None,
**kwargs
) -> list[dict]:
"""
Query documents using SQL syntax or retrieve all documents.
Args:
sql_string (str): SQL query string (None retrieves all)
database_name (str): Name of database (uses default if None)
collection_name (str): Name of collection (uses default if None)
partition_key: Partition key value to filter by
**kwargs: Additional query options (parameters, max_item_count, etc.)
Returns:
list[dict]: List of documents matching query
"""
def test_connection(self) -> tuple[bool, str]:
"""
Test the Cosmos DB connection.
Returns:
tuple[bool, str]: (success, message) indicating connection status
"""Operator for inserting documents into Azure Cosmos DB collections.
class AzureCosmosInsertDocumentOperator(BaseOperator):
"""
Insert documents into Azure Cosmos DB collection.
Supports inserting single documents or multiple documents with
automatic collection creation if needed.
"""
def __init__(
self,
database_name: str,
collection_name: str,
document: dict | list[dict],
azure_cosmos_conn_id: str = "azure_cosmos_default",
**kwargs
):
"""
Initialize Cosmos DB document insert operator.
Args:
database_name (str): Name of Cosmos DB database
collection_name (str): Name of collection to insert into
document (dict | list[dict]): Document(s) to insert
azure_cosmos_conn_id (str): Airflow connection ID for Cosmos DB
"""Sensor that waits for a document to exist in Azure Cosmos DB.
class AzureCosmosDocumentSensor(BaseSensorOperator):
"""
Sensor that waits for a document to exist in Azure Cosmos DB.
Monitors a collection for the existence of a specific document
by ID or query criteria.
"""
def __init__(
self,
database_name: str,
collection_name: str,
document_id: str | None = None,
sql_string: str | None = None,
azure_cosmos_conn_id: str = "azure_cosmos_default",
**kwargs
):
"""
Initialize Cosmos DB document sensor.
Args:
database_name (str): Name of Cosmos DB database
collection_name (str): Name of collection to monitor
document_id (str): Specific document ID to wait for
sql_string (str): SQL query to check for documents
azure_cosmos_conn_id (str): Airflow connection ID for Cosmos DB
"""
def poke(self, context: dict) -> bool:
"""Check if the document exists."""from airflow.providers.microsoft.azure.hooks.cosmos import AzureCosmosDBHook
# Initialize hook
cosmos_hook = AzureCosmosDBHook(azure_cosmos_conn_id='cosmos_default')
# Create database
cosmos_hook.create_database('my-database')
# Create collection with partition key
cosmos_hook.create_collection(
collection_name='users',
database_name='my-database',
partition_key='/userId',
throughput=400
)
# Insert a document
user_doc = {
'id': 'user-123',
'userId': 'user-123',
'name': 'John Doe',
'email': 'john@example.com',
'age': 30
}
result = cosmos_hook.upsert_document(
document=user_doc,
database_name='my-database',
collection_name='users'
)
# Query documents
users = cosmos_hook.get_documents(
sql_string="SELECT * FROM c WHERE c.age > 25",
database_name='my-database',
collection_name='users'
)
# Get specific document
user = cosmos_hook.get_document(
document_id='user-123',
database_name='my-database',
collection_name='users',
partition_key='user-123'
)
# Delete document
cosmos_hook.delete_document(
document_id='user-123',
database_name='my-database',
collection_name='users',
partition_key='user-123'
)# Insert multiple documents
users_batch = [
{'id': 'user-1', 'userId': 'user-1', 'name': 'Alice'},
{'id': 'user-2', 'userId': 'user-2', 'name': 'Bob'},
{'id': 'user-3', 'userId': 'user-3', 'name': 'Charlie'}
]
results = cosmos_hook.insert_documents(
documents=users_batch,
database_name='my-database',
collection_name='users'
)
# Query with parameters
users_in_city = cosmos_hook.get_documents(
sql_string="SELECT * FROM c WHERE c.city = @city",
database_name='my-database',
collection_name='users',
parameters=[{'name': '@city', 'value': 'New York'}]
)from airflow import DAG
from airflow.providers.microsoft.azure.operators.cosmos import AzureCosmosInsertDocumentOperator
from airflow.providers.microsoft.azure.sensors.cosmos import AzureCosmosDocumentSensor
from datetime import datetime, timedelta
dag = DAG(
'cosmos_db_workflow',
default_args={
'owner': 'data-team',
'retries': 1,
'retry_delay': timedelta(minutes=5)
},
description='Cosmos DB operations',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1)
)
# Insert daily summary document
insert_summary = AzureCosmosInsertDocumentOperator(
task_id='insert_daily_summary',
database_name='analytics',
collection_name='daily_summaries',
document={
'id': '{{ ds }}',
'date': '{{ ds }}',
'summary_type': 'daily',
'metrics': {
'users_active': 1000,
'orders_count': 50,
'revenue': 5000.00
}
},
azure_cosmos_conn_id='cosmos_default',
dag=dag
)
# Wait for processing completion indicator
wait_for_completion = AzureCosmosDocumentSensor(
task_id='wait_for_processing_complete',
database_name='analytics',
collection_name='processing_status',
sql_string="SELECT * FROM c WHERE c.date = '{{ ds }}' AND c.status = 'completed'",
timeout=1800, # 30 minutes
poke_interval=60, # Check every minute
dag=dag
)
insert_summary >> wait_for_completion# Complex query with aggregation
monthly_stats = cosmos_hook.get_documents(
sql_string="""
SELECT
COUNT(1) as total_orders,
SUM(c.amount) as total_revenue,
AVG(c.amount) as avg_order_value
FROM c
WHERE c.order_date >= @start_date
AND c.order_date < @end_date
""",
database_name='ecommerce',
collection_name='orders',
parameters=[
{'name': '@start_date', 'value': '2024-01-01'},
{'name': '@end_date', 'value': '2024-02-01'}
]
)
# Cross-partition query with continuation
all_users = []
query_iterator = cosmos_hook.get_documents(
sql_string="SELECT * FROM c",
database_name='my-database',
collection_name='users',
enable_cross_partition=True,
max_item_count=100
)Azure Cosmos DB connections support multiple authentication methods and API types.
Connection Type: azure_cosmos
Required Fields:
endpoint_uri: Cosmos DB account endpoint URIdatabase_name: Default database name (can be overridden)Authentication Options:
Connection Fields:
master_key: Cosmos DB account primary/secondary keyclient_id: Service principal client IDclient_secret: Service principal secrettenant_id: Azure tenant IDOptional Configuration:
collection_name: Default collection nameconsistency_level: Default consistency level (Strong, BoundedStaleness, Session, ConsistentPrefix, Eventual)# Common Cosmos DB exceptions
class CosmosDBException(AirflowException):
"""Base exception for Cosmos DB operations."""
class DocumentNotFound(CosmosDBException):
"""Raised when a document is not found."""
class CollectionNotFound(CosmosDBException):
"""Raised when a collection is not found."""
class DatabaseNotFound(CosmosDBException):
"""Raised when a database is not found."""
class PartitionKeyMismatch(CosmosDBException):
"""Raised when partition key doesn't match."""The Cosmos DB integration supports:
Azure Cosmos DB integration provides comprehensive NoSQL database capabilities with global distribution, multiple APIs, and automatic scaling suitable for mission-critical applications.
Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-microsoft-azure