or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

artifact-resource-management.mdconnection-data-flow.mdindex.mdlegacy-version-support.mdpipeline-configuration.mdpipeline-triggering.mdstage-plugin-management.md

connection-data-flow.mddocs/

0

# Connection and Data Flow

1

2

Connection management system for defining data flow between pipeline stages. Connections specify how data moves through the ETL pipeline, supporting various routing patterns including conditional connections, port-based routing, and complex data flow topologies.

3

4

## Capabilities

5

6

### Basic Connection Management

7

8

Core connection functionality for linking pipeline stages in simple linear or branching data flows.

9

10

```java { .api }

11

/**

12

* Connection between two ETL stages defining data flow

13

*/

14

public class Connection {

15

/**

16

* Create basic connection between stages

17

* @param from source stage name

18

* @param to target stage name

19

*/

20

public Connection(String from, String to);

21

22

/**

23

* Create connection with output port specification

24

* @param from source stage name

25

* @param to target stage name

26

* @param port output port name for multi-output stages

27

*/

28

public Connection(String from, String to, String port);

29

30

/**

31

* Create conditional connection

32

* @param from source stage name

33

* @param to target stage name

34

* @param condition boolean condition determining if connection is active

35

*/

36

public Connection(String from, String to, Boolean condition);

37

38

/**

39

* Get source stage name

40

* @return name of the source stage

41

*/

42

public String getFrom();

43

44

/**

45

* Get target stage name

46

* @return name of the target stage

47

*/

48

public String getTo();

49

50

/**

51

* Get output port name

52

* @return port name for multi-output stages, may be null

53

*/

54

public String getPort();

55

56

/**

57

* Get connection condition

58

* @return boolean condition for conditional connections, may be null

59

*/

60

public Boolean getCondition();

61

}

62

```

63

64

**Usage Examples:**

65

66

```java

67

import co.cask.cdap.etl.proto.Connection;

68

69

// Basic linear pipeline connections

70

Connection sourceToTransform = new Connection("data_source", "clean_data");

71

Connection transformToSink = new Connection("clean_data", "output_table");

72

73

// Multi-output stage with port-based routing

74

Connection validOutput = new Connection("validator", "valid_sink", "valid");

75

Connection invalidOutput = new Connection("validator", "error_sink", "invalid");

76

77

// Conditional connections for dynamic routing

78

Connection conditionalConnection = new Connection("decision_stage", "special_processing", true);

79

Connection defaultConnection = new Connection("decision_stage", "normal_processing", false);

80

81

// Complex branching example

82

Set<Connection> branchingFlow = Set.of(

83

new Connection("source", "splitter"),

84

new Connection("splitter", "branch_a", "output_a"),

85

new Connection("splitter", "branch_b", "output_b"),

86

new Connection("branch_a", "joiner", "input_a"),

87

new Connection("branch_b", "joiner", "input_b"),

88

new Connection("joiner", "final_sink")

89

);

90

```

91

92

### Linear Pipeline Patterns

93

94

Simple sequential data processing patterns where data flows through stages in a straight line.

95

96

```java

97

// Simple ETL pipeline: Extract -> Transform -> Load

98

List<Connection> linearPipeline = List.of(

99

new Connection("file_source", "data_cleaner"),

100

new Connection("data_cleaner", "format_converter"),

101

new Connection("format_converter", "database_sink")

102

);

103

104

// Multi-stage transformation pipeline

105

List<Connection> transformationChain = List.of(

106

new Connection("api_source", "json_parser"),

107

new Connection("json_parser", "field_validator"),

108

new Connection("field_validator", "data_enricher"),

109

new Connection("data_enricher", "aggregator"),

110

new Connection("aggregator", "table_sink")

111

);

112

```

113

114

### Branching and Joining Patterns

115

116

Complex data flow patterns supporting parallel processing, conditional routing, and data aggregation.

117

118

