0
# Legacy Version Support
1
2
Backward compatibility support for v0 and v1 protocol versions with automatic upgrade mechanisms to current v2 format. This enables smooth migration from older CDAP versions while maintaining pipeline functionality and configuration integrity.
3
4
## Capabilities
5
6
### Upgrade Interface
7
8
Core upgrade interface enabling version-to-version configuration migration with contextual artifact resolution.
9
10
```java { .api }
11
/**
12
* Interface for configurations that can be upgraded to newer versions
13
* @param <T> the type of configuration this upgrades to
14
*/
15
public interface UpgradeableConfig<T extends UpgradeableConfig> {
16
/**
17
* Check if this configuration can be upgraded to a newer version
18
* @return true if upgrade is possible, false if already at latest version
19
*/
20
boolean canUpgrade();
21
22
/**
23
* Upgrade configuration to the next version
24
* This enables chain upgrading: v0 -> v1 -> v2
25
* @param upgradeContext context providing artifact resolution and upgrade utilities
26
* @return upgraded configuration of the next version
27
*/
28
T upgrade(UpgradeContext upgradeContext);
29
}
30
```
31
32
**Usage Examples:**
33
34
```java
35
import co.cask.cdap.etl.proto.*;
36
import co.cask.cdap.etl.proto.v0.*;
37
import co.cask.cdap.etl.proto.v1.*;
38
import co.cask.cdap.etl.proto.v2.*;
39
40
// Upgrade chain example: v0 -> v1 -> v2
41
co.cask.cdap.etl.proto.v0.ETLBatchConfig v0Config = loadLegacyConfig();
42
43
// Check if upgrade is needed
44
if (v0Config.canUpgrade()) {
45
// Upgrade v0 to v1
46
co.cask.cdap.etl.proto.v1.ETLBatchConfig v1Config = v0Config.upgrade(upgradeContext);
47
48
// Continue upgrade to v2 if needed
49
if (v1Config.canUpgrade()) {
50
co.cask.cdap.etl.proto.v2.ETLBatchConfig v2Config = v1Config.upgrade(upgradeContext);
51
52
// v2Config is now ready for use with current CDAP version
53
v2Config.validate();
54
}
55
}
56
57
// Automatic chain upgrade utility
58
public static co.cask.cdap.etl.proto.v2.ETLBatchConfig upgradeToLatest(
59
UpgradeableConfig<?> config, UpgradeContext context) {
60
61
UpgradeableConfig<?> current = config;
62
while (current.canUpgrade()) {
63
current = current.upgrade(context);
64
}
65
return (co.cask.cdap.etl.proto.v2.ETLBatchConfig) current;
66
}
67
```
68
69
### Upgrade Context
70
71
Context interface providing artifact resolution and upgrade utilities during configuration migration.
72
73
```java { .api }
74
/**
75
* Context for upgrading configurations, providing artifact resolution services
76
*/
77
public interface UpgradeContext {
78
/**
79
* Get artifact information for a plugin type and name
80
* Used during upgrade to resolve plugin artifacts from older versions
81
* @param pluginType the plugin type (e.g., "batchsource", "transform", "batchsink")
82
* @param pluginName the plugin name
83
* @return artifact selector configuration for the plugin, null if not found
84
*/
85
ArtifactSelectorConfig getPluginArtifact(String pluginType, String pluginName);
86
}
87
```
88
89
**Implementation Examples:**
90
91
```java
92
import co.cask.cdap.etl.proto.UpgradeContext;
93
import co.cask.cdap.etl.proto.ArtifactSelectorConfig;
94
95
// Custom upgrade context for development/testing
96
public class DevelopmentUpgradeContext implements UpgradeContext {
97
private final Map<String, Map<String, ArtifactSelectorConfig>> pluginArtifacts;
98
99
public DevelopmentUpgradeContext() {
100
this.pluginArtifacts = loadPluginArtifactMappings();
101
}
102
103
@Override
104
public ArtifactSelectorConfig getPluginArtifact(String pluginType, String pluginName) {
105
return pluginArtifacts
106
.getOrDefault(pluginType, Collections.emptyMap())
107
.get(pluginName);
108
}
109
110
private Map<String, Map<String, ArtifactSelectorConfig>> loadPluginArtifactMappings() {
111
// Load from configuration file or database
112
return Map.of(
113
"batchsource", Map.of(
114
"Table", new ArtifactSelectorConfig("SYSTEM", "core-plugins", "2.8.0"),
115
"File", new ArtifactSelectorConfig("SYSTEM", "core-plugins", "2.8.0")
116
),
117
"transform", Map.of(
118
"JavaScript", new ArtifactSelectorConfig("SYSTEM", "core-plugins", "2.8.0"),
119
"Python", new ArtifactSelectorConfig("SYSTEM", "hydrator-plugins", "2.8.0")
120
)
121
);
122
}
123
}
124
125
// Production upgrade context with artifact service integration
126
public class ProductionUpgradeContext implements UpgradeContext {
127
private final ArtifactService artifactService;
128
129
public ProductionUpgradeContext(ArtifactService artifactService) {
130
this.artifactService = artifactService;
131
}
132
133
@Override
134
public ArtifactSelectorConfig getPluginArtifact(String pluginType, String pluginName) {
135
try {
136
return artifactService.resolvePluginArtifact(pluginType, pluginName);
137
} catch (ArtifactNotFoundException e) {
138
logger.warn("Could not resolve artifact for plugin {}:{}", pluginType, pluginName);
139
return null;
140
}
141
}
142
}
143
```
144
145
### Version 1 (v1) Legacy Support
146
147
Support for v1 protocol format featuring separated source/sink/transform structure with basic connection support.
148
149
```java { .api }
150
/**
151
* ETL Configuration version 1 - legacy format with separated stage types
152
*/
153
public class ETLConfig extends Config {
154
/**
155
* Get source stage configuration
156
* @return ETL source stage
157
*/
158
public ETLStage getSource();
159
160
/**
161
* Get sink stage configurations
162
* @return list of ETL sink stages
163
*/
164
public List<ETLStage> getSinks();
165
166
/**
167
* Get transform stage configurations
168
* @return list of ETL transform stages
169
*/
170
public List<ETLStage> getTransforms();
171
172
/**
173
* Get stage connections
174
* @return list of connections between stages
175
*/
176
public List<Connection> getConnections();
177
178
/**
179
* Get resource allocation
180
* @return resource configuration
181
*/
182
public Resources getResources();
183
184
/**
185
* Check if stage logging is enabled
186
* @return true if stage logging enabled
187
*/
188
public Boolean isStageLoggingEnabled();
189
190
/**
191
* Get compatible configuration (internal conversion method)
192
* @return compatible configuration format
193
*/
194
public ETLConfig getCompatibleConfig();
195
}
196
197
/**
198
* ETL Batch Configuration version 1
199
*/
200
public final class ETLBatchConfig extends ETLConfig implements UpgradeableConfig<co.cask.cdap.etl.proto.v2.ETLBatchConfig> {
201
public enum Engine { MAPREDUCE, SPARK }
202
203
public Engine getEngine();
204
public String getSchedule();
205
public List<ETLStage> getActions();
206
public Resources getDriverResources();
207
208
@Override
209
public boolean canUpgrade() { return true; }
210
211
@Override
212
public co.cask.cdap.etl.proto.v2.ETLBatchConfig upgrade(UpgradeContext upgradeContext);
213
}
214
```
215
216
**v1 Usage Examples:**
217
218
```java
219
import co.cask.cdap.etl.proto.v1.*;
220
221
// v1 configuration structure
222
co.cask.cdap.etl.proto.v1.ETLBatchConfig v1Config = loadV1Config();
223
224
// Access v1-specific structure
225
ETLStage sourceStage = v1Config.getSource();
226
List<ETLStage> sinkStages = v1Config.getSinks();
227
List<ETLStage> transformStages = v1Config.getTransforms();
228
229
// v1 stage with plugin structure
230
Plugin sourcePlugin = sourceStage.getPlugin();
231
String pluginName = sourcePlugin.getName();
232
Map<String, String> properties = sourcePlugin.getProperties();
233
234
// Upgrade v1 to v2
235
if (v1Config.canUpgrade()) {
236
co.cask.cdap.etl.proto.v2.ETLBatchConfig v2Config = v1Config.upgrade(upgradeContext);
237
238
// v2 uses unified stage structure
239
Set<co.cask.cdap.etl.proto.v2.ETLStage> v2Stages = v2Config.getStages();
240
Set<Connection> v2Connections = v2Config.getConnections();
241
}
242
```
243
244
### Version 0 (v0) Legacy Support
245
246
Support for the original v0 protocol format with basic stage structure and minimal connection support.
247
248
```java { .api }
249
/**
250
* ETL Configuration version 0 - original legacy format
251
*/
252
public abstract class ETLConfig extends Config {
253
/**
254
* Get source stage
255
* @return ETL source stage
256
*/
257
public ETLStage getSource();
258
259
/**
260
* Get sink stages
261
* @return immutable list of sink stages
262
*/
263
public List<ETLStage> getSinks();
264
265
/**
266
* Get transform stages
267
* @return immutable list of transform stages
268
*/
269
public List<ETLStage> getTransforms();
270
271
/**
272
* Get resource allocation
273
* @return resource configuration, defaults if null
274
*/
275
public Resources getResources();
276
}
277
278
/**
279
* ETL Batch Configuration version 0
280
*/
281
public final class ETLBatchConfig extends ETLConfig implements UpgradeableConfig<co.cask.cdap.etl.proto.v1.ETLBatchConfig> {
282
public ETLBatchConfig(String schedule, ETLStage source, List<ETLStage> sinks,
283
List<ETLStage> transforms, Resources resources, List<ETLStage> actions);
284
285
public List<ETLStage> getActions();
286
287
@Override
288
public boolean canUpgrade() { return true; }
289
290
@Override
291
public co.cask.cdap.etl.proto.v1.ETLBatchConfig upgrade(UpgradeContext upgradeContext);
292
}
293
294
/**
295
* ETL Stage version 0 - simple property-based structure
296
*/
297
public class ETLStage {
298
public ETLStage(String name, Map<String, String> properties, String errorDatasetName);
299
300
public String getName();
301
public String getErrorDatasetName();
302
public Map<String, String> getProperties();
303
304
co.cask.cdap.etl.proto.v1.ETLStage upgradeStage(String name, String pluginType, UpgradeContext upgradeContext);
305
}
306
```
307
308
**v0 Usage Examples:**
309
310
```java
311
import co.cask.cdap.etl.proto.v0.*;
312
313
// v0 configuration - simple property-based stages
314
ETLStage v0Source = new ETLStage(
315
"Table",
316
Map.of("name", "input_table", "schema.row.field", "id"),
317
null // no error dataset
318
);
319
320
ETLStage v0Transform = new ETLStage(
321
"JavaScript",
322
Map.of("script", "function transform(input, emitter, context) { emitter.emit(input); }"),
323
"error_dataset" // error dataset name
324
);
325
326
ETLStage v0Sink = new ETLStage(
327
"Table",
328
Map.of("name", "output_table"),
329
null
330
);
331
332
// v0 batch configuration
333
ETLBatchConfig v0BatchConfig = new ETLBatchConfig(
334
"0 0 2 * * ?", // schedule
335
v0Source,
336
List.of(v0Sink),
337
List.of(v0Transform),
338
new Resources(1024, 2),
339
List.of() // no actions
340
);
341
342
// Upgrade v0 to v1
343
co.cask.cdap.etl.proto.v1.ETLBatchConfig v1Config = v0BatchConfig.upgrade(upgradeContext);
344
345
// Note: v0 stages with error datasets will fail upgrade
346
// Error datasets were replaced by error collectors in later versions
347
```
348
349
### Migration Utilities and Best Practices
350
351
Utilities and patterns for safe configuration migration across versions.
352
353
```java { .api }
354
/**
355
* Migration utilities for handling version upgrades
356
*/
357
public class ConfigMigrationUtils {
358
/**
359
* Safely upgrade configuration with error handling
360
* @param config configuration to upgrade
361
* @param context upgrade context
362
* @return upgraded configuration or original if upgrade fails
363
*/
364
public static UpgradeableConfig<?> safeUpgrade(UpgradeableConfig<?> config, UpgradeContext context);
365
366
/**
367
* Validate configuration before and after upgrade
368
* @param original original configuration
369
* @param upgraded upgraded configuration
370
* @return validation results
371
*/
372
public static ValidationResult validateUpgrade(UpgradeableConfig<?> original, UpgradeableConfig<?> upgraded);
373
374
/**
375
* Create backup of configuration before upgrade
376
* @param config configuration to backup
377
* @return serialized configuration backup
378
*/
379
public static String backupConfiguration(UpgradeableConfig<?> config);
380
381
/**
382
* Restore configuration from backup if upgrade fails
383
* @param backup serialized configuration backup
384
* @return restored configuration
385
*/
386
public static UpgradeableConfig<?> restoreFromBackup(String backup);
387
}
388
```
389
390
**Migration Best Practices:**
391
392
```java
393
// Safe upgrade pattern with backup and validation
394
public co.cask.cdap.etl.proto.v2.ETLBatchConfig migrateConfigSafely(
395
UpgradeableConfig<?> legacyConfig, UpgradeContext context) {
396
397
// 1. Create backup
398
String backup = ConfigMigrationUtils.backupConfiguration(legacyConfig);
399
400
try {
401
// 2. Perform upgrade
402
UpgradeableConfig<?> current = legacyConfig;
403
while (current.canUpgrade()) {
404
UpgradeableConfig<?> next = current.upgrade(context);
405
406
// 3. Validate each upgrade step
407
ValidationResult result = ConfigMigrationUtils.validateUpgrade(current, next);
408
if (!result.isValid()) {
409
throw new ConfigUpgradeException("Upgrade validation failed: " + result.getErrors());
410
}
411
412
current = next;
413
}
414
415
// 4. Final validation
416
co.cask.cdap.etl.proto.v2.ETLBatchConfig finalConfig =
417
(co.cask.cdap.etl.proto.v2.ETLBatchConfig) current;
418
finalConfig.validate();
419
420
return finalConfig;
421
422
} catch (Exception e) {
423
// 5. Restore from backup on failure
424
logger.error("Configuration upgrade failed, restoring from backup", e);
425
UpgradeableConfig<?> restored = ConfigMigrationUtils.restoreFromBackup(backup);
426
throw new ConfigUpgradeException("Upgrade failed, configuration restored", e);
427
}
428
}
429
430
// Handle common upgrade issues
431
public void handleUpgradeIssues(UpgradeableConfig<?> config) {
432
try {
433
config.upgrade(upgradeContext);
434
} catch (IllegalStateException e) {
435
if (e.getMessage().contains("Error datasets")) {
436
// Handle error dataset migration
437
logger.warn("Configuration uses deprecated error datasets. Manual migration required.");
438
// Provide migration guidance or automatic conversion
439
} else {
440
throw e;
441
}
442
}
443
}
444
```