0
# Java Pipeline Execution
1
2
Execute Apache Beam pipelines written in Java using self-executing JAR files with comprehensive support for various runners, Dataflow integration, and job lifecycle management. The Java pipeline operator provides robust execution capabilities for enterprise-grade data processing workflows.
3
4
## Capabilities
5
6
### BeamRunJavaPipelineOperator
7
8
Launch Apache Beam pipelines written in Java using self-executing JAR files with support for job classes, multiple runners, and cloud integration.
9
10
```python { .api }
11
class BeamRunJavaPipelineOperator(BeamBasePipelineOperator):
12
def __init__(
13
self,
14
*,
15
jar: str,
16
runner: str = "DirectRunner",
17
job_class: str | None = None,
18
default_pipeline_options: dict | None = None,
19
pipeline_options: dict | None = None,
20
gcp_conn_id: str = "google_cloud_default",
21
dataflow_config: DataflowConfiguration | dict | None = None,
22
deferrable: bool = False,
23
**kwargs,
24
) -> None:
25
"""
26
Initialize Java pipeline operator.
27
28
Parameters:
29
- jar (str): Path to self-executing JAR file, supports local paths and gs:// URLs
30
- runner (str): Runner type (DirectRunner, DataflowRunner, SparkRunner, etc.)
31
- job_class (str): Java class name for pipeline, often not the main class
32
- default_pipeline_options (dict): High-level pipeline options applied to all tasks
33
- pipeline_options (dict): Task-specific pipeline options that override defaults
34
- gcp_conn_id (str): Google Cloud connection ID for GCS and Dataflow access
35
- dataflow_config (DataflowConfiguration|dict): Dataflow-specific configuration
36
- deferrable (bool): Enable deferrable execution mode
37
"""
38
39
def execute(self, context: Context):
40
"""Execute the Apache Beam Java Pipeline."""
41
42
def execute_on_dataflow(self, context: Context):
43
"""Execute the Apache Beam Pipeline on Dataflow runner."""
44
45
def on_kill(self) -> None:
46
"""Cancel the pipeline when task is killed."""
47
```
48
49
### Usage Examples
50
51
#### Basic Local Execution
52
53
```python
54
from airflow.providers.apache.beam.operators.beam import BeamRunJavaPipelineOperator
55
56
run_local_java_pipeline = BeamRunJavaPipelineOperator(
57
task_id='run_local_java_pipeline',
58
jar='/path/to/pipeline.jar',
59
runner='DirectRunner',
60
pipeline_options={
61
'--output': '/tmp/output',
62
'--tempLocation': '/tmp/beam_temp',
63
},
64
)
65
```
66
67
#### Dataflow Execution with Job Class
68
69
```python
70
run_dataflow_java_pipeline = BeamRunJavaPipelineOperator(
71
task_id='run_dataflow_java_pipeline',
72
jar='gs://my-bucket/pipeline.jar',
73
job_class='com.company.pipelines.DataProcessingPipeline',
74
runner='DataflowRunner',
75
pipeline_options={
76
'--project': 'my-gcp-project',
77
'--region': 'us-central1',
78
'--tempLocation': 'gs://my-bucket/temp',
79
'--stagingLocation': 'gs://my-bucket/staging',
80
'--numWorkers': '4',
81
'--maxNumWorkers': '10',
82
'--machineType': 'n1-standard-4',
83
'--inputTable': 'project:dataset.input_table',
84
'--outputTable': 'project:dataset.output_table',
85
},
86
dataflow_config={
87
'job_name': 'java-dataflow-job',
88
'project_id': 'my-gcp-project',
89
'location': 'us-central1',
90
'wait_until_finished': True,
91
'check_if_running': 'WaitForRun',
92
},
93
)
94
```
95
96
#### Self-Executing JAR (No Job Class)
97
98
```python
99
run_self_executing_jar = BeamRunJavaPipelineOperator(
100
task_id='run_self_executing_jar',
101
jar='gs://my-bucket/self-executing-pipeline.jar',
102
# job_class not specified - uses JAR main class
103
runner='DataflowRunner',
104
pipeline_options={
105
'--project': 'my-gcp-project',
106
'--tempLocation': 'gs://my-bucket/temp',
107
'--input': 'gs://input-bucket/data/*',
108
'--output': 'gs://output-bucket/results/',
109
},
110
)
111
```
112
113
#### Deferrable Execution with Monitoring
114
115
```python
116
run_deferrable_java_pipeline = BeamRunJavaPipelineOperator(
117
task_id='run_deferrable_java_pipeline',
118
jar='gs://my-bucket/long-running-pipeline.jar',
119
job_class='com.company.LongRunningPipeline',
120
runner='DataflowRunner',
121
deferrable=True, # Enable deferrable mode
122
pipeline_options={
123
'--project': 'my-gcp-project',
124
'--region': 'us-central1',
125
'--tempLocation': 'gs://my-bucket/temp',
126
'--streaming': 'true', # Long-running streaming job
127
},
128
dataflow_config={
129
'job_name': 'streaming-java-pipeline',
130
'wait_until_finished': True,
131
'multiple_jobs': True, # Allow multiple instances
132
},
133
)
134
```
135
136
### Configuration Options
137
138
#### Java-Specific Pipeline Options
139
140
Java pipelines use camelCase option naming conventions:
141
142
- **Basic Options**: `--runner`, `--project`, `--region`, `--tempLocation`, `--stagingLocation`
143
- **Scaling Options**: `--numWorkers`, `--maxNumWorkers`, `--machineType`, `--diskSizeGb`
144
- **Processing Options**: `--streaming`, `--windowSize`, `--maxBundleSize`
145
- **I/O Options**: `--inputTable`, `--outputTable`, `--inputTopic`, `--outputTopic`
146
147
#### JAR File Requirements
148
149
Java pipelines require self-executing JAR files that include:
150
151
- All dependencies bundled (fat JAR or shaded JAR)
152
- Proper manifest with main class (if no job_class specified)
153
- Apache Beam SDK dependencies
154
- Any custom transforms and I/O connectors
155
156
Example Maven configuration for creating self-executing JAR:
157
158
```xml
159
<plugin>
160
<groupId>org.apache.maven.plugins</groupId>
161
<artifactId>maven-shade-plugin</artifactId>
162
<executions>
163
<execution>
164
<goals>
165
<goal>shade</goal>
166
</goals>
167
<configuration>
168
<transformers>
169
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
170
<mainClass>com.company.pipelines.MyPipeline</mainClass>
171
</transformer>
172
</transformers>
173
</configuration>
174
</execution>
175
</executions>
176
</plugin>
177
```
178
179
#### Dataflow-Specific Options
180
181
When using DataflowRunner with Java pipelines:
182
183
```python
184
dataflow_config = {
185
'job_name': 'java-pipeline-job',
186
'project_id': 'gcp-project-id',
187
'location': 'us-central1',
188
'check_if_running': 'WaitForRun', # Wait for existing jobs to complete
189
'multiple_jobs': False, # Allow only one job instance
190
'wait_until_finished': True,
191
'cancel_timeout': 600, # 10 minutes to cancel
192
}
193
```
194
195
### Error Handling
196
197
The operator handles Java-specific error conditions:
198
199
- **JAR File Access**: Validates JAR file accessibility and downloads from GCS
200
- **Class Loading**: Handles job class loading and main class resolution
201
- **JVM Configuration**: Manages Java runtime environment and memory settings
202
- **Dataflow Integration**: Handles Dataflow job submission and monitoring
203
- **Job Lifecycle**: Manages job cancellation and cleanup
204
205
### Boolean Options Handling
206
207
Java pipelines have specific boolean option handling:
208
209
```python
210
pipeline_options = {
211
'--usePublicIps': 'false', # Explicit false value for Java
212
'--enableStreamingEngine': 'true', # Explicit true value
213
'--enableDataflowServiceOptions': 'true',
214
}
215
```
216
217
### Templating Support
218
219
The operator supports Airflow templating for dynamic Java pipeline configuration:
220
221
```python
222
run_templated_java_pipeline = BeamRunJavaPipelineOperator(
223
task_id='run_templated_java_pipeline',
224
jar='gs://{{ var.value.jar_bucket }}/pipeline-{{ ds_nodash }}.jar',
225
job_class='{{ var.value.pipeline_class }}',
226
runner='{{ var.value.runner }}',
227
pipeline_options={
228
'--project': '{{ var.value.gcp_project }}',
229
'--inputTable': '{{ var.value.project }}:{{ var.value.dataset }}.input_{{ ds_nodash }}',
230
'--outputTable': '{{ var.value.project }}:{{ var.value.dataset }}.output_{{ ds_nodash }}',
231
'--jobName': 'java-pipeline-{{ ds_nodash }}-{{ ts_nodash }}',
232
},
233
)