0
# HTTP Service Framework
1
2
HTTP service framework that allows Spark applications to expose web endpoints and REST APIs while maintaining full integration with CDAP's service discovery, security model, and resource management capabilities.
3
4
## Capabilities
5
6
### Spark HTTP Service Server
7
8
HTTP service server that enables Spark applications to expose web endpoints and handle HTTP requests with full lifecycle management.
9
10
```java { .api }
11
/**
12
* HTTP service server for Spark applications
13
* Provides web endpoint capabilities with proper lifecycle management
14
*/
15
public class SparkHttpServiceServer {
16
/**
17
* Starts the HTTP service server and waits for it to be ready
18
* Blocks until the server is fully started and ready to accept requests
19
* @throws Exception if server startup fails
20
*/
21
public void startAndWait() throws Exception;
22
23
/**
24
* Stops the HTTP service server and waits for shutdown to complete
25
* Blocks until the server is fully stopped and all resources are released
26
* @throws Exception if server shutdown fails
27
*/
28
public void stopAndWait() throws Exception;
29
30
/**
31
* Gets the bind address of the running server
32
* @return InetSocketAddress containing the host and port the server is bound to
33
* @throws IllegalStateException if server is not running
34
*/
35
public InetSocketAddress getBindAddress();
36
37
/**
38
* Checks if the server is running
39
* @return true if the server is currently running
40
*/
41
public boolean isRunning();
42
43
/**
44
* Gets the server state
45
* @return Current state of the server (STARTING, RUNNING, STOPPING, STOPPED)
46
*/
47
public State getState();
48
}
49
```
50
51
### Default Spark HTTP Service Context (Scala)
52
53
Scala-based HTTP service context that provides access to CDAP services and program metadata within HTTP service handlers.
54
55
```scala { .api }
56
/**
57
* Default implementation of HTTP service context for Spark applications
58
* Provides access to CDAP services with Scala-friendly interfaces
59
*/
60
class DefaultSparkHttpServiceContext(runtimeContext: SparkRuntimeContext) extends SparkHttpServiceContext {
61
/**
62
* Gets the Spark program specification
63
* @return SparkSpecification containing program metadata
64
*/
65
def getSpecification: SparkSpecification
66
67
/**
68
* Gets the instance ID of this service
69
* @return Instance ID (0-based) of this service instance
70
*/
71
def getInstanceId: Int
72
73
/**
74
* Gets the total number of service instances
75
* @return Total count of service instances
76
*/
77
def getInstanceCount: Int
78
79
/**
80
* Gets the logical start time of the program
81
* @return Start time in milliseconds since epoch
82
*/
83
def getLogicalStartTime: Long
84
85
/**
86
* Gets runtime arguments as a Scala map
87
* @return Map of argument key-value pairs
88
*/
89
def getRuntimeArguments: Map[String, String]
90
91
/**
92
* Gets the dataset framework for dataset operations
93
* @return DatasetFramework for dataset access
94
*/
95
def getDatasetFramework: DatasetFramework
96
97
/**
98
* Gets the messaging context for pub-sub operations
99
* @return MessagingContext for messaging operations
100
*/
101
def getMessagingContext: MessagingContext
102
103
/**
104
* Gets the admin interface for CDAP operations
105
* @return Admin interface for administrative operations
106
*/
107
def getAdmin: Admin
108
}
109
```
110
111
### Spark HTTP Service Plugin Context
112
113
Plugin context for HTTP services that provides access to plugins and their configurations within service handlers.
114
115
```java { .api }
116
/**
117
* Plugin context for HTTP services in Spark applications
118
* Provides access to plugins and their configurations
119
*/
120
public class DefaultSparkHttpServicePluginContext implements PluginContext {
121
/**
122
* Gets runtime arguments for the service
123
* @return Map of runtime argument key-value pairs
124
*/
125
public Map<String, String> getRuntimeArguments();
126
127
/**
128
* Gets properties for a specific plugin
129
* @param pluginId Identifier of the plugin
130
* @return PluginProperties containing plugin configuration
131
* @throws IllegalArgumentException if plugin not found
132
*/
133
public PluginProperties getPluginProperties(String pluginId);
134
135
/**
136
* Loads a plugin class by ID
137
* @param pluginId Identifier of the plugin
138
* @param <T> Expected type of the plugin class
139
* @return Class object for the plugin
140
* @throws IllegalArgumentException if plugin not found
141
* @throws ClassNotFoundException if plugin class cannot be loaded
142
*/
143
public <T> Class<T> loadPluginClass(String pluginId);
144
145
/**
146
* Creates a new instance of a plugin
147
* @param pluginId Identifier of the plugin
148
* @param <T> Expected type of the plugin instance
149
* @return New instance of the plugin
150
* @throws IllegalArgumentException if plugin not found
151
* @throws InstantiationException if plugin cannot be instantiated
152
*/
153
public <T> T newPluginInstance(String pluginId) throws InstantiationException;
154
155
/**
156
* Checks if a plugin exists
157
* @param pluginId Identifier of the plugin
158
* @return true if the plugin exists
159
*/
160
public boolean hasPlugin(String pluginId);
161
}
162
```
163
164
### HTTP Service Handler Base
165
166
Base class for implementing HTTP service handlers with common functionality and lifecycle management.
167
168
```java { .api }
169
/**
170
* Base class for HTTP service handlers
171
* Provides common functionality and lifecycle management
172
*/
173
public abstract class AbstractHttpServiceHandler implements HttpServiceHandler {
174
/**
175
* Initializes the service handler
176
* Called once when the service starts
177
* @param context HTTP service context
178
* @throws Exception if initialization fails
179
*/
180
public void initialize(HttpServiceContext context) throws Exception;
181
182
/**
183
* Destroys the service handler
184
* Called once when the service stops
185
*/
186
public void destroy();
187
188
/**
189
* Gets the HTTP service context
190
* @return HttpServiceContext for accessing CDAP services
191
*/
192
protected HttpServiceContext getContext();
193
194
/**
195
* Gets a dataset instance
196
* @param datasetName Name of the dataset
197
* @param <T> Expected dataset type
198
* @return Dataset instance
199
* @throws DatasetInstantiationException if dataset cannot be accessed
200
*/
201
protected <T extends Dataset> T getDataset(String datasetName) throws DatasetInstantiationException;
202
203
/**
204
* Gets runtime arguments
205
* @return Map of runtime arguments
206
*/
207
protected Map<String, String> getRuntimeArguments();
208
}
209
```
210
211
## Usage Examples
212
213
**Basic HTTP Service Server:**
214
215
```java
216
import co.cask.cdap.app.runtime.spark.service.SparkHttpServiceServer;
217
import java.net.InetSocketAddress;
218
219
// Create and start HTTP service server
220
SparkHttpServiceServer server = new SparkHttpServiceServer(
221
sparkHttpServiceContext,
222
httpServiceHandlers,
223
httpServiceSpec
224
);
225
226
try {
227
// Start server (blocks until ready)
228
server.startAndWait();
229
230
// Get server address
231
InetSocketAddress address = server.getBindAddress();
232
System.out.println("Server running on " + address.getHostName() + ":" + address.getPort());
233
234
// Server is now accepting requests
235
// ... application logic
236
237
} finally {
238
// Stop server (blocks until stopped)
239
server.stopAndWait();
240
}
241
```
242
243
**Scala HTTP Service Context:**
244
245
```scala
246
import co.cask.cdap.app.runtime.spark.service.DefaultSparkHttpServiceContext
247
248
// Create HTTP service context
249
val serviceContext = new DefaultSparkHttpServiceContext(runtimeContext)
250
251
// Access service metadata
252
val spec = serviceContext.getSpecification
253
val instanceId = serviceContext.getInstanceId
254
val instanceCount = serviceContext.getInstanceCount
255
val startTime = serviceContext.getLogicalStartTime
256
257
// Access CDAP services
258
val datasetFramework = serviceContext.getDatasetFramework
259
val messagingContext = serviceContext.getMessagingContext
260
val admin = serviceContext.getAdmin
261
262
// Use runtime arguments
263
val args = serviceContext.getRuntimeArguments
264
val port = args.getOrElse("service.port", "8080").toInt
265
val enableSsl = args.getOrElse("service.ssl.enabled", "false").toBoolean
266
```
267
268
**HTTP Service Handler Implementation:**
269
270
```java
271
import co.cask.cdap.api.annotation.GET;
272
import co.cask.cdap.api.annotation.POST;
273
import co.cask.cdap.api.annotation.Path;
274
import co.cask.cdap.api.service.http.AbstractHttpServiceHandler;
275
import co.cask.cdap.api.service.http.HttpServiceRequest;
276
import co.cask.cdap.api.service.http.HttpServiceResponder;
277
278
@Path("/api/v1")
279
public class MySparkHttpHandler extends AbstractHttpServiceHandler {
280
281
@Override
282
public void initialize(HttpServiceContext context) throws Exception {
283
super.initialize(context);
284
// Custom initialization logic
285
}
286
287
@GET
288
@Path("/status")
289
public void getStatus(HttpServiceRequest request, HttpServiceResponder responder) {
290
// Get service status
291
Map<String, Object> status = new HashMap<>();
292
status.put("instanceId", getContext().getInstanceId());
293
status.put("instanceCount", getContext().getInstanceCount());
294
status.put("startTime", getContext().getLogicalStartTime());
295
296
responder.sendJson(200, status);
297
}
298
299
@POST
300
@Path("/data")
301
public void processData(HttpServiceRequest request, HttpServiceResponder responder) throws Exception {
302
// Access dataset
303
Dataset dataset = getDataset("my-dataset");
304
305
// Process request data
306
byte[] requestBody = request.getContent();
307
String data = new String(requestBody, StandardCharsets.UTF_8);
308
309
// Write to dataset
310
// ... dataset operations
311
312
responder.sendString(200, "Data processed successfully");
313
}
314
}
315
```
316
317
**Plugin Context Usage:**
318
319
```java
320
import co.cask.cdap.app.runtime.spark.service.DefaultSparkHttpServicePluginContext;
321
322
// Access plugin context
323
DefaultSparkHttpServicePluginContext pluginContext = // ... obtain from service context
324
325
// Check if plugin exists
326
if (pluginContext.hasPlugin("my-plugin")) {
327
// Get plugin properties
328
PluginProperties properties = pluginContext.getPluginProperties("my-plugin");
329
String configValue = properties.getProperties().get("config.key");
330
331
// Load plugin class
332
Class<MyPlugin> pluginClass = pluginContext.loadPluginClass("my-plugin");
333
334
// Create plugin instance
335
MyPlugin plugin = pluginContext.newPluginInstance("my-plugin");
336
plugin.configure(configValue);
337
}
338
```
339
340
## Types
341
342
```java { .api }
343
/**
344
* Context interface for HTTP services
345
*/
346
public interface HttpServiceContext {
347
/**
348
* Gets the service specification
349
* @return HttpServiceSpecification containing service metadata
350
*/
351
HttpServiceSpecification getSpecification();
352
353
/**
354
* Gets the instance ID of this service
355
* @return Instance ID (0-based) of this service instance
356
*/
357
int getInstanceId();
358
359
/**
360
* Gets the total number of service instances
361
* @return Total count of service instances
362
*/
363
int getInstanceCount();
364
365
/**
366
* Gets runtime arguments
367
* @return Map of runtime arguments
368
*/
369
Map<String, String> getRuntimeArguments();
370
371
/**
372
* Gets the dataset framework
373
* @return DatasetFramework for dataset operations
374
*/
375
DatasetFramework getDatasetFramework();
376
}
377
378
/**
379
* Interface for HTTP service handlers
380
*/
381
public interface HttpServiceHandler {
382
/**
383
* Initializes the handler
384
* @param context HTTP service context
385
* @throws Exception if initialization fails
386
*/
387
void initialize(HttpServiceContext context) throws Exception;
388
389
/**
390
* Destroys the handler
391
*/
392
void destroy();
393
}
394
395
/**
396
* Specification for HTTP services
397
*/
398
public class HttpServiceSpecification {
399
/**
400
* Gets the service name
401
* @return Name of the HTTP service
402
*/
403
public String getName();
404
405
/**
406
* Gets the service description
407
* @return Description of the HTTP service
408
*/
409
public String getDescription();
410
411
/**
412
* Gets the service handlers
413
* @return Map of handler names to handler specifications
414
*/
415
public Map<String, HttpServiceHandlerSpecification> getHandlers();
416
417
/**
418
* Gets service resources
419
* @return Resources allocated to the service
420
*/
421
public Resources getResources();
422
}
423
424
/**
425
* Server state enumeration
426
*/
427
public enum State {
428
STARTING, // Server is starting up
429
RUNNING, // Server is running and accepting requests
430
STOPPING, // Server is shutting down
431
STOPPED // Server has stopped
432
}
433
434
/**
435
* Plugin properties container
436
*/
437
public class PluginProperties {
438
/**
439
* Gets the plugin properties
440
* @return Map of property key-value pairs
441
*/
442
public Map<String, String> getProperties();
443
444
/**
445
* Gets a property value
446
* @param key Property key
447
* @return Property value or null if not found
448
*/
449
public String getProperty(String key);
450
451
/**
452
* Gets a property value with default
453
* @param key Property key
454
* @param defaultValue Default value if property not found
455
* @return Property value or default value
456
*/
457
public String getProperty(String key, String defaultValue);
458
}
459
```
460
461
```scala { .api }
462
/**
463
* Trait defining the Spark HTTP service context interface
464
*/
465
trait SparkHttpServiceContext {
466
/**
467
* Gets the Spark program specification
468
* @return SparkSpecification containing program metadata
469
*/
470
def getSpecification: SparkSpecification
471
472
/**
473
* Gets the instance ID of this service
474
* @return Instance ID (0-based) of this service instance
475
*/
476
def getInstanceId: Int
477
478
/**
479
* Gets the total number of service instances
480
* @return Total count of service instances
481
*/
482
def getInstanceCount: Int
483
484
/**
485
* Gets the logical start time of the program
486
* @return Start time in milliseconds since epoch
487
*/
488
def getLogicalStartTime: Long
489
490
/**
491
* Gets runtime arguments as a Scala map
492
* @return Map of argument key-value pairs
493
*/
494
def getRuntimeArguments: Map[String, String]
495
}
496
```