Provider package for Apache Beam integration with Apache Airflow supporting Python, Java, and Go pipeline execution
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Execute Apache Beam pipelines written in Java using self-executing JAR files with comprehensive support for various runners, Dataflow integration, and job lifecycle management. The Java pipeline operator provides robust execution capabilities for enterprise-grade data processing workflows.
Launch Apache Beam pipelines written in Java using self-executing JAR files with support for job classes, multiple runners, and cloud integration.
class BeamRunJavaPipelineOperator(BeamBasePipelineOperator):
def __init__(
self,
*,
jar: str,
runner: str = "DirectRunner",
job_class: str | None = None,
default_pipeline_options: dict | None = None,
pipeline_options: dict | None = None,
gcp_conn_id: str = "google_cloud_default",
dataflow_config: DataflowConfiguration | dict | None = None,
deferrable: bool = False,
**kwargs,
) -> None:
"""
Initialize Java pipeline operator.
Parameters:
- jar (str): Path to self-executing JAR file, supports local paths and gs:// URLs
- runner (str): Runner type (DirectRunner, DataflowRunner, SparkRunner, etc.)
- job_class (str): Java class name for pipeline, often not the main class
- default_pipeline_options (dict): High-level pipeline options applied to all tasks
- pipeline_options (dict): Task-specific pipeline options that override defaults
- gcp_conn_id (str): Google Cloud connection ID for GCS and Dataflow access
- dataflow_config (DataflowConfiguration|dict): Dataflow-specific configuration
- deferrable (bool): Enable deferrable execution mode
"""
def execute(self, context: Context):
"""Execute the Apache Beam Java Pipeline."""
def execute_on_dataflow(self, context: Context):
"""Execute the Apache Beam Pipeline on Dataflow runner."""
def on_kill(self) -> None:
"""Cancel the pipeline when task is killed."""from airflow.providers.apache.beam.operators.beam import BeamRunJavaPipelineOperator
run_local_java_pipeline = BeamRunJavaPipelineOperator(
task_id='run_local_java_pipeline',
jar='/path/to/pipeline.jar',
runner='DirectRunner',
pipeline_options={
'--output': '/tmp/output',
'--tempLocation': '/tmp/beam_temp',
},
)run_dataflow_java_pipeline = BeamRunJavaPipelineOperator(
task_id='run_dataflow_java_pipeline',
jar='gs://my-bucket/pipeline.jar',
job_class='com.company.pipelines.DataProcessingPipeline',
runner='DataflowRunner',
pipeline_options={
'--project': 'my-gcp-project',
'--region': 'us-central1',
'--tempLocation': 'gs://my-bucket/temp',
'--stagingLocation': 'gs://my-bucket/staging',
'--numWorkers': '4',
'--maxNumWorkers': '10',
'--machineType': 'n1-standard-4',
'--inputTable': 'project:dataset.input_table',
'--outputTable': 'project:dataset.output_table',
},
dataflow_config={
'job_name': 'java-dataflow-job',
'project_id': 'my-gcp-project',
'location': 'us-central1',
'wait_until_finished': True,
'check_if_running': 'WaitForRun',
},
)run_self_executing_jar = BeamRunJavaPipelineOperator(
task_id='run_self_executing_jar',
jar='gs://my-bucket/self-executing-pipeline.jar',
# job_class not specified - uses JAR main class
runner='DataflowRunner',
pipeline_options={
'--project': 'my-gcp-project',
'--tempLocation': 'gs://my-bucket/temp',
'--input': 'gs://input-bucket/data/*',
'--output': 'gs://output-bucket/results/',
},
)run_deferrable_java_pipeline = BeamRunJavaPipelineOperator(
task_id='run_deferrable_java_pipeline',
jar='gs://my-bucket/long-running-pipeline.jar',
job_class='com.company.LongRunningPipeline',
runner='DataflowRunner',
deferrable=True, # Enable deferrable mode
pipeline_options={
'--project': 'my-gcp-project',
'--region': 'us-central1',
'--tempLocation': 'gs://my-bucket/temp',
'--streaming': 'true', # Long-running streaming job
},
dataflow_config={
'job_name': 'streaming-java-pipeline',
'wait_until_finished': True,
'multiple_jobs': True, # Allow multiple instances
},
)Java pipelines use camelCase option naming conventions:
--runner, --project, --region, --tempLocation, --stagingLocation--numWorkers, --maxNumWorkers, --machineType, --diskSizeGb--streaming, --windowSize, --maxBundleSize--inputTable, --outputTable, --inputTopic, --outputTopicJava pipelines require self-executing JAR files that include:
Example Maven configuration for creating self-executing JAR:
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.company.pipelines.MyPipeline</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>When using DataflowRunner with Java pipelines:
dataflow_config = {
'job_name': 'java-pipeline-job',
'project_id': 'gcp-project-id',
'location': 'us-central1',
'check_if_running': 'WaitForRun', # Wait for existing jobs to complete
'multiple_jobs': False, # Allow only one job instance
'wait_until_finished': True,
'cancel_timeout': 600, # 10 minutes to cancel
}The operator handles Java-specific error conditions:
Java pipelines have specific boolean option handling:
pipeline_options = {
'--usePublicIps': 'false', # Explicit false value for Java
'--enableStreamingEngine': 'true', # Explicit true value
'--enableDataflowServiceOptions': 'true',
}The operator supports Airflow templating for dynamic Java pipeline configuration:
run_templated_java_pipeline = BeamRunJavaPipelineOperator(
task_id='run_templated_java_pipeline',
jar='gs://{{ var.value.jar_bucket }}/pipeline-{{ ds_nodash }}.jar',
job_class='{{ var.value.pipeline_class }}',
runner='{{ var.value.runner }}',
pipeline_options={
'--project': '{{ var.value.gcp_project }}',
'--inputTable': '{{ var.value.project }}:{{ var.value.dataset }}.input_{{ ds_nodash }}',
'--outputTable': '{{ var.value.project }}:{{ var.value.dataset }}.output_{{ ds_nodash }}',
'--jobName': 'java-pipeline-{{ ds_nodash }}-{{ ts_nodash }}',
},
)Install with Tessl CLI
npx tessl i tessl/pypi-apache-airflow-providers-apache-beam