or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-processing.mddistributed-execution.mddynamic-compilation.mdexecution-contexts.mdhttp-services.mdindex.mdruntime-providers.mdtransaction-management.md

execution-contexts.mddocs/

0

# Execution Contexts

1

2

Execution contexts that provide access to CDAP services, metadata, and configuration within Spark applications, enabling seamless integration with the broader CDAP ecosystem including datasets, messaging, and service discovery.

3

4

## Capabilities

5

6

### Spark Runtime Context

7

8

Java-based runtime context that provides access to core CDAP services and configuration for Spark program execution.

9

10

```java { .api }

11

/**

12

* Runtime context for Spark program execution within CDAP

13

* Provides access to CDAP services, configuration, and program metadata

14

*/

15

public class SparkRuntimeContext extends AbstractContext {

16

/**

17

* Gets the Spark program specification

18

* @return SparkSpecification containing program metadata

19

*/

20

public SparkSpecification getSpecification();

21

22

/**

23

* Gets the logical start time of the program

24

* @return Start time in milliseconds since epoch

25

*/

26

public long getLogicalStartTime();

27

28

/**

29

* Gets runtime arguments provided to the program

30

* @return Map of argument key-value pairs

31

*/

32

public Map<String, String> getRuntimeArguments();

33

34

/**

35

* Gets the service discovery client for finding services

36

* @return DiscoveryServiceClient for service discovery

37

*/

38

public DiscoveryServiceClient getDiscoveryServiceClient();

39

40

/**

41

* Gets the location factory for accessing file systems

42

* @return LocationFactory for creating file system locations

43

*/

44

public LocationFactory getLocationFactory();

45

46

/**

47

* Gets the dataset framework for dataset operations

48

* @return DatasetFramework for dataset access

49

*/

50

public DatasetFramework getDatasetFramework();

51

52

/**

53

* Gets the admin interface for CDAP operations

54

* @return Admin interface for administrative operations

55

*/

56

public Admin getAdmin();

57

58

/**

59

* Gets the messaging context for pub-sub operations

60

* @return MessagingContext for messaging operations

61

*/

62

public MessagingContext getMessagingContext();

63

}

64

```

65

66

### Default Spark Execution Context (Scala)

67

68

Scala-based execution context implementation that provides typed access to CDAP services with functional programming patterns.

69

70

```scala { .api }

71

/**

72

* Default implementation of Spark execution context for CDAP applications

73

* Provides access to CDAP services with Scala-friendly interfaces

74

*/

75

class DefaultSparkExecutionContext(runtimeContext: SparkRuntimeContext) extends SparkExecutionContext {

76

/**

77

* Gets the Spark program specification

78

* @return SparkSpecification containing program metadata

79

*/

80

def getSpecification: SparkSpecification

81

82

/**

83

* Gets the logical start time of the program

84

* @return Start time in milliseconds since epoch

85

*/

86

def getLogicalStartTime: Long

87

88

/**

89

* Gets runtime arguments as a Scala map

90

* @return Map of argument key-value pairs

91

*/

92

def getRuntimeArguments: Map[String, String]

93

94

/**

95

* Gets the admin interface for CDAP operations

96

* @return Admin interface for administrative operations

97

*/

98

def getAdmin: Admin

99

100

/**

101

* Gets the dataset framework for dataset operations

102

* @return DatasetFramework for dataset access

103

*/

104

def getDatasetFramework: DatasetFramework

105

106

/**

107

* Gets the messaging context for pub-sub operations

108

* @return MessagingContext for messaging operations

109

*/

110

def getMessagingContext: MessagingContext

111

112

/**

113

* Gets the location factory for accessing file systems

114

* @return LocationFactory for creating file system locations

115

*/

116

def getLocationFactory: LocationFactory

117

118

/**

119

* Gets the service discovery client

120

* @return DiscoveryServiceClient for service discovery

121

*/

122

def getDiscoveryServiceClient: DiscoveryServiceClient

123

}

124

```

125

126

### Abstract Context Base

127

128

Base class providing common functionality for all CDAP program contexts.

129

130

