Apache Airflow provider for executing Jupyter notebooks with Papermill
npx @tessl/cli install tessl/pypi-apache-airflow-providers-papermill@2021.3.0Apache Airflow provider for executing Jupyter notebooks with Papermill. This provider enables data teams to integrate notebook-based analytics and machine learning workflows into their Airflow DAGs by executing parameterized Jupyter notebooks through the Papermill library.
pip install apache-airflow-providers-papermillfrom airflow.providers.papermill.operators.papermill import PapermillOperatorFor lineage tracking:
from airflow.providers.papermill.operators.papermill import NoteBookfrom airflow import DAG
from airflow.providers.papermill.operators.papermill import PapermillOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
# Define DAG
dag = DAG(
'example_papermill',
default_args={'owner': 'airflow'},
schedule_interval='0 0 * * *',
start_date=days_ago(2),
dagrun_timeout=timedelta(minutes=60),
)
# Execute a notebook with parameters
run_notebook = PapermillOperator(
task_id="run_analysis_notebook",
input_nb="/path/to/analysis.ipynb",
output_nb="/path/to/output-{{ execution_date }}.ipynb",
parameters={"date": "{{ execution_date }}", "source": "airflow"},
dag=dag
)Execute Jupyter notebooks through Papermill with parameter injection and lineage tracking support.
class PapermillOperator(BaseOperator):
"""
Executes a jupyter notebook through papermill that is annotated with parameters
:param input_nb: input notebook (can also be a NoteBook or a File inlet)
:type input_nb: str
:param output_nb: output notebook (can also be a NoteBook or File outlet)
:type output_nb: str
:param parameters: the notebook parameters to set
:type parameters: dict
"""
supports_lineage = True
@apply_defaults
def __init__(
self,
*,
input_nb: Optional[str] = None,
output_nb: Optional[str] = None,
parameters: Optional[Dict] = None,
**kwargs,
) -> None: ...
def execute(self, context): ...Represents Jupyter notebooks for Airflow lineage tracking.
@attr.s(auto_attribs=True)
class NoteBook(File):
"""Jupyter notebook"""
type_hint: Optional[str] = "jupyter_notebook"
parameters: Optional[Dict] = {}
meta_schema: str = __name__ + '.NoteBook'from typing import Dict, Optional
import attr
import papermill as pm
from airflow.lineage.entities import File
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaultsUse Airflow's templating system for dynamic notebook paths and parameters:
run_notebook = PapermillOperator(
task_id="daily_report",
input_nb="/notebooks/daily_report_template.ipynb",
output_nb="/reports/daily_report_{{ ds }}.ipynb",
parameters={
"report_date": "{{ ds }}",
"execution_time": "{{ execution_date }}",
"run_id": "{{ run_id }}"
}
)The operator automatically creates lineage entities that can be used by downstream tasks:
from airflow.lineage import AUTO
from airflow.operators.python import PythonOperator
def process_notebook_output(inlets, **context):
# Access the output notebook through lineage
notebook_path = inlets[0].url
# Process the executed notebook...
process_task = PythonOperator(
task_id='process_output',
python_callable=process_notebook_output,
inlets=AUTO # Automatically detects upstream notebook outputs
)
run_notebook >> process_taskExecute multiple notebooks in sequence by setting up multiple inlets and outlets:
# Note: This pattern requires careful setup of inlets/outlets
# The operator will execute notebooks in pairs (inlet[i] -> outlet[i])
multi_notebook = PapermillOperator(
task_id="run_multiple_notebooks",
# Set up inlets and outlets manually for multiple notebooks
dag=dag
)
# Additional configuration needed for multiple notebook executionThe operator performs validation during execution:
ValueError if inlets or outlets are not properly configured (i.e., "Input notebook or output notebook is not specified")The operator calls papermill.execute_notebook() with these settings:
progress_bar=False - Disables progress display for cleaner logsreport_mode=True - Enables report generation modeNoteBook entitiesThe legacy import path is deprecated and issues a warning:
# DEPRECATED - issues warning
from airflow.operators.papermill_operator import PapermillOperatorUse the current import path:
# Current import path
from airflow.providers.papermill.operators.papermill import PapermillOperator