or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

app-decorators.mdconfiguration.mddata-management.mdexecutors.mdindex.mdlaunchers.mdmonitoring.mdproviders.mdworkflow-management.md

app-decorators.mddocs/

0

# App Decorators

1

2

Parsl's app decorators transform ordinary Python functions into parallel apps that can be executed across distributed computing resources. These decorators handle dependency management, data flow, and asynchronous execution automatically.

3

4

## Capabilities

5

6

### Python App Decorator

7

8

Converts Python functions into parallel apps that execute on remote workers with automatic dependency tracking and data management.

9

10

```python { .api }

11

def python_app(function=None, data_flow_kernel=None, cache=False,

12

executors='all', ignore_for_cache=None):

13

"""

14

Decorator for making Python functions into parallel apps.

15

16

Parameters:

17

- function: The function to decorate (automatically provided)

18

- data_flow_kernel: DataFlowKernel instance (optional, uses current DFK)

19

- cache: Enable caching of app results (default: False)

20

- executors: List of executor labels or 'all' (default: 'all')

21

- ignore_for_cache: Parameter names to ignore for caching

22

23

Returns:

24

Decorated function that returns AppFuture when called

25

"""

26

```

27

28

**Usage Example:**

29

30

```python

31

@python_app

32

def process_data(data, multiplier=2):

33

"""Process data by multiplying each element."""

34

return [x * multiplier for x in data]

35

36

@python_app(executors=['compute_nodes'], cache=True)

37

def expensive_computation(n):

38

"""CPU-intensive computation with caching enabled."""

39

result = 0

40

for i in range(n):

41

result += i ** 2

42

return result

43

44

# Execute and get futures

45

future1 = process_data([1, 2, 3, 4])

46

future2 = expensive_computation(1000000)

47

48

# Get results (blocks until complete)

49

processed = future1.result() # [2, 4, 6, 8]

50

computed = future2.result() # computed value

51

```

52

53

### Bash App Decorator

54

55

Converts functions that return bash command strings into parallel apps that execute shell commands on remote workers.

56

57

```python { .api }

58

def bash_app(function=None, data_flow_kernel=None, cache=False,

59

executors='all', ignore_for_cache=None):

60

"""

61

Decorator for making bash command functions into parallel apps.

62

63

Parameters:

64

- function: The function to decorate (returns bash command string)

65

- data_flow_kernel: DataFlowKernel instance (optional, uses current DFK)

66

- cache: Enable caching of app results (default: False)

67

- executors: List of executor labels or 'all' (default: 'all')

68

- ignore_for_cache: Parameter names to ignore for caching

69

70

Returns:

71

Decorated function that returns AppFuture when called

72

"""

73

```

74

75

**Usage Example:**

76

77

```python

78

from parsl.data_provider.files import File

79

80

@bash_app

81

def process_file(input_file, output_file, inputs=[], outputs=[]):

82

"""Process a file using bash commands."""

83

return f'sort {inputs[0]} > {outputs[0]}'

84

85

@bash_app

86

def compile_code(source_file, executable, inputs=[], outputs=[]):

87

"""Compile source code."""

88

return f'gcc {inputs[0]} -o {outputs[0]}'

89

90

# Execute with file dependencies

91

input_file = File('data.txt')

92

output_file = File('sorted_data.txt')

93

94

future = process_file(

95

input_file, output_file,

96

inputs=[input_file],

97

outputs=[output_file]

98

)

99

100

# Wait for completion

101

future.result() # Returns exit code

102

```

103

104

### Join App Decorator

105

106

Special decorator for join operations that need to run on the submit-side rather than remote workers, typically used for aggregating results from multiple parallel tasks. Join apps can return Future objects that are awaited without blocking workers, preventing deadlocks.

107

108

```python { .api }

109

def join_app(function=None, data_flow_kernel=None, cache=False,

110

ignore_for_cache=None):

111

"""

112

Decorator for join apps that run on submit-side internal executor.

113

114

Parameters:

115

- function: The function to decorate (automatically provided)

116

- data_flow_kernel: DataFlowKernel instance (optional, uses current DFK)

117

- cache: Enable caching of app results (default: False)

118

- ignore_for_cache: Parameter names to ignore for caching

119

120

Returns:

121

Decorated function that returns AppFuture when called

122

123

Note: Always executes on "_parsl_internal" executor

124

"""

125

```

126

127

**Relationship to python_app:**

128

- `join_app` is functionally equivalent to `python_app(join=True, executors=["_parsl_internal"])`

129

- However, the `python_app` decorator currently does not expose the `join` parameter

130

- Use `@join_app` for apps that need to wait for Futures without blocking workers

131

132

**Usage Example:**

133

134

```python

135

@python_app

136

def generate_data(size):

137

"""Generate data chunk."""

138

return list(range(size))

139

140

@join_app

141

def aggregate_results(futures_list):

142

"""Aggregate results from multiple parallel tasks."""

143

results = []

144

for future in futures_list:

145

results.extend(future.result())

146

return sorted(results)

147

148

# Create multiple parallel tasks

149

futures = [generate_data(100) for _ in range(5)]

150

151

# Aggregate on submit-side

152

aggregated = aggregate_results(futures)

153

final_result = aggregated.result()

154

```

155

156

## App Function Parameters

157

158

App-decorated functions can accept special parameters that control execution behavior:

159

160

### Execution Control Parameters

161

162

```python { .api }

163

# These parameters are automatically handled by Parsl:

164

# - inputs: List of input File objects

165

# - outputs: List of output File objects

166

# - stdout: File object or string for stdout redirection

167

# - stderr: File object or string for stderr redirection

168

# - walltime: Maximum execution time in seconds

169

# - parsl_resource_specification: Resource requirements dict

170

```

171

172

**Example with execution control:**

173

174

```python

175

@bash_app

176

def long_running_task(input_data, outputs=[], stdout=None, stderr=None, walltime=3600):

177

"""Long-running task with resource control."""

178

return f'process_data {input_data} > {outputs[0]}'

179

180

@python_app

181

def cpu_intensive_task(data, parsl_resource_specification={}):

182

"""Task with specific resource requirements."""

183

# parsl_resource_specification can specify cores, memory, etc.

184

return sum(x**2 for x in data)

185

186

# Execute with resource control

187

future = long_running_task(

188

'input.dat',

189

outputs=[File('output.dat')],

190

stdout='task.out',

191

stderr='task.err',

192

walltime=1800, # 30 minutes

193

parsl_resource_specification={'cores': 4, 'memory': '8GB'}

194

)

195

```

196

197

## App Execution Flow

198

199

1. **Decoration**: Function is wrapped with app decorator

200

2. **Invocation**: Decorated function is called with arguments

201

3. **Future Creation**: Returns AppFuture immediately (non-blocking)

202

4. **Dependency Resolution**: Parsl analyzes input dependencies

203

5. **Task Submission**: Task is submitted to appropriate executor

204

6. **Execution**: Task runs on remote worker or local thread

205

7. **Result Retrieval**: `.result()` blocks until completion and returns value

206

207

## Dependency Management

208

209

Apps automatically handle dependencies through Future objects:

210

211

```python

212

@python_app

213

def step1():

214

return "step1_result"

215

216

@python_app

217

def step2(input_data):

218

return f"step2_with_{input_data}"

219

220

@python_app

221

def step3(data1, data2):

222

return f"final_{data1}_{data2}"

223

224

# Create dependency chain

225

future1 = step1()

226

future2 = step2(future1) # Waits for future1

227

future3 = step3(future1, future2) # Waits for both

228

229

result = future3.result() # "final_step1_result_step2_with_step1_result"

230

```