or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

data-transfer-objects.mdhistory-server.mdindex.mdjar-management.mdrest-api-specifications.mdutilities-extensions.mdweb-server-bootstrap.md

utilities-extensions.mddocs/

0

# Utilities and Extensions

1

2

Helper utilities and extension points for JAR processing and web submission functionality. These components provide reusable functionality for custom implementations and internal processing operations.

3

4

## Capabilities

5

6

### JAR Handler Utilities

7

8

Core utility class providing JAR processing functionality and context management.

9

10

```java { .api }

11

/**

12

* Utility class providing helper functions for JAR handlers.

13

* Contains static methods and context classes for JAR processing operations.

14

*/

15

public class JarHandlerUtils {

16

/**

17

* Tokenize program arguments string into individual arguments.

18

* Handles quoted arguments and proper escaping.

19

*

20

* @param args Program arguments as a single string

21

* @return List of individual argument strings

22

*/

23

public static List<String> tokenizeArguments(String args);

24

25

/**

26

* Context object for JAR processing operations.

27

* Provides methods for converting between different representations of JAR configurations.

28

*/

29

public static class JarHandlerContext {

30

/**

31

* Create a JAR handler context from an HTTP request.

32

* Extracts JAR configuration from request body and parameters.

33

*

34

* @param request HandlerRequest containing JAR configuration

35

* @param jarDir Directory containing uploaded JAR files

36

* @param log Logger instance for operation logging

37

* @return JarHandlerContext with extracted configuration

38

* @throws RestHandlerException if request parameters are invalid

39

*/

40

public static JarHandlerContext fromRequest(

41

HandlerRequest<JarRequestBody, ?> request,

42

Path jarDir,

43

Logger log

44

) throws RestHandlerException;

45

46

/**

47

* Apply JAR configuration to a Flink Configuration object.

48

* Sets parallelism, program arguments, job ID, and other job settings.

49

*

50

* @param configuration Target Flink configuration to modify

51

*/

52

public void applyToConfiguration(Configuration configuration);

53

54

/**

55

* Convert JAR configuration to a Flink JobGraph.

56

* Creates the execution graph for the job without running it.

57

*

58

* @param packagedProgram PackagedProgram containing the job

59

* @param configuration Flink configuration for job execution

60

* @param suppressOutput Whether to suppress program output during job graph creation

61

* @return JobGraph representing the job execution plan

62

* @throws ProgramInvocationException if job graph creation fails

63

*/

64

public JobGraph toJobGraph(

65

PackagedProgram packagedProgram,

66

Configuration configuration,

67

boolean suppressOutput

68

) throws ProgramInvocationException;

69

70

/**

71

* Convert JAR configuration to a PackagedProgram.

72

* Creates a packaged program that can be executed or analyzed.

73

*

74

* @param configuration Flink configuration for program creation

75

* @return PackagedProgram ready for execution

76

* @throws ProgramInvocationException if program creation fails

77

*/

78

public PackagedProgram toPackagedProgram(Configuration configuration) throws ProgramInvocationException;

79

80

/**

81

* Get the JAR file path for this context.

82

*

83

* @return Path to the JAR file

84

*/

85

public Path getJarFile();

86

87

/**

88

* Get the entry class name.

89

*

90

* @return Fully qualified entry class name, or null if not specified

91

*/

92

public String getEntryClassName();

93

94

/**

95

* Get the program arguments.

96

*

97

* @return List of program arguments

98

*/

99

public List<String> getProgramArguments();

100

101

/**

102

* Get the job parallelism.

103

*

104

* @return Parallelism value, or null for default

105

*/

106

public Integer getParallelism();

107

108

/**

109

* Get the job ID.

110

*

111

* @return Job ID, or null if not specified

112

*/

113

public JobID getJobId();

114

}

115

}

116

```

117

118

### Web Submission Extension

119

120

Extension point for adding JAR submission capabilities to the web interface.

121

122

```java { .api }

123

/**

124

* Extension that provides JAR submission capabilities to the Flink web interface.

125

* Implements WebMonitorExtension to integrate with the web server framework.

126

*/

127

public class WebSubmissionExtension implements WebMonitorExtension {

128

/**

129

* Create a web submission extension.

130

*

131

* @param configuration Flink configuration

132

* @param leaderRetriever Gateway retriever for accessing Flink cluster

133

* @param responseHeaders HTTP headers to include in responses

134

* @param leaderElectionService Service for leader election

135

* @param jarDir Directory for storing uploaded JAR files

136

* @param executor Executor for handling submission operations

137

* @param timeout Request timeout for operations

138

*/

139

public WebSubmissionExtension(

140

Configuration configuration,

141

GatewayRetriever<? extends RestfulGateway> leaderRetriever,

142

Map<String, String> responseHeaders,

143

CompletableFuture<String> leaderElectionService,

144

Path jarDir,

145

Executor executor,

146

Time timeout

147

);

148

149

/**

150

* Get the collection of REST handlers provided by this extension.

151

* Returns handlers for JAR upload, list, run, delete, and plan operations.

152

*

153

* @return Collection of handler specifications and implementations

154

*/

155

public Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> getHandlers();

156

157

/**

158

* Asynchronously close the extension and clean up resources.

159

*

160

* @return CompletableFuture that completes when cleanup is finished

161

*/

162

public CompletableFuture<Void> closeAsync();

163

}

164

```