```java

119

// Fan-out pattern - one source to multiple processing paths

120

Set<Connection> fanOutPattern = Set.of(

121

new Connection("main_source", "customer_processor"),

122

new Connection("main_source", "order_processor"),

123

new Connection("main_source", "product_processor"),

124

new Connection("customer_processor", "customer_sink"),

125

new Connection("order_processor", "order_sink"),

126

new Connection("product_processor", "product_sink")

127

);

128

129

// Fan-in pattern - multiple sources to single processor

130

Set<Connection> fanInPattern = Set.of(

131

new Connection("sales_data", "data_merger"),

132

new Connection("inventory_data", "data_merger"),

133

new Connection("customer_data", "data_merger"),

134

new Connection("data_merger", "unified_sink")

135

);

136

137

// Diamond pattern - split and rejoin

138

Set<Connection> diamondPattern = Set.of(

139

new Connection("input", "splitter"),

140

new Connection("splitter", "fast_path", "priority"),

141

new Connection("splitter", "slow_path", "standard"),

142

new Connection("fast_path", "merger", "fast_input"),

143

new Connection("slow_path", "merger", "slow_input"),

144

new Connection("merger", "output")

145

);

146

```

147

148

### Error Handling and Alternative Paths

149

150

Connection patterns for handling errors, validation failures, and alternative processing paths.

151

152

```java

153

// Error handling with multiple outputs

154

Set<Connection> errorHandlingFlow = Set.of(

155

new Connection("source", "validator"),

156

new Connection("validator", "main_processor", "valid"),

157

new Connection("validator", "error_handler", "invalid"),

158

new Connection("main_processor", "success_sink"),

159

new Connection("error_handler", "error_sink")

160

);

161

162

// Multi-stage error handling

163

Set<Connection> complexErrorHandling = Set.of(

164

new Connection("input", "stage1"),

165

new Connection("stage1", "stage2", "success"),

166

new Connection("stage1", "error_processor", "error"),

167

new Connection("stage2", "stage3", "success"),

168

new Connection("stage2", "error_processor", "error"),

169

new Connection("stage3", "final_sink", "success"),

170

new Connection("stage3", "error_processor", "error"),

171

new Connection("error_processor", "error_sink")

172

);

173

174

// Conditional processing based on data content

175

Set<Connection> conditionalProcessing = Set.of(

176

new Connection("source", "classifier"),

177

new Connection("classifier", "premium_processor", "premium"),

178

new Connection("classifier", "standard_processor", "standard"),

179

new Connection("classifier", "basic_processor", "basic"),

180

new Connection("premium_processor", "premium_sink"),

181

new Connection("standard_processor", "standard_sink"),

182

new Connection("basic_processor", "basic_sink")

183

);

184

```

185

186

### Connection Validation and Best Practices

187

188

Validation rules and best practices for ensuring correct connection topology and preventing common errors.

189

190

```java { .api }

191

/**

192

* Connection validation utilities

193

*/

194

public class ConnectionValidator {

195

/**

196

* Validate connection topology for cycles and orphaned stages

197

* @param stages all pipeline stages

198

* @param connections all pipeline connections

199

* @throws IllegalArgumentException if topology is invalid

200

*/

201

public static void validateTopology(Set<ETLStage> stages, Set<Connection> connections);

202

203

/**

204

* Check for unreachable stages (no input connections)

205

* @param stages all pipeline stages

206

* @param connections all pipeline connections

207

* @return list of unreachable stage names

208

*/

209

public static List<String> findUnreachableStages(Set<ETLStage> stages, Set<Connection> connections);

210

211

/**

212

* Check for dead-end stages (no output connections, not sinks)

213

* @param stages all pipeline stages

214

* @param connections all pipeline connections

215

* @return list of dead-end stage names

216

*/

217

public static List<String> findDeadEndStages(Set<ETLStage> stages, Set<Connection> connections);

218

219

/**

220

* Detect circular dependencies in pipeline

221

* @param connections all pipeline connections

222

* @return true if circular dependencies exist

223

*/

224

public static boolean hasCycles(Set<Connection> connections);

225

}

226

```

227

228

**Best Practices Examples:**

229

230

