or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/pypi-apache-airflow-providers-mongo

MongoDB provider for Apache Airflow enabling database connections, queries, and workflow monitoring

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
pypipkg:pypi/apache-airflow-providers-mongo@5.2.x

To install, run

npx @tessl/cli install tessl/pypi-apache-airflow-providers-mongo@5.2.0

0

# Apache Airflow MongoDB Provider

1

2

Apache Airflow MongoDB Provider enables data engineers to build workflows that interact with MongoDB databases. It provides MongoDB connectivity, database operations, and monitoring capabilities within Airflow DAGs through hooks and sensors.

3

4

## Package Information

5

6

- **Package Name**: apache-airflow-providers-mongo

7

- **Package Type**: pypi

8

- **Language**: Python

9

- **Installation**: `pip install apache-airflow-providers-mongo`

10

- **Requirements**: Apache Airflow 2.10.0+, pymongo>=4.13.2, dnspython>=1.13.0

11

12

## Core Imports

13

14

```python

15

from airflow.providers.mongo.hooks.mongo import MongoHook

16

from airflow.providers.mongo.sensors.mongo import MongoSensor

17

```

18

19

## Basic Usage

20

21

```python

22

from airflow import DAG

23

from airflow.providers.mongo.hooks.mongo import MongoHook

24

from airflow.providers.mongo.sensors.mongo import MongoSensor

25

from airflow.operators.python import PythonOperator

26

from datetime import datetime

27

28

def process_mongo_data():

29

hook = MongoHook(mongo_conn_id="mongo_default")

30

31

# Insert a document

32

result = hook.insert_one("my_collection", {"key": "value", "timestamp": datetime.now()})

33

print(f"Inserted document with ID: {result.inserted_id}")

34

35

# Query documents

36

documents = hook.find("my_collection", {"key": "value"})

37

for doc in documents:

38

print(doc)

39

40

dag = DAG(

41

'mongo_example_dag',

42

default_args={'start_date': datetime(2024, 1, 1)},

43

schedule_interval='@daily'

44

)

45

46

# Wait for a document to exist

47

wait_for_document = MongoSensor(

48

task_id='wait_for_data',

49

collection='my_collection',

50

query={'status': 'ready'},

51

mongo_conn_id='mongo_default',

52

poke_interval=30,

53

timeout=300,

54

dag=dag

55

)

56

57

# Process data when available

58

process_data = PythonOperator(

59

task_id='process_data',

60

python_callable=process_mongo_data,

61

dag=dag

62

)

63

64

wait_for_document >> process_data

65

```

66

67

## Capabilities

68

69

### MongoDB Hook

70

71

Database connectivity and operations through MongoHook for CRUD operations, aggregations, and collection management.

72

73

```python { .api }

74

class MongoHook(BaseHook):

75

"""PyMongo wrapper to interact with MongoDB."""

76

77

def __init__(self, mongo_conn_id: str = "mongo_default", *args, **kwargs) -> None:

78

"""Initialize MongoDB hook with connection ID."""

79

80

def get_conn(self) -> MongoClient:

81

"""Fetch PyMongo Client."""

82

83

def close(self) -> None:

84

"""Close the MongoDB connection."""

85

86

def get_collection(self, mongo_collection: str, mongo_db: str | None = None) -> MongoCollection:

87

"""Fetch a mongo collection object for querying."""

88

89

def create_collection(

90

self,

91

mongo_collection: str,

92

mongo_db: str | None = None,

93

return_if_exists: bool = True,

94

**create_kwargs: Any

95

) -> MongoCollection:

96

"""Create the collection and return it."""

97

```

98

99

#### Query Operations

100

101

```python { .api }

102

def find(

103

self,

104

mongo_collection: str,

105

query: dict,

106

find_one: bool = False,

107

mongo_db: str | None = None,

108

projection: list | dict | None = None,

109

**kwargs

110

) -> pymongo.cursor.Cursor | Any | None:

111

"""Run a mongo find query and returns the results."""

112

113

def aggregate(

114

self,

115

mongo_collection: str,

116

aggregate_query: list,

117

mongo_db: str | None = None,

118

**kwargs

119

) -> CommandCursor:

120

"""Run an aggregation pipeline and returns the results."""

121

122

def distinct(

123

self,

124

mongo_collection: str,

125

distinct_key: str,

126

filter_doc: dict | None = None,

127

mongo_db: str | None = None,

128

**kwargs

129

) -> list[Any]:

130

"""Return a list of distinct values for the given key across a collection."""

131

```

132

133

#### Document Operations

134

135

```python { .api }

136

def insert_one(

137

self,

138

mongo_collection: str,

139

doc: dict,

140

mongo_db: str | None = None,

141

**kwargs

142

) -> pymongo.results.InsertOneResult:

143

"""Insert a single document into a mongo collection."""

144

145

def insert_many(

146

self,

147

mongo_collection: str,

148

docs: Iterable[dict],

149

mongo_db: str | None = None,

150

**kwargs

151

) -> pymongo.results.InsertManyResult:

152

"""Insert many docs into a mongo collection."""

153

154

def update_one(

155

self,

156

mongo_collection: str,

157

filter_doc: dict,

158

update_doc: dict,

159

mongo_db: str | None = None,

160

**kwargs

161

) -> pymongo.results.UpdateResult:

162

"""Update a single document in a mongo collection."""

163

164

def update_many(

165

self,

166

mongo_collection: str,

167

filter_doc: dict,

168

update_doc: dict,

169

mongo_db: str | None = None,

170

**kwargs

171

) -> pymongo.results.UpdateResult:

172

"""Update one or more documents in a mongo collection."""

173

174

def replace_one(

175

self,

176

mongo_collection: str,

177

doc: dict,

178

filter_doc: dict | None = None,

179

mongo_db: str | None = None,

180

**kwargs

181

) -> pymongo.results.UpdateResult:

182

"""Replace a single document in a mongo collection."""

183

184

def replace_many(

185

self,

186

mongo_collection: str,

187

docs: list[dict],

188

filter_docs: list[dict] | None = None,

189

mongo_db: str | None = None,

190

upsert: bool = False,

191

collation: pymongo.collation.Collation | None = None,

192

**kwargs

193

) -> pymongo.results.BulkWriteResult:

194

"""Replace many documents in a mongo collection."""

195

196

def delete_one(

197

self,

198

mongo_collection: str,

199

filter_doc: dict,

200

mongo_db: str | None = None,

201

**kwargs

202

) -> pymongo.results.DeleteResult:

203

"""Delete a single document in a mongo collection."""

204

205

def delete_many(

206

self,

207

mongo_collection: str,

208

filter_doc: dict,

209

mongo_db: str | None = None,

210

**kwargs

211

) -> pymongo.results.DeleteResult:

212

"""Delete one or more documents in a mongo collection."""

213

```

214

215

### MongoDB Sensor

216

217

Document existence monitoring through MongoSensor for workflow coordination.

218

219

```python { .api }

220

class MongoSensor(BaseSensorOperator):

221

"""Checks for the existence of a document which matches the given query in MongoDB."""

222

223

template_fields: Sequence[str] = ("collection", "query")

224

225

def __init__(

226

self,

227

*,

228

collection: str,

229

query: dict,

230

mongo_conn_id: str = "mongo_default",

231

mongo_db: str | None = None,

232

**kwargs

233

) -> None:

234

"""Initialize MongoDB sensor.

235

236

Args:

237

collection: Target MongoDB collection

238

query: The query to find the target document

239

mongo_conn_id: The Mongo connection id to use when connecting to MongoDB

240

mongo_db: Target MongoDB name

241

"""

242

243

def poke(self, context: Context) -> bool:

244

"""Check if document matching query exists."""

245

```

246

247

## Connection Configuration

248

249

MongoDB connections are configured in Airflow with the following parameters:

250

251

```python { .api }

252

# Connection Type: mongo

253

# Default Connection ID: mongo_default

254

255

# Connection Extra Fields (JSON):

256

{

257

"srv": bool, # Use SRV/seed list connection

258

"ssl": bool, # Enable SSL/TLS

259

"tls": bool, # Alias for ssl

260

"allow_insecure": bool, # Allow invalid certificates during SSL connections

261

# Additional MongoDB connection string options supported

262

}

263

```

264

265

### Connection Examples

266

267

Standard MongoDB connection:

268

```python

269

# Connection ID: mongo_default

270

# Host: localhost

271

# Port: 27017

272

# Schema: my_database (optional default database)

273

# Extra: {"ssl": false}

274

```

275

276

MongoDB Atlas connection:

277

```python

278

# Connection ID: mongo_atlas

279

# Host: cluster0.mongodb.net

280

# Login: username

281

# Password: password

282

# Schema: production_db

283

# Extra: {"srv": true, "ssl": true}

284

```

285

286

SSL connection with custom options:

287

```python

288

# Connection ID: mongo_ssl

289

# Host: mongo.example.com

290

# Port: 27017

291

# Login: admin

292

# Password: password

293

# Extra: {

294

# "ssl": true,

295

# "allow_insecure": false,

296

# "connectTimeoutMS": 30000,

297

# "serverSelectionTimeoutMS": 30000

298

# }

299

```

300

301

## Types

302

303

```python { .api }

304

from collections.abc import Iterable

305

from typing import Any, Sequence, overload, Literal

306

from pymongo import MongoClient

307

from pymongo.collection import Collection as MongoCollection

308

from pymongo.command_cursor import CommandCursor

309

from pymongo.cursor import Cursor

310

from pymongo.results import (

311

InsertOneResult,

312

InsertManyResult,

313

UpdateResult,

314

BulkWriteResult,

315

DeleteResult

316

)

317

from pymongo.collation import Collation

318

from airflow.models import Connection

319

from airflow.utils.context import Context

320

```

321

322

## Error Handling

323

324

The provider handles MongoDB-specific exceptions:

325

326

```python

327

from pymongo.errors import CollectionInvalid

328

from airflow.exceptions import AirflowConfigException

329

330

# Connection validation errors

331

# - Invalid connection type (must be 'mongo')

332

# - SRV connections with port specified

333

# - Configuration conflicts

334

335

# MongoDB operation errors

336

# - Collection creation failures

337

# - Network connectivity issues

338

# - Authentication failures

339

# - Query execution errors

340

```

341

342

## Usage Examples

343

344

### Basic CRUD Operations

345

346

```python

347

from airflow.providers.mongo.hooks.mongo import MongoHook

348

349

def mongo_operations():

350

hook = MongoHook(mongo_conn_id="mongo_default")

351

352

# Create collection

353

collection = hook.create_collection("users")

354

355

# Insert documents

356

user = {"name": "John Doe", "email": "john@example.com", "age": 30}

357

result = hook.insert_one("users", user)

358

print(f"Inserted user with ID: {result.inserted_id}")

359

360

# Bulk insert

361

users = [

362

{"name": "Jane Smith", "email": "jane@example.com", "age": 25},

363

{"name": "Bob Johnson", "email": "bob@example.com", "age": 35}

364

]

365

hook.insert_many("users", users)

366

367

# Query documents

368

young_users = hook.find("users", {"age": {"$lt": 30}})

369

for user in young_users:

370

print(f"Young user: {user['name']}")

371

372

# Update document

373

hook.update_one("users", {"name": "John Doe"}, {"$set": {"age": 31}})

374

375

# Delete document

376

hook.delete_one("users", {"name": "Bob Johnson"})

377

```

378

379

### Aggregation Pipeline

380

381

```python

382

def aggregation_example():

383

hook = MongoHook(mongo_conn_id="mongo_default")

384

385

pipeline = [

386

{"$match": {"age": {"$gte": 25}}},

387

{"$group": {"_id": "$department", "avg_age": {"$avg": "$age"}}},

388

{"$sort": {"avg_age": -1}}

389

]

390

391

results = hook.aggregate("employees", pipeline)

392

for result in results:

393

print(f"Department: {result['_id']}, Average Age: {result['avg_age']}")

394

```

395

396

### Sensor with Complex Query

397

398

```python

399

from airflow.providers.mongo.sensors.mongo import MongoSensor

400

401

wait_for_processed_data = MongoSensor(

402

task_id='wait_for_processed_data',

403

collection='data_processing',

404

query={

405

'status': 'completed',

406

'processing_date': {'$gte': datetime.now().replace(hour=0, minute=0, second=0)},

407

'errors': {'$exists': False}

408

},

409

mongo_conn_id='mongo_production',

410

mongo_db='analytics',

411

poke_interval=60,

412

timeout=1800,

413

dag=dag

414

)

415

```

416

417

### Context Manager Usage

418

419

```python

420

def safe_mongo_operations():

421

with MongoHook(mongo_conn_id="mongo_default") as hook:

422

# Connection automatically closed when exiting context

423

documents = hook.find("my_collection", {"status": "active"})

424

processed_count = 0

425

426

for doc in documents:

427

# Process document

428

hook.update_one(

429

"my_collection",

430

{"_id": doc["_id"]},

431

{"$set": {"status": "processed", "processed_at": datetime.now()}}

432

)

433

processed_count += 1

434

435

print(f"Processed {processed_count} documents")

436

```