or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

go-pipelines.mdhooks-monitoring.mdindex.mdjava-pipelines.mdpython-pipelines.mdtriggers.md

java-pipelines.mddocs/

0

# Java Pipeline Execution

1

2

Execute Apache Beam pipelines written in Java using self-executing JAR files with comprehensive support for various runners, Dataflow integration, and job lifecycle management. The Java pipeline operator provides robust execution capabilities for enterprise-grade data processing workflows.

3

4

## Capabilities

5

6

### BeamRunJavaPipelineOperator

7

8

Launch Apache Beam pipelines written in Java using self-executing JAR files with support for job classes, multiple runners, and cloud integration.

9

10

```python { .api }

11

class BeamRunJavaPipelineOperator(BeamBasePipelineOperator):

12

def __init__(

13

self,

14

*,

15

jar: str,

16

runner: str = "DirectRunner",

17

job_class: str | None = None,

18

default_pipeline_options: dict | None = None,

19

pipeline_options: dict | None = None,

20

gcp_conn_id: str = "google_cloud_default",

21

dataflow_config: DataflowConfiguration | dict | None = None,

22

deferrable: bool = False,

23

**kwargs,

24

) -> None:

25

"""

26

Initialize Java pipeline operator.

27

28

Parameters:

29

- jar (str): Path to self-executing JAR file, supports local paths and gs:// URLs

30

- runner (str): Runner type (DirectRunner, DataflowRunner, SparkRunner, etc.)

31

- job_class (str): Java class name for pipeline, often not the main class

32

- default_pipeline_options (dict): High-level pipeline options applied to all tasks

33

- pipeline_options (dict): Task-specific pipeline options that override defaults

34

- gcp_conn_id (str): Google Cloud connection ID for GCS and Dataflow access

35

- dataflow_config (DataflowConfiguration|dict): Dataflow-specific configuration

36

- deferrable (bool): Enable deferrable execution mode

37

"""

38

39

def execute(self, context: Context):

40

"""Execute the Apache Beam Java Pipeline."""

41

42

def execute_on_dataflow(self, context: Context):

43

"""Execute the Apache Beam Pipeline on Dataflow runner."""

44

45

def on_kill(self) -> None:

46

"""Cancel the pipeline when task is killed."""

47

```

48

49

### Usage Examples

50

51

#### Basic Local Execution

52

53

```python

54

from airflow.providers.apache.beam.operators.beam import BeamRunJavaPipelineOperator

55

56

run_local_java_pipeline = BeamRunJavaPipelineOperator(

57

task_id='run_local_java_pipeline',

58

jar='/path/to/pipeline.jar',

59

runner='DirectRunner',

60

pipeline_options={

61

'--output': '/tmp/output',

62

'--tempLocation': '/tmp/beam_temp',

63

},

64

)

65

```

66

67

#### Dataflow Execution with Job Class

68

69

```python

70

run_dataflow_java_pipeline = BeamRunJavaPipelineOperator(

71

task_id='run_dataflow_java_pipeline',

72

jar='gs://my-bucket/pipeline.jar',

73

job_class='com.company.pipelines.DataProcessingPipeline',

74

runner='DataflowRunner',

75

pipeline_options={

76

'--project': 'my-gcp-project',

77

'--region': 'us-central1',

78

'--tempLocation': 'gs://my-bucket/temp',

79

'--stagingLocation': 'gs://my-bucket/staging',

80

'--numWorkers': '4',

81

'--maxNumWorkers': '10',

82

'--machineType': 'n1-standard-4',

83

'--inputTable': 'project:dataset.input_table',

84

'--outputTable': 'project:dataset.output_table',

85

},

86

dataflow_config={

87

'job_name': 'java-dataflow-job',

88

'project_id': 'my-gcp-project',

89

'location': 'us-central1',

90

'wait_until_finished': True,

91

'check_if_running': 'WaitForRun',

92

},

93

)

94

```

95

96

#### Self-Executing JAR (No Job Class)

97

98

```python

99

run_self_executing_jar = BeamRunJavaPipelineOperator(

100

task_id='run_self_executing_jar',

101

jar='gs://my-bucket/self-executing-pipeline.jar',

102

# job_class not specified - uses JAR main class

103

runner='DataflowRunner',

104

pipeline_options={

105

'--project': 'my-gcp-project',

106

'--tempLocation': 'gs://my-bucket/temp',

107

'--input': 'gs://input-bucket/data/*',

108

'--output': 'gs://output-bucket/results/',

109

},

110

)

111

```

112

113