```java

231

import co.cask.cdap.etl.proto.v2.*;

232

import co.cask.cdap.etl.proto.Connection;

233

234

// Good: Well-defined pipeline with clear data flow

235

Set<ETLStage> stages = Set.of(

236

new ETLStage("source", sourcePlugin),

237

new ETLStage("transform", transformPlugin),

238

new ETLStage("sink", sinkPlugin)

239

);

240

241

Set<Connection> connections = Set.of(

242

new Connection("source", "transform"),

243

new Connection("transform", "sink")

244

);

245

246

// Validate before using

247

try {

248

// Check for basic topology issues

249

ConnectionValidator.validateTopology(stages, connections);

250

251

// Check for specific issues

252

List<String> unreachable = ConnectionValidator.findUnreachableStages(stages, connections);

253

if (!unreachable.isEmpty()) {

254

throw new IllegalArgumentException("Unreachable stages: " + unreachable);

255

}

256

257

List<String> deadEnds = ConnectionValidator.findDeadEndStages(stages, connections);

258

if (!deadEnds.isEmpty()) {

259

throw new IllegalArgumentException("Dead-end stages: " + deadEnds);

260

}

261

262

if (ConnectionValidator.hasCycles(connections)) {

263

throw new IllegalArgumentException("Circular dependencies detected");

264

}

265

266

} catch (IllegalArgumentException e) {

267

logger.error("Pipeline topology validation failed: {}", e.getMessage());

268

// Handle validation failure

269

}

270

271

// Good: Descriptive stage names that clearly indicate data flow

272

Set<Connection> descriptiveConnections = Set.of(

273

new Connection("raw_customer_data", "validate_customer_fields"),

274

new Connection("validate_customer_fields", "enrich_customer_data"),

275

new Connection("enrich_customer_data", "customer_warehouse_table")

276

);

277

278

// Good: Port names that clearly indicate data type/purpose

279

Set<Connection> clearPortConnections = Set.of(

280

new Connection("data_splitter", "valid_records_processor", "validated_output"),

281

new Connection("data_splitter", "invalid_records_handler", "rejected_output"),

282

new Connection("data_splitter", "audit_logger", "audit_output")

283

);

284

```

285

286

### Advanced Connection Patterns

287

288

Sophisticated connection patterns for complex ETL scenarios including parallel processing, data replication, and conditional routing.

289

290

```java

291

// Parallel processing with synchronization

292

Set<Connection> parallelSync = Set.of(

293

new Connection("source", "parallel_split"),

294

new Connection("parallel_split", "worker_1", "batch_1"),

295

new Connection("parallel_split", "worker_2", "batch_2"),

296

new Connection("parallel_split", "worker_3", "batch_3"),

297

new Connection("worker_1", "sync_barrier", "result_1"),

298

new Connection("worker_2", "sync_barrier", "result_2"),

299

new Connection("worker_3", "sync_barrier", "result_3"),

300

new Connection("sync_barrier", "final_aggregator"),

301

new Connection("final_aggregator", "output")

302

);

303

304

// Data replication for multiple destinations

305

Set<Connection> replicationPattern = Set.of(

306

new Connection("master_source", "data_replicator"),

307

new Connection("data_replicator", "operational_sink", "live_copy"),

308

new Connection("data_replicator", "warehouse_sink", "analytical_copy"),

309

new Connection("data_replicator", "backup_sink", "backup_copy"),

310

new Connection("data_replicator", "audit_sink", "audit_trail")

311

);

312

313

// Hierarchical processing with multiple levels

314

Set<Connection> hierarchicalFlow = Set.of(

315

new Connection("raw_input", "level1_processor"),

316

new Connection("level1_processor", "level2_processor_a", "category_a"),

317

new Connection("level1_processor", "level2_processor_b", "category_b"),

318

new Connection("level2_processor_a", "level3_detail_processor", "detailed_a"),

319

new Connection("level2_processor_a", "level3_summary_processor", "summary_a"),

320

new Connection("level2_processor_b", "level3_detail_processor", "detailed_b"),

321

new Connection("level2_processor_b", "level3_summary_processor", "summary_b"),

322

new Connection("level3_detail_processor", "detail_sink"),

323

new Connection("level3_summary_processor", "summary_sink")

324

);

325

```