or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/pypi-apache-airflow-providers-mongo

MongoDB provider for Apache Airflow enabling database connections, queries, and workflow monitoring

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-mongo@5.2.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-mongo@5.2.0

index.mddocs/

Apache Airflow MongoDB Provider

Apache Airflow MongoDB Provider enables data engineers to build workflows that interact with MongoDB databases. It provides MongoDB connectivity, database operations, and monitoring capabilities within Airflow DAGs through hooks and sensors.

Package Information

  • Package Name: apache-airflow-providers-mongo
  • Package Type: pypi
  • Language: Python
  • Installation: pip install apache-airflow-providers-mongo
  • Requirements: Apache Airflow 2.10.0+, pymongo>=4.13.2, dnspython>=1.13.0

Core Imports

from airflow.providers.mongo.hooks.mongo import MongoHook
from airflow.providers.mongo.sensors.mongo import MongoSensor

Basic Usage

from airflow import DAG
from airflow.providers.mongo.hooks.mongo import MongoHook
from airflow.providers.mongo.sensors.mongo import MongoSensor
from airflow.operators.python import PythonOperator
from datetime import datetime

def process_mongo_data():
    hook = MongoHook(mongo_conn_id="mongo_default")
    
    # Insert a document
    result = hook.insert_one("my_collection", {"key": "value", "timestamp": datetime.now()})
    print(f"Inserted document with ID: {result.inserted_id}")
    
    # Query documents
    documents = hook.find("my_collection", {"key": "value"})
    for doc in documents:
        print(doc)

dag = DAG(
    'mongo_example_dag',
    default_args={'start_date': datetime(2024, 1, 1)},
    schedule_interval='@daily'
)

# Wait for a document to exist
wait_for_document = MongoSensor(
    task_id='wait_for_data',
    collection='my_collection',
    query={'status': 'ready'},
    mongo_conn_id='mongo_default',
    poke_interval=30,
    timeout=300,
    dag=dag
)

# Process data when available
process_data = PythonOperator(
    task_id='process_data',
    python_callable=process_mongo_data,
    dag=dag
)

wait_for_document >> process_data

Capabilities

MongoDB Hook

Database connectivity and operations through MongoHook for CRUD operations, aggregations, and collection management.

class MongoHook(BaseHook):
    """PyMongo wrapper to interact with MongoDB."""
    
    def __init__(self, mongo_conn_id: str = "mongo_default", *args, **kwargs) -> None:
        """Initialize MongoDB hook with connection ID."""
    
    def get_conn(self) -> MongoClient:
        """Fetch PyMongo Client."""
    
    def close(self) -> None:
        """Close the MongoDB connection."""
    
    def get_collection(self, mongo_collection: str, mongo_db: str | None = None) -> MongoCollection:
        """Fetch a mongo collection object for querying."""
    
    def create_collection(
        self,
        mongo_collection: str,
        mongo_db: str | None = None,
        return_if_exists: bool = True,
        **create_kwargs: Any
    ) -> MongoCollection:
        """Create the collection and return it."""

Query Operations

def find(
    self,
    mongo_collection: str,
    query: dict,
    find_one: bool = False,
    mongo_db: str | None = None,
    projection: list | dict | None = None,
    **kwargs
) -> pymongo.cursor.Cursor | Any | None:
    """Run a mongo find query and returns the results."""

def aggregate(
    self,
    mongo_collection: str,
    aggregate_query: list,
    mongo_db: str | None = None,
    **kwargs
) -> CommandCursor:
    """Run an aggregation pipeline and returns the results."""

def distinct(
    self,
    mongo_collection: str,
    distinct_key: str,
    filter_doc: dict | None = None,
    mongo_db: str | None = None,
    **kwargs
) -> list[Any]:
    """Return a list of distinct values for the given key across a collection."""

Document Operations

def insert_one(
    self,
    mongo_collection: str,
    doc: dict,
    mongo_db: str | None = None,
    **kwargs
) -> pymongo.results.InsertOneResult:
    """Insert a single document into a mongo collection."""

def insert_many(
    self,
    mongo_collection: str,
    docs: Iterable[dict],
    mongo_db: str | None = None,
    **kwargs
) -> pymongo.results.InsertManyResult:
    """Insert many docs into a mongo collection."""

def update_one(
    self,
    mongo_collection: str,
    filter_doc: dict,
    update_doc: dict,
    mongo_db: str | None = None,
    **kwargs
) -> pymongo.results.UpdateResult:
    """Update a single document in a mongo collection."""

def update_many(
    self,
    mongo_collection: str,
    filter_doc: dict,
    update_doc: dict,
    mongo_db: str | None = None,
    **kwargs
) -> pymongo.results.UpdateResult:
    """Update one or more documents in a mongo collection."""

def replace_one(
    self,
    mongo_collection: str,
    doc: dict,
    filter_doc: dict | None = None,
    mongo_db: str | None = None,
    **kwargs
) -> pymongo.results.UpdateResult:
    """Replace a single document in a mongo collection."""

def replace_many(
    self,
    mongo_collection: str,
    docs: list[dict],
    filter_docs: list[dict] | None = None,
    mongo_db: str | None = None,
    upsert: bool = False,
    collation: pymongo.collation.Collation | None = None,
    **kwargs
) -> pymongo.results.BulkWriteResult:
    """Replace many documents in a mongo collection."""

def delete_one(
    self,
    mongo_collection: str,
    filter_doc: dict,
    mongo_db: str | None = None,
    **kwargs
) -> pymongo.results.DeleteResult:
    """Delete a single document in a mongo collection."""

