or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

artifact-resource-management.mdconnection-data-flow.mdindex.mdlegacy-version-support.mdpipeline-configuration.mdpipeline-triggering.mdstage-plugin-management.md

pipeline-triggering.mddocs/

0

# Pipeline Triggering and Property Mapping

1

2

Advanced pipeline triggering capabilities enabling property mapping between triggering and triggered pipelines. Supports both argument mapping and plugin property mapping for sophisticated pipeline orchestration and parameter passing.

3

4

## Capabilities

5

6

### Triggering Property Mapping

7

8

Container for managing property mappings between triggering and triggered pipelines, enabling complex parameter passing scenarios.

9

10

```java { .api }

11

/**

12

* Container for property mappings between triggering and triggered pipelines

13

*/

14

public class TriggeringPropertyMapping {

15

/**

16

* Create empty property mapping (no mappings)

17

*/

18

public TriggeringPropertyMapping();

19

20

/**

21

* Create property mapping with argument and plugin property mappings

22

* @param arguments list of argument mappings between pipelines

23

* @param pluginProperties list of plugin property mappings

24

*/

25

public TriggeringPropertyMapping(List<ArgumentMapping> arguments, List<PluginPropertyMapping> pluginProperties);

26

27

/**

28

* Get argument mappings between triggering and triggered pipeline arguments

29

* @return immutable list of argument mappings

30

*/

31

public List<ArgumentMapping> getArguments();

32

33

/**

34

* Get plugin property mappings from triggering pipeline to triggered pipeline arguments

35

* @return immutable list of plugin property mappings

36

*/

37

public List<PluginPropertyMapping> getPluginProperties();

38

}

39

```

40

41

**Usage Examples:**

42

43

```java

44

import co.cask.cdap.etl.proto.v2.*;

45

46

// Simple argument mapping between pipelines

47

List<ArgumentMapping> argumentMappings = List.of(

48

new ArgumentMapping("source_table", "input_table"),

49

new ArgumentMapping("processing_date", "batch_date"),

50

new ArgumentMapping("output_format", "target_format")

51

);

52

53

// Plugin property mapping from triggering pipeline to triggered arguments

54

List<PluginPropertyMapping> pluginPropertyMappings = List.of(

55

new PluginPropertyMapping("file_source", "path", "input_path"),

56

new PluginPropertyMapping("database_sink", "tableName", "target_table"),

57

new PluginPropertyMapping("validator", "threshold", "validation_threshold")

58

);

59

60

// Create comprehensive property mapping

61

TriggeringPropertyMapping propertyMapping = new TriggeringPropertyMapping(

62

argumentMappings,

63

pluginPropertyMappings

64

);

65

66

// Use in pipeline triggering configuration

67

System.out.println("Argument mappings: " + propertyMapping.getArguments().size());

68

System.out.println("Plugin property mappings: " + propertyMapping.getPluginProperties().size());

69

```

70

71

### Argument Mapping

72

73

Direct mapping between triggering pipeline arguments and triggered pipeline arguments.

74

75

```java { .api }

76

/**

77

* Mapping between triggering pipeline argument and triggered pipeline argument

78

*/

79

public class ArgumentMapping {

80

/**

81

* Create argument mapping between source and target arguments

82

* @param source name of triggering pipeline argument, may be null

83

* @param target name of triggered pipeline argument, may be null

84

*/

85

public ArgumentMapping(String source, String target);

86

87

/**

88

* Get source argument name from triggering pipeline

89

* @return triggering pipeline argument name, may be null

90

*/

91

public String getSource();

92

93

/**

94

* Get target argument name for triggered pipeline

95

* @return triggered pipeline argument name, may be null

96

*/

97

public String getTarget();

98

}

99

```

100

101

**Usage Examples:**

102

103

