or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

artifact-resource-management.mdconnection-data-flow.mdindex.mdlegacy-version-support.mdpipeline-configuration.mdpipeline-triggering.mdstage-plugin-management.md

legacy-version-support.mddocs/

0

# Legacy Version Support

1

2

Backward compatibility support for v0 and v1 protocol versions with automatic upgrade mechanisms to current v2 format. This enables smooth migration from older CDAP versions while maintaining pipeline functionality and configuration integrity.

3

4

## Capabilities

5

6

### Upgrade Interface

7

8

Core upgrade interface enabling version-to-version configuration migration with contextual artifact resolution.

9

10

```java { .api }

11

/**

12

* Interface for configurations that can be upgraded to newer versions

13

* @param <T> the type of configuration this upgrades to

14

*/

15

public interface UpgradeableConfig<T extends UpgradeableConfig> {

16

/**

17

* Check if this configuration can be upgraded to a newer version

18

* @return true if upgrade is possible, false if already at latest version

19

*/

20

boolean canUpgrade();

21

22

/**

23

* Upgrade configuration to the next version

24

* This enables chain upgrading: v0 -> v1 -> v2

25

* @param upgradeContext context providing artifact resolution and upgrade utilities

26

* @return upgraded configuration of the next version

27

*/

28

T upgrade(UpgradeContext upgradeContext);

29

}

30

```

31

32

**Usage Examples:**

33

34

```java

35

import co.cask.cdap.etl.proto.*;

36

import co.cask.cdap.etl.proto.v0.*;

37

import co.cask.cdap.etl.proto.v1.*;

38

import co.cask.cdap.etl.proto.v2.*;

39

40

// Upgrade chain example: v0 -> v1 -> v2

41

co.cask.cdap.etl.proto.v0.ETLBatchConfig v0Config = loadLegacyConfig();

42

43

// Check if upgrade is needed

44

if (v0Config.canUpgrade()) {

45

// Upgrade v0 to v1

46

co.cask.cdap.etl.proto.v1.ETLBatchConfig v1Config = v0Config.upgrade(upgradeContext);

47

48

// Continue upgrade to v2 if needed

49

if (v1Config.canUpgrade()) {

50

co.cask.cdap.etl.proto.v2.ETLBatchConfig v2Config = v1Config.upgrade(upgradeContext);

51

52

// v2Config is now ready for use with current CDAP version

53

v2Config.validate();

54

}

55

}

56

57

// Automatic chain upgrade utility

58

public static co.cask.cdap.etl.proto.v2.ETLBatchConfig upgradeToLatest(

59

UpgradeableConfig<?> config, UpgradeContext context) {

60

61

UpgradeableConfig<?> current = config;

62

while (current.canUpgrade()) {

63

current = current.upgrade(context);

64

}

65

return (co.cask.cdap.etl.proto.v2.ETLBatchConfig) current;

66

}

67

```

68

69

### Upgrade Context

70

71

Context interface providing artifact resolution and upgrade utilities during configuration migration.

72

73

```java { .api }

74

/**

75

* Context for upgrading configurations, providing artifact resolution services

76

*/

77

public interface UpgradeContext {

78

/**

79

* Get artifact information for a plugin type and name

80

* Used during upgrade to resolve plugin artifacts from older versions

81

* @param pluginType the plugin type (e.g., "batchsource", "transform", "batchsink")

82

* @param pluginName the plugin name

83

* @return artifact selector configuration for the plugin, null if not found

84

*/

85

ArtifactSelectorConfig getPluginArtifact(String pluginType, String pluginName);

86

}

87

```

88

89

**Implementation Examples:**

90

91