#### Deferrable Execution with Monitoring

114

115

```python

116

run_deferrable_java_pipeline = BeamRunJavaPipelineOperator(

117

task_id='run_deferrable_java_pipeline',

118

jar='gs://my-bucket/long-running-pipeline.jar',

119

job_class='com.company.LongRunningPipeline',

120

runner='DataflowRunner',

121

deferrable=True, # Enable deferrable mode

122

pipeline_options={

123

'--project': 'my-gcp-project',

124

'--region': 'us-central1',

125

'--tempLocation': 'gs://my-bucket/temp',

126

'--streaming': 'true', # Long-running streaming job

127

},

128

dataflow_config={

129

'job_name': 'streaming-java-pipeline',

130

'wait_until_finished': True,

131

'multiple_jobs': True, # Allow multiple instances

132

},

133

)

134

```

135

136

### Configuration Options

137

138

#### Java-Specific Pipeline Options

139

140

Java pipelines use camelCase option naming conventions:

141

142

- **Basic Options**: `--runner`, `--project`, `--region`, `--tempLocation`, `--stagingLocation`

143

- **Scaling Options**: `--numWorkers`, `--maxNumWorkers`, `--machineType`, `--diskSizeGb`

144

- **Processing Options**: `--streaming`, `--windowSize`, `--maxBundleSize`

145

- **I/O Options**: `--inputTable`, `--outputTable`, `--inputTopic`, `--outputTopic`

146

147

#### JAR File Requirements

148

149

Java pipelines require self-executing JAR files that include:

150

151

- All dependencies bundled (fat JAR or shaded JAR)

152

- Proper manifest with main class (if no job_class specified)

153

- Apache Beam SDK dependencies

154

- Any custom transforms and I/O connectors

155

156

Example Maven configuration for creating self-executing JAR:

157

158

```xml

159

<plugin>

160

<groupId>org.apache.maven.plugins</groupId>

161

<artifactId>maven-shade-plugin</artifactId>

162

<executions>

163

<execution>

164

<goals>

165

<goal>shade</goal>

166

</goals>

167

<configuration>

168

<transformers>

169

<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">

170

<mainClass>com.company.pipelines.MyPipeline</mainClass>

171

</transformer>

172

</transformers>

173

</configuration>

174

</execution>

175

</executions>

176

</plugin>

177

```

178

179

#### Dataflow-Specific Options

180

181

When using DataflowRunner with Java pipelines:

182

183

```python

184

dataflow_config = {

185

'job_name': 'java-pipeline-job',

186

'project_id': 'gcp-project-id',

187

'location': 'us-central1',

188

'check_if_running': 'WaitForRun', # Wait for existing jobs to complete

189

'multiple_jobs': False, # Allow only one job instance

190

'wait_until_finished': True,

191

'cancel_timeout': 600, # 10 minutes to cancel

192

}

193

```

194

195

### Error Handling

196

197

The operator handles Java-specific error conditions:

198

199

- **JAR File Access**: Validates JAR file accessibility and downloads from GCS

200

- **Class Loading**: Handles job class loading and main class resolution

201

- **JVM Configuration**: Manages Java runtime environment and memory settings

202

- **Dataflow Integration**: Handles Dataflow job submission and monitoring

203

- **Job Lifecycle**: Manages job cancellation and cleanup

204

205

### Boolean Options Handling

206

207

Java pipelines have specific boolean option handling:

208

209

```python

210

pipeline_options = {

211

'--usePublicIps': 'false', # Explicit false value for Java

212

'--enableStreamingEngine': 'true', # Explicit true value

213

'--enableDataflowServiceOptions': 'true',

214

}

215

```

216

217

### Templating Support

218

219

The operator supports Airflow templating for dynamic Java pipeline configuration:

220

221

```python

222

run_templated_java_pipeline = BeamRunJavaPipelineOperator(

223

task_id='run_templated_java_pipeline',

224

jar='gs://{{ var.value.jar_bucket }}/pipeline-{{ ds_nodash }}.jar',

225

job_class='{{ var.value.pipeline_class }}',

226

runner='{{ var.value.runner }}',

227

pipeline_options={

228

'--project': '{{ var.value.gcp_project }}',

229

'--inputTable': '{{ var.value.project }}:{{ var.value.dataset }}.input_{{ ds_nodash }}',

230

'--outputTable': '{{ var.value.project }}:{{ var.value.dataset }}.output_{{ ds_nodash }}',

231

'--jobName': 'java-pipeline-{{ ds_nodash }}-{{ ts_nodash }}',

232

},

233

)