```java

104

import co.cask.cdap.etl.proto.v2.ArgumentMapping;

105

106

// Direct argument mappings for common parameters

107

ArgumentMapping tableMapping = new ArgumentMapping("input_table", "source_table");

108

ArgumentMapping dateMapping = new ArgumentMapping("process_date", "batch_date");

109

ArgumentMapping formatMapping = new ArgumentMapping("output_format", "sink_format");

110

111

// Complex mapping scenarios

112

List<ArgumentMapping> complexMappings = List.of(

113

// Map single source to multiple targets (handled at pipeline level)

114

new ArgumentMapping("master_config", "worker_config_1"),

115

new ArgumentMapping("master_config", "worker_config_2"),

116

117

// Environment-specific mappings

118

new ArgumentMapping("prod_database_url", "db_connection_string"),

119

new ArgumentMapping("prod_api_key", "external_service_key"),

120

121

// Data lineage tracking

122

new ArgumentMapping("upstream_batch_id", "source_batch_id"),

123

new ArgumentMapping("pipeline_run_id", "parent_run_id")

124

);

125

126

// Conditional argument mapping based on triggering context

127

String sourceArg = "dynamic_source";

128

String targetArg = System.getenv("ENVIRONMENT").equals("prod") ? "prod_source" : "dev_source";

129

ArgumentMapping conditionalMapping = new ArgumentMapping(sourceArg, targetArg);

130

131

// Validation example

132

for (ArgumentMapping mapping : complexMappings) {

133

if (mapping.getSource() == null || mapping.getTarget() == null) {

134

System.out.println("Warning: Incomplete mapping - " + mapping);

135

}

136

}

137

```

138

139

### Plugin Property Mapping

140

141

Mapping between triggering pipeline plugin properties and triggered pipeline arguments, enabling complex parameter extraction from stage configurations.

142

143

```java { .api }

144

/**

145

* Mapping between triggering pipeline plugin property and triggered pipeline argument

146

*/

147

public class PluginPropertyMapping extends ArgumentMapping {

148

/**

149

* Create plugin property mapping

150

* @param stageName name of the stage in triggering pipeline containing the plugin property

151

* @param source name of the plugin property in the specified stage

152

* @param target name of the triggered pipeline argument to receive the property value

153

*/

154

public PluginPropertyMapping(String stageName, String source, String target);

155

156

/**

157

* Get stage name containing the plugin property

158

* @return stage name in triggering pipeline, may be null

159

*/

160

public String getStageName();

161

162

// Inherits getSource() and getTarget() from ArgumentMapping

163

}

164

```

165

166

**Usage Examples:**

167

168

```java

169

import co.cask.cdap.etl.proto.v2.PluginPropertyMapping;

170

171

// Extract file paths from source stages

172

PluginPropertyMapping filePathMapping = new PluginPropertyMapping(

173

"file_reader", // stage name

174

"path", // plugin property

175

"input_file_path" // target argument

176

);

177

178

// Extract database connection details

179

PluginPropertyMapping dbConnectionMapping = new PluginPropertyMapping(

180

"database_source",

181

"connectionString",

182

"downstream_db_url"

183

);

184

185

// Extract processing parameters

186

PluginPropertyMapping thresholdMapping = new PluginPropertyMapping(

187

"data_validator",

188

"validation_threshold",

189

"quality_threshold"

190

);

191

192

// Complex plugin property extraction

193

List<PluginPropertyMapping> extractionMappings = List.of(

194

// Extract source configuration

195

new PluginPropertyMapping("kafka_source", "brokers", "kafka_brokers"),

196

new PluginPropertyMapping("kafka_source", "topic", "source_topic"),

197

198

// Extract transformation parameters

199

new PluginPropertyMapping("aggregator", "window_size", "agg_window"),

200

new PluginPropertyMapping("aggregator", "grouping_fields", "group_by_fields"),

201

202

// Extract sink configuration

203

new PluginPropertyMapping("table_sink", "name", "output_table"),

204

new PluginPropertyMapping("table_sink", "schema.row.field", "key_field")

205

);

206

207

// Validation and usage

208

for (PluginPropertyMapping mapping : extractionMappings) {

209

System.out.printf("Stage: %s, Property: %s -> Argument: %s%n",

210

mapping.getStageName(), mapping.getSource(), mapping.getTarget());

211

}

212

```

213

214

### Pipeline Orchestration Patterns

215

216

Common patterns for orchestrating multiple pipelines with property mapping and triggering.

217

218

**Sequential Pipeline Chain:**

219

220

