or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

tessl/maven-co-cask-cdap--cdap-spark-core

Core Spark 1.x integration component for the Cask Data Application Platform providing runtime services and execution context for CDAP applications

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/co.cask.cdap/cdap-spark-core@5.1.x

To install, run

npx @tessl/cli install tessl/maven-co-cask-cdap--cdap-spark-core@5.1.0

0

# CDAP Spark Core

1

2

CDAP Spark Core is a Java/Scala library that provides Apache Spark 1.x integration capabilities for the Cask Data Application Platform (CDAP). It serves as a runtime provider and execution context for Spark-based applications within the CDAP framework, offering abstractions for data processing, service contexts, and distributed execution while maintaining integration with Hadoop ecosystem components.

3

4

## Package Information

5

6

- **Package Name**: cdap-spark-core

7

- **Package Type**: maven

8

- **Group ID**: co.cask.cdap

9

- **Language**: Java/Scala

10

- **Installation**: Add to your Maven pom.xml:

11

12

```xml

13

<dependency>

14

<groupId>co.cask.cdap</groupId>

15

<artifactId>cdap-spark-core</artifactId>

16

<version>5.1.2</version>

17

</dependency>

18

```

19

20

## Core Imports

21

22

```java

23

import co.cask.cdap.app.runtime.spark.DefaultSparkExecutionContext;

24

import co.cask.cdap.app.runtime.spark.Spark1ProgramRuntimeProvider;

25

import co.cask.cdap.app.runtime.spark.SparkRuntimeContext;

26

import co.cask.cdap.app.runtime.spark.service.SparkHttpServiceServer;

27

```

28

29

For Scala:

30

31

```scala

32

import co.cask.cdap.app.runtime.spark.DefaultSparkExecutionContext

33

import co.cask.cdap.app.runtime.spark.service.DefaultSparkHttpServiceContext

34

import co.cask.cdap.app.runtime.spark.dynamic.{DefaultSparkCompiler, DefaultSparkInterpreter}

35

```

36

37

## Basic Usage

38

39

```java

40

// Create Spark runtime provider

41

Spark1ProgramRuntimeProvider provider = new Spark1ProgramRuntimeProvider();

42

43

// SparkRuntimeContext is created internally by CDAP with all required dependencies

44

// Access CDAP services within Spark programs through the execution context

45

SparkRuntimeContext runtimeContext = sparkClassLoader.getRuntimeContext();

46

TransactionSystemClient txClient = runtimeContext.getTransactionSystemClient();

47

DatasetFramework datasetFramework = runtimeContext.getDatasetFramework();

48

AuthorizationEnforcer authEnforcer = runtimeContext.getAuthorizationEnforcer();

49

```

50

51

For Scala execution context:

52

53

```scala

54

// DefaultSparkExecutionContext is created with SparkClassLoader and localized resources

55

val sparkContext = new DefaultSparkExecutionContext(sparkClassLoader, localizeResources)

56

57

// Access runtime information

58

val spec = sparkContext.getSpecification

59

val startTime = sparkContext.getLogicalStartTime

60

val args = sparkContext.getRuntimeArguments

61

val admin = sparkContext.getAdmin

62

val messaging = sparkContext.getMessagingContext

63

```

64

65

## Architecture

66

67

CDAP Spark Core is built around several key components:

68

69

- **Runtime Providers**: Service providers that integrate Spark 1.x execution with CDAP program lifecycle

70

- **Execution Contexts**: Abstractions providing access to CDAP services and metadata within Spark applications

71

- **Data Integration**: SQL data sources and scanning utilities for accessing CDAP datasets and streams

72

- **Service Framework**: HTTP service contexts for running web services alongside Spark applications

73

- **Dynamic Compilation**: Scala compiler and interpreter integration for runtime code generation

74

- **Distributed Execution**: Twill-based distributed application support with proper resource management

75

- **Class Loading**: Custom class loaders providing proper isolation and dependency management

76

77

## Capabilities

78

79

### Runtime Providers and Program Execution

80

81

Core runtime integration that enables Spark 1.x programs to run within the CDAP platform with full lifecycle management and resource allocation.

82

83

```java { .api }

84

@ProgramRuntimeProvider.SupportedProgramType(ProgramType.SPARK)

85

public class Spark1ProgramRuntimeProvider extends SparkProgramRuntimeProvider {

86

public Spark1ProgramRuntimeProvider();

87

}

88

89

public abstract class SparkProgramRuntimeProvider implements ProgramRuntimeProvider {

90

public ProgramRunner createProgramRunner(ProgramType type, Mode mode, Injector injector);

91

public boolean isSupported(ProgramType programType, CConfiguration cConf);

92

}

93

```

94

95