```java

92

import co.cask.cdap.etl.proto.UpgradeContext;

93

import co.cask.cdap.etl.proto.ArtifactSelectorConfig;

94

95

// Custom upgrade context for development/testing

96

public class DevelopmentUpgradeContext implements UpgradeContext {

97

private final Map<String, Map<String, ArtifactSelectorConfig>> pluginArtifacts;

98

99

public DevelopmentUpgradeContext() {

100

this.pluginArtifacts = loadPluginArtifactMappings();

101

}

102

103

@Override

104

public ArtifactSelectorConfig getPluginArtifact(String pluginType, String pluginName) {

105

return pluginArtifacts

106

.getOrDefault(pluginType, Collections.emptyMap())

107

.get(pluginName);

108

}

109

110

private Map<String, Map<String, ArtifactSelectorConfig>> loadPluginArtifactMappings() {

111

// Load from configuration file or database

112

return Map.of(

113

"batchsource", Map.of(

114

"Table", new ArtifactSelectorConfig("SYSTEM", "core-plugins", "2.8.0"),

115

"File", new ArtifactSelectorConfig("SYSTEM", "core-plugins", "2.8.0")

116

),

117

"transform", Map.of(

118

"JavaScript", new ArtifactSelectorConfig("SYSTEM", "core-plugins", "2.8.0"),

119

"Python", new ArtifactSelectorConfig("SYSTEM", "hydrator-plugins", "2.8.0")

120

)

121

);

122

}

123

}

124

125

// Production upgrade context with artifact service integration

126

public class ProductionUpgradeContext implements UpgradeContext {

127

private final ArtifactService artifactService;

128

129

public ProductionUpgradeContext(ArtifactService artifactService) {

130

this.artifactService = artifactService;

131

}

132

133

@Override

134

public ArtifactSelectorConfig getPluginArtifact(String pluginType, String pluginName) {

135

try {

136

return artifactService.resolvePluginArtifact(pluginType, pluginName);

137

} catch (ArtifactNotFoundException e) {

138

logger.warn("Could not resolve artifact for plugin {}:{}", pluginType, pluginName);

139

return null;

140

}

141

}

142

}

143

```

144

145

### Version 1 (v1) Legacy Support

146

147

Support for v1 protocol format featuring separated source/sink/transform structure with basic connection support.

148

149

```java { .api }

150

/**

151

* ETL Configuration version 1 - legacy format with separated stage types

152

*/

153

public class ETLConfig extends Config {

154

/**

155

* Get source stage configuration

156

* @return ETL source stage

157

*/

158

public ETLStage getSource();

159

160

/**

161

* Get sink stage configurations

162

* @return list of ETL sink stages

163

*/

164

public List<ETLStage> getSinks();

165

166

/**

167

* Get transform stage configurations

168

* @return list of ETL transform stages

169

*/

170

public List<ETLStage> getTransforms();

171

172

/**

173

* Get stage connections

174

* @return list of connections between stages

175

*/

176

public List<Connection> getConnections();

177

178

/**

179

* Get resource allocation

180

* @return resource configuration

181

*/

182

public Resources getResources();

183

184

/**

185

* Check if stage logging is enabled

186

* @return true if stage logging enabled

187

*/

188

public Boolean isStageLoggingEnabled();

189

190

/**

191

* Get compatible configuration (internal conversion method)

192

* @return compatible configuration format

193

*/

194

public ETLConfig getCompatibleConfig();

195

}

196

197

/**

198

* ETL Batch Configuration version 1

199

*/

200

public final class ETLBatchConfig extends ETLConfig implements UpgradeableConfig<co.cask.cdap.etl.proto.v2.ETLBatchConfig> {

201

public enum Engine { MAPREDUCE, SPARK }

202

203

public Engine getEngine();

204

public String getSchedule();

205

public List<ETLStage> getActions();

206

public Resources getDriverResources();

207

208

@Override

209

public boolean canUpgrade() { return true; }

210

211

@Override

212

public co.cask.cdap.etl.proto.v2.ETLBatchConfig upgrade(UpgradeContext upgradeContext);

213

}

214

```

215

216

**v1 Usage Examples:**

217

218

