0
# Connection and Data Flow
1
2
Connection management system for defining data flow between pipeline stages. Connections specify how data moves through the ETL pipeline, supporting various routing patterns including conditional connections, port-based routing, and complex data flow topologies.
3
4
## Capabilities
5
6
### Basic Connection Management
7
8
Core connection functionality for linking pipeline stages in simple linear or branching data flows.
9
10
```java { .api }
11
/**
12
* Connection between two ETL stages defining data flow
13
*/
14
public class Connection {
15
/**
16
* Create basic connection between stages
17
* @param from source stage name
18
* @param to target stage name
19
*/
20
public Connection(String from, String to);
21
22
/**
23
* Create connection with output port specification
24
* @param from source stage name
25
* @param to target stage name
26
* @param port output port name for multi-output stages
27
*/
28
public Connection(String from, String to, String port);
29
30
/**
31
* Create conditional connection
32
* @param from source stage name
33
* @param to target stage name
34
* @param condition boolean condition determining if connection is active
35
*/
36
public Connection(String from, String to, Boolean condition);
37
38
/**
39
* Get source stage name
40
* @return name of the source stage
41
*/
42
public String getFrom();
43
44
/**
45
* Get target stage name
46
* @return name of the target stage
47
*/
48
public String getTo();
49
50
/**
51
* Get output port name
52
* @return port name for multi-output stages, may be null
53
*/
54
public String getPort();
55
56
/**
57
* Get connection condition
58
* @return boolean condition for conditional connections, may be null
59
*/
60
public Boolean getCondition();
61
}
62
```
63
64
**Usage Examples:**
65
66
```java
67
import co.cask.cdap.etl.proto.Connection;
68
69
// Basic linear pipeline connections
70
Connection sourceToTransform = new Connection("data_source", "clean_data");
71
Connection transformToSink = new Connection("clean_data", "output_table");
72
73
// Multi-output stage with port-based routing
74
Connection validOutput = new Connection("validator", "valid_sink", "valid");
75
Connection invalidOutput = new Connection("validator", "error_sink", "invalid");
76
77
// Conditional connections for dynamic routing
78
Connection conditionalConnection = new Connection("decision_stage", "special_processing", true);
79
Connection defaultConnection = new Connection("decision_stage", "normal_processing", false);
80
81
// Complex branching example
82
Set<Connection> branchingFlow = Set.of(
83
new Connection("source", "splitter"),
84
new Connection("splitter", "branch_a", "output_a"),
85
new Connection("splitter", "branch_b", "output_b"),
86
new Connection("branch_a", "joiner", "input_a"),
87
new Connection("branch_b", "joiner", "input_b"),
88
new Connection("joiner", "final_sink")
89
);
90
```
91
92
### Linear Pipeline Patterns
93
94
Simple sequential data processing patterns where data flows through stages in a straight line.
95
96
```java
97
// Simple ETL pipeline: Extract -> Transform -> Load
98
List<Connection> linearPipeline = List.of(
99
new Connection("file_source", "data_cleaner"),
100
new Connection("data_cleaner", "format_converter"),
101
new Connection("format_converter", "database_sink")
102
);
103
104
// Multi-stage transformation pipeline
105
List<Connection> transformationChain = List.of(
106
new Connection("api_source", "json_parser"),
107
new Connection("json_parser", "field_validator"),
108
new Connection("field_validator", "data_enricher"),
109
new Connection("data_enricher", "aggregator"),
110
new Connection("aggregator", "table_sink")
111
);
112
```
113
114
### Branching and Joining Patterns
115
116
Complex data flow patterns supporting parallel processing, conditional routing, and data aggregation.
117
118
```java
119
// Fan-out pattern - one source to multiple processing paths
120
Set<Connection> fanOutPattern = Set.of(
121
new Connection("main_source", "customer_processor"),
122
new Connection("main_source", "order_processor"),
123
new Connection("main_source", "product_processor"),
124
new Connection("customer_processor", "customer_sink"),
125
new Connection("order_processor", "order_sink"),
126
new Connection("product_processor", "product_sink")
127
);
128
129
// Fan-in pattern - multiple sources to single processor
130
Set<Connection> fanInPattern = Set.of(
131
new Connection("sales_data", "data_merger"),
132
new Connection("inventory_data", "data_merger"),
133
new Connection("customer_data", "data_merger"),
134
new Connection("data_merger", "unified_sink")
135
);
136
137
// Diamond pattern - split and rejoin
138
Set<Connection> diamondPattern = Set.of(
139
new Connection("input", "splitter"),
140
new Connection("splitter", "fast_path", "priority"),
141
new Connection("splitter", "slow_path", "standard"),
142
new Connection("fast_path", "merger", "fast_input"),
143
new Connection("slow_path", "merger", "slow_input"),
144
new Connection("merger", "output")
145
);
146
```
147
148
### Error Handling and Alternative Paths
149
150
Connection patterns for handling errors, validation failures, and alternative processing paths.
151
152
```java
153
// Error handling with multiple outputs
154
Set<Connection> errorHandlingFlow = Set.of(
155
new Connection("source", "validator"),
156
new Connection("validator", "main_processor", "valid"),
157
new Connection("validator", "error_handler", "invalid"),
158
new Connection("main_processor", "success_sink"),
159
new Connection("error_handler", "error_sink")
160
);
161
162
// Multi-stage error handling
163
Set<Connection> complexErrorHandling = Set.of(
164
new Connection("input", "stage1"),
165
new Connection("stage1", "stage2", "success"),
166
new Connection("stage1", "error_processor", "error"),
167
new Connection("stage2", "stage3", "success"),
168
new Connection("stage2", "error_processor", "error"),
169
new Connection("stage3", "final_sink", "success"),
170
new Connection("stage3", "error_processor", "error"),
171
new Connection("error_processor", "error_sink")
172
);
173
174
// Conditional processing based on data content
175
Set<Connection> conditionalProcessing = Set.of(
176
new Connection("source", "classifier"),
177
new Connection("classifier", "premium_processor", "premium"),
178
new Connection("classifier", "standard_processor", "standard"),
179
new Connection("classifier", "basic_processor", "basic"),
180
new Connection("premium_processor", "premium_sink"),
181
new Connection("standard_processor", "standard_sink"),
182
new Connection("basic_processor", "basic_sink")
183
);
184
```
185
186
### Connection Validation and Best Practices
187
188
Validation rules and best practices for ensuring correct connection topology and preventing common errors.
189
190
```java { .api }
191
/**
192
* Connection validation utilities
193
*/
194
public class ConnectionValidator {
195
/**
196
* Validate connection topology for cycles and orphaned stages
197
* @param stages all pipeline stages
198
* @param connections all pipeline connections
199
* @throws IllegalArgumentException if topology is invalid
200
*/
201
public static void validateTopology(Set<ETLStage> stages, Set<Connection> connections);
202
203
/**
204
* Check for unreachable stages (no input connections)
205
* @param stages all pipeline stages
206
* @param connections all pipeline connections
207
* @return list of unreachable stage names
208
*/
209
public static List<String> findUnreachableStages(Set<ETLStage> stages, Set<Connection> connections);
210
211
/**
212
* Check for dead-end stages (no output connections, not sinks)
213
* @param stages all pipeline stages
214
* @param connections all pipeline connections
215
* @return list of dead-end stage names
216
*/
217
public static List<String> findDeadEndStages(Set<ETLStage> stages, Set<Connection> connections);
218
219
/**
220
* Detect circular dependencies in pipeline
221
* @param connections all pipeline connections
222
* @return true if circular dependencies exist
223
*/
224
public static boolean hasCycles(Set<Connection> connections);
225
}
226
```
227
228
**Best Practices Examples:**
229
230
```java
231
import co.cask.cdap.etl.proto.v2.*;
232
import co.cask.cdap.etl.proto.Connection;
233
234
// Good: Well-defined pipeline with clear data flow
235
Set<ETLStage> stages = Set.of(
236
new ETLStage("source", sourcePlugin),
237
new ETLStage("transform", transformPlugin),
238
new ETLStage("sink", sinkPlugin)
239
);
240
241
Set<Connection> connections = Set.of(
242
new Connection("source", "transform"),
243
new Connection("transform", "sink")
244
);
245
246
// Validate before using
247
try {
248
// Check for basic topology issues
249
ConnectionValidator.validateTopology(stages, connections);
250
251
// Check for specific issues
252
List<String> unreachable = ConnectionValidator.findUnreachableStages(stages, connections);
253
if (!unreachable.isEmpty()) {
254
throw new IllegalArgumentException("Unreachable stages: " + unreachable);
255
}
256
257
List<String> deadEnds = ConnectionValidator.findDeadEndStages(stages, connections);
258
if (!deadEnds.isEmpty()) {
259
throw new IllegalArgumentException("Dead-end stages: " + deadEnds);
260
}
261
262
if (ConnectionValidator.hasCycles(connections)) {
263
throw new IllegalArgumentException("Circular dependencies detected");
264
}
265
266
} catch (IllegalArgumentException e) {
267
logger.error("Pipeline topology validation failed: {}", e.getMessage());
268
// Handle validation failure
269
}
270
271
// Good: Descriptive stage names that clearly indicate data flow
272
Set<Connection> descriptiveConnections = Set.of(
273
new Connection("raw_customer_data", "validate_customer_fields"),
274
new Connection("validate_customer_fields", "enrich_customer_data"),
275
new Connection("enrich_customer_data", "customer_warehouse_table")
276
);
277
278
// Good: Port names that clearly indicate data type/purpose
279
Set<Connection> clearPortConnections = Set.of(
280
new Connection("data_splitter", "valid_records_processor", "validated_output"),
281
new Connection("data_splitter", "invalid_records_handler", "rejected_output"),
282
new Connection("data_splitter", "audit_logger", "audit_output")
283
);
284
```
285
286
### Advanced Connection Patterns
287
288
Sophisticated connection patterns for complex ETL scenarios including parallel processing, data replication, and conditional routing.
289
290
```java
291
// Parallel processing with synchronization
292
Set<Connection> parallelSync = Set.of(
293
new Connection("source", "parallel_split"),
294
new Connection("parallel_split", "worker_1", "batch_1"),
295
new Connection("parallel_split", "worker_2", "batch_2"),
296
new Connection("parallel_split", "worker_3", "batch_3"),
297
new Connection("worker_1", "sync_barrier", "result_1"),
298
new Connection("worker_2", "sync_barrier", "result_2"),
299
new Connection("worker_3", "sync_barrier", "result_3"),
300
new Connection("sync_barrier", "final_aggregator"),
301
new Connection("final_aggregator", "output")
302
);
303
304
// Data replication for multiple destinations
305
Set<Connection> replicationPattern = Set.of(
306
new Connection("master_source", "data_replicator"),
307
new Connection("data_replicator", "operational_sink", "live_copy"),
308
new Connection("data_replicator", "warehouse_sink", "analytical_copy"),
309
new Connection("data_replicator", "backup_sink", "backup_copy"),
310
new Connection("data_replicator", "audit_sink", "audit_trail")
311
);
312
313
// Hierarchical processing with multiple levels
314
Set<Connection> hierarchicalFlow = Set.of(
315
new Connection("raw_input", "level1_processor"),
316
new Connection("level1_processor", "level2_processor_a", "category_a"),
317
new Connection("level1_processor", "level2_processor_b", "category_b"),
318
new Connection("level2_processor_a", "level3_detail_processor", "detailed_a"),
319
new Connection("level2_processor_a", "level3_summary_processor", "summary_a"),
320
new Connection("level2_processor_b", "level3_detail_processor", "detailed_b"),
321
new Connection("level2_processor_b", "level3_summary_processor", "summary_b"),
322
new Connection("level3_detail_processor", "detail_sink"),
323
new Connection("level3_summary_processor", "summary_sink")
324
);
325
```