Provider package for Apache Beam integration with Apache Airflow supporting Python, Java, and Go pipeline execution
npx @tessl/cli install tessl/pypi-apache-airflow-providers-apache-beam@6.1.0An Apache Airflow provider package that enables workflow orchestration and data processing capabilities by offering operators, hooks, and triggers for Apache Beam pipeline execution. Supports running Beam pipelines written in Python, Java, and Go across various runners including DirectRunner, DataflowRunner, SparkRunner, and FlinkRunner.
pip install apache-airflow-providers-apache-beamfrom airflow.providers.apache.beam import __version__Operators:
from airflow.providers.apache.beam.operators.beam import (
BeamRunPythonPipelineOperator,
BeamRunJavaPipelineOperator,
BeamRunGoPipelineOperator,
BeamBasePipelineOperator
)Hooks:
from airflow.providers.apache.beam.hooks.beam import (
BeamHook,
BeamAsyncHook,
BeamRunnerType,
beam_options_to_args,
run_beam_command
)Triggers:
from airflow.providers.apache.beam.triggers.beam import (
BeamPythonPipelineTrigger,
BeamJavaPipelineTrigger,
BeamPipelineBaseTrigger
)Version compatibility:
from airflow.providers.apache.beam.version_compat import (
AIRFLOW_V_3_1_PLUS,
BaseHook,
BaseOperator
)from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator
# Define default arguments
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2024, 1, 1),
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# Create DAG
dag = DAG(
'beam_pipeline_example',
default_args=default_args,
description='Run Apache Beam pipeline with Airflow',
schedule_interval=timedelta(days=1),
catchup=False,
)
# Define Beam pipeline task
run_beam_pipeline = BeamRunPythonPipelineOperator(
task_id='run_beam_pipeline',
py_file='gs://my-bucket/beam_pipeline.py',
runner='DataflowRunner',
pipeline_options={
'project': 'my-gcp-project',
'region': 'us-central1',
'temp_location': 'gs://my-bucket/temp',
'staging_location': 'gs://my-bucket/staging',
},
dataflow_config={
'job_name': 'my-beam-pipeline',
'project_id': 'my-gcp-project',
'location': 'us-central1',
},
dag=dag,
)The provider follows Airflow's standard architecture pattern with three main component types:
The provider integrates with Google Cloud Dataflow when the google provider is available, enabling cloud-scale pipeline execution with monitoring and job management capabilities.
Execute Apache Beam pipelines written in Python with support for virtual environments, custom requirements, and various runners including local DirectRunner and cloud DataflowRunner.
class BeamRunPythonPipelineOperator(BeamBasePipelineOperator):
def __init__(
self,
*,
py_file: str,
runner: str = "DirectRunner",
default_pipeline_options: dict | None = None,
pipeline_options: dict | None = None,
py_interpreter: str = "python3",
py_options: list[str] | None = None,
py_requirements: list[str] | None = None,
py_system_site_packages: bool = False,
gcp_conn_id: str = "google_cloud_default",
dataflow_config: DataflowConfiguration | dict | None = None,
deferrable: bool = False,
**kwargs,
) -> None: ...Execute Apache Beam pipelines written in Java using self-executing JAR files with support for various runners and Dataflow integration.
class BeamRunJavaPipelineOperator(BeamBasePipelineOperator):
def __init__(
self,
*,
jar: str,
runner: str = "DirectRunner",
job_class: str | None = None,
default_pipeline_options: dict | None = None,
pipeline_options: dict | None = None,
gcp_conn_id: str = "google_cloud_default",
dataflow_config: DataflowConfiguration | dict | None = None,
deferrable: bool = False,
**kwargs,
) -> None: ...Execute Apache Beam pipelines written in Go from source files or pre-compiled binaries with support for cross-platform execution.
class BeamRunGoPipelineOperator(BeamBasePipelineOperator):
def __init__(
self,
*,
go_file: str = "",
launcher_binary: str = "",
worker_binary: str = "",
runner: str = "DirectRunner",
default_pipeline_options: dict | None = None,
pipeline_options: dict | None = None,
gcp_conn_id: str = "google_cloud_default",
dataflow_config: DataflowConfiguration | dict | None = None,
**kwargs,
) -> None: ...Low-level interface for executing and monitoring Apache Beam pipelines with both synchronous and asynchronous execution modes.
class BeamHook(BaseHook):
def __init__(self, runner: str) -> None: ...
def start_python_pipeline(
self,
variables: dict,
py_file: str,
py_options: list[str],
py_interpreter: str = "python3",
py_requirements: list[str] | None = None,
py_system_site_packages: bool = False,
process_line_callback: Callable[[str], None] | None = None,
is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,
) -> None: ...
def start_java_pipeline(
self,
variables: dict,
jar: str,
job_class: str | None = None,
process_line_callback: Callable[[str], None] | None = None,
is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,
) -> None: ...
def start_go_pipeline(
self,
variables: dict,
go_file: str,
process_line_callback: Callable[[str], None] | None = None,
should_init_module: bool = False,
) -> None: ...
class BeamAsyncHook(BeamHook):
def __init__(self, runner: str) -> None: ...
async def start_python_pipeline_async(
self,
variables: dict,
py_file: str,
py_options: list[str] | None = None,
py_interpreter: str = "python3",
py_requirements: list[str] | None = None,
py_system_site_packages: bool = False,
process_line_callback: Callable[[str], None] | None = None,
) -> int: ...
async def start_java_pipeline_async(
self,
variables: dict,
jar: str,
job_class: str | None = None,
process_line_callback: Callable[[str], None] | None = None,
) -> int: ...Deferrable execution triggers for long-running pipelines that enable efficient resource utilization by yielding control during pipeline execution.
class BeamPythonPipelineTrigger(BeamPipelineBaseTrigger):
def __init__(
self,
variables: dict,
py_file: str,
py_options: list[str] | None = None,
py_interpreter: str = "python3",
py_requirements: list[str] | None = None,
py_system_site_packages: bool = False,
runner: str = "DirectRunner",
gcp_conn_id: str = "google_cloud_default",
) -> None: ...
def serialize(self) -> tuple[str, dict[str, Any]]: ...
async def run(self) -> AsyncIterator[TriggerEvent]: ...
class BeamJavaPipelineTrigger(BeamPipelineBaseTrigger):
def __init__(
self,
variables: dict,
jar: str,
job_class: str | None = None,
runner: str = "DirectRunner",
gcp_conn_id: str = "google_cloud_default",
) -> None: ...
def serialize(self) -> tuple[str, dict[str, Any]]: ...
async def run(self) -> AsyncIterator[TriggerEvent]: ...Triggers and Deferrable Execution
Cross-version compatibility components that provide stable imports and version detection for different Airflow releases.
AIRFLOW_V_3_1_PLUS: bool
"""Boolean flag indicating Airflow version 3.1+ compatibility."""
# Version-compatible base classes (imported from appropriate Airflow version)
BaseHook: type
"""Version-compatible BaseHook class."""
BaseOperator: type
"""Version-compatible BaseOperator class."""class BeamBasePipelineOperator(BaseOperator):
"""
Abstract base class for all Apache Beam pipeline operators.
Provides common functionality including pipeline option handling,
Dataflow integration, and error management.
"""
template_fields = ["runner", "pipeline_options", "default_pipeline_options", "dataflow_config"]
def __init__(
self,
*,
runner: str = "DirectRunner",
default_pipeline_options: dict | None = None,
pipeline_options: dict | None = None,
gcp_conn_id: str = "google_cloud_default",
dataflow_config: DataflowConfiguration | dict | None = None,
deferrable: bool = False,
**kwargs,
) -> None: ...
def execute(self, context: Context) -> dict: ...
def execute_on_dataflow(self, context: Context) -> dict: ...
def on_kill(self) -> None: ...
class BeamRunnerType:
"""Helper class for listing available runner types."""
DataflowRunner = "DataflowRunner"
DirectRunner = "DirectRunner"
SparkRunner = "SparkRunner"
FlinkRunner = "FlinkRunner"
SamzaRunner = "SamzaRunner"
NemoRunner = "NemoRunner"
JetRunner = "JetRunner"
Twister2Runner = "Twister2Runner"
class DataflowConfiguration:
"""
Configuration object for Dataflow-specific options.
Used to configure Google Cloud Dataflow execution parameters
including job naming, project settings, and execution behavior.
"""
job_name: str
project_id: str
location: str
gcp_conn_id: str = "google_cloud_default"
wait_until_finished: bool = True
poll_sleep: int = 10
cancel_timeout: int = 300
drain_pipeline: bool = False
service_account: str | None = None
impersonation_chain: list[str] | None = None
check_if_running: str = "WaitForRun"
multiple_jobs: bool = False
class Context:
"""Airflow execution context for task instances."""
def beam_options_to_args(options: dict) -> list[str]:
"""
Convert pipeline options dictionary to command line arguments.
Args:
options: Dictionary with pipeline options
Returns:
List of formatted command line arguments
"""
def run_beam_command(
cmd: list[str],
log: logging.Logger,
process_line_callback: Callable[[str], None] | None = None,
working_directory: str | None = None,
is_dataflow_job_id_exist_callback: Callable[[], bool] | None = None,
) -> None:
"""
Execute Beam pipeline command in subprocess with monitoring.
Args:
cmd: Command parts to execute
log: Logger for output
process_line_callback: Optional output processor
working_directory: Execution directory
is_dataflow_job_id_exist_callback: Optional job ID detector
"""
# Version compatibility support
AIRFLOW_V_3_1_PLUS: bool
"""Boolean flag indicating Airflow version 3.1+ compatibility."""
# Trigger types
class TriggerEvent:
"""Event yielded by triggers to indicate status changes."""
def __init__(self, payload: dict[str, Any]) -> None: ...
class AsyncIterator[T]:
"""Async iterator type for trigger events."""
class NamedTemporaryFile:
"""Temporary file with a visible name in the file system."""
name: str
# Type aliases
Any = typing.Any
Callable = typing.Callable