[Runtime Providers](./runtime-providers.md)

96

97

### Spark Execution Contexts

98

99

Execution contexts that provide access to CDAP services, metadata, and configuration within Spark applications, enabling seamless integration with the broader CDAP ecosystem.

100

101

```java { .api }

102

public final class SparkRuntimeContext extends AbstractContext implements Metrics {

103

SparkRuntimeContext(Configuration hConf, Program program, ProgramOptions programOptions,

104

CConfiguration cConf, String hostname, TransactionSystemClient txClient,

105

DatasetFramework datasetFramework, DiscoveryServiceClient discoveryServiceClient,

106

MetricsCollectionService metricsCollectionService, StreamAdmin streamAdmin,

107

WorkflowProgramInfo workflowProgramInfo, PluginInstantiator pluginInstantiator,

108

SecureStore secureStore, SecureStoreManager secureStoreManager,

109

AuthorizationEnforcer authorizationEnforcer, AuthenticationContext authenticationContext,

110

MessagingService messagingService, ServiceAnnouncer serviceAnnouncer,

111

PluginFinder pluginFinder, LocationFactory locationFactory,

112

MetadataReader metadataReader, MetadataPublisher metadataPublisher);

113

114

public SparkSpecification getSpecification();

115

public long getLogicalStartTime();

116

public Map<String, String> getRuntimeArguments();

117

public DiscoveryServiceClient getDiscoveryServiceClient();

118

public LocationFactory getLocationFactory();

119

public TransactionSystemClient getTransactionSystemClient();

120

public AuthorizationEnforcer getAuthorizationEnforcer();

121

public AuthenticationContext getAuthenticationContext();

122

public String getHostname();

123

public Configuration getHConf();

124

public CConfiguration getCConf();

125

}

126

```

127

128

```scala { .api }

129

class DefaultSparkExecutionContext(sparkClassLoader: SparkClassLoader, localizeResources: util.Map[String, File])

130

extends AbstractSparkExecutionContext(sparkClassLoader, localizeResources) {

131

132

protected def saveAsNewAPIHadoopDataset[K: ClassManifest, V: ClassManifest](sc: SparkContext,

133

conf: Configuration,

134

rdd: RDD[(K, V)]): Unit

135

136

protected def createInterpreter(settings: Settings, classDir: File,

137

urlAdder: URLAdder, onClose: () => Unit): SparkInterpreter

138

139

protected def createSparkMetricsWriterFactory(): (TaskContext) => SparkMetricsWriter

140

}

141

142

abstract class AbstractSparkExecutionContext(sparkClassLoader: SparkClassLoader,

143

localizeResources: util.Map[String, File])

144

extends SparkExecutionContext with AutoCloseable {

145

146

def getSpecification: SparkSpecification

147

def getLogicalStartTime: Long

148

def getRuntimeArguments: Map[String, String]

149

def getAdmin: Admin

150

def getDatasetFramework: DatasetFramework

151

def getMessagingContext: MessagingContext

152

def getTransactionSystemClient: TransactionSystemClient

153

}

154

```

155

156

[Execution Contexts](./execution-contexts.md)

157

158

### Data Processing and SQL Integration

159

160

Data processing utilities and Spark SQL data sources that enable efficient access to CDAP datasets and streams with full type safety and performance optimization.

161

162

```scala { .api }

163

class DatumScannerIterator[T](scanner: Scanner[T]) extends Iterator[T] {

164

def hasNext: Boolean

165

def next(): T

166

def close(): Unit

167

}

168

169

object DatasetRelationProvider extends RelationProvider {

170

def shortName(): String

171

def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation

172

}

173

```

174

175

[Data Processing](./data-processing.md)

176

177

### HTTP Service Framework

178

179

HTTP service framework that allows Spark applications to expose web endpoints and REST APIs while maintaining full integration with CDAP's service discovery and security model.

180

181

```java { .api }

182

public class SparkHttpServiceServer {

183

public void startAndWait() throws Exception;

184

public void stopAndWait() throws Exception;

185

public InetSocketAddress getBindAddress();

186

}

187

```

188

189

```scala { .api }

190

class DefaultSparkHttpServiceContext extends SparkHttpServiceContext {

191

def getSpecification: SparkSpecification

192

def getInstanceId: Int

193

def getInstanceCount: Int

194

def getLogicalStartTime: Long

195

}

196

```

197

198

[HTTP Services](./http-services.md)

199

200

### Dynamic Compilation and Interpretation

201

202

Dynamic Scala compilation and interpretation capabilities that enable runtime code generation, interactive development, and flexible application behavior modification.

203

204

