or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

artifact-resource-management.mdconnection-data-flow.mdindex.mdlegacy-version-support.mdpipeline-configuration.mdpipeline-triggering.mdstage-plugin-management.md

pipeline-configuration.mddocs/

0

# Pipeline Configuration

1

2

Current version (v2) ETL pipeline configuration providing comprehensive features for both batch and streaming data processing scenarios. Includes advanced resource management, stage logging, process timing, and extensive property support.

3

4

## Capabilities

5

6

### Base ETL Configuration

7

8

Core configuration class providing common functionality for all ETL pipeline types.

9

10

```java { .api }

11

/**

12

* Base ETL Configuration class for all pipeline types

13

*/

14

public class ETLConfig extends Config implements UpgradeableConfig {

15

/**

16

* Get pipeline description

17

* @return pipeline description, may be null

18

*/

19

public String getDescription();

20

21

/**

22

* Get all pipeline stages

23

* @return immutable set of ETL stages

24

*/

25

public Set<ETLStage> getStages();

26

27

/**

28

* Get stage connections defining data flow

29

* @return immutable set of connections between stages

30

*/

31

public Set<Connection> getConnections();

32

33

/**

34

* Get resource allocation for pipeline execution

35

* @return resource configuration, defaults to 1024MB/1 core if not specified

36

*/

37

public Resources getResources();

38

39

/**

40

* Get driver resource allocation

41

* @return driver resource configuration

42

*/

43

public Resources getDriverResources();

44

45

/**

46

* Get client resource allocation

47

* @return client resource configuration

48

*/

49

public Resources getClientResources();

50

51

/**

52

* Get number of records for preview

53

* @return preview record count, defaults to 100

54

*/

55

public int getNumOfRecordsPreview();

56

57

/**

58

* Check if stage logging is enabled

59

* @return true if stage logging enabled, defaults to true

60

*/

61

public boolean isStageLoggingEnabled();

62

63

/**

64

* Check if process timing is enabled

65

* @return true if process timing enabled, defaults to true

66

*/

67

public boolean isProcessTimingEnabled();

68

69

/**

70

* Get pipeline properties

71

* @return immutable map of pipeline properties

72

*/

73

public Map<String, String> getProperties();

74

75

/**

76

* Validate configuration correctness

77

* @throws IllegalArgumentException if configuration is invalid

78

*/

79

public void validate();

80

81

/**

82

* Check if configuration can be upgraded

83

* @return false for v2 configurations (latest version)

84

*/

85

public boolean canUpgrade();

86

87

/**

88

* Upgrade configuration to next version

89

* @param upgradeContext context for upgrading

90

* @throws UnsupportedOperationException for v2 configurations

91

*/

92

public UpgradeableConfig upgrade(UpgradeContext upgradeContext);

93

}

94

```

95

96

**Usage Example:**

97

98

```java

99

import co.cask.cdap.etl.proto.v2.*;

100

import co.cask.cdap.api.Resources;

101

102

// Using builder pattern to create base configuration

103

ETLConfig.Builder<?> builder = new ETLConfig.Builder<ETLConfig.Builder<?>>() {

104

// Abstract builder implementation would be provided by concrete subclasses

105

};

106

107

// Configure resources and properties

108

Map<String, String> properties = new HashMap<>();

109

properties.put("custom.property", "value");

110

111

builder.setResources(new Resources(2048, 4))

112

.setDriverResources(new Resources(1024, 2))

113

.setClientResources(new Resources(512, 1))

114

.setNumOfRecordsPreview(500)

115

.setProperties(properties);

116

117

// Disable optional features

118

builder.disableStageLogging()

119

.disableProcessTiming();

120

```

121

122

### Batch ETL Configuration

123

124

Configuration specific to batch ETL pipelines with scheduling, execution engine selection, and post-action support.

125

126

```java { .api }

127

/**

128

* ETL Batch Configuration for scheduled batch processing pipelines

129

*/

130

public final class ETLBatchConfig extends ETLConfig {

131

/**

132

* Get post-execution actions

133

* @return immutable list of post-actions to execute after pipeline completion

134

*/

135

public List<ETLStage> getPostActions();

136

137

/**

138

* Get execution engine

139

* @return execution engine (MapReduce or Spark), defaults to MapReduce

140

*/

141

public Engine getEngine();

142

143

/**

144

* Get schedule configuration

145

* @return schedule string, may be null

146

*/

147

public String getSchedule();

148

149

/**

150

* Get maximum concurrent runs

151

* @return max concurrent runs, may be null for unlimited

152

*/

153

public Integer getMaxConcurrentRuns();

154

155

/**

156

* Convert old configuration format to current v2 format

157

* @return v2 batch configuration

158

*/

159

public ETLBatchConfig convertOldConfig();

160

161

/**

162

* Create builder for batch configuration

163

* @return new builder instance

164

*/

165

public static Builder builder();

166

167

/**

168

* Create builder with schedule (deprecated)

169

* @param schedule time schedule

170

* @return new builder instance

171

* @deprecated use builder() and setTimeSchedule() instead

172

*/

173

@Deprecated

174

public static Builder builder(String schedule);

175

}

176

```

177

178

**Usage Example:**

179

180

