0
# Pipeline Triggering and Property Mapping
1
2
Advanced pipeline triggering capabilities enabling property mapping between triggering and triggered pipelines. Supports both argument mapping and plugin property mapping for sophisticated pipeline orchestration and parameter passing.
3
4
## Capabilities
5
6
### Triggering Property Mapping
7
8
Container for managing property mappings between triggering and triggered pipelines, enabling complex parameter passing scenarios.
9
10
```java { .api }
11
/**
12
* Container for property mappings between triggering and triggered pipelines
13
*/
14
public class TriggeringPropertyMapping {
15
/**
16
* Create empty property mapping (no mappings)
17
*/
18
public TriggeringPropertyMapping();
19
20
/**
21
* Create property mapping with argument and plugin property mappings
22
* @param arguments list of argument mappings between pipelines
23
* @param pluginProperties list of plugin property mappings
24
*/
25
public TriggeringPropertyMapping(List<ArgumentMapping> arguments, List<PluginPropertyMapping> pluginProperties);
26
27
/**
28
* Get argument mappings between triggering and triggered pipeline arguments
29
* @return immutable list of argument mappings
30
*/
31
public List<ArgumentMapping> getArguments();
32
33
/**
34
* Get plugin property mappings from triggering pipeline to triggered pipeline arguments
35
* @return immutable list of plugin property mappings
36
*/
37
public List<PluginPropertyMapping> getPluginProperties();
38
}
39
```
40
41
**Usage Examples:**
42
43
```java
44
import co.cask.cdap.etl.proto.v2.*;
45
46
// Simple argument mapping between pipelines
47
List<ArgumentMapping> argumentMappings = List.of(
48
new ArgumentMapping("source_table", "input_table"),
49
new ArgumentMapping("processing_date", "batch_date"),
50
new ArgumentMapping("output_format", "target_format")
51
);
52
53
// Plugin property mapping from triggering pipeline to triggered arguments
54
List<PluginPropertyMapping> pluginPropertyMappings = List.of(
55
new PluginPropertyMapping("file_source", "path", "input_path"),
56
new PluginPropertyMapping("database_sink", "tableName", "target_table"),
57
new PluginPropertyMapping("validator", "threshold", "validation_threshold")
58
);
59
60
// Create comprehensive property mapping
61
TriggeringPropertyMapping propertyMapping = new TriggeringPropertyMapping(
62
argumentMappings,
63
pluginPropertyMappings
64
);
65
66
// Use in pipeline triggering configuration
67
System.out.println("Argument mappings: " + propertyMapping.getArguments().size());
68
System.out.println("Plugin property mappings: " + propertyMapping.getPluginProperties().size());
69
```
70
71
### Argument Mapping
72
73
Direct mapping between triggering pipeline arguments and triggered pipeline arguments.
74
75
```java { .api }
76
/**
77
* Mapping between triggering pipeline argument and triggered pipeline argument
78
*/
79
public class ArgumentMapping {
80
/**
81
* Create argument mapping between source and target arguments
82
* @param source name of triggering pipeline argument, may be null
83
* @param target name of triggered pipeline argument, may be null
84
*/
85
public ArgumentMapping(String source, String target);
86
87
/**
88
* Get source argument name from triggering pipeline
89
* @return triggering pipeline argument name, may be null
90
*/
91
public String getSource();
92
93
/**
94
* Get target argument name for triggered pipeline
95
* @return triggered pipeline argument name, may be null
96
*/
97
public String getTarget();
98
}
99
```
100
101
**Usage Examples:**
102
103
```java
104
import co.cask.cdap.etl.proto.v2.ArgumentMapping;
105
106
// Direct argument mappings for common parameters
107
ArgumentMapping tableMapping = new ArgumentMapping("input_table", "source_table");
108
ArgumentMapping dateMapping = new ArgumentMapping("process_date", "batch_date");
109
ArgumentMapping formatMapping = new ArgumentMapping("output_format", "sink_format");
110
111
// Complex mapping scenarios
112
List<ArgumentMapping> complexMappings = List.of(
113
// Map single source to multiple targets (handled at pipeline level)
114
new ArgumentMapping("master_config", "worker_config_1"),
115
new ArgumentMapping("master_config", "worker_config_2"),
116
117
// Environment-specific mappings
118
new ArgumentMapping("prod_database_url", "db_connection_string"),
119
new ArgumentMapping("prod_api_key", "external_service_key"),
120
121
// Data lineage tracking
122
new ArgumentMapping("upstream_batch_id", "source_batch_id"),
123
new ArgumentMapping("pipeline_run_id", "parent_run_id")
124
);
125
126
// Conditional argument mapping based on triggering context
127
String sourceArg = "dynamic_source";
128
String targetArg = System.getenv("ENVIRONMENT").equals("prod") ? "prod_source" : "dev_source";
129
ArgumentMapping conditionalMapping = new ArgumentMapping(sourceArg, targetArg);
130
131
// Validation example
132
for (ArgumentMapping mapping : complexMappings) {
133
if (mapping.getSource() == null || mapping.getTarget() == null) {
134
System.out.println("Warning: Incomplete mapping - " + mapping);
135
}
136
}
137
```
138
139
### Plugin Property Mapping
140
141
Mapping between triggering pipeline plugin properties and triggered pipeline arguments, enabling complex parameter extraction from stage configurations.
142
143
```java { .api }
144
/**
145
* Mapping between triggering pipeline plugin property and triggered pipeline argument
146
*/
147
public class PluginPropertyMapping extends ArgumentMapping {
148
/**
149
* Create plugin property mapping
150
* @param stageName name of the stage in triggering pipeline containing the plugin property
151
* @param source name of the plugin property in the specified stage
152
* @param target name of the triggered pipeline argument to receive the property value
153
*/
154
public PluginPropertyMapping(String stageName, String source, String target);
155
156
/**
157
* Get stage name containing the plugin property
158
* @return stage name in triggering pipeline, may be null
159
*/
160
public String getStageName();
161
162
// Inherits getSource() and getTarget() from ArgumentMapping
163
}
164
```
165
166
**Usage Examples:**
167
168
```java
169
import co.cask.cdap.etl.proto.v2.PluginPropertyMapping;
170
171
// Extract file paths from source stages
172
PluginPropertyMapping filePathMapping = new PluginPropertyMapping(
173
"file_reader", // stage name
174
"path", // plugin property
175
"input_file_path" // target argument
176
);
177
178
// Extract database connection details
179
PluginPropertyMapping dbConnectionMapping = new PluginPropertyMapping(
180
"database_source",
181
"connectionString",
182
"downstream_db_url"
183
);
184
185
// Extract processing parameters
186
PluginPropertyMapping thresholdMapping = new PluginPropertyMapping(
187
"data_validator",
188
"validation_threshold",
189
"quality_threshold"
190
);
191
192
// Complex plugin property extraction
193
List<PluginPropertyMapping> extractionMappings = List.of(
194
// Extract source configuration
195
new PluginPropertyMapping("kafka_source", "brokers", "kafka_brokers"),
196
new PluginPropertyMapping("kafka_source", "topic", "source_topic"),
197
198
// Extract transformation parameters
199
new PluginPropertyMapping("aggregator", "window_size", "agg_window"),
200
new PluginPropertyMapping("aggregator", "grouping_fields", "group_by_fields"),
201
202
// Extract sink configuration
203
new PluginPropertyMapping("table_sink", "name", "output_table"),
204
new PluginPropertyMapping("table_sink", "schema.row.field", "key_field")
205
);
206
207
// Validation and usage
208
for (PluginPropertyMapping mapping : extractionMappings) {
209
System.out.printf("Stage: %s, Property: %s -> Argument: %s%n",
210
mapping.getStageName(), mapping.getSource(), mapping.getTarget());
211
}
212
```
213
214
### Pipeline Orchestration Patterns
215
216
Common patterns for orchestrating multiple pipelines with property mapping and triggering.
217
218
**Sequential Pipeline Chain:**
219
220
```java
221
// Master pipeline triggers data processing pipeline
222
TriggeringPropertyMapping masterToProcessor = new TriggeringPropertyMapping(
223
List.of(
224
new ArgumentMapping("data_date", "process_date"),
225
new ArgumentMapping("source_system", "input_system")
226
),
227
List.of(
228
new PluginPropertyMapping("file_source", "directory", "input_directory"),
229
new PluginPropertyMapping("file_source", "format", "file_format")
230
)
231
);
232
233
// Processing pipeline triggers aggregation pipeline
234
TriggeringPropertyMapping processorToAggregator = new TriggeringPropertyMapping(
235
List.of(
236
new ArgumentMapping("process_date", "aggregation_date"),
237
new ArgumentMapping("processed_records", "input_count")
238
),
239
List.of(
240
new PluginPropertyMapping("output_table", "name", "source_table"),
241
new PluginPropertyMapping("quality_checker", "pass_rate", "quality_threshold")
242
)
243
);
244
```
245
246
**Fan-out Pipeline Pattern:**
247
248
```java
249
// Single trigger pipeline spawning multiple specialized processors
250
TriggeringPropertyMapping fanOutMapping = new TriggeringPropertyMapping(
251
List.of(
252
// Common arguments for all triggered pipelines
253
new ArgumentMapping("master_batch_id", "parent_batch_id"),
254
new ArgumentMapping("processing_timestamp", "start_time")
255
),
256
List.of(
257
// Extract different aspects for different pipelines
258
new PluginPropertyMapping("data_splitter", "customer_output_path", "customer_data_path"),
259
new PluginPropertyMapping("data_splitter", "order_output_path", "order_data_path"),
260
new PluginPropertyMapping("data_splitter", "product_output_path", "product_data_path")
261
)
262
);
263
```
264
265
**Dynamic Configuration Pattern:**
266
267
```java
268
// Environment-aware property mapping
269
TriggeringPropertyMapping dynamicMapping = new TriggeringPropertyMapping(
270
List.of(
271
new ArgumentMapping("environment", "target_env"),
272
new ArgumentMapping("scaling_factor", "parallelism_level")
273
),
274
List.of(
275
// Extract environment-specific configurations
276
new PluginPropertyMapping("env_config", "database_url", "target_database"),
277
new PluginPropertyMapping("env_config", "api_endpoint", "service_url"),
278
new PluginPropertyMapping("resource_manager", "memory_allocation", "worker_memory"),
279
new PluginPropertyMapping("resource_manager", "cpu_cores", "worker_cores")
280
)
281
);
282
```
283
284
### Advanced Triggering Scenarios
285
286
Complex triggering scenarios with conditional mappings and error handling.
287
288
```java
289
// Conditional triggering based on data quality
290
TriggeringPropertyMapping qualityBasedTriggering = new TriggeringPropertyMapping(
291
List.of(
292
new ArgumentMapping("data_quality_score", "input_quality"),
293
new ArgumentMapping("record_count", "input_volume")
294
),
295
List.of(
296
new PluginPropertyMapping("quality_validator", "pass_threshold", "minimum_quality"),
297
new PluginPropertyMapping("quality_validator", "error_rate", "max_error_rate"),
298
new PluginPropertyMapping("record_counter", "total_records", "expected_count")
299
)
300
);
301
302
// Error recovery triggering
303
TriggeringPropertyMapping errorRecoveryMapping = new TriggeringPropertyMapping(
304
List.of(
305
new ArgumentMapping("failed_batch_id", "recovery_batch_id"),
306
new ArgumentMapping("error_timestamp", "failure_time"),
307
new ArgumentMapping("retry_attempt", "attempt_number")
308
),
309
List.of(
310
new PluginPropertyMapping("error_analyzer", "failure_reason", "error_category"),
311
new PluginPropertyMapping("error_analyzer", "affected_records", "recovery_scope"),
312
new PluginPropertyMapping("checkpoint_manager", "last_success_point", "resume_from")
313
)
314
);
315
316
// Multi-tenant triggering
317
TriggeringPropertyMapping multiTenantMapping = new TriggeringPropertyMapping(
318
List.of(
319
new ArgumentMapping("tenant_id", "target_tenant"),
320
new ArgumentMapping("tenant_config", "processing_rules")
321
),
322
List.of(
323
new PluginPropertyMapping("tenant_resolver", "database_schema", "tenant_schema"),
324
new PluginPropertyMapping("tenant_resolver", "resource_limits", "tenant_quotas"),
325
new PluginPropertyMapping("security_manager", "access_token", "tenant_credentials")
326
)
327
);
328
```