```java

221

// Master pipeline triggers data processing pipeline

222

TriggeringPropertyMapping masterToProcessor = new TriggeringPropertyMapping(

223

List.of(

224

new ArgumentMapping("data_date", "process_date"),

225

new ArgumentMapping("source_system", "input_system")

226

),

227

List.of(

228

new PluginPropertyMapping("file_source", "directory", "input_directory"),

229

new PluginPropertyMapping("file_source", "format", "file_format")

230

)

231

);

232

233

// Processing pipeline triggers aggregation pipeline

234

TriggeringPropertyMapping processorToAggregator = new TriggeringPropertyMapping(

235

List.of(

236

new ArgumentMapping("process_date", "aggregation_date"),

237

new ArgumentMapping("processed_records", "input_count")

238

),

239

List.of(

240

new PluginPropertyMapping("output_table", "name", "source_table"),

241

new PluginPropertyMapping("quality_checker", "pass_rate", "quality_threshold")

242

)

243

);

244

```

245

246

**Fan-out Pipeline Pattern:**

247

248

```java

249

// Single trigger pipeline spawning multiple specialized processors

250

TriggeringPropertyMapping fanOutMapping = new TriggeringPropertyMapping(

251

List.of(

252

// Common arguments for all triggered pipelines

253

new ArgumentMapping("master_batch_id", "parent_batch_id"),

254

new ArgumentMapping("processing_timestamp", "start_time")

255

),

256

List.of(

257

// Extract different aspects for different pipelines

258

new PluginPropertyMapping("data_splitter", "customer_output_path", "customer_data_path"),

259

new PluginPropertyMapping("data_splitter", "order_output_path", "order_data_path"),

260

new PluginPropertyMapping("data_splitter", "product_output_path", "product_data_path")

261

)

262

);

263

```

264

265

**Dynamic Configuration Pattern:**

266

267

```java

268

// Environment-aware property mapping

269

TriggeringPropertyMapping dynamicMapping = new TriggeringPropertyMapping(

270

List.of(

271

new ArgumentMapping("environment", "target_env"),

272

new ArgumentMapping("scaling_factor", "parallelism_level")

273

),

274

List.of(

275

// Extract environment-specific configurations

276

new PluginPropertyMapping("env_config", "database_url", "target_database"),

277

new PluginPropertyMapping("env_config", "api_endpoint", "service_url"),

278

new PluginPropertyMapping("resource_manager", "memory_allocation", "worker_memory"),

279

new PluginPropertyMapping("resource_manager", "cpu_cores", "worker_cores")

280

)

281

);

282

```

283

284

### Advanced Triggering Scenarios

285

286

Complex triggering scenarios with conditional mappings and error handling.

287

288

```java

289

// Conditional triggering based on data quality

290

TriggeringPropertyMapping qualityBasedTriggering = new TriggeringPropertyMapping(

291

List.of(

292

new ArgumentMapping("data_quality_score", "input_quality"),

293

new ArgumentMapping("record_count", "input_volume")

294

),

295

List.of(

296

new PluginPropertyMapping("quality_validator", "pass_threshold", "minimum_quality"),

297

new PluginPropertyMapping("quality_validator", "error_rate", "max_error_rate"),

298

new PluginPropertyMapping("record_counter", "total_records", "expected_count")

299

)

300

);

301

302

// Error recovery triggering

303

TriggeringPropertyMapping errorRecoveryMapping = new TriggeringPropertyMapping(

304

List.of(

305

new ArgumentMapping("failed_batch_id", "recovery_batch_id"),

306

new ArgumentMapping("error_timestamp", "failure_time"),

307

new ArgumentMapping("retry_attempt", "attempt_number")

308

),

309

List.of(

310

new PluginPropertyMapping("error_analyzer", "failure_reason", "error_category"),

311

new PluginPropertyMapping("error_analyzer", "affected_records", "recovery_scope"),

312

new PluginPropertyMapping("checkpoint_manager", "last_success_point", "resume_from")

313

)

314

);

315

316

// Multi-tenant triggering

317

TriggeringPropertyMapping multiTenantMapping = new TriggeringPropertyMapping(

318

List.of(

319

new ArgumentMapping("tenant_id", "target_tenant"),

320

new ArgumentMapping("tenant_config", "processing_rules")

321

),

322

List.of(

323

new PluginPropertyMapping("tenant_resolver", "database_schema", "tenant_schema"),

324

new PluginPropertyMapping("tenant_resolver", "resource_limits", "tenant_quotas"),

325

new PluginPropertyMapping("security_manager", "access_token", "tenant_credentials")

326

)

327

);

328

```