165

166

### HTTP Request Handler

167

168

Low-level HTTP request handler for file uploads and general request processing.

169

170

```java { .api }

171

/**

172

* Netty channel handler for processing HTTP requests and file uploads.

173

* Handles multipart/form-data uploads and basic HTTP request routing.

174

*/

175

public class HttpRequestHandler extends SimpleChannelInboundHandler<HttpObject> {

176

/**

177

* Create an HTTP request handler.

178

*

179

* @param uploadDir Directory for storing uploaded files

180

*/

181

public HttpRequestHandler(File uploadDir);

182

183

/**

184

* Check and create upload directory if it doesn't exist.

185

* Validates directory permissions and creates necessary parent directories.

186

*

187

* @param uploadDir Directory to check and create

188

* @return The validated upload directory

189

* @throws IOException if directory cannot be created or accessed

190

*/

191

public static File checkAndCreateUploadDir(File uploadDir) throws IOException;

192

193

/**

194

* Log external upload directory deletion for cleanup tracking.

195

* Used for auditing and debugging upload directory management.

196

*

197

* @param uploadDir Directory that was deleted

198

*/

199

public static void logExternalUploadDirDeletion(File uploadDir);

200

201

/**

202

* Handle incoming HTTP request objects.

203

* Processes both simple requests and multipart file uploads.

204

*

205

* @param ctx Netty channel handler context

206

* @param httpObject HTTP request object (request, content, etc.)

207

*/

208

protected void channelRead0(ChannelHandlerContext ctx, HttpObject httpObject);

209

}

210

```

211

212

### Pipeline Error Handler

213

214

Final error handler in the Netty pipeline for comprehensive error handling.

215

216

```java { .api }

217

/**

218

* Final error handler in the Netty pipeline for unhandled exceptions.

219

* Provides centralized error logging and response generation.

220

*/

221

public class PipelineErrorHandler extends SimpleChannelInboundHandler<HttpObject> {

222

/**

223

* Create a pipeline error handler.

224

*

225

* @param logger Logger for error reporting

226

*/

227

public PipelineErrorHandler(Logger logger);

228

229

/**

230

* Handle exceptions that occurred during request processing.

231

* Logs errors and sends appropriate HTTP error responses to clients.

232

*

233

* @param ctx Netty channel handler context

234

* @param cause Exception that occurred

235

*/

236

public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause);

237

238

/**

239

* Handle HTTP objects that reached the end of the pipeline.

240

* Typically handles unprocessed requests with 404 responses.

241

*

242

* @param ctx Netty channel handler context

243

* @param httpObject Unprocessed HTTP object

244

*/

245

protected void channelRead0(ChannelHandlerContext ctx, HttpObject httpObject);

246

}

247

```

248

249

## Usage Examples

250

251

### JAR Processing with Utilities

252

253

```java

254

import org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils;

255

import org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils.JarHandlerContext;

256

import java.nio.file.Paths;

257

258

// Tokenize program arguments

259

String argsString = "--input /data/input --output /data/output --parallelism 4";

260

List<String> args = JarHandlerUtils.tokenizeArguments(argsString);

261

// Result: ["--input", "/data/input", "--output", "/data/output", "--parallelism", "4"]

262

263

// Create JAR context from request

264

HandlerRequest<JarRunRequestBody, JarRunMessageParameters> request = createRequest();

265

Path jarDir = Paths.get("/tmp/flink-jars");

266

JarHandlerContext context = JarHandlerContext.fromRequest(request, jarDir);

267

268

// Apply configuration

269

Configuration flinkConfig = new Configuration();

270

context.applyToConfiguration(flinkConfig);

271

272

// Create job graph for execution plan

273

ClassLoader classLoader = Thread.currentThread().getContextClassLoader();

274

JobGraph jobGraph = context.toJobGraph(classLoader);

275

276

// Create packaged program for execution

277

PackagedProgram program = context.toPackagedProgram(classLoader);

278

```

279

280

### Web Submission Extension Setup

281

282

```java

283

import org.apache.flink.runtime.webmonitor.WebSubmissionExtension;

284

import org.apache.flink.configuration.Configuration;

285

import java.nio.file.Paths;

286

import java.util.concurrent.Executors;

287

288

// Setup extension configuration

289

Configuration config = new Configuration();

290

Path jarDir = Paths.get("/tmp/flink-web-jars");

291

Executor executor = Executors.newCachedThreadPool();

292

Time timeout = Time.seconds(60);

293

Map<String, String> responseHeaders = new HashMap<>();

294

responseHeaders.put("Access-Control-Allow-Origin", "*");

295

296

// Create web submission extension

297

WebSubmissionExtension extension = new WebSubmissionExtension(

298

config,

299

leaderRetriever,

300

responseHeaders,

301

leaderElectionService,

302

jarDir,

303

executor,

304

timeout

305

);

306

307

// Get handlers for router registration

308

Collection<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers =

309

extension.getHandlers();

310

311

// Register handlers with router

312

for (Tuple2<RestHandlerSpecification, ChannelInboundHandler> handler : handlers) {

313

RestHandlerSpecification spec = handler.f0;

314

ChannelInboundHandler implementation = handler.f1;

315

router.addHandler(spec, implementation);

316

}

317

318

// Cleanup when shutting down

319

extension.closeAsync().get();

320

```

