0
# Execution Contexts
1
2
Execution contexts that provide access to CDAP services, metadata, and configuration within Spark applications, enabling seamless integration with the broader CDAP ecosystem including datasets, messaging, and service discovery.
3
4
## Capabilities
5
6
### Spark Runtime Context
7
8
Java-based runtime context that provides access to core CDAP services and configuration for Spark program execution.
9
10
```java { .api }
11
/**
12
* Runtime context for Spark program execution within CDAP
13
* Provides access to CDAP services, configuration, and program metadata
14
*/
15
public class SparkRuntimeContext extends AbstractContext {
16
/**
17
* Gets the Spark program specification
18
* @return SparkSpecification containing program metadata
19
*/
20
public SparkSpecification getSpecification();
21
22
/**
23
* Gets the logical start time of the program
24
* @return Start time in milliseconds since epoch
25
*/
26
public long getLogicalStartTime();
27
28
/**
29
* Gets runtime arguments provided to the program
30
* @return Map of argument key-value pairs
31
*/
32
public Map<String, String> getRuntimeArguments();
33
34
/**
35
* Gets the service discovery client for finding services
36
* @return DiscoveryServiceClient for service discovery
37
*/
38
public DiscoveryServiceClient getDiscoveryServiceClient();
39
40
/**
41
* Gets the location factory for accessing file systems
42
* @return LocationFactory for creating file system locations
43
*/
44
public LocationFactory getLocationFactory();
45
46
/**
47
* Gets the dataset framework for dataset operations
48
* @return DatasetFramework for dataset access
49
*/
50
public DatasetFramework getDatasetFramework();
51
52
/**
53
* Gets the admin interface for CDAP operations
54
* @return Admin interface for administrative operations
55
*/
56
public Admin getAdmin();
57
58
/**
59
* Gets the messaging context for pub-sub operations
60
* @return MessagingContext for messaging operations
61
*/
62
public MessagingContext getMessagingContext();
63
}
64
```
65
66
### Default Spark Execution Context (Scala)
67
68
Scala-based execution context implementation that provides typed access to CDAP services with functional programming patterns.
69
70
```scala { .api }
71
/**
72
* Default implementation of Spark execution context for CDAP applications
73
* Provides access to CDAP services with Scala-friendly interfaces
74
*/
75
class DefaultSparkExecutionContext(runtimeContext: SparkRuntimeContext) extends SparkExecutionContext {
76
/**
77
* Gets the Spark program specification
78
* @return SparkSpecification containing program metadata
79
*/
80
def getSpecification: SparkSpecification
81
82
/**
83
* Gets the logical start time of the program
84
* @return Start time in milliseconds since epoch
85
*/
86
def getLogicalStartTime: Long
87
88
/**
89
* Gets runtime arguments as a Scala map
90
* @return Map of argument key-value pairs
91
*/
92
def getRuntimeArguments: Map[String, String]
93
94
/**
95
* Gets the admin interface for CDAP operations
96
* @return Admin interface for administrative operations
97
*/
98
def getAdmin: Admin
99
100
/**
101
* Gets the dataset framework for dataset operations
102
* @return DatasetFramework for dataset access
103
*/
104
def getDatasetFramework: DatasetFramework
105
106
/**
107
* Gets the messaging context for pub-sub operations
108
* @return MessagingContext for messaging operations
109
*/
110
def getMessagingContext: MessagingContext
111
112
/**
113
* Gets the location factory for accessing file systems
114
* @return LocationFactory for creating file system locations
115
*/
116
def getLocationFactory: LocationFactory
117
118
/**
119
* Gets the service discovery client
120
* @return DiscoveryServiceClient for service discovery
121
*/
122
def getDiscoveryServiceClient: DiscoveryServiceClient
123
}
124
```
125
126
### Abstract Context Base
127
128
Base class providing common functionality for all CDAP program contexts.
129
130
```java { .api }
131
/**
132
* Abstract base class for CDAP program contexts
133
* Provides common functionality and service access patterns
134
*/
135
public abstract class AbstractContext {
136
/**
137
* Gets the program run ID
138
* @return ProgramRunId identifying this program run
139
*/
140
public ProgramRunId getProgramRunId();
141
142
/**
143
* Gets the namespace of the program
144
* @return Namespace name
145
*/
146
public String getNamespace();
147
148
/**
149
* Gets the application name
150
* @return Application name
151
*/
152
public String getApplicationName();
153
154
/**
155
* Gets the program name
156
* @return Program name
157
*/
158
public String getProgramName();
159
160
/**
161
* Gets the run ID
162
* @return Run ID string
163
*/
164
public String getRunId();
165
166
/**
167
* Gets the CDAP configuration
168
* @return CConfiguration containing CDAP settings
169
*/
170
protected CConfiguration getCConfiguration();
171
172
/**
173
* Gets the Hadoop configuration
174
* @return Configuration containing Hadoop settings
175
*/
176
protected Configuration getConfiguration();
177
}
178
```
179
180
## Usage Examples
181
182
**Java Runtime Context Usage:**
183
184
```java
185
import co.cask.cdap.app.runtime.spark.SparkRuntimeContext;
186
import co.cask.cdap.data2.dataset2.DatasetFramework;
187
import co.cask.cdap.proto.id.DatasetId;
188
189
// Access dataset framework
190
SparkRuntimeContext context = // ... obtained from runtime
191
DatasetFramework datasetFramework = context.getDatasetFramework();
192
193
// Access a dataset
194
DatasetId datasetId = new DatasetId("namespace", "my-dataset");
195
Dataset dataset = datasetFramework.getDataset(datasetId, Collections.emptyMap(), null);
196
197
// Get runtime arguments
198
Map<String, String> args = context.getRuntimeArguments();
199
String configValue = args.get("config.key");
200
201
// Access location factory for file operations
202
LocationFactory locationFactory = context.getLocationFactory();
203
Location fileLocation = locationFactory.create("/path/to/file");
204
```
205
206
**Scala Execution Context Usage:**
207
208
```scala
209
import co.cask.cdap.app.runtime.spark.DefaultSparkExecutionContext
210
import co.cask.cdap.api.data.DatasetContext
211
212
// Create execution context
213
val sparkContext = new DefaultSparkExecutionContext(runtimeContext)
214
215
// Access program metadata
216
val spec = sparkContext.getSpecification
217
val startTime = sparkContext.getLogicalStartTime
218
val args = sparkContext.getRuntimeArguments
219
220
// Access CDAP services
221
val admin = sparkContext.getAdmin
222
val datasetFramework = sparkContext.getDatasetFramework
223
val messagingContext = sparkContext.getMessagingContext
224
225
// Use functional patterns with runtime arguments
226
val configPrefix = "spark.config."
227
val sparkConfigs = args.filter(_._1.startsWith(configPrefix))
228
.map { case (key, value) => key.substring(configPrefix.length) -> value }
229
```
230
231
**Service Discovery Usage:**
232
233
```java
234
import co.cask.cdap.common.discovery.DiscoveryServiceClient;
235
import org.apache.twill.discovery.Discoverable;
236
import org.apache.twill.discovery.ServiceDiscovered;
237
238
// Get service discovery client
239
DiscoveryServiceClient discoveryClient = context.getDiscoveryServiceClient();
240
241
// Discover a service
242
ServiceDiscovered serviceDiscovered = discoveryClient.discover("my-service");
243
Iterable<Discoverable> discoverables = serviceDiscovered.discover();
244
245
// Use the discovered service
246
for (Discoverable discoverable : discoverables) {
247
InetSocketAddress address = discoverable.getSocketAddress();
248
// Connect to service at address
249
}
250
```
251
252
## Types
253
254
```java { .api }
255
/**
256
* Specification for Spark programs containing metadata and resource requirements
257
*/
258
public class SparkSpecification {
259
/**
260
* Gets the program name
261
* @return Name of the Spark program
262
*/
263
public String getName();
264
265
/**
266
* Gets the program description
267
* @return Description of the Spark program
268
*/
269
public String getDescription();
270
271
/**
272
* Gets the main class name
273
* @return Fully qualified main class name
274
*/
275
public String getMainClassName();
276
277
/**
278
* Gets the driver resource requirements
279
* @return Resources required for the Spark driver
280
*/
281
public Resources getDriverResources();
282
283
/**
284
* Gets the executor resource requirements
285
* @return Resources required for Spark executors
286
*/
287
public Resources getExecutorResources();
288
289
/**
290
* Gets the client resource requirements
291
* @return Resources required for the Spark client
292
*/
293
public Resources getClientResources();
294
295
/**
296
* Gets additional properties
297
* @return Map of additional program properties
298
*/
299
public Map<String, String> getProperties();
300
}
301
302
/**
303
* Resource specification for program components
304
*/
305
public class Resources {
306
/**
307
* Gets the memory requirement in MB
308
* @return Memory in megabytes
309
*/
310
public int getMemoryMB();
311
312
/**
313
* Gets the virtual core requirement
314
* @return Number of virtual cores
315
*/
316
public int getVirtualCores();
317
}
318
319
/**
320
* Interface for accessing CDAP administrative operations
321
*/
322
public interface Admin {
323
/**
324
* Checks if a dataset exists
325
* @param datasetInstanceId Dataset identifier
326
* @return true if dataset exists
327
*/
328
boolean datasetExists(DatasetId datasetInstanceId) throws DatasetManagementException;
329
330
/**
331
* Gets dataset properties
332
* @param datasetInstanceId Dataset identifier
333
* @return Dataset properties
334
*/
335
DatasetProperties getDatasetProperties(DatasetId datasetInstanceId) throws DatasetManagementException;
336
}
337
338
/**
339
* Interface for dataset framework operations
340
*/
341
public interface DatasetFramework {
342
/**
343
* Gets a dataset instance
344
* @param datasetInstanceId Dataset identifier
345
* @param arguments Dataset arguments
346
* @param classLoader Class loader for dataset classes
347
* @return Dataset instance
348
*/
349
<T extends Dataset> T getDataset(DatasetId datasetInstanceId,
350
Map<String, String> arguments,
351
ClassLoader classLoader) throws DatasetInstantiationException;
352
}
353
354
/**
355
* Interface for messaging operations
356
*/
357
public interface MessagingContext {
358
/**
359
* Gets a message publisher for a topic
360
* @param topicId Topic identifier
361
* @return MessagePublisher for publishing messages
362
*/
363
MessagePublisher getMessagePublisher(TopicId topicId);
364
365
/**
366
* Gets a message fetcher for a topic
367
* @param topicId Topic identifier
368
* @return MessageFetcher for fetching messages
369
*/
370
MessageFetcher getMessageFetcher(TopicId topicId);
371
}
372
```
373
374
```scala { .api }
375
/**
376
* Trait defining the Spark execution context interface
377
*/
378
trait SparkExecutionContext {
379
/**
380
* Gets the Spark program specification
381
* @return SparkSpecification containing program metadata
382
*/
383
def getSpecification: SparkSpecification
384
385
/**
386
* Gets the logical start time of the program
387
* @return Start time in milliseconds since epoch
388
*/
389
def getLogicalStartTime: Long
390
391
/**
392
* Gets runtime arguments as a Scala map
393
* @return Map of argument key-value pairs
394
*/
395
def getRuntimeArguments: Map[String, String]
396
397
/**
398
* Gets the admin interface for CDAP operations
399
* @return Admin interface for administrative operations
400
*/
401
def getAdmin: Admin
402
403
/**
404
* Gets the dataset framework for dataset operations
405
* @return DatasetFramework for dataset access
406
*/
407
def getDatasetFramework: DatasetFramework
408
409
/**
410
* Gets the messaging context for pub-sub operations
411
* @return MessagingContext for messaging operations
412
*/
413
def getMessagingContext: MessagingContext
414
}
415
```