def delete_many(
    self,
    mongo_collection: str,
    filter_doc: dict,
    mongo_db: str | None = None,
    **kwargs
) -> pymongo.results.DeleteResult:
    """Delete one or more documents in a mongo collection."""

MongoDB Sensor

Document existence monitoring through MongoSensor for workflow coordination.

class MongoSensor(BaseSensorOperator):
    """Checks for the existence of a document which matches the given query in MongoDB."""
    
    template_fields: Sequence[str] = ("collection", "query")
    
    def __init__(
        self,
        *,
        collection: str,
        query: dict,
        mongo_conn_id: str = "mongo_default",
        mongo_db: str | None = None,
        **kwargs
    ) -> None:
        """Initialize MongoDB sensor.
        
        Args:
            collection: Target MongoDB collection
            query: The query to find the target document
            mongo_conn_id: The Mongo connection id to use when connecting to MongoDB
            mongo_db: Target MongoDB name
        """
    
    def poke(self, context: Context) -> bool:
        """Check if document matching query exists."""

Connection Configuration

MongoDB connections are configured in Airflow with the following parameters:

# Connection Type: mongo
# Default Connection ID: mongo_default

# Connection Extra Fields (JSON):
{
    "srv": bool,              # Use SRV/seed list connection
    "ssl": bool,              # Enable SSL/TLS
    "tls": bool,              # Alias for ssl
    "allow_insecure": bool,   # Allow invalid certificates during SSL connections
    # Additional MongoDB connection string options supported
}

Connection Examples

Standard MongoDB connection:

# Connection ID: mongo_default
# Host: localhost
# Port: 27017
# Schema: my_database (optional default database)
# Extra: {"ssl": false}

MongoDB Atlas connection:

# Connection ID: mongo_atlas
# Host: cluster0.mongodb.net
# Login: username
# Password: password
# Schema: production_db
# Extra: {"srv": true, "ssl": true}

SSL connection with custom options:

# Connection ID: mongo_ssl
# Host: mongo.example.com
# Port: 27017
# Login: admin
# Password: password
# Extra: {
#   "ssl": true,
#   "allow_insecure": false,
#   "connectTimeoutMS": 30000,
#   "serverSelectionTimeoutMS": 30000
# }

Types

from collections.abc import Iterable
from typing import Any, Sequence, overload, Literal
from pymongo import MongoClient
from pymongo.collection import Collection as MongoCollection
from pymongo.command_cursor import CommandCursor
from pymongo.cursor import Cursor
from pymongo.results import (
    InsertOneResult,
    InsertManyResult,
    UpdateResult,
    BulkWriteResult,
    DeleteResult
)
from pymongo.collation import Collation
from airflow.models import Connection
from airflow.utils.context import Context

Error Handling

The provider handles MongoDB-specific exceptions:

from pymongo.errors import CollectionInvalid
from airflow.exceptions import AirflowConfigException

# Connection validation errors
# - Invalid connection type (must be 'mongo')
# - SRV connections with port specified
# - Configuration conflicts

# MongoDB operation errors
# - Collection creation failures
# - Network connectivity issues
# - Authentication failures
# - Query execution errors

Usage Examples

Basic CRUD Operations

from airflow.providers.mongo.hooks.mongo import MongoHook

def mongo_operations():
    hook = MongoHook(mongo_conn_id="mongo_default")
    
    # Create collection
    collection = hook.create_collection("users")
    
    # Insert documents
    user = {"name": "John Doe", "email": "john@example.com", "age": 30}
    result = hook.insert_one("users", user)
    print(f"Inserted user with ID: {result.inserted_id}")
    
    # Bulk insert
    users = [
        {"name": "Jane Smith", "email": "jane@example.com", "age": 25},
        {"name": "Bob Johnson", "email": "bob@example.com", "age": 35}
    ]
    hook.insert_many("users", users)
    
    # Query documents
    young_users = hook.find("users", {"age": {"$lt": 30}})
    for user in young_users:
        print(f"Young user: {user['name']}")
    
    # Update document
    hook.update_one("users", {"name": "John Doe"}, {"$set": {"age": 31}})
    
    # Delete document
    hook.delete_one("users", {"name": "Bob Johnson"})

Aggregation Pipeline

def aggregation_example():
    hook = MongoHook(mongo_conn_id="mongo_default")
    
    pipeline = [
        {"$match": {"age": {"$gte": 25}}},
        {"$group": {"_id": "$department", "avg_age": {"$avg": "$age"}}},
        {"$sort": {"avg_age": -1}}
    ]
    
    results = hook.aggregate("employees", pipeline)
    for result in results:
        print(f"Department: {result['_id']}, Average Age: {result['avg_age']}")

Sensor with Complex Query

from airflow.providers.mongo.sensors.mongo import MongoSensor

wait_for_processed_data = MongoSensor(
    task_id='wait_for_processed_data',
    collection='data_processing',
    query={
        'status': 'completed',
        'processing_date': {'$gte': datetime.now().replace(hour=0, minute=0, second=0)},
        'errors': {'$exists': False}
    },
    mongo_conn_id='mongo_production',
    mongo_db='analytics',
    poke_interval=60,
    timeout=1800,
    dag=dag
)

Context Manager Usage

def safe_mongo_operations():
    with MongoHook(mongo_conn_id="mongo_default") as hook:
        # Connection automatically closed when exiting context
        documents = hook.find("my_collection", {"status": "active"})
        processed_count = 0
        
        for doc in documents:
            # Process document
            hook.update_one(
                "my_collection",
                {"_id": doc["_id"]},
                {"$set": {"status": "processed", "processed_at": datetime.now()}}
            )
            processed_count += 1
        
        print(f"Processed {processed_count} documents")