```scala { .api }

205

class DefaultSparkCompiler extends SparkCompiler {

206

def compile(code: String): Option[Class[_]]

207

def compileClass(className: String, code: String): Option[Class[_]]

208

}

209

210

class DefaultSparkInterpreter extends SparkInterpreter {

211

def interpret(code: String): Unit

212

def bind(name: String, value: Any): Unit

213

def reset(): Unit

214

}

215

```

216

217

[Dynamic Compilation](./dynamic-compilation.md)

218

219

### Transaction Management

220

221

Transaction handling capabilities that enable ACID properties for Spark operations within the CDAP platform, providing consistent data access across distributed Spark executors.

222

223

```java { .api }

224

public class SparkTransactionHandler {

225

public void jobStarted(Integer jobId, Set<Integer> stageIds);

226

public void jobStarted(Integer jobId, Set<Integer> stageIds, SparkTransactional.TransactionInfo txInfo);

227

public void jobEnded(Integer jobId, boolean succeeded);

228

public void stageSubmitted(Integer stageId);

229

public void stageCompleted(Integer stageId);

230

}

231

232

public class SparkTransactional {

233

public TransactionInfo getTransactionInfo(String key);

234

public void execute(TransactionType type, TxRunnable runnable) throws TransactionFailureException;

235

public <T> T execute(TransactionType type, TxCallable<T> callable) throws TransactionFailureException;

236

}

237

```

238

239

[Transaction Management](./transaction-management.md)

240

241

### Class Loading and Runtime Support

242

243

Custom class loading infrastructure that provides proper isolation and dependency management for Spark applications within the CDAP runtime environment.

244

245

```java { .api }

246

public class SparkClassLoader extends URLClassLoader {

247

public SparkRuntimeContext getRuntimeContext();

248

public void addURL(URL url);

249

public Class<?> loadClass(String name) throws ClassNotFoundException;

250

}

251

252

public class SparkRunnerClassLoader extends FilterClassLoader {

253

public static SparkRunnerClassLoader create();

254

protected boolean includePackage(String packageName);

255

}

256

```

257

258

### Distributed Execution

259

260

Distributed execution framework built on Apache Twill that provides scalable, fault-tolerant Spark application deployment across YARN clusters with proper resource management.

261

262

```java { .api }

263

public class SparkExecutionService {

264

public ListenableFuture<ProgramController> submit(ProgramRunId programRunId, ProgramOptions programOptions);

265

public void stop();

266

}

267

268

public class SparkTwillRunnable implements TwillRunnable {

269

public void run();

270

public void stop();

271

public void handleCommand(Command command) throws Exception;

272

}

273

```

274

275

[Distributed Execution](./distributed-execution.md)

276

277

## Types

278

279

```java { .api }

280

// Core runtime types

281

interface ProgramRuntimeProvider {

282

ProgramController createProgramController(ProgramRunId programRunId, ProgramOptions programOptions);

283

Runnable createRunnable(ProgramRunId programRunId, ProgramOptions programOptions, InetAddress hostname);

284

}

285

286

interface ProgramController {

287

ListenableFuture<ProgramController> command(String command, Object... args);

288

ListenableFuture<ProgramController> stop();

289

State getState();

290

}

291

292

// Spark specification and context types

293

class SparkSpecification {

294

String getName();

295

String getDescription();

296

String getMainClassName();

297

Resources getDriverResources();

298

Resources getExecutorResources();

299

}

300

301

class Resources {

302

int getMemoryMB();

303

int getVirtualCores();

304

}

305

306

// Program execution metadata

307

class ProgramRunId {

308

String getNamespace();

309

String getApplication();

310

ProgramType getType();

311

String getProgram();

312

String getRun();

313

}

314

315

class ProgramOptions {

316

Map<String, String> getArguments();

317

Map<String, String> getUserArguments();

318

boolean isDebug();

319

}

320

```

321

322

```scala { .api }

323

// Scala-specific context types

324

trait SparkExecutionContext {

325

def getSpecification: SparkSpecification

326

def getLogicalStartTime: Long

327

def getRuntimeArguments: Map[String, String]

328

def getAdmin: Admin

329

def getDatasetFramework: DatasetFramework

330

def getMessagingContext: MessagingContext

331

}

332

333

trait SparkHttpServiceContext {

334

def getSpecification: SparkSpecification

335

def getInstanceId: Int

336

def getInstanceCount: Int

337

def getLogicalStartTime: Long

338

}

339

340

// Dynamic compilation types

341

trait SparkCompiler {

342

def compile(code: String): Option[Class[_]]

343

def compileClass(className: String, code: String): Option[Class[_]]

344

}

345

346

trait SparkInterpreter {

347

def interpret(code: String): Unit

348

def bind(name: String, value: Any): Unit

349

def reset(): Unit

350

}

351

```