Web-based monitoring and management interface for Apache Flink with REST APIs and Angular dashboard.
—
Complete JAR lifecycle management for Flink job submission including upload, execution, execution planning, and deletion capabilities.
Upload JAR files to the Flink cluster for later execution. Supports multipart form data uploads with automatic validation.
/**
* Handles JAR file uploads via multipart form data
* REST Endpoint: POST /jars/upload
*/
public class JarUploadHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, JarUploadResponseBody, EmptyMessageParameters> {
public JarUploadHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Duration timeout,
Map<String, String> responseHeaders,
MessageHeaders<EmptyRequestBody, JarUploadResponseBody, EmptyMessageParameters> messageHeaders,
Path jarDir,
Executor executor
);
public CompletableFuture<JarUploadResponseBody> handleRequest(
HandlerRequest<EmptyRequestBody> request,
RestfulGateway gateway
) throws RestHandlerException;
}
/**
* Message headers for JAR upload endpoint
*/
public class JarUploadHeaders implements RuntimeMessageHeaders<EmptyRequestBody, JarUploadResponseBody, EmptyMessageParameters> {
public static JarUploadHeaders getInstance();
public boolean acceptsFileUploads();
public String getDescription();
}Usage Example:
# Upload a JAR file via REST API
curl -X POST -F "jarfile=@/path/to/job.jar" http://localhost:8081/jars/uploadList all uploaded JAR files with metadata including entry points, upload timestamps, and file information.
/**
* Lists all uploaded JAR files with metadata and entry points
* REST Endpoint: GET /jars
*/
public class JarListHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, JarListInfo, EmptyMessageParameters> {
public JarListHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Duration timeout,
Map<String, String> responseHeaders,
MessageHeaders<EmptyRequestBody, JarListInfo, EmptyMessageParameters> messageHeaders,
CompletableFuture<String> localAddressFuture,
File jarDir,
Configuration configuration,
Executor executor
);
public CompletableFuture<JarListInfo> handleRequest(
HandlerRequest<EmptyRequestBody> request,
RestfulGateway gateway
) throws RestHandlerException;
}Usage Example:
# List all uploaded JARs
curl http://localhost:8081/jarsExecute previously uploaded JAR files with configurable parameters including parallelism, program arguments, entry class, and savepoint restoration.
/**
* Submits and runs jobs from uploaded JAR files
* REST Endpoint: POST /jars/:jarid/run
*/
public class JarRunHandler extends AbstractRestHandler<DispatcherGateway, JarRunRequestBody, JarRunResponseBody, JarRunMessageParameters> {
public JarRunHandler(
GatewayRetriever<? extends DispatcherGateway> leaderRetriever,
Duration timeout,
Map<String, String> responseHeaders,
MessageHeaders<JarRunRequestBody, JarRunResponseBody, JarRunMessageParameters> messageHeaders,
Path jarDir,
Configuration configuration,
Executor executor,
java.util.function.Supplier<ApplicationRunner> applicationRunnerSupplier
);
public CompletableFuture<JarRunResponseBody> handleRequest(
HandlerRequest<JarRunRequestBody> request,
DispatcherGateway gateway
) throws RestHandlerException;
}
/**
* Request body for JAR execution with savepoint and configuration options
*/
public class JarRunRequestBody extends JarRequestBody implements RequestBody {
public JarRunRequestBody();
public JarRunRequestBody(
String entryClassName,
List<String> programArgumentsList,
Integer parallelism,
JobID jobId,
Boolean allowNonRestoredState,
String savepointPath,
RecoveryClaimMode recoveryClaimMode,
Map<String, String> flinkConfiguration
);
public Boolean getAllowNonRestoredState();
public String getSavepointPath();
public RecoveryClaimMode getRecoveryClaimMode();
public boolean isDeprecatedRestoreModeHasValue();
}Usage Example:
# Execute a JAR with parameters
curl -X POST http://localhost:8081/jars/your-jar-id/run \
-H "Content-Type: application/json" \
-d '{
"entryClass": "com.example.FlinkJob",
"programArgs": ["--input", "hdfs://input", "--output", "hdfs://output"],
"parallelism": 4,
"savepointPath": "hdfs://savepoints/savepoint-123",
"allowNonRestoredState": false
}'Generate execution plans for JAR files without actually running them. Useful for validation and visualization of job graphs.
/**
* Generates execution plans for JAR files without running them
* REST Endpoints: GET /jars/:jarid/plan, POST /jars/:jarid/plan
*/
public class JarPlanHandler extends AbstractRestHandler<RestfulGateway, JarPlanRequestBody, JobPlanInfo, JarPlanMessageParameters> {
public JarPlanHandler(
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
Duration timeout,
Map<String, String> responseHeaders,
MessageHeaders<JarPlanRequestBody, JobPlanInfo, JarPlanMessageParameters> messageHeaders,
Path jarDir,
Configuration configuration,
Executor executor
);
public CompletableFuture<JobPlanInfo> handleRequest(
HandlerRequest<JarPlanRequestBody> request,
RestfulGateway gateway
) throws RestHandlerException;
}
/**
* Abstract base class for JAR plan message headers
*/
public abstract class AbstractJarPlanHeaders implements RuntimeMessageHeaders<JarPlanRequestBody, JobPlanInfo, JarPlanMessageParameters> {
public Class<JobPlanInfo> getResponseClass();
public HttpResponseStatus getResponseStatusCode();
public Class<JarPlanRequestBody> getRequestClass();
public JarPlanMessageParameters getUnresolvedMessageParameters();
public String getTargetRestEndpointURL();
public String operationId();
public String getDescription();
}
/**
* Message headers for GET /jars/:jarid/plan endpoint
*/
public class JarPlanGetHeaders extends AbstractJarPlanHeaders {
public static JarPlanGetHeaders getInstance();
public HttpMethodWrapper getHttpMethod();
}
/**
* Message headers for POST /jars/:jarid/plan endpoint
*/
public class JarPlanPostHeaders extends AbstractJarPlanHeaders {
public static JarPlanPostHeaders getInstance();
public HttpMethodWrapper getHttpMethod();
}
/**
* Request body for JAR execution plan generation
*/
public class JarPlanRequestBody extends JarRequestBody implements RequestBody {
// Inherits entryClass, programArgs, parallelism from JarRequestBody
}Usage Example:
# Get execution plan for a JAR
curl "http://localhost:8081/jars/your-jar-id/plan?entryClass=com.example.FlinkJob¶llelism=4"
# Or with POST body
curl -X POST http://localhost:8081/jars/your-jar-id/plan \
-H "Content-Type: application/json" \
-d '{
"entryClass": "com.example.FlinkJob",
"programArgs": ["--input", "test"],
"parallelism": 2
}'Delete uploaded JAR files from the cluster to free up storage space.
/**
* Deletes uploaded JAR files
* REST Endpoint: DELETE /jars/:jarid
*/
public class JarDeleteHandler extends AbstractRestHandler<RestfulGateway, EmptyRequestBody, EmptyResponseBody, JarDeleteMessageParameters> {
public CompletableFuture<EmptyResponseBody> handleRequest(
HandlerRequest<EmptyRequestBody> request,
RestfulGateway gateway
) throws RestHandlerException;
}Usage Example:
# Delete an uploaded JAR
curl -X DELETE http://localhost:8081/jars/your-jar-id/**
* Base message parameters for JAR operations
*/
public class JarMessageParameters extends MessageParameters {
public final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
}
/**
* Message parameters for JAR run operations
*/
public class JarRunMessageParameters extends JarMessageParameters {
public final EntryClassQueryParameter entryClassQueryParameter = new EntryClassQueryParameter();
public final ParallelismQueryParameter parallelismQueryParameter = new ParallelismQueryParameter();
public final ProgramArgQueryParameter programArgQueryParameter = new ProgramArgQueryParameter();
public final SavepointPathQueryParameter savepointPathQueryParameter = new SavepointPathQueryParameter();
public final AllowNonRestoredStateQueryParameter allowNonRestoredStateQueryParameter = new AllowNonRestoredStateQueryParameter();
}
/**
* Message parameters for JAR plan operations
*/
public class JarPlanMessageParameters extends JarMessageParameters {
public final EntryClassQueryParameter entryClassQueryParameter = new EntryClassQueryParameter();
public final ParallelismQueryParameter parallelismQueryParameter = new ParallelismQueryParameter();
public final ProgramArgQueryParameter programArgQueryParameter = new ProgramArgQueryParameter();
}
/**
* Message parameters for JAR delete operations
*/
public class JarDeleteMessageParameters extends JarMessageParameters {
// Only inherits jarIdPathParameter
}
/**
* Path parameter for identifying JAR files by ID
*/
public class JarIdPathParameter extends MessagePathParameter<String> {
public static final String KEY = "jarid";
protected String convertFromString(String value) throws ConversionException;
protected String convertToString(String value);
public String getDescription();
}
/**
* Base class for string query parameters
*/
public abstract class StringQueryParameter extends MessageQueryParameter<String> {
public StringQueryParameter(String key, MessageParameterRequisiteness requisiteness);
public final String convertStringToValue(String value);
public final String convertValueToString(String value);
}
/**
* Query parameters for JAR operations
*/
public class ParallelismQueryParameter extends MessageQueryParameter<Integer> {
public static final String KEY = "parallelism";
}
public class EntryClassQueryParameter extends StringQueryParameter {
public static final String KEY = "entryClass";
}
public class ProgramArgQueryParameter extends MessageQueryParameter<List<String>> {
public static final String KEY = "programArgs";
}
public class SavepointPathQueryParameter extends StringQueryParameter {
public static final String KEY = "savepointPath";
}
public class AllowNonRestoredStateQueryParameter extends MessageQueryParameter<Boolean> {
public static final String KEY = "allowNonRestoredState";
}/**
* Base request body for JAR operations
*/
public abstract class JarRequestBody implements RequestBody {
/**
* Get the entry class name for JAR execution
*/
public String getEntryClassName();
/**
* Get the program arguments as a list
*/
public List<String> getProgramArgumentsList();
/**
* Get the parallelism setting for job execution
*/
public Integer getParallelism();
/**
* Get the job ID for the execution
*/
public JobID getJobId();
/**
* Get Flink configuration overrides
*/
public Configuration getFlinkConfiguration();
}/**
* Response for JAR upload operations
*/
public class JarUploadResponseBody implements ResponseBody {
public JarUploadResponseBody(String filename);
public String getFilename();
public UploadStatus getStatus();
public enum UploadStatus {
success
}
}
/**
* Response for JAR execution operations
*/
public class JarRunResponseBody implements ResponseBody {
public JarRunResponseBody(JobID jobId);
public JobID getJobId();
}
/**
* Response containing list of uploaded JARs with metadata
*/
public class JarListInfo implements ResponseBody {
public JarListInfo(String address, List<JarFileInfo> jarFileList);
public String getAddress();
public List<JarFileInfo> getFiles();
/**
* Individual JAR file metadata
*/
public static class JarFileInfo {
public JarFileInfo(String id, String name, long uploaded, List<JarEntryInfo> jarEntryList);
public String getId();
public String getName();
public long getUploaded();
public List<JarEntryInfo> getEntry();
// Public fields for JSON serialization
public String id;
public String name;
public long uploaded;
public List<JarEntryInfo> entry;
}
/**
* Entry point information for JAR files
*/
public static class JarEntryInfo {
public JarEntryInfo(String name, String description);
public String getName();
public String getDescription();
}
}/**
* JAR handling utilities for upload and execution
*/
public class JarHandlerUtils {
/**
* Tokenizes program arguments string into list of arguments
*/
static List<String> tokenizeArguments(@Nullable String args);
/**
* Context for JAR handler operations containing standard parameters
*/
public static class JarHandlerContext {
public static <R extends JarRequestBody> JarHandlerContext fromRequest(
HandlerRequest<R> request,
Path jarDir,
Logger log
) throws RestHandlerException;
/**
* Apply JAR request configuration to Flink configuration
*/
public void applyToConfiguration(
Configuration configuration,
HandlerRequest<? extends JarRequestBody> request
);
/**
* Create job graph from packaged program
*/
public JobGraph toJobGraph(
PackagedProgram packagedProgram,
Configuration configuration,
boolean suppressOutput
) throws Exception;
/**
* Create packaged program from configuration
*/
public PackagedProgram toPackagedProgram(Configuration configuration)
throws Exception;
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-runtime-web