or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

go-pipelines.mdhooks-monitoring.mdindex.mdjava-pipelines.mdpython-pipelines.mdtriggers.md

python-pipelines.mddocs/

0

# Python Pipeline Execution

1

2

Execute Apache Beam pipelines written in Python with comprehensive support for virtual environments, custom package requirements, multiple runners, and cloud integration. The Python pipeline operator provides the most flexible execution environment with automatic dependency management.

3

4

## Capabilities

5

6

### BeamRunPythonPipelineOperator

7

8

Launch Apache Beam pipelines written in Python with support for custom environments, package requirements, and various execution runners.

9

10

```python { .api }

11

class BeamRunPythonPipelineOperator(BeamBasePipelineOperator):

12

def __init__(

13

self,

14

*,

15

py_file: str,

16

runner: str = "DirectRunner",

17

default_pipeline_options: dict | None = None,

18

pipeline_options: dict | None = None,

19

py_interpreter: str = "python3",

20

py_options: list[str] | None = None,

21

py_requirements: list[str] | None = None,

22

py_system_site_packages: bool = False,

23

gcp_conn_id: str = "google_cloud_default",

24

dataflow_config: DataflowConfiguration | dict | None = None,

25

deferrable: bool = False,

26

**kwargs,

27

) -> None:

28

"""

29

Initialize Python pipeline operator.

30

31

Parameters:

32

- py_file (str): Path to Python pipeline file, supports local paths and gs:// URLs

33

- runner (str): Runner type (DirectRunner, DataflowRunner, SparkRunner, etc.)

34

- default_pipeline_options (dict): High-level pipeline options applied to all tasks

35

- pipeline_options (dict): Task-specific pipeline options that override defaults

36

- py_interpreter (str): Python interpreter version, defaults to "python3"

37

- py_options (list[str]): Additional Python command-line options

38

- py_requirements (list[str]): Python packages to install in virtual environment

39

- py_system_site_packages (bool): Include system packages in virtual environment

40

- gcp_conn_id (str): Google Cloud connection ID for GCS and Dataflow access

41

- dataflow_config (DataflowConfiguration|dict): Dataflow-specific configuration

42

- deferrable (bool): Enable deferrable execution mode

43

"""

44

45

def execute(self, context: Context):

46

"""Execute the Apache Beam Python Pipeline."""

47

48

def execute_on_dataflow(self, context: Context):

49

"""Execute the Apache Beam Pipeline on Dataflow runner."""

50

51

def on_kill(self) -> None:

52

"""Cancel the pipeline when task is killed."""

53

```

54

55

### Usage Examples

56

57

#### Basic Local Execution

58

59

```python

60

from airflow.providers.apache.beam.operators.beam import BeamRunPythonPipelineOperator

61

62

run_local_pipeline = BeamRunPythonPipelineOperator(

63

task_id='run_local_beam_pipeline',

64

py_file='/path/to/pipeline.py',

65

runner='DirectRunner',

66

pipeline_options={

67

'output': '/tmp/output',

68

'temp_location': '/tmp/beam_temp',

69

},

70

)

71

```

72

73

#### Cloud Dataflow Execution

74

75

```python

76

run_dataflow_pipeline = BeamRunPythonPipelineOperator(

77

task_id='run_dataflow_pipeline',

78

py_file='gs://my-bucket/pipeline.py',

79

runner='DataflowRunner',

80

pipeline_options={

81

'project': 'my-gcp-project',

82

'region': 'us-central1',

83

'temp_location': 'gs://my-bucket/temp',

84

'staging_location': 'gs://my-bucket/staging',

85

'num_workers': 4,

86

'max_num_workers': 10,

87

'machine_type': 'n1-standard-4',

88

},

89

dataflow_config={

90

'job_name': 'my-dataflow-job',

91

'project_id': 'my-gcp-project',

92

'location': 'us-central1',

93

'wait_until_finished': True,

94

},

95

)

96

```

97

98

#### Custom Python Environment

99

100

```python

101

run_pipeline_custom_env = BeamRunPythonPipelineOperator(

102

task_id='run_pipeline_custom_env',

103

py_file='/path/to/pipeline.py',

104

py_requirements=[

105

'apache-beam[gcp]==2.50.0',

106

'pandas==2.0.0',

107

'numpy==1.24.0',

108

],

109

py_system_site_packages=True,

110

py_interpreter='python3.10',

111

py_options=['-u', '-W', 'ignore'],

112

runner='DirectRunner',

113

)

114

```

115

116

#### Deferrable Execution

117

118

```python

119

run_deferrable_pipeline = BeamRunPythonPipelineOperator(

120

task_id='run_deferrable_pipeline',

121

py_file='gs://my-bucket/long_pipeline.py',

122

runner='DataflowRunner',

123

deferrable=True, # Enable deferrable mode

124

pipeline_options={

125

'project': 'my-gcp-project',

126

'region': 'us-central1',

127

'temp_location': 'gs://my-bucket/temp',

128

},

129

dataflow_config={

130

'job_name': 'long-running-job',

131

'wait_until_finished': True,

132

},

133

)

134

```

135

136

### Configuration Options

137

138

#### Pipeline Options

139

140

Pipeline options control the behavior and configuration of the Beam pipeline execution:

141

142

- **Basic Options**: `runner`, `project`, `region`, `temp_location`, `staging_location`

143

- **Scaling Options**: `num_workers`, `max_num_workers`, `machine_type`, `disk_size_gb`

144

- **Processing Options**: `streaming`, `windowing`, `max_bundle_size`

145

- **Output Options**: Various output sinks and formatting options

146

147

#### Dataflow Configuration

148

149

When using DataflowRunner, additional configuration options are available:

150

151

```python

152

dataflow_config = {

153

'job_name': 'unique-job-name',

154

'project_id': 'gcp-project-id',

155

'location': 'us-central1',

156

'gcp_conn_id': 'google_cloud_default',

157

'wait_until_finished': True,

158

'poll_sleep': 10,

159

'cancel_timeout': 300,

160

'drain_pipeline': False,

161

'service_account': 'pipeline-service-account@project.iam.gserviceaccount.com',

162

'impersonation_chain': ['service-account@project.iam.gserviceaccount.com'],

163

}

164

```

165

166

### Error Handling

167

168

The operator handles various error conditions:

169

170

- **File Not Found**: Validates pipeline file existence before execution

171

- **Environment Setup**: Manages virtual environment creation and package installation

172

- **Pipeline Failures**: Captures and reports Beam pipeline execution errors

173

- **Dataflow Errors**: Handles Dataflow-specific errors and job cancellation

174

- **Resource Cleanup**: Ensures temporary files and resources are cleaned up

175

176

### Templating Support

177

178

The operator supports Airflow templating for dynamic configuration:

179

180

```python

181

run_templated_pipeline = BeamRunPythonPipelineOperator(

182

task_id='run_templated_pipeline',

183

py_file='gs://{{ var.value.bucket }}/pipeline.py',

184

runner='{{ var.value.runner }}',

185

pipeline_options={

186

'project': '{{ var.value.gcp_project }}',

187

'input': 'gs://{{ var.value.input_bucket }}/{{ ds }}/*',

188

'output': 'gs://{{ var.value.output_bucket }}/{{ ds }}/',

189

},

190

)

191

```