Core Spark 1.x integration component for the Cask Data Application Platform providing runtime services and execution context for CDAP applications
npx @tessl/cli install tessl/maven-co-cask-cdap--cdap-spark-core@5.1.00
# CDAP Spark Core
1
2
CDAP Spark Core is a Java/Scala library that provides Apache Spark 1.x integration capabilities for the Cask Data Application Platform (CDAP). It serves as a runtime provider and execution context for Spark-based applications within the CDAP framework, offering abstractions for data processing, service contexts, and distributed execution while maintaining integration with Hadoop ecosystem components.
3
4
## Package Information
5
6
- **Package Name**: cdap-spark-core
7
- **Package Type**: maven
8
- **Group ID**: co.cask.cdap
9
- **Language**: Java/Scala
10
- **Installation**: Add to your Maven pom.xml:
11
12
```xml
13
<dependency>
14
<groupId>co.cask.cdap</groupId>
15
<artifactId>cdap-spark-core</artifactId>
16
<version>5.1.2</version>
17
</dependency>
18
```
19
20
## Core Imports
21
22
```java
23
import co.cask.cdap.app.runtime.spark.DefaultSparkExecutionContext;
24
import co.cask.cdap.app.runtime.spark.Spark1ProgramRuntimeProvider;
25
import co.cask.cdap.app.runtime.spark.SparkRuntimeContext;
26
import co.cask.cdap.app.runtime.spark.service.SparkHttpServiceServer;
27
```
28
29
For Scala:
30
31
```scala
32
import co.cask.cdap.app.runtime.spark.DefaultSparkExecutionContext
33
import co.cask.cdap.app.runtime.spark.service.DefaultSparkHttpServiceContext
34
import co.cask.cdap.app.runtime.spark.dynamic.{DefaultSparkCompiler, DefaultSparkInterpreter}
35
```
36
37
## Basic Usage
38
39
```java
40
// Create Spark runtime provider
41
Spark1ProgramRuntimeProvider provider = new Spark1ProgramRuntimeProvider();
42
43
// SparkRuntimeContext is created internally by CDAP with all required dependencies
44
// Access CDAP services within Spark programs through the execution context
45
SparkRuntimeContext runtimeContext = sparkClassLoader.getRuntimeContext();
46
TransactionSystemClient txClient = runtimeContext.getTransactionSystemClient();
47
DatasetFramework datasetFramework = runtimeContext.getDatasetFramework();
48
AuthorizationEnforcer authEnforcer = runtimeContext.getAuthorizationEnforcer();
49
```
50
51
For Scala execution context:
52
53
```scala
54
// DefaultSparkExecutionContext is created with SparkClassLoader and localized resources
55
val sparkContext = new DefaultSparkExecutionContext(sparkClassLoader, localizeResources)
56
57
// Access runtime information
58
val spec = sparkContext.getSpecification
59
val startTime = sparkContext.getLogicalStartTime
60
val args = sparkContext.getRuntimeArguments
61
val admin = sparkContext.getAdmin
62
val messaging = sparkContext.getMessagingContext
63
```
64
65
## Architecture
66
67
CDAP Spark Core is built around several key components:
68
69
- **Runtime Providers**: Service providers that integrate Spark 1.x execution with CDAP program lifecycle
70
- **Execution Contexts**: Abstractions providing access to CDAP services and metadata within Spark applications
71
- **Data Integration**: SQL data sources and scanning utilities for accessing CDAP datasets and streams
72
- **Service Framework**: HTTP service contexts for running web services alongside Spark applications
73
- **Dynamic Compilation**: Scala compiler and interpreter integration for runtime code generation
74
- **Distributed Execution**: Twill-based distributed application support with proper resource management
75
- **Class Loading**: Custom class loaders providing proper isolation and dependency management
76
77
## Capabilities
78
79
### Runtime Providers and Program Execution
80
81
Core runtime integration that enables Spark 1.x programs to run within the CDAP platform with full lifecycle management and resource allocation.
82
83
```java { .api }
84
@ProgramRuntimeProvider.SupportedProgramType(ProgramType.SPARK)
85
public class Spark1ProgramRuntimeProvider extends SparkProgramRuntimeProvider {
86
public Spark1ProgramRuntimeProvider();
87
}
88
89
public abstract class SparkProgramRuntimeProvider implements ProgramRuntimeProvider {
90
public ProgramRunner createProgramRunner(ProgramType type, Mode mode, Injector injector);
91
public boolean isSupported(ProgramType programType, CConfiguration cConf);
92
}
93
```
94
95
[Runtime Providers](./runtime-providers.md)
96
97
### Spark Execution Contexts
98
99
Execution contexts that provide access to CDAP services, metadata, and configuration within Spark applications, enabling seamless integration with the broader CDAP ecosystem.
100
101
```java { .api }
102
public final class SparkRuntimeContext extends AbstractContext implements Metrics {
103
SparkRuntimeContext(Configuration hConf, Program program, ProgramOptions programOptions,
104
CConfiguration cConf, String hostname, TransactionSystemClient txClient,
105
DatasetFramework datasetFramework, DiscoveryServiceClient discoveryServiceClient,
106
MetricsCollectionService metricsCollectionService, StreamAdmin streamAdmin,
107
WorkflowProgramInfo workflowProgramInfo, PluginInstantiator pluginInstantiator,
108
SecureStore secureStore, SecureStoreManager secureStoreManager,
109
AuthorizationEnforcer authorizationEnforcer, AuthenticationContext authenticationContext,
110
MessagingService messagingService, ServiceAnnouncer serviceAnnouncer,
111
PluginFinder pluginFinder, LocationFactory locationFactory,
112
MetadataReader metadataReader, MetadataPublisher metadataPublisher);
113
114
public SparkSpecification getSpecification();
115
public long getLogicalStartTime();
116
public Map<String, String> getRuntimeArguments();
117
public DiscoveryServiceClient getDiscoveryServiceClient();
118
public LocationFactory getLocationFactory();
119
public TransactionSystemClient getTransactionSystemClient();
120
public AuthorizationEnforcer getAuthorizationEnforcer();
121
public AuthenticationContext getAuthenticationContext();
122
public String getHostname();
123
public Configuration getHConf();
124
public CConfiguration getCConf();
125
}
126
```
127
128
```scala { .api }
129
class DefaultSparkExecutionContext(sparkClassLoader: SparkClassLoader, localizeResources: util.Map[String, File])
130
extends AbstractSparkExecutionContext(sparkClassLoader, localizeResources) {
131
132
protected def saveAsNewAPIHadoopDataset[K: ClassManifest, V: ClassManifest](sc: SparkContext,
133
conf: Configuration,
134
rdd: RDD[(K, V)]): Unit
135
136
protected def createInterpreter(settings: Settings, classDir: File,
137
urlAdder: URLAdder, onClose: () => Unit): SparkInterpreter
138
139
protected def createSparkMetricsWriterFactory(): (TaskContext) => SparkMetricsWriter
140
}
141
142
abstract class AbstractSparkExecutionContext(sparkClassLoader: SparkClassLoader,
143
localizeResources: util.Map[String, File])
144
extends SparkExecutionContext with AutoCloseable {
145
146
def getSpecification: SparkSpecification
147
def getLogicalStartTime: Long
148
def getRuntimeArguments: Map[String, String]
149
def getAdmin: Admin
150
def getDatasetFramework: DatasetFramework
151
def getMessagingContext: MessagingContext
152
def getTransactionSystemClient: TransactionSystemClient
153
}
154
```
155
156
[Execution Contexts](./execution-contexts.md)
157
158
### Data Processing and SQL Integration
159
160
Data processing utilities and Spark SQL data sources that enable efficient access to CDAP datasets and streams with full type safety and performance optimization.
161
162
```scala { .api }
163
class DatumScannerIterator[T](scanner: Scanner[T]) extends Iterator[T] {
164
def hasNext: Boolean
165
def next(): T
166
def close(): Unit
167
}
168
169
object DatasetRelationProvider extends RelationProvider {
170
def shortName(): String
171
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
172
}
173
```
174
175
[Data Processing](./data-processing.md)
176
177
### HTTP Service Framework
178
179
HTTP service framework that allows Spark applications to expose web endpoints and REST APIs while maintaining full integration with CDAP's service discovery and security model.
180
181
```java { .api }
182
public class SparkHttpServiceServer {
183
public void startAndWait() throws Exception;
184
public void stopAndWait() throws Exception;
185
public InetSocketAddress getBindAddress();
186
}
187
```
188
189
```scala { .api }
190
class DefaultSparkHttpServiceContext extends SparkHttpServiceContext {
191
def getSpecification: SparkSpecification
192
def getInstanceId: Int
193
def getInstanceCount: Int
194
def getLogicalStartTime: Long
195
}
196
```
197
198
[HTTP Services](./http-services.md)
199
200
### Dynamic Compilation and Interpretation
201
202
Dynamic Scala compilation and interpretation capabilities that enable runtime code generation, interactive development, and flexible application behavior modification.
203
204
```scala { .api }
205
class DefaultSparkCompiler extends SparkCompiler {
206
def compile(code: String): Option[Class[_]]
207
def compileClass(className: String, code: String): Option[Class[_]]
208
}
209
210
class DefaultSparkInterpreter extends SparkInterpreter {
211
def interpret(code: String): Unit
212
def bind(name: String, value: Any): Unit
213
def reset(): Unit
214
}
215
```
216
217
[Dynamic Compilation](./dynamic-compilation.md)
218
219
### Transaction Management
220
221
Transaction handling capabilities that enable ACID properties for Spark operations within the CDAP platform, providing consistent data access across distributed Spark executors.
222
223
```java { .api }
224
public class SparkTransactionHandler {
225
public void jobStarted(Integer jobId, Set<Integer> stageIds);
226
public void jobStarted(Integer jobId, Set<Integer> stageIds, SparkTransactional.TransactionInfo txInfo);
227
public void jobEnded(Integer jobId, boolean succeeded);
228
public void stageSubmitted(Integer stageId);
229
public void stageCompleted(Integer stageId);
230
}
231
232
public class SparkTransactional {
233
public TransactionInfo getTransactionInfo(String key);
234
public void execute(TransactionType type, TxRunnable runnable) throws TransactionFailureException;
235
public <T> T execute(TransactionType type, TxCallable<T> callable) throws TransactionFailureException;
236
}
237
```
238
239
[Transaction Management](./transaction-management.md)
240
241
### Class Loading and Runtime Support
242
243
Custom class loading infrastructure that provides proper isolation and dependency management for Spark applications within the CDAP runtime environment.
244
245
```java { .api }
246
public class SparkClassLoader extends URLClassLoader {
247
public SparkRuntimeContext getRuntimeContext();
248
public void addURL(URL url);
249
public Class<?> loadClass(String name) throws ClassNotFoundException;
250
}
251
252
public class SparkRunnerClassLoader extends FilterClassLoader {
253
public static SparkRunnerClassLoader create();
254
protected boolean includePackage(String packageName);
255
}
256
```
257
258
### Distributed Execution
259
260
Distributed execution framework built on Apache Twill that provides scalable, fault-tolerant Spark application deployment across YARN clusters with proper resource management.
261
262
```java { .api }
263
public class SparkExecutionService {
264
public ListenableFuture<ProgramController> submit(ProgramRunId programRunId, ProgramOptions programOptions);
265
public void stop();
266
}
267
268
public class SparkTwillRunnable implements TwillRunnable {
269
public void run();
270
public void stop();
271
public void handleCommand(Command command) throws Exception;
272
}
273
```
274
275
[Distributed Execution](./distributed-execution.md)
276
277
## Types
278
279
```java { .api }
280
// Core runtime types
281
interface ProgramRuntimeProvider {
282
ProgramController createProgramController(ProgramRunId programRunId, ProgramOptions programOptions);
283
Runnable createRunnable(ProgramRunId programRunId, ProgramOptions programOptions, InetAddress hostname);
284
}
285
286
interface ProgramController {
287
ListenableFuture<ProgramController> command(String command, Object... args);
288
ListenableFuture<ProgramController> stop();
289
State getState();
290
}
291
292
// Spark specification and context types
293
class SparkSpecification {
294
String getName();
295
String getDescription();
296
String getMainClassName();
297
Resources getDriverResources();
298
Resources getExecutorResources();
299
}
300
301
class Resources {
302
int getMemoryMB();
303
int getVirtualCores();
304
}
305
306
// Program execution metadata
307
class ProgramRunId {
308
String getNamespace();
309
String getApplication();
310
ProgramType getType();
311
String getProgram();
312
String getRun();
313
}
314
315
class ProgramOptions {
316
Map<String, String> getArguments();
317
Map<String, String> getUserArguments();
318
boolean isDebug();
319
}
320
```
321
322
```scala { .api }
323
// Scala-specific context types
324
trait SparkExecutionContext {
325
def getSpecification: SparkSpecification
326
def getLogicalStartTime: Long
327
def getRuntimeArguments: Map[String, String]
328
def getAdmin: Admin
329
def getDatasetFramework: DatasetFramework
330
def getMessagingContext: MessagingContext
331
}
332
333
trait SparkHttpServiceContext {
334
def getSpecification: SparkSpecification
335
def getInstanceId: Int
336
def getInstanceCount: Int
337
def getLogicalStartTime: Long
338
}
339
340
// Dynamic compilation types
341
trait SparkCompiler {
342
def compile(code: String): Option[Class[_]]
343
def compileClass(className: String, code: String): Option[Class[_]]
344
}
345
346
trait SparkInterpreter {
347
def interpret(code: String): Unit
348
def bind(name: String, value: Any): Unit
349
def reset(): Unit
350
}
351
```