```java

181

import co.cask.cdap.etl.proto.v2.*;

182

import co.cask.cdap.etl.api.Engine;

183

import co.cask.cdap.api.Resources;

184

185

// Build batch ETL configuration

186

ETLBatchConfig batchConfig = ETLBatchConfig.builder()

187

.setTimeSchedule("0 0 2 * * ?") // Daily at 2 AM

188

.setEngine(Engine.SPARK)

189

.setMaxConcurrentRuns(3)

190

.addStage(sourceStage)

191

.addStage(transformStage)

192

.addStage(sinkStage)

193

.addConnection("source", "transform")

194

.addConnection("transform", "sink")

195

.addPostAction(cleanupAction)

196

.setResources(new Resources(4096, 8))

197

.setDriverResources(new Resources(2048, 4))

198

.build();

199

200

// Validate and use

201

batchConfig.validate();

202

```

203

204

### Data Streams Configuration

205

206

Configuration for streaming ETL pipelines with batch interval settings, checkpoint management, and graceful shutdown options.

207

208

```java { .api }

209

/**

210

* Data Streams Configuration for real-time streaming pipelines

211

*/

212

public final class DataStreamsConfig extends ETLConfig {

213

/**

214

* Get batch processing interval

215

* @return batch interval string (e.g., "1m", "30s")

216

*/

217

public String getBatchInterval();

218

219

/**

220

* Check if running in unit test mode

221

* @return true if in unit test mode

222

*/

223

public boolean isUnitTest();

224

225

/**

226

* Check if checkpoints are disabled

227

* @return true if checkpoints disabled, defaults to false

228

*/

229

public boolean checkpointsDisabled();

230

231

/**

232

* Get additional JVM options

233

* @return extra Java options string, defaults to empty

234

*/

235

public String getExtraJavaOpts();

236

237

/**

238

* Get graceful stop setting

239

* @return true for graceful stop, defaults to true

240

*/

241

public Boolean getStopGracefully();

242

243

/**

244

* Get checkpoint directory

245

* @return checkpoint directory path, may be null

246

*/

247

public String getCheckpointDir();

248

249

/**

250

* Create builder for data streams configuration

251

* @return new builder instance

252

*/

253

public static Builder builder();

254

}

255

```

256

257

**Usage Example:**

258

259

```java

260

import co.cask.cdap.etl.proto.v2.DataStreamsConfig;

261

262

// Build streaming ETL configuration

263

DataStreamsConfig streamConfig = DataStreamsConfig.builder()

264

.setBatchInterval("30s")

265

.setCheckpointDir("/tmp/streaming-checkpoints")

266

.setStopGracefully(true)

267

.addStage(kafkaSource)

268

.addStage(realtimeTransform)

269

.addStage(tablesSink)

270

.addConnection("kafka", "transform")

271

.addConnection("transform", "sink")

272

.setResources(new Resources(8192, 16))

273

.build();

274

275

// Configure for production use

276

if (!streamConfig.checkpointsDisabled()) {

277

// Checkpoints enabled - configure persistent storage

278

System.out.println("Checkpoints enabled at: " + streamConfig.getCheckpointDir());

279

}

280

```

281

282

### Configuration Builder Pattern

283

284

Abstract builder pattern providing fluent API for constructing ETL configurations with validation and type safety.

285

286

```java { .api }

287

/**

288

* Abstract builder for ETL configurations

289

* @param <T> The concrete builder type for method chaining

290

*/

291

public abstract static class Builder<T extends Builder> {

292

/**

293

* Add a stage to the pipeline

294

* @param stage ETL stage to add

295

* @return builder instance for chaining

296

*/

297

public T addStage(ETLStage stage);

298

299

/**

300

* Add connection between stages

301

* @param from source stage name

302

* @param to target stage name

303

* @return builder instance for chaining

304

*/

305

public T addConnection(String from, String to);

306

307

/**

308

* Add connection with port specification

309

* @param from source stage name

310

* @param to target stage name

311

* @param port output port name

312

* @return builder instance for chaining

313

*/

314

public T addConnection(String from, String to, String port);

315

316

/**

317

* Add conditional connection

318

* @param from source stage name

319

* @param to target stage name

320

* @param condition connection condition

321

* @return builder instance for chaining

322

*/

323

public T addConnection(String from, String to, Boolean condition);

324

325

/**

326

* Add connection object

327

* @param connection connection to add

328

* @return builder instance for chaining

329

*/

330

public T addConnection(Connection connection);

331

332

/**

333

* Add multiple connections

334

* @param connections collection of connections

335

* @return builder instance for chaining

336

*/

337

public T addConnections(Collection<Connection> connections);

338

339

/**

340

* Set resource allocation

341

* @param resources resource configuration

342

* @return builder instance for chaining

343

*/

344

public T setResources(Resources resources);

345

346

/**

347

* Set driver resource allocation

348

* @param resources driver resource configuration

349

* @return builder instance for chaining

350

*/

351

public T setDriverResources(Resources resources);

352

353

/**

354

* Set client resource allocation

355

* @param resources client resource configuration

356

* @return builder instance for chaining

357

*/

358

public T setClientResources(Resources resources);

359

360

/**

361

* Set number of preview records

362

* @param numOfRecordsPreview preview record count

363

* @return builder instance for chaining

364

*/

365

public T setNumOfRecordsPreview(int numOfRecordsPreview);

366

367

/**

368

* Disable stage logging

369

* @return builder instance for chaining

370

*/

371

public T disableStageLogging();

372

373

/**

374

* Disable process timing

375

* @return builder instance for chaining

376

*/

377

public T disableProcessTiming();

378

379

/**

380

* Set pipeline properties

381

* @param properties pipeline properties map

382

* @return builder instance for chaining

383

*/

384

public T setProperties(Map<String, String> properties);

385

}

386

```