```java

219

import co.cask.cdap.etl.proto.v1.*;

220

221

// v1 configuration structure

222

co.cask.cdap.etl.proto.v1.ETLBatchConfig v1Config = loadV1Config();

223

224

// Access v1-specific structure

225

ETLStage sourceStage = v1Config.getSource();

226

List<ETLStage> sinkStages = v1Config.getSinks();

227

List<ETLStage> transformStages = v1Config.getTransforms();

228

229

// v1 stage with plugin structure

230

Plugin sourcePlugin = sourceStage.getPlugin();

231

String pluginName = sourcePlugin.getName();

232

Map<String, String> properties = sourcePlugin.getProperties();

233

234

// Upgrade v1 to v2

235

if (v1Config.canUpgrade()) {

236

co.cask.cdap.etl.proto.v2.ETLBatchConfig v2Config = v1Config.upgrade(upgradeContext);

237

238

// v2 uses unified stage structure

239

Set<co.cask.cdap.etl.proto.v2.ETLStage> v2Stages = v2Config.getStages();

240

Set<Connection> v2Connections = v2Config.getConnections();

241

}

242

```

243

244

### Version 0 (v0) Legacy Support

245

246

Support for the original v0 protocol format with basic stage structure and minimal connection support.

247

248

```java { .api }

249

/**

250

* ETL Configuration version 0 - original legacy format

251

*/

252

public abstract class ETLConfig extends Config {

253

/**

254

* Get source stage

255

* @return ETL source stage

256

*/

257

public ETLStage getSource();

258

259

/**

260

* Get sink stages

261

* @return immutable list of sink stages

262

*/

263

public List<ETLStage> getSinks();

264

265

/**

266

* Get transform stages

267

* @return immutable list of transform stages

268

*/

269

public List<ETLStage> getTransforms();

270

271

/**

272

* Get resource allocation

273

* @return resource configuration, defaults if null

274

*/

275

public Resources getResources();

276

}

277

278

/**

279

* ETL Batch Configuration version 0

280

*/

281

public final class ETLBatchConfig extends ETLConfig implements UpgradeableConfig<co.cask.cdap.etl.proto.v1.ETLBatchConfig> {

282

public ETLBatchConfig(String schedule, ETLStage source, List<ETLStage> sinks,

283

List<ETLStage> transforms, Resources resources, List<ETLStage> actions);

284

285

public List<ETLStage> getActions();

286

287

@Override

288

public boolean canUpgrade() { return true; }

289

290

@Override

291

public co.cask.cdap.etl.proto.v1.ETLBatchConfig upgrade(UpgradeContext upgradeContext);

292

}

293

294

/**

295

* ETL Stage version 0 - simple property-based structure

296

*/

297

public class ETLStage {

298

public ETLStage(String name, Map<String, String> properties, String errorDatasetName);

299

300

public String getName();

301

public String getErrorDatasetName();

302

public Map<String, String> getProperties();

303

304

co.cask.cdap.etl.proto.v1.ETLStage upgradeStage(String name, String pluginType, UpgradeContext upgradeContext);

305

}

306

```

307

308

**v0 Usage Examples:**

309

310

```java

311

import co.cask.cdap.etl.proto.v0.*;

312

313

// v0 configuration - simple property-based stages

314

ETLStage v0Source = new ETLStage(

315

"Table",

316

Map.of("name", "input_table", "schema.row.field", "id"),

317

null // no error dataset

318

);

319

320

ETLStage v0Transform = new ETLStage(

321

"JavaScript",

322

Map.of("script", "function transform(input, emitter, context) { emitter.emit(input); }"),

323

"error_dataset" // error dataset name

324

);

325

326

ETLStage v0Sink = new ETLStage(

327

"Table",

328

Map.of("name", "output_table"),

329

null

330

);

331

332

// v0 batch configuration

333

ETLBatchConfig v0BatchConfig = new ETLBatchConfig(

334

"0 0 2 * * ?", // schedule

335

v0Source,

336

List.of(v0Sink),

337

List.of(v0Transform),

338

new Resources(1024, 2),

339

List.of() // no actions

340

);

341

342

// Upgrade v0 to v1

343

co.cask.cdap.etl.proto.v1.ETLBatchConfig v1Config = v0BatchConfig.upgrade(upgradeContext);

344

345

// Note: v0 stages with error datasets will fail upgrade

346

// Error datasets were replaced by error collectors in later versions

347

```