```java { .api }

131

/**

132

* Abstract base class for CDAP program contexts

133

* Provides common functionality and service access patterns

134

*/

135

public abstract class AbstractContext {

136

/**

137

* Gets the program run ID

138

* @return ProgramRunId identifying this program run

139

*/

140

public ProgramRunId getProgramRunId();

141

142

/**

143

* Gets the namespace of the program

144

* @return Namespace name

145

*/

146

public String getNamespace();

147

148

/**

149

* Gets the application name

150

* @return Application name

151

*/

152

public String getApplicationName();

153

154

/**

155

* Gets the program name

156

* @return Program name

157

*/

158

public String getProgramName();

159

160

/**

161

* Gets the run ID

162

* @return Run ID string

163

*/

164

public String getRunId();

165

166

/**

167

* Gets the CDAP configuration

168

* @return CConfiguration containing CDAP settings

169

*/

170

protected CConfiguration getCConfiguration();

171

172

/**

173

* Gets the Hadoop configuration

174

* @return Configuration containing Hadoop settings

175

*/

176

protected Configuration getConfiguration();

177

}

178

```

179

180

## Usage Examples

181

182

**Java Runtime Context Usage:**

183

184

```java

185

import co.cask.cdap.app.runtime.spark.SparkRuntimeContext;

186

import co.cask.cdap.data2.dataset2.DatasetFramework;

187

import co.cask.cdap.proto.id.DatasetId;

188

189

// Access dataset framework

190

SparkRuntimeContext context = // ... obtained from runtime

191

DatasetFramework datasetFramework = context.getDatasetFramework();

192

193

// Access a dataset

194

DatasetId datasetId = new DatasetId("namespace", "my-dataset");

195

Dataset dataset = datasetFramework.getDataset(datasetId, Collections.emptyMap(), null);

196

197

// Get runtime arguments

198

Map<String, String> args = context.getRuntimeArguments();

199

String configValue = args.get("config.key");

200

201

// Access location factory for file operations

202

LocationFactory locationFactory = context.getLocationFactory();

203

Location fileLocation = locationFactory.create("/path/to/file");

204

```

205

206

**Scala Execution Context Usage:**

207

208

```scala

209

import co.cask.cdap.app.runtime.spark.DefaultSparkExecutionContext

210

import co.cask.cdap.api.data.DatasetContext

211

212

// Create execution context

213

val sparkContext = new DefaultSparkExecutionContext(runtimeContext)

214

215

// Access program metadata

216

val spec = sparkContext.getSpecification

217

val startTime = sparkContext.getLogicalStartTime

218

val args = sparkContext.getRuntimeArguments

219

220

// Access CDAP services

221

val admin = sparkContext.getAdmin

222

val datasetFramework = sparkContext.getDatasetFramework

223

val messagingContext = sparkContext.getMessagingContext

224

225

// Use functional patterns with runtime arguments

226

val configPrefix = "spark.config."

227

val sparkConfigs = args.filter(_._1.startsWith(configPrefix))

228

.map { case (key, value) => key.substring(configPrefix.length) -> value }

229

```

230

231

**Service Discovery Usage:**

232

233

```java

234

import co.cask.cdap.common.discovery.DiscoveryServiceClient;

235

import org.apache.twill.discovery.Discoverable;

236

import org.apache.twill.discovery.ServiceDiscovered;

237

238

// Get service discovery client

239

DiscoveryServiceClient discoveryClient = context.getDiscoveryServiceClient();

240

241

// Discover a service

242

ServiceDiscovered serviceDiscovered = discoveryClient.discover("my-service");

243

Iterable<Discoverable> discoverables = serviceDiscovered.discover();

244

245

// Use the discovered service

246

for (Discoverable discoverable : discoverables) {

247

InetSocketAddress address = discoverable.getSocketAddress();

248

// Connect to service at address

249

}

250

```

251

252

## Types

253

254

