A service that enables multiple clients from the remote to execute SQL in concurrency, providing an easy way to submit Flink Jobs, look up metadata, and analyze data online.
—
Workflow management provides materialized table scheduling system with Quartz integration for periodic refresh operations, workflow lifecycle management, and embedded scheduler support for automated data pipeline operations.
Base interface for workflow schedulers managing materialized table refresh operations.
/**
* Workflow scheduler interface for materialized table refresh operations
* @param <T> The type of RefreshHandler used by specific WorkflowScheduler to locate the refresh workflow in scheduler service
*/
public interface WorkflowScheduler<T extends RefreshHandler> {
/**
* Open this workflow scheduler instance for required preparation in initialization phase
* @throws WorkflowException if initializing workflow scheduler occurs exception
*/
void open() throws WorkflowException;
/**
* Close this workflow scheduler when it is no longer needed and release any resource that it might be holding
* @throws WorkflowException if closing the related resources of workflow scheduler failed
*/
void close() throws WorkflowException;
/**
* Return a RefreshHandlerSerializer instance to serialize and deserialize RefreshHandler created by specific workflow scheduler service
* @return RefreshHandlerSerializer instance for type T
*/
RefreshHandlerSerializer<T> getRefreshHandlerSerializer();
/**
* Create a refresh workflow in specific scheduler service for the materialized table
* This method supports creating workflow for periodic refresh, as well as workflow for a one-time refresh only
* @param createRefreshWorkflow The detail info for create refresh workflow of materialized table
* @return The meta info which points to the refresh workflow in scheduler service
* @throws WorkflowException if creating refresh workflow failed
*/
T createRefreshWorkflow(CreateRefreshWorkflow createRefreshWorkflow) throws WorkflowException;
/**
* Modify the refresh workflow status in scheduler service. This includes suspend, resume, modify schedule cron operation, and so on
* @param modifyRefreshWorkflow The detail info for modify refresh workflow of materialized table
* @throws WorkflowException if modify refresh workflow failed
*/
void modifyRefreshWorkflow(ModifyRefreshWorkflow<T> modifyRefreshWorkflow) throws WorkflowException;
/**
* Delete the refresh workflow in scheduler service
* @param deleteRefreshWorkflow The detail info for delete refresh workflow of materialized table
* @throws WorkflowException if delete refresh workflow failed
*/
void deleteRefreshWorkflow(DeleteRefreshWorkflow<T> deleteRefreshWorkflow) throws WorkflowException;
}Concrete implementation of WorkflowScheduler for embedded Quartz scheduler.
/**
* A workflow scheduler plugin implementation for EmbeddedQuartzScheduler
* It is used to create, modify refresh workflow for materialized table
*/
public class EmbeddedWorkflowScheduler implements WorkflowScheduler<EmbeddedRefreshHandler> {
/**
* Constructor with configuration
* @param configuration Configuration for the embedded scheduler
*/
public EmbeddedWorkflowScheduler(Configuration configuration);
}Factory for creating embedded workflow scheduler instances.
/**
* The WorkflowSchedulerFactory to create the EmbeddedWorkflowScheduler
*/
public class EmbeddedWorkflowSchedulerFactory implements WorkflowSchedulerFactory {
/**
* Factory identifier for embedded scheduler
*/
public static final String IDENTIFIER = "embedded";
/**
* Get factory identifier
* @return Factory identifier string
*/
public String factoryIdentifier();
/**
* Get required configuration options
* @return Set of required ConfigOptions (empty for embedded scheduler)
*/
public Set<ConfigOption<?>> requiredOptions();
/**
* Get optional configuration options
* @return Set of optional ConfigOptions (empty for embedded scheduler)
*/
public Set<ConfigOption<?>> optionalOptions();
/**
* Create workflow scheduler instance
* @param context Factory context with configuration and other dependencies
* @return WorkflowScheduler instance
*/
public WorkflowScheduler<?> createWorkflowScheduler(Context context);
}Handler for materialized table refresh operations with serialization support.
/**
* Handler for materialized table refresh operations
*/
public class EmbeddedRefreshHandler {
/**
* Execute materialized table refresh
* @param context Execution context with table and job details
* @throws Exception if refresh execution fails
*/
public void execute(RefreshContext context) throws Exception;
/**
* Get table identifier for this refresh handler
* @return String identifier of the materialized table
*/
public String getTableIdentifier();
/**
* Get refresh configuration
* @return Map of refresh configuration properties
*/
public Map<String, String> getRefreshConfig();
}Information about workflow execution and status.
/**
* Information about workflow execution
*/
public class WorkflowInfo {
/**
* Get workflow identifier
* @return Unique workflow identifier
*/
public String getWorkflowId();
/**
* Get workflow name
* @return Human-readable workflow name
*/
public String getWorkflowName();
/**
* Get workflow status
* @return Current workflow status (ACTIVE, PAUSED, COMPLETED, etc.)
*/
public WorkflowStatus getStatus();
/**
* Get next execution time
* @return Optional next scheduled execution time
*/
public Optional<Instant> getNextExecutionTime();
/**
* Get last execution time
* @return Optional last execution time
*/
public Optional<Instant> getLastExecutionTime();
/**
* Get workflow creation time
* @return Workflow creation timestamp
*/
public Instant getCreationTime();
}Manager for materialized table operations and refresh scheduling.
/**
* Manager for materialized table operations
*/
public class MaterializedTableManager {
/**
* Refresh materialized table with specified options
* @param tableIdentifier Fully qualified table identifier
* @param isPeriodic Whether refresh is periodic or one-time
* @param scheduleTime Optional schedule time for execution
* @param dynamicOptions Dynamic configuration options
* @param staticPartitions Partition specifications for refresh
* @param executionConfig Flink job execution configuration
* @return OperationHandle for tracking the refresh operation
*/
public OperationHandle refreshMaterializedTable(
String tableIdentifier,
boolean isPeriodic,
@Nullable String scheduleTime,
Map<String, String> dynamicOptions,
Map<String, String> staticPartitions,
Map<String, String> executionConfig
);
/**
* Get refresh status for materialized table
* @param tableIdentifier Table identifier
* @return MaterializedTableRefreshStatus with current state
*/
public MaterializedTableRefreshStatus getRefreshStatus(String tableIdentifier);
/**
* Cancel ongoing refresh operation
* @param tableIdentifier Table identifier
* @param operationHandle Operation to cancel
*/
public void cancelRefresh(String tableIdentifier, OperationHandle operationHandle);
}import org.apache.flink.table.gateway.workflow.EmbeddedWorkflowScheduler;
import org.apache.flink.table.gateway.workflow.EmbeddedWorkflowSchedulerFactory;
// Create workflow scheduler
Configuration schedulerConfig = new Configuration();
schedulerConfig.setString("workflow.scheduler.type", "quartz");
schedulerConfig.setString("workflow.scheduler.quartz.instanceName", "FlinkSQLGateway");
schedulerConfig.setInteger("workflow.scheduler.quartz.threadCount", 10);
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
EmbeddedWorkflowSchedulerFactory factory = new EmbeddedQuartzSchedulerFactory();
EmbeddedWorkflowScheduler scheduler = factory.createEmbeddedWorkflowScheduler(
schedulerConfig,
classLoader
);
// Initialize scheduler
scheduler.open();
// Scheduler is now ready for workflow managementimport org.apache.flink.table.gateway.service.materializedtable.MaterializedTableManager;
// Configure materialized table refresh
MaterializedTableManager manager = new MaterializedTableManager(scheduler, service);
// One-time refresh
OperationHandle oneTimeRefresh = manager.refreshMaterializedTable(
"my_catalog.my_database.sales_summary", // Table identifier
false, // Not periodic
null, // No schedule time (immediate)
Map.of("execution.parallelism", "4"), // Dynamic options
Map.of("year", "2023", "month", "12"), // Static partitions
Map.of("execution.savepoint.path", "hdfs://cluster/savepoints") // Execution config
);
System.out.println("Started one-time refresh: " + oneTimeRefresh);
// Periodic refresh (daily at 2 AM)
OperationHandle periodicRefresh = manager.refreshMaterializedTable(
"my_catalog.my_database.daily_metrics",
true, // Periodic
"0 0 2 * * ?", // Cron expression for daily 2 AM
Map.of(
"execution.parallelism", "8",
"execution.max-parallelism", "128"
),
Collections.emptyMap(), // No partition restrictions
Map.of(
"execution.checkpointing.interval", "30s",
"execution.savepoint.path", "hdfs://cluster/savepoints"
)
);
System.out.println("Started periodic refresh: " + periodicRefresh);// Create custom refresh workflow
public class CustomRefreshWorkflow {
public void createHourlyRefreshWorkflow(
EmbeddedWorkflowScheduler scheduler,
String tableIdentifier,
Map<String, String> refreshConfig) throws SchedulerException {
CreateRefreshWorkflow request = CreateRefreshWorkflow.builder()
.workflowId("hourly_refresh_" + tableIdentifier.replace(".", "_"))
.workflowName("Hourly Refresh for " + tableIdentifier)
.tableIdentifier(tableIdentifier)
.cronExpression("0 0 * * * ?") // Every hour
.isPeriodic(true)
.refreshConfig(refreshConfig)
.executionConfig(Map.of(
"execution.parallelism", "4",
"execution.checkpointing.interval", "60s"
))
.build();
scheduler.createRefreshWorkflow(request);
System.out.println("Created hourly refresh workflow for: " + tableIdentifier);
}
public void createConditionalRefreshWorkflow(
EmbeddedWorkflowScheduler scheduler,
String tableIdentifier,
String condition) throws SchedulerException {
// Custom refresh with conditions
Map<String, String> refreshConfig = Map.of(
"refresh.condition", condition,
"refresh.partition.strategy", "incremental",
"refresh.max.records", "1000000"
);
CreateRefreshWorkflow request = CreateRefreshWorkflow.builder()
.workflowId("conditional_refresh_" + System.currentTimeMillis())
.workflowName("Conditional Refresh")
.tableIdentifier(tableIdentifier)
.cronExpression("0 */15 * * * ?") // Every 15 minutes
.isPeriodic(true)
.refreshConfig(refreshConfig)
.build();
scheduler.createRefreshWorkflow(request);
}
}// Comprehensive workflow lifecycle management
public class WorkflowLifecycleManager {
private final EmbeddedWorkflowScheduler scheduler;
private final Map<String, WorkflowInfo> activeWorkflows = new ConcurrentHashMap<>();
public WorkflowLifecycleManager(EmbeddedWorkflowScheduler scheduler) {
this.scheduler = scheduler;
}
public void createWorkflow(WorkflowDefinition definition) throws SchedulerException {
CreateRefreshWorkflow request = buildCreateRequest(definition);
scheduler.createRefreshWorkflow(request);
WorkflowInfo info = WorkflowInfo.builder()
.workflowId(definition.getId())
.workflowName(definition.getName())
.status(WorkflowStatus.ACTIVE)
.creationTime(Instant.now())
.build();
activeWorkflows.put(definition.getId(), info);
System.out.println("Created workflow: " + definition.getId());
}
public void pauseWorkflow(String workflowId) throws SchedulerException {
ModifyRefreshWorkflow request = ModifyRefreshWorkflow.builder()
.workflowId(workflowId)
.action(WorkflowAction.PAUSE)
.build();
scheduler.modifyRefreshWorkflow(request);
WorkflowInfo info = activeWorkflows.get(workflowId);
if (info != null) {
activeWorkflows.put(workflowId, info.withStatus(WorkflowStatus.PAUSED));
}
System.out.println("Paused workflow: " + workflowId);
}
public void resumeWorkflow(String workflowId) throws SchedulerException {
ModifyRefreshWorkflow request = ModifyRefreshWorkflow.builder()
.workflowId(workflowId)
.action(WorkflowAction.RESUME)
.build();
scheduler.modifyRefreshWorkflow(request);
WorkflowInfo info = activeWorkflows.get(workflowId);
if (info != null) {
activeWorkflows.put(workflowId, info.withStatus(WorkflowStatus.ACTIVE));
}
System.out.println("Resumed workflow: " + workflowId);
}
public void deleteWorkflow(String workflowId) throws SchedulerException {
DeleteRefreshWorkflow request = DeleteRefreshWorkflow.builder()
.workflowId(workflowId)
.build();
scheduler.deleteRefreshWorkflow(request);
activeWorkflows.remove(workflowId);
System.out.println("Deleted workflow: " + workflowId);
}
public List<WorkflowInfo> listActiveWorkflows() {
return new ArrayList<>(activeWorkflows.values());
}
private CreateRefreshWorkflow buildCreateRequest(WorkflowDefinition definition) {
return CreateRefreshWorkflow.builder()
.workflowId(definition.getId())
.workflowName(definition.getName())
.tableIdentifier(definition.getTableIdentifier())
.cronExpression(definition.getCronExpression())
.isPeriodic(definition.isPeriodic())
.refreshConfig(definition.getRefreshConfig())
.executionConfig(definition.getExecutionConfig())
.build();
}
}// Workflow execution monitoring
public class WorkflowMonitor {
private final MaterializedTableManager manager;
private final ScheduledExecutorService monitorExecutor;
public WorkflowMonitor(MaterializedTableManager manager) {
this.manager = manager;
this.monitorExecutor = Executors.newScheduledThreadPool(2);
}
public void startMonitoring() {
// Monitor refresh status every minute
monitorExecutor.scheduleAtFixedRate(
this::checkRefreshStatus,
0, 60, TimeUnit.SECONDS
);
// Generate reports every hour
monitorExecutor.scheduleAtFixedRate(
this::generateStatusReport,
0, 3600, TimeUnit.SECONDS
);
}
private void checkRefreshStatus() {
List<String> tables = getMonitoredTables();
for (String table : tables) {
try {
MaterializedTableRefreshStatus status = manager.getRefreshStatus(table);
switch (status.getStatus()) {
case RUNNING:
System.out.println("Refresh running for: " + table +
", Progress: " + status.getProgress() + "%");
break;
case FAILED:
System.err.println("Refresh failed for: " + table +
", Error: " + status.getErrorMessage());
handleRefreshFailure(table, status);
break;
case COMPLETED:
System.out.println("Refresh completed for: " + table +
", Duration: " + status.getDuration() + "ms");
break;
default:
// Handle other statuses
break;
}
} catch (Exception e) {
System.err.println("Failed to check status for: " + table + ", " + e.getMessage());
}
}
}
private void handleRefreshFailure(String table, MaterializedTableRefreshStatus status) {
// Implement retry logic, alerting, etc.
if (status.getRetryCount() < 3) {
System.out.println("Retrying refresh for: " + table);
// Trigger retry...
} else {
System.err.println("Max retries exceeded for: " + table);
// Send alert...
}
}
private void generateStatusReport() {
System.out.println("=== Workflow Status Report ===");
List<String> tables = getMonitoredTables();
int totalTables = tables.size();
int healthyTables = 0;
int failedTables = 0;
for (String table : tables) {
try {
MaterializedTableRefreshStatus status = manager.getRefreshStatus(table);
if (status.isHealthy()) {
healthyTables++;
} else {
failedTables++;
}
} catch (Exception e) {
failedTables++;
}
}
System.out.println("Total tables: " + totalTables);
System.out.println("Healthy tables: " + healthyTables);
System.out.println("Failed tables: " + failedTables);
System.out.println("Health ratio: " + (healthyTables * 100.0 / totalTables) + "%");
System.out.println("===============================");
}
private List<String> getMonitoredTables() {
// Return list of tables to monitor
return Arrays.asList(
"catalog.db.sales_summary",
"catalog.db.daily_metrics",
"catalog.db.user_analytics"
);
}
public void stop() {
monitorExecutor.shutdown();
try {
if (!monitorExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
monitorExecutor.shutdownNow();
}
} catch (InterruptedException e) {
monitorExecutor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-gateway