348

349

### Migration Utilities and Best Practices

350

351

Utilities and patterns for safe configuration migration across versions.

352

353

```java { .api }

354

/**

355

* Migration utilities for handling version upgrades

356

*/

357

public class ConfigMigrationUtils {

358

/**

359

* Safely upgrade configuration with error handling

360

* @param config configuration to upgrade

361

* @param context upgrade context

362

* @return upgraded configuration or original if upgrade fails

363

*/

364

public static UpgradeableConfig<?> safeUpgrade(UpgradeableConfig<?> config, UpgradeContext context);

365

366

/**

367

* Validate configuration before and after upgrade

368

* @param original original configuration

369

* @param upgraded upgraded configuration

370

* @return validation results

371

*/

372

public static ValidationResult validateUpgrade(UpgradeableConfig<?> original, UpgradeableConfig<?> upgraded);

373

374

/**

375

* Create backup of configuration before upgrade

376

* @param config configuration to backup

377

* @return serialized configuration backup

378

*/

379

public static String backupConfiguration(UpgradeableConfig<?> config);

380

381

/**

382

* Restore configuration from backup if upgrade fails

383

* @param backup serialized configuration backup

384

* @return restored configuration

385

*/

386

public static UpgradeableConfig<?> restoreFromBackup(String backup);

387

}

388

```

389

390

**Migration Best Practices:**

391

392

```java

393

// Safe upgrade pattern with backup and validation

394

public co.cask.cdap.etl.proto.v2.ETLBatchConfig migrateConfigSafely(

395

UpgradeableConfig<?> legacyConfig, UpgradeContext context) {

396

397

// 1. Create backup

398

String backup = ConfigMigrationUtils.backupConfiguration(legacyConfig);

399

400

try {

401

// 2. Perform upgrade

402

UpgradeableConfig<?> current = legacyConfig;

403

while (current.canUpgrade()) {

404

UpgradeableConfig<?> next = current.upgrade(context);

405

406

// 3. Validate each upgrade step

407

ValidationResult result = ConfigMigrationUtils.validateUpgrade(current, next);

408

if (!result.isValid()) {

409

throw new ConfigUpgradeException("Upgrade validation failed: " + result.getErrors());

410

}

411

412

current = next;

413

}

414

415

// 4. Final validation

416

co.cask.cdap.etl.proto.v2.ETLBatchConfig finalConfig =

417

(co.cask.cdap.etl.proto.v2.ETLBatchConfig) current;

418

finalConfig.validate();

419

420

return finalConfig;

421

422

} catch (Exception e) {

423

// 5. Restore from backup on failure

424

logger.error("Configuration upgrade failed, restoring from backup", e);

425

UpgradeableConfig<?> restored = ConfigMigrationUtils.restoreFromBackup(backup);

426

throw new ConfigUpgradeException("Upgrade failed, configuration restored", e);

427

}

428

}

429

430

// Handle common upgrade issues

431

public void handleUpgradeIssues(UpgradeableConfig<?> config) {

432

try {

433

config.upgrade(upgradeContext);

434

} catch (IllegalStateException e) {

435

if (e.getMessage().contains("Error datasets")) {

436

// Handle error dataset migration

437

logger.warn("Configuration uses deprecated error datasets. Manual migration required.");

438

// Provide migration guidance or automatic conversion

439

} else {

440

throw e;

441

}

442

}

443

}

444

```