321

322

### HTTP Request Handler Integration

323

324

```java

325

import org.apache.flink.runtime.webmonitor.HttpRequestHandler;

326

import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;

327

import java.io.File;

328

329

// Setup upload directory

330

File uploadDir = new File("/tmp/flink-uploads");

331

File validatedDir = HttpRequestHandler.checkAndCreateUploadDir(uploadDir);

332

333

// Create HTTP request handler

334

HttpRequestHandler httpHandler = new HttpRequestHandler(validatedDir);

335

336

// Add to Netty pipeline

337

public void initChannel(SocketChannel ch) {

338

ChannelPipeline pipeline = ch.pipeline();

339

pipeline.addLast("httpRequestHandler", httpHandler);

340

341

// Add error handler as final stage

342

PipelineErrorHandler errorHandler = new PipelineErrorHandler(logger);

343

pipeline.addLast("errorHandler", errorHandler);

344

}

345

346

// Cleanup upload directory when needed

347

HttpRequestHandler.logExternalUploadDirDeletion(uploadDir);

348

```

349

350

### Advanced JAR Context Usage

351

352

```java

353

// Custom JAR context processing

354

public class CustomJarProcessor {

355

public void processJar(JarHandlerContext context) throws Exception {

356

// Extract JAR information

357

Path jarFile = context.getJarFile();

358

String entryClass = context.getEntryClassName();

359

List<String> args = context.getProgramArguments();

360

Integer parallelism = context.getParallelism();

361

362

// Create custom configuration

363

Configuration config = new Configuration();

364

config.setInteger(CoreOptions.DEFAULT_PARALLELISM,

365

parallelism != null ? parallelism : 1);

366

367

// Apply JAR-specific settings

368

context.applyToConfiguration(config);

369

370

// Create and validate job graph

371

ClassLoader jarClassLoader = createJarClassLoader(jarFile);

372

JobGraph jobGraph = context.toJobGraph(jarClassLoader);

373

374

// Validate job graph

375

validateJobGraph(jobGraph);

376

377

// Create packaged program for execution

378

PackagedProgram program = context.toPackagedProgram(jarClassLoader);

379

380

// Execute or further process

381

executeProgram(program, config);

382

}

383

384

private ClassLoader createJarClassLoader(Path jarFile) {

385

// Custom class loader creation logic

386

return URLClassLoader.newInstance(

387

new URL[]{jarFile.toUri().toURL()},

388

Thread.currentThread().getContextClassLoader()

389

);

390

}

391

392

private void validateJobGraph(JobGraph jobGraph) {

393

// Custom validation logic

394

if (jobGraph.getNumberOfVertices() == 0) {

395

throw new IllegalArgumentException("Job graph is empty");

396

}

397

}

398

399

private void executeProgram(PackagedProgram program, Configuration config) {

400

// Custom execution logic

401

}

402

}

403

```

404

405

### Error Handling Integration

406

407

```java

408

// Comprehensive error handling setup

409

public class WebServerSetup {

410

public void setupPipeline(SocketChannel ch) {

411

ChannelPipeline pipeline = ch.pipeline();

412

413

// Add request processing handlers

414

pipeline.addLast("httpRequestHandler", new HttpRequestHandler(uploadDir));

415

416

// Add business logic handlers

417

pipeline.addLast("jarUploadHandler", jarUploadHandler);

418

pipeline.addLast("jarRunHandler", jarRunHandler);

419

420

// Add final error handler

421

PipelineErrorHandler errorHandler = new PipelineErrorHandler(logger);

422

pipeline.addLast("pipelineErrorHandler", errorHandler);

423

}

424

}

425

426

// Custom error handling in JAR operations

427

public void handleJarOperation() {

428

try {

429

JarHandlerContext context = JarHandlerContext.fromRequest(request, jarDir);

430

JobGraph jobGraph = context.toJobGraph(classLoader);

431

// Process job graph

432

} catch (ClassNotFoundException e) {

433

logger.error("Entry class not found in JAR", e);

434

throw new BadRequestException("Invalid entry class: " + e.getMessage());

435

} catch (Exception e) {

436

logger.error("Failed to process JAR", e);

437

throw new InternalServerErrorException("JAR processing failed");

438

}

439

}

440

```

441

442

These utilities and extensions provide the foundation for building custom web interfaces and extending Flink's web capabilities while maintaining consistency with the core framework.