```java { .api }

255

/**

256

* Specification for Spark programs containing metadata and resource requirements

257

*/

258

public class SparkSpecification {

259

/**

260

* Gets the program name

261

* @return Name of the Spark program

262

*/

263

public String getName();

264

265

/**

266

* Gets the program description

267

* @return Description of the Spark program

268

*/

269

public String getDescription();

270

271

/**

272

* Gets the main class name

273

* @return Fully qualified main class name

274

*/

275

public String getMainClassName();

276

277

/**

278

* Gets the driver resource requirements

279

* @return Resources required for the Spark driver

280

*/

281

public Resources getDriverResources();

282

283

/**

284

* Gets the executor resource requirements

285

* @return Resources required for Spark executors

286

*/

287

public Resources getExecutorResources();

288

289

/**

290

* Gets the client resource requirements

291

* @return Resources required for the Spark client

292

*/

293

public Resources getClientResources();

294

295

/**

296

* Gets additional properties

297

* @return Map of additional program properties

298

*/

299

public Map<String, String> getProperties();

300

}

301

302

/**

303

* Resource specification for program components

304

*/

305

public class Resources {

306

/**

307

* Gets the memory requirement in MB

308

* @return Memory in megabytes

309

*/

310

public int getMemoryMB();

311

312

/**

313

* Gets the virtual core requirement

314

* @return Number of virtual cores

315

*/

316

public int getVirtualCores();

317

}

318

319

/**

320

* Interface for accessing CDAP administrative operations

321

*/

322

public interface Admin {

323

/**

324

* Checks if a dataset exists

325

* @param datasetInstanceId Dataset identifier

326

* @return true if dataset exists

327

*/

328

boolean datasetExists(DatasetId datasetInstanceId) throws DatasetManagementException;

329

330

/**

331

* Gets dataset properties

332

* @param datasetInstanceId Dataset identifier

333

* @return Dataset properties

334

*/

335

DatasetProperties getDatasetProperties(DatasetId datasetInstanceId) throws DatasetManagementException;

336

}

337

338

/**

339

* Interface for dataset framework operations

340

*/

341

public interface DatasetFramework {

342

/**

343

* Gets a dataset instance

344

* @param datasetInstanceId Dataset identifier

345

* @param arguments Dataset arguments

346

* @param classLoader Class loader for dataset classes

347

* @return Dataset instance

348

*/

349

<T extends Dataset> T getDataset(DatasetId datasetInstanceId,

350

Map<String, String> arguments,

351

ClassLoader classLoader) throws DatasetInstantiationException;

352

}

353

354

/**

355

* Interface for messaging operations

356

*/

357

public interface MessagingContext {

358

/**

359

* Gets a message publisher for a topic

360

* @param topicId Topic identifier

361

* @return MessagePublisher for publishing messages

362

*/

363

MessagePublisher getMessagePublisher(TopicId topicId);

364

365

/**

366

* Gets a message fetcher for a topic

367

* @param topicId Topic identifier

368

* @return MessageFetcher for fetching messages

369

*/

370

MessageFetcher getMessageFetcher(TopicId topicId);

371

}

372

```

373

374

```scala { .api }

375

/**

376

* Trait defining the Spark execution context interface

377

*/

378

trait SparkExecutionContext {

379

/**

380

* Gets the Spark program specification

381

* @return SparkSpecification containing program metadata

382

*/

383

def getSpecification: SparkSpecification

384

385

/**

386

* Gets the logical start time of the program

387

* @return Start time in milliseconds since epoch

388

*/

389

def getLogicalStartTime: Long

390

391

/**

392

* Gets runtime arguments as a Scala map

393

* @return Map of argument key-value pairs

394

*/

395

def getRuntimeArguments: Map[String, String]

396

397

/**

398

* Gets the admin interface for CDAP operations

399

* @return Admin interface for administrative operations

400

*/

401

def getAdmin: Admin

402

403

/**

404

* Gets the dataset framework for dataset operations

405

* @return DatasetFramework for dataset access

406

*/

407

def getDatasetFramework: DatasetFramework

408

409

/**

410

* Gets the messaging context for pub-sub operations

411

* @return MessagingContext for messaging operations

412

*/

413

def getMessagingContext: MessagingContext

414

}

415

```