MongoDB provider for Apache Airflow enabling database connections, queries, and workflow monitoring
npx @tessl/cli install tessl/pypi-apache-airflow-providers-mongo@5.2.00
# Apache Airflow MongoDB Provider
1
2
Apache Airflow MongoDB Provider enables data engineers to build workflows that interact with MongoDB databases. It provides MongoDB connectivity, database operations, and monitoring capabilities within Airflow DAGs through hooks and sensors.
3
4
## Package Information
5
6
- **Package Name**: apache-airflow-providers-mongo
7
- **Package Type**: pypi
8
- **Language**: Python
9
- **Installation**: `pip install apache-airflow-providers-mongo`
10
- **Requirements**: Apache Airflow 2.10.0+, pymongo>=4.13.2, dnspython>=1.13.0
11
12
## Core Imports
13
14
```python
15
from airflow.providers.mongo.hooks.mongo import MongoHook
16
from airflow.providers.mongo.sensors.mongo import MongoSensor
17
```
18
19
## Basic Usage
20
21
```python
22
from airflow import DAG
23
from airflow.providers.mongo.hooks.mongo import MongoHook
24
from airflow.providers.mongo.sensors.mongo import MongoSensor
25
from airflow.operators.python import PythonOperator
26
from datetime import datetime
27
28
def process_mongo_data():
29
hook = MongoHook(mongo_conn_id="mongo_default")
30
31
# Insert a document
32
result = hook.insert_one("my_collection", {"key": "value", "timestamp": datetime.now()})
33
print(f"Inserted document with ID: {result.inserted_id}")
34
35
# Query documents
36
documents = hook.find("my_collection", {"key": "value"})
37
for doc in documents:
38
print(doc)
39
40
dag = DAG(
41
'mongo_example_dag',
42
default_args={'start_date': datetime(2024, 1, 1)},
43
schedule_interval='@daily'
44
)
45
46
# Wait for a document to exist
47
wait_for_document = MongoSensor(
48
task_id='wait_for_data',
49
collection='my_collection',
50
query={'status': 'ready'},
51
mongo_conn_id='mongo_default',
52
poke_interval=30,
53
timeout=300,
54
dag=dag
55
)
56
57
# Process data when available
58
process_data = PythonOperator(
59
task_id='process_data',
60
python_callable=process_mongo_data,
61
dag=dag
62
)
63
64
wait_for_document >> process_data
65
```
66
67
## Capabilities
68
69
### MongoDB Hook
70
71
Database connectivity and operations through MongoHook for CRUD operations, aggregations, and collection management.
72
73
```python { .api }
74
class MongoHook(BaseHook):
75
"""PyMongo wrapper to interact with MongoDB."""
76
77
def __init__(self, mongo_conn_id: str = "mongo_default", *args, **kwargs) -> None:
78
"""Initialize MongoDB hook with connection ID."""
79
80
def get_conn(self) -> MongoClient:
81
"""Fetch PyMongo Client."""
82
83
def close(self) -> None:
84
"""Close the MongoDB connection."""
85
86
def get_collection(self, mongo_collection: str, mongo_db: str | None = None) -> MongoCollection:
87
"""Fetch a mongo collection object for querying."""
88
89
def create_collection(
90
self,
91
mongo_collection: str,
92
mongo_db: str | None = None,
93
return_if_exists: bool = True,
94
**create_kwargs: Any
95
) -> MongoCollection:
96
"""Create the collection and return it."""
97
```
98
99
#### Query Operations
100
101
```python { .api }
102
def find(
103
self,
104
mongo_collection: str,
105
query: dict,
106
find_one: bool = False,
107
mongo_db: str | None = None,
108
projection: list | dict | None = None,
109
**kwargs
110
) -> pymongo.cursor.Cursor | Any | None:
111
"""Run a mongo find query and returns the results."""
112
113
def aggregate(
114
self,
115
mongo_collection: str,
116
aggregate_query: list,
117
mongo_db: str | None = None,
118
**kwargs
119
) -> CommandCursor:
120
"""Run an aggregation pipeline and returns the results."""
121
122
def distinct(
123
self,
124
mongo_collection: str,
125
distinct_key: str,
126
filter_doc: dict | None = None,
127
mongo_db: str | None = None,
128
**kwargs
129
) -> list[Any]:
130
"""Return a list of distinct values for the given key across a collection."""
131
```
132
133
#### Document Operations
134
135
```python { .api }
136
def insert_one(
137
self,
138
mongo_collection: str,
139
doc: dict,
140
mongo_db: str | None = None,
141
**kwargs
142
) -> pymongo.results.InsertOneResult:
143
"""Insert a single document into a mongo collection."""
144
145
def insert_many(
146
self,
147
mongo_collection: str,
148
docs: Iterable[dict],
149
mongo_db: str | None = None,
150
**kwargs
151
) -> pymongo.results.InsertManyResult:
152
"""Insert many docs into a mongo collection."""
153
154
def update_one(
155
self,
156
mongo_collection: str,
157
filter_doc: dict,
158
update_doc: dict,
159
mongo_db: str | None = None,
160
**kwargs
161
) -> pymongo.results.UpdateResult:
162
"""Update a single document in a mongo collection."""
163
164
def update_many(
165
self,
166
mongo_collection: str,
167
filter_doc: dict,
168
update_doc: dict,
169
mongo_db: str | None = None,
170
**kwargs
171
) -> pymongo.results.UpdateResult:
172
"""Update one or more documents in a mongo collection."""
173
174
def replace_one(
175
self,
176
mongo_collection: str,
177
doc: dict,
178
filter_doc: dict | None = None,
179
mongo_db: str | None = None,
180
**kwargs
181
) -> pymongo.results.UpdateResult:
182
"""Replace a single document in a mongo collection."""
183
184
def replace_many(
185
self,
186
mongo_collection: str,
187
docs: list[dict],
188
filter_docs: list[dict] | None = None,
189
mongo_db: str | None = None,
190
upsert: bool = False,
191
collation: pymongo.collation.Collation | None = None,
192
**kwargs
193
) -> pymongo.results.BulkWriteResult:
194
"""Replace many documents in a mongo collection."""
195
196
def delete_one(
197
self,
198
mongo_collection: str,
199
filter_doc: dict,
200
mongo_db: str | None = None,
201
**kwargs
202
) -> pymongo.results.DeleteResult:
203
"""Delete a single document in a mongo collection."""
204
205
def delete_many(
206
self,
207
mongo_collection: str,
208
filter_doc: dict,
209
mongo_db: str | None = None,
210
**kwargs
211
) -> pymongo.results.DeleteResult:
212
"""Delete one or more documents in a mongo collection."""
213
```
214
215
### MongoDB Sensor
216
217
Document existence monitoring through MongoSensor for workflow coordination.
218
219
```python { .api }
220
class MongoSensor(BaseSensorOperator):
221
"""Checks for the existence of a document which matches the given query in MongoDB."""
222
223
template_fields: Sequence[str] = ("collection", "query")
224
225
def __init__(
226
self,
227
*,
228
collection: str,
229
query: dict,
230
mongo_conn_id: str = "mongo_default",
231
mongo_db: str | None = None,
232
**kwargs
233
) -> None:
234
"""Initialize MongoDB sensor.
235
236
Args:
237
collection: Target MongoDB collection
238
query: The query to find the target document
239
mongo_conn_id: The Mongo connection id to use when connecting to MongoDB
240
mongo_db: Target MongoDB name
241
"""
242
243
def poke(self, context: Context) -> bool:
244
"""Check if document matching query exists."""
245
```
246
247
## Connection Configuration
248
249
MongoDB connections are configured in Airflow with the following parameters:
250
251
```python { .api }
252
# Connection Type: mongo
253
# Default Connection ID: mongo_default
254
255
# Connection Extra Fields (JSON):
256
{
257
"srv": bool, # Use SRV/seed list connection
258
"ssl": bool, # Enable SSL/TLS
259
"tls": bool, # Alias for ssl
260
"allow_insecure": bool, # Allow invalid certificates during SSL connections
261
# Additional MongoDB connection string options supported
262
}
263
```
264
265
### Connection Examples
266
267
Standard MongoDB connection:
268
```python
269
# Connection ID: mongo_default
270
# Host: localhost
271
# Port: 27017
272
# Schema: my_database (optional default database)
273
# Extra: {"ssl": false}
274
```
275
276
MongoDB Atlas connection:
277
```python
278
# Connection ID: mongo_atlas
279
# Host: cluster0.mongodb.net
280
# Login: username
281
# Password: password
282
# Schema: production_db
283
# Extra: {"srv": true, "ssl": true}
284
```
285
286
SSL connection with custom options:
287
```python
288
# Connection ID: mongo_ssl
289
# Host: mongo.example.com
290
# Port: 27017
291
# Login: admin
292
# Password: password
293
# Extra: {
294
# "ssl": true,
295
# "allow_insecure": false,
296
# "connectTimeoutMS": 30000,
297
# "serverSelectionTimeoutMS": 30000
298
# }
299
```
300
301
## Types
302
303
```python { .api }
304
from collections.abc import Iterable
305
from typing import Any, Sequence, overload, Literal
306
from pymongo import MongoClient
307
from pymongo.collection import Collection as MongoCollection
308
from pymongo.command_cursor import CommandCursor
309
from pymongo.cursor import Cursor
310
from pymongo.results import (
311
InsertOneResult,
312
InsertManyResult,
313
UpdateResult,
314
BulkWriteResult,
315
DeleteResult
316
)
317
from pymongo.collation import Collation
318
from airflow.models import Connection
319
from airflow.utils.context import Context
320
```
321
322
## Error Handling
323
324
The provider handles MongoDB-specific exceptions:
325
326
```python
327
from pymongo.errors import CollectionInvalid
328
from airflow.exceptions import AirflowConfigException
329
330
# Connection validation errors
331
# - Invalid connection type (must be 'mongo')
332
# - SRV connections with port specified
333
# - Configuration conflicts
334
335
# MongoDB operation errors
336
# - Collection creation failures
337
# - Network connectivity issues
338
# - Authentication failures
339
# - Query execution errors
340
```
341
342
## Usage Examples
343
344
### Basic CRUD Operations
345
346
```python
347
from airflow.providers.mongo.hooks.mongo import MongoHook
348
349
def mongo_operations():
350
hook = MongoHook(mongo_conn_id="mongo_default")
351
352
# Create collection
353
collection = hook.create_collection("users")
354
355
# Insert documents
356
user = {"name": "John Doe", "email": "john@example.com", "age": 30}
357
result = hook.insert_one("users", user)
358
print(f"Inserted user with ID: {result.inserted_id}")
359
360
# Bulk insert
361
users = [
362
{"name": "Jane Smith", "email": "jane@example.com", "age": 25},
363
{"name": "Bob Johnson", "email": "bob@example.com", "age": 35}
364
]
365
hook.insert_many("users", users)
366
367
# Query documents
368
young_users = hook.find("users", {"age": {"$lt": 30}})
369
for user in young_users:
370
print(f"Young user: {user['name']}")
371
372
# Update document
373
hook.update_one("users", {"name": "John Doe"}, {"$set": {"age": 31}})
374
375
# Delete document
376
hook.delete_one("users", {"name": "Bob Johnson"})
377
```
378
379
### Aggregation Pipeline
380
381
```python
382
def aggregation_example():
383
hook = MongoHook(mongo_conn_id="mongo_default")
384
385
pipeline = [
386
{"$match": {"age": {"$gte": 25}}},
387
{"$group": {"_id": "$department", "avg_age": {"$avg": "$age"}}},
388
{"$sort": {"avg_age": -1}}
389
]
390
391
results = hook.aggregate("employees", pipeline)
392
for result in results:
393
print(f"Department: {result['_id']}, Average Age: {result['avg_age']}")
394
```
395
396
### Sensor with Complex Query
397
398
```python
399
from airflow.providers.mongo.sensors.mongo import MongoSensor
400
401
wait_for_processed_data = MongoSensor(
402
task_id='wait_for_processed_data',
403
collection='data_processing',
404
query={
405
'status': 'completed',
406
'processing_date': {'$gte': datetime.now().replace(hour=0, minute=0, second=0)},
407
'errors': {'$exists': False}
408
},
409
mongo_conn_id='mongo_production',
410
mongo_db='analytics',
411
poke_interval=60,
412
timeout=1800,
413
dag=dag
414
)
415
```
416
417
### Context Manager Usage
418
419
```python
420
def safe_mongo_operations():
421
with MongoHook(mongo_conn_id="mongo_default") as hook:
422
# Connection automatically closed when exiting context
423
documents = hook.find("my_collection", {"status": "active"})
424
processed_count = 0
425
426
for doc in documents:
427
# Process document
428
hook.update_one(
429
"my_collection",
430
{"_id": doc["_id"]},
431
{"$set": {"status": "processed", "processed_at": datetime.now()}}
432
)
433
processed_count += 1
434
435
print(f"Processed {processed_count} documents")
436
```