Apache Airflow provider for executing Jupyter notebooks with Papermill
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Apache Airflow provider for executing Jupyter notebooks with Papermill. This provider enables data teams to integrate notebook-based analytics and machine learning workflows into their Airflow DAGs by executing parameterized Jupyter notebooks through the Papermill library.
pip install apache-airflow-providers-papermillfrom airflow.providers.papermill.operators.papermill import PapermillOperatorFor lineage tracking:
from airflow.providers.papermill.operators.papermill import NoteBookfrom airflow import DAG
from airflow.providers.papermill.operators.papermill import PapermillOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
# Define DAG
dag = DAG(
'example_papermill',
default_args={'owner': 'airflow'},
schedule_interval='0 0 * * *',
start_date=days_ago(2),
dagrun_timeout=timedelta(minutes=60),
)
# Execute a notebook with parameters
run_notebook = PapermillOperator(
task_id="run_analysis_notebook",
input_nb="/path/to/analysis.ipynb",
output_nb="/path/to/output-{{ execution_date }}.ipynb",
parameters={"date": "{{ execution_date }}", "source": "airflow"},
dag=dag
)Execute Jupyter notebooks through Papermill with parameter injection and lineage tracking support.
class PapermillOperator(BaseOperator):
"""
Executes a jupyter notebook through papermill that is annotated with parameters
:param input_nb: input notebook (can also be a NoteBook or a File inlet)
:type input_nb: str
:param output_nb: output notebook (can also be a NoteBook or File outlet)
:type output_nb: str
:param parameters: the notebook parameters to set
:type parameters: dict
"""
supports_lineage = True
@apply_defaults
def __init__(
self,
*,
input_nb: Optional[str] = None,
output_nb: Optional[str] = None,
parameters: Optional[Dict] = None,
**kwargs,
) -> None: ...
def execute(self, context): ...Represents Jupyter notebooks for Airflow lineage tracking.
@attr.s(auto_attribs=True)
class NoteBook(File):
"""Jupyter notebook"""
type_hint: Optional[str] = "jupyter_notebook"
parameters: Optional[Dict] = {}
meta_schema: str = __name__ + '.NoteBook'from typing import Dict, Optional
import attr
import papermill as pm
from airflow.lineage.entities import File
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaultsUse Airflow's templating system for dynamic notebook paths and parameters:
run_notebook = PapermillOperator(
task_id="daily_report",
input_nb="/notebooks/daily_report_template.ipynb",
output_nb="/reports/daily_report_{{ ds }}.ipynb",
parameters={
"report_date": "{{ ds }}",
"execution_time": "{{ execution_date }}",
"run_id": "{{ run_id }}"
}
)The operator automatically creates lineage entities that can be used by downstream tasks:
from airflow.lineage import AUTO
from airflow.operators.python import PythonOperator
def process_notebook_output(inlets, **context):
# Access the output notebook through lineage
notebook_path = inlets[0].url
# Process the executed notebook...
process_task = PythonOperator(
task_id='process_output',
python_callable=process_notebook_output,
inlets=AUTO # Automatically detects upstream notebook outputs
)
run_notebook >> process_taskExecute multiple notebooks in sequence by setting up multiple inlets and outlets:
# Note: This pattern requires careful setup of inlets/outlets
# The operator will execute notebooks in pairs (inlet[i] -> outlet[i])
multi_notebook = PapermillOperator(
task_id="run_multiple_notebooks",
# Set up inlets and outlets manually for multiple notebooks
dag=dag
)
# Additional configuration needed for multiple notebook executionThe operator performs validation during execution:
ValueError if inlets or outlets are not properly configured (i.e., "Input notebook or output notebook is not specified")The operator calls papermill.execute_notebook() with these settings:
progress_bar=False - Disables progress display for cleaner logsreport_mode=True - Enables report generation modeNoteBook entitiesThe legacy import path is deprecated and issues a warning:
# DEPRECATED - issues warning
from airflow.operators.papermill_operator import PapermillOperatorUse the current import path:
# Current import path
from airflow.providers.papermill.operators.papermill import PapermillOperatorInstall with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-papermill