MongoDB provider for Apache Airflow enabling database connections, queries, and workflow monitoring
npx @tessl/cli install tessl/pypi-apache-airflow-providers-mongo@5.2.0Apache Airflow MongoDB Provider enables data engineers to build workflows that interact with MongoDB databases. It provides MongoDB connectivity, database operations, and monitoring capabilities within Airflow DAGs through hooks and sensors.
pip install apache-airflow-providers-mongofrom airflow.providers.mongo.hooks.mongo import MongoHook
from airflow.providers.mongo.sensors.mongo import MongoSensorfrom airflow import DAG
from airflow.providers.mongo.hooks.mongo import MongoHook
from airflow.providers.mongo.sensors.mongo import MongoSensor
from airflow.operators.python import PythonOperator
from datetime import datetime
def process_mongo_data():
hook = MongoHook(mongo_conn_id="mongo_default")
# Insert a document
result = hook.insert_one("my_collection", {"key": "value", "timestamp": datetime.now()})
print(f"Inserted document with ID: {result.inserted_id}")
# Query documents
documents = hook.find("my_collection", {"key": "value"})
for doc in documents:
print(doc)
dag = DAG(
'mongo_example_dag',
default_args={'start_date': datetime(2024, 1, 1)},
schedule_interval='@daily'
)
# Wait for a document to exist
wait_for_document = MongoSensor(
task_id='wait_for_data',
collection='my_collection',
query={'status': 'ready'},
mongo_conn_id='mongo_default',
poke_interval=30,
timeout=300,
dag=dag
)
# Process data when available
process_data = PythonOperator(
task_id='process_data',
python_callable=process_mongo_data,
dag=dag
)
wait_for_document >> process_dataDatabase connectivity and operations through MongoHook for CRUD operations, aggregations, and collection management.
class MongoHook(BaseHook):
"""PyMongo wrapper to interact with MongoDB."""
def __init__(self, mongo_conn_id: str = "mongo_default", *args, **kwargs) -> None:
"""Initialize MongoDB hook with connection ID."""
def get_conn(self) -> MongoClient:
"""Fetch PyMongo Client."""
def close(self) -> None:
"""Close the MongoDB connection."""
def get_collection(self, mongo_collection: str, mongo_db: str | None = None) -> MongoCollection:
"""Fetch a mongo collection object for querying."""
def create_collection(
self,
mongo_collection: str,
mongo_db: str | None = None,
return_if_exists: bool = True,
**create_kwargs: Any
) -> MongoCollection:
"""Create the collection and return it."""def find(
self,
mongo_collection: str,
query: dict,
find_one: bool = False,
mongo_db: str | None = None,
projection: list | dict | None = None,
**kwargs
) -> pymongo.cursor.Cursor | Any | None:
"""Run a mongo find query and returns the results."""
def aggregate(
self,
mongo_collection: str,
aggregate_query: list,
mongo_db: str | None = None,
**kwargs
) -> CommandCursor:
"""Run an aggregation pipeline and returns the results."""
def distinct(
self,
mongo_collection: str,
distinct_key: str,
filter_doc: dict | None = None,
mongo_db: str | None = None,
**kwargs
) -> list[Any]:
"""Return a list of distinct values for the given key across a collection."""def insert_one(
self,
mongo_collection: str,
doc: dict,
mongo_db: str | None = None,
**kwargs
) -> pymongo.results.InsertOneResult:
"""Insert a single document into a mongo collection."""
def insert_many(
self,
mongo_collection: str,
docs: Iterable[dict],
mongo_db: str | None = None,
**kwargs
) -> pymongo.results.InsertManyResult:
"""Insert many docs into a mongo collection."""
def update_one(
self,
mongo_collection: str,
filter_doc: dict,
update_doc: dict,
mongo_db: str | None = None,
**kwargs
) -> pymongo.results.UpdateResult:
"""Update a single document in a mongo collection."""
def update_many(
self,
mongo_collection: str,
filter_doc: dict,
update_doc: dict,
mongo_db: str | None = None,
**kwargs
) -> pymongo.results.UpdateResult:
"""Update one or more documents in a mongo collection."""
def replace_one(
self,
mongo_collection: str,
doc: dict,
filter_doc: dict | None = None,
mongo_db: str | None = None,
**kwargs
) -> pymongo.results.UpdateResult:
"""Replace a single document in a mongo collection."""
def replace_many(
self,
mongo_collection: str,
docs: list[dict],
filter_docs: list[dict] | None = None,
mongo_db: str | None = None,
upsert: bool = False,
collation: pymongo.collation.Collation | None = None,
**kwargs
) -> pymongo.results.BulkWriteResult:
"""Replace many documents in a mongo collection."""
def delete_one(
self,
mongo_collection: str,
filter_doc: dict,
mongo_db: str | None = None,
**kwargs
) -> pymongo.results.DeleteResult:
"""Delete a single document in a mongo collection."""
def delete_many(
self,
mongo_collection: str,
filter_doc: dict,
mongo_db: str | None = None,
**kwargs
) -> pymongo.results.DeleteResult:
"""Delete one or more documents in a mongo collection."""Document existence monitoring through MongoSensor for workflow coordination.
class MongoSensor(BaseSensorOperator):
"""Checks for the existence of a document which matches the given query in MongoDB."""
template_fields: Sequence[str] = ("collection", "query")
def __init__(
self,
*,
collection: str,
query: dict,
mongo_conn_id: str = "mongo_default",
mongo_db: str | None = None,
**kwargs
) -> None:
"""Initialize MongoDB sensor.
Args:
collection: Target MongoDB collection
query: The query to find the target document
mongo_conn_id: The Mongo connection id to use when connecting to MongoDB
mongo_db: Target MongoDB name
"""
def poke(self, context: Context) -> bool:
"""Check if document matching query exists."""MongoDB connections are configured in Airflow with the following parameters:
# Connection Type: mongo
# Default Connection ID: mongo_default
# Connection Extra Fields (JSON):
{
"srv": bool, # Use SRV/seed list connection
"ssl": bool, # Enable SSL/TLS
"tls": bool, # Alias for ssl
"allow_insecure": bool, # Allow invalid certificates during SSL connections
# Additional MongoDB connection string options supported
}Standard MongoDB connection:
# Connection ID: mongo_default
# Host: localhost
# Port: 27017
# Schema: my_database (optional default database)
# Extra: {"ssl": false}MongoDB Atlas connection:
# Connection ID: mongo_atlas
# Host: cluster0.mongodb.net
# Login: username
# Password: password
# Schema: production_db
# Extra: {"srv": true, "ssl": true}SSL connection with custom options:
# Connection ID: mongo_ssl
# Host: mongo.example.com
# Port: 27017
# Login: admin
# Password: password
# Extra: {
# "ssl": true,
# "allow_insecure": false,
# "connectTimeoutMS": 30000,
# "serverSelectionTimeoutMS": 30000
# }from collections.abc import Iterable
from typing import Any, Sequence, overload, Literal
from pymongo import MongoClient
from pymongo.collection import Collection as MongoCollection
from pymongo.command_cursor import CommandCursor
from pymongo.cursor import Cursor
from pymongo.results import (
InsertOneResult,
InsertManyResult,
UpdateResult,
BulkWriteResult,
DeleteResult
)
from pymongo.collation import Collation
from airflow.models import Connection
from airflow.utils.context import ContextThe provider handles MongoDB-specific exceptions:
from pymongo.errors import CollectionInvalid
from airflow.exceptions import AirflowConfigException
# Connection validation errors
# - Invalid connection type (must be 'mongo')
# - SRV connections with port specified
# - Configuration conflicts
# MongoDB operation errors
# - Collection creation failures
# - Network connectivity issues
# - Authentication failures
# - Query execution errorsfrom airflow.providers.mongo.hooks.mongo import MongoHook
def mongo_operations():
hook = MongoHook(mongo_conn_id="mongo_default")
# Create collection
collection = hook.create_collection("users")
# Insert documents
user = {"name": "John Doe", "email": "john@example.com", "age": 30}
result = hook.insert_one("users", user)
print(f"Inserted user with ID: {result.inserted_id}")
# Bulk insert
users = [
{"name": "Jane Smith", "email": "jane@example.com", "age": 25},
{"name": "Bob Johnson", "email": "bob@example.com", "age": 35}
]
hook.insert_many("users", users)
# Query documents
young_users = hook.find("users", {"age": {"$lt": 30}})
for user in young_users:
print(f"Young user: {user['name']}")
# Update document
hook.update_one("users", {"name": "John Doe"}, {"$set": {"age": 31}})
# Delete document
hook.delete_one("users", {"name": "Bob Johnson"})def aggregation_example():
hook = MongoHook(mongo_conn_id="mongo_default")
pipeline = [
{"$match": {"age": {"$gte": 25}}},
{"$group": {"_id": "$department", "avg_age": {"$avg": "$age"}}},
{"$sort": {"avg_age": -1}}
]
results = hook.aggregate("employees", pipeline)
for result in results:
print(f"Department: {result['_id']}, Average Age: {result['avg_age']}")from airflow.providers.mongo.sensors.mongo import MongoSensor
wait_for_processed_data = MongoSensor(
task_id='wait_for_processed_data',
collection='data_processing',
query={
'status': 'completed',
'processing_date': {'$gte': datetime.now().replace(hour=0, minute=0, second=0)},
'errors': {'$exists': False}
},
mongo_conn_id='mongo_production',
mongo_db='analytics',
poke_interval=60,
timeout=1800,
dag=dag
)def safe_mongo_operations():
with MongoHook(mongo_conn_id="mongo_default") as hook:
# Connection automatically closed when exiting context
documents = hook.find("my_collection", {"status": "active"})
processed_count = 0
for doc in documents:
# Process document
hook.update_one(
"my_collection",
{"_id": doc["_id"]},
{"$set": {"status": "processed", "processed_at": datetime.now()}}
)
processed_count += 1
print(f"Processed {processed_count} documents")