0
# Workflow Management
1
2
Workflow management provides materialized table scheduling system with Quartz integration for periodic refresh operations, workflow lifecycle management, and embedded scheduler support for automated data pipeline operations.
3
4
## Capabilities
5
6
### WorkflowScheduler Interface
7
8
Base interface for workflow schedulers managing materialized table refresh operations.
9
10
```java { .api }
11
/**
12
* Workflow scheduler interface for materialized table refresh operations
13
* @param <T> The type of RefreshHandler used by specific WorkflowScheduler to locate the refresh workflow in scheduler service
14
*/
15
public interface WorkflowScheduler<T extends RefreshHandler> {
16
/**
17
* Open this workflow scheduler instance for required preparation in initialization phase
18
* @throws WorkflowException if initializing workflow scheduler occurs exception
19
*/
20
void open() throws WorkflowException;
21
22
/**
23
* Close this workflow scheduler when it is no longer needed and release any resource that it might be holding
24
* @throws WorkflowException if closing the related resources of workflow scheduler failed
25
*/
26
void close() throws WorkflowException;
27
28
/**
29
* Return a RefreshHandlerSerializer instance to serialize and deserialize RefreshHandler created by specific workflow scheduler service
30
* @return RefreshHandlerSerializer instance for type T
31
*/
32
RefreshHandlerSerializer<T> getRefreshHandlerSerializer();
33
34
/**
35
* Create a refresh workflow in specific scheduler service for the materialized table
36
* This method supports creating workflow for periodic refresh, as well as workflow for a one-time refresh only
37
* @param createRefreshWorkflow The detail info for create refresh workflow of materialized table
38
* @return The meta info which points to the refresh workflow in scheduler service
39
* @throws WorkflowException if creating refresh workflow failed
40
*/
41
T createRefreshWorkflow(CreateRefreshWorkflow createRefreshWorkflow) throws WorkflowException;
42
43
/**
44
* Modify the refresh workflow status in scheduler service. This includes suspend, resume, modify schedule cron operation, and so on
45
* @param modifyRefreshWorkflow The detail info for modify refresh workflow of materialized table
46
* @throws WorkflowException if modify refresh workflow failed
47
*/
48
void modifyRefreshWorkflow(ModifyRefreshWorkflow<T> modifyRefreshWorkflow) throws WorkflowException;
49
50
/**
51
* Delete the refresh workflow in scheduler service
52
* @param deleteRefreshWorkflow The detail info for delete refresh workflow of materialized table
53
* @throws WorkflowException if delete refresh workflow failed
54
*/
55
void deleteRefreshWorkflow(DeleteRefreshWorkflow<T> deleteRefreshWorkflow) throws WorkflowException;
56
}
57
```
58
59
### EmbeddedWorkflowScheduler Implementation
60
61
Concrete implementation of WorkflowScheduler for embedded Quartz scheduler.
62
63
```java { .api }
64
/**
65
* A workflow scheduler plugin implementation for EmbeddedQuartzScheduler
66
* It is used to create, modify refresh workflow for materialized table
67
*/
68
public class EmbeddedWorkflowScheduler implements WorkflowScheduler<EmbeddedRefreshHandler> {
69
/**
70
* Constructor with configuration
71
* @param configuration Configuration for the embedded scheduler
72
*/
73
public EmbeddedWorkflowScheduler(Configuration configuration);
74
}
75
```
76
77
### EmbeddedWorkflowSchedulerFactory
78
79
Factory for creating embedded workflow scheduler instances.
80
81
```java { .api }
82
/**
83
* The WorkflowSchedulerFactory to create the EmbeddedWorkflowScheduler
84
*/
85
public class EmbeddedWorkflowSchedulerFactory implements WorkflowSchedulerFactory {
86
/**
87
* Factory identifier for embedded scheduler
88
*/
89
public static final String IDENTIFIER = "embedded";
90
91
/**
92
* Get factory identifier
93
* @return Factory identifier string
94
*/
95
public String factoryIdentifier();
96
97
/**
98
* Get required configuration options
99
* @return Set of required ConfigOptions (empty for embedded scheduler)
100
*/
101
public Set<ConfigOption<?>> requiredOptions();
102
103
/**
104
* Get optional configuration options
105
* @return Set of optional ConfigOptions (empty for embedded scheduler)
106
*/
107
public Set<ConfigOption<?>> optionalOptions();
108
109
/**
110
* Create workflow scheduler instance
111
* @param context Factory context with configuration and other dependencies
112
* @return WorkflowScheduler instance
113
*/
114
public WorkflowScheduler<?> createWorkflowScheduler(Context context);
115
}
116
```
117
118
### EmbeddedRefreshHandler
119
120
Handler for materialized table refresh operations with serialization support.
121
122
```java { .api }
123
/**
124
* Handler for materialized table refresh operations
125
*/
126
public class EmbeddedRefreshHandler {
127
/**
128
* Execute materialized table refresh
129
* @param context Execution context with table and job details
130
* @throws Exception if refresh execution fails
131
*/
132
public void execute(RefreshContext context) throws Exception;
133
134
/**
135
* Get table identifier for this refresh handler
136
* @return String identifier of the materialized table
137
*/
138
public String getTableIdentifier();
139
140
/**
141
* Get refresh configuration
142
* @return Map of refresh configuration properties
143
*/
144
public Map<String, String> getRefreshConfig();
145
}
146
```
147
148
### WorkflowInfo
149
150
Information about workflow execution and status.
151
152
```java { .api }
153
/**
154
* Information about workflow execution
155
*/
156
public class WorkflowInfo {
157
/**
158
* Get workflow identifier
159
* @return Unique workflow identifier
160
*/
161
public String getWorkflowId();
162
163
/**
164
* Get workflow name
165
* @return Human-readable workflow name
166
*/
167
public String getWorkflowName();
168
169
/**
170
* Get workflow status
171
* @return Current workflow status (ACTIVE, PAUSED, COMPLETED, etc.)
172
*/
173
public WorkflowStatus getStatus();
174
175
/**
176
* Get next execution time
177
* @return Optional next scheduled execution time
178
*/
179
public Optional<Instant> getNextExecutionTime();
180
181
/**
182
* Get last execution time
183
* @return Optional last execution time
184
*/
185
public Optional<Instant> getLastExecutionTime();
186
187
/**
188
* Get workflow creation time
189
* @return Workflow creation timestamp
190
*/
191
public Instant getCreationTime();
192
}
193
```
194
195
### MaterializedTableManager
196
197
Manager for materialized table operations and refresh scheduling.
198
199
```java { .api }
200
/**
201
* Manager for materialized table operations
202
*/
203
public class MaterializedTableManager {
204
/**
205
* Refresh materialized table with specified options
206
* @param tableIdentifier Fully qualified table identifier
207
* @param isPeriodic Whether refresh is periodic or one-time
208
* @param scheduleTime Optional schedule time for execution
209
* @param dynamicOptions Dynamic configuration options
210
* @param staticPartitions Partition specifications for refresh
211
* @param executionConfig Flink job execution configuration
212
* @return OperationHandle for tracking the refresh operation
213
*/
214
public OperationHandle refreshMaterializedTable(
215
String tableIdentifier,
216
boolean isPeriodic,
217
@Nullable String scheduleTime,
218
Map<String, String> dynamicOptions,
219
Map<String, String> staticPartitions,
220
Map<String, String> executionConfig
221
);
222
223
/**
224
* Get refresh status for materialized table
225
* @param tableIdentifier Table identifier
226
* @return MaterializedTableRefreshStatus with current state
227
*/
228
public MaterializedTableRefreshStatus getRefreshStatus(String tableIdentifier);
229
230
/**
231
* Cancel ongoing refresh operation
232
* @param tableIdentifier Table identifier
233
* @param operationHandle Operation to cancel
234
*/
235
public void cancelRefresh(String tableIdentifier, OperationHandle operationHandle);
236
}
237
```
238
239
## Usage Examples
240
241
### Creating Workflow Scheduler
242
243
```java
244
import org.apache.flink.table.gateway.workflow.EmbeddedWorkflowScheduler;
245
import org.apache.flink.table.gateway.workflow.EmbeddedWorkflowSchedulerFactory;
246
247
// Create workflow scheduler
248
Configuration schedulerConfig = new Configuration();
249
schedulerConfig.setString("workflow.scheduler.type", "quartz");
250
schedulerConfig.setString("workflow.scheduler.quartz.instanceName", "FlinkSQLGateway");
251
schedulerConfig.setInteger("workflow.scheduler.quartz.threadCount", 10);
252
253
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
254
255
EmbeddedWorkflowSchedulerFactory factory = new EmbeddedQuartzSchedulerFactory();
256
EmbeddedWorkflowScheduler scheduler = factory.createEmbeddedWorkflowScheduler(
257
schedulerConfig,
258
classLoader
259
);
260
261
// Initialize scheduler
262
scheduler.open();
263
264
// Scheduler is now ready for workflow management
265
```
266
267
### Managing Materialized Table Refresh
268
269
```java
270
import org.apache.flink.table.gateway.service.materializedtable.MaterializedTableManager;
271
272
// Configure materialized table refresh
273
MaterializedTableManager manager = new MaterializedTableManager(scheduler, service);
274
275
// One-time refresh
276
OperationHandle oneTimeRefresh = manager.refreshMaterializedTable(
277
"my_catalog.my_database.sales_summary", // Table identifier
278
false, // Not periodic
279
null, // No schedule time (immediate)
280
Map.of("execution.parallelism", "4"), // Dynamic options
281
Map.of("year", "2023", "month", "12"), // Static partitions
282
Map.of("execution.savepoint.path", "hdfs://cluster/savepoints") // Execution config
283
);
284
285
System.out.println("Started one-time refresh: " + oneTimeRefresh);
286
287
// Periodic refresh (daily at 2 AM)
288
OperationHandle periodicRefresh = manager.refreshMaterializedTable(
289
"my_catalog.my_database.daily_metrics",
290
true, // Periodic
291
"0 0 2 * * ?", // Cron expression for daily 2 AM
292
Map.of(
293
"execution.parallelism", "8",
294
"execution.max-parallelism", "128"
295
),
296
Collections.emptyMap(), // No partition restrictions
297
Map.of(
298
"execution.checkpointing.interval", "30s",
299
"execution.savepoint.path", "hdfs://cluster/savepoints"
300
)
301
);
302
303
System.out.println("Started periodic refresh: " + periodicRefresh);
304
```
305
306
### Creating Custom Refresh Workflows
307
308
```java
309
// Create custom refresh workflow
310
public class CustomRefreshWorkflow {
311
312
public void createHourlyRefreshWorkflow(
313
EmbeddedWorkflowScheduler scheduler,
314
String tableIdentifier,
315
Map<String, String> refreshConfig) throws SchedulerException {
316
317
CreateRefreshWorkflow request = CreateRefreshWorkflow.builder()
318
.workflowId("hourly_refresh_" + tableIdentifier.replace(".", "_"))
319
.workflowName("Hourly Refresh for " + tableIdentifier)
320
.tableIdentifier(tableIdentifier)
321
.cronExpression("0 0 * * * ?") // Every hour
322
.isPeriodic(true)
323
.refreshConfig(refreshConfig)
324
.executionConfig(Map.of(
325
"execution.parallelism", "4",
326
"execution.checkpointing.interval", "60s"
327
))
328
.build();
329
330
scheduler.createRefreshWorkflow(request);
331
System.out.println("Created hourly refresh workflow for: " + tableIdentifier);
332
}
333
334
public void createConditionalRefreshWorkflow(
335
EmbeddedWorkflowScheduler scheduler,
336
String tableIdentifier,
337
String condition) throws SchedulerException {
338
339
// Custom refresh with conditions
340
Map<String, String> refreshConfig = Map.of(
341
"refresh.condition", condition,
342
"refresh.partition.strategy", "incremental",
343
"refresh.max.records", "1000000"
344
);
345
346
CreateRefreshWorkflow request = CreateRefreshWorkflow.builder()
347
.workflowId("conditional_refresh_" + System.currentTimeMillis())
348
.workflowName("Conditional Refresh")
349
.tableIdentifier(tableIdentifier)
350
.cronExpression("0 */15 * * * ?") // Every 15 minutes
351
.isPeriodic(true)
352
.refreshConfig(refreshConfig)
353
.build();
354
355
scheduler.createRefreshWorkflow(request);
356
}
357
}
358
```
359
360
### Workflow Lifecycle Management
361
362
```java
363
// Comprehensive workflow lifecycle management
364
public class WorkflowLifecycleManager {
365
private final EmbeddedWorkflowScheduler scheduler;
366
private final Map<String, WorkflowInfo> activeWorkflows = new ConcurrentHashMap<>();
367
368
public WorkflowLifecycleManager(EmbeddedWorkflowScheduler scheduler) {
369
this.scheduler = scheduler;
370
}
371
372
public void createWorkflow(WorkflowDefinition definition) throws SchedulerException {
373
CreateRefreshWorkflow request = buildCreateRequest(definition);
374
scheduler.createRefreshWorkflow(request);
375
376
WorkflowInfo info = WorkflowInfo.builder()
377
.workflowId(definition.getId())
378
.workflowName(definition.getName())
379
.status(WorkflowStatus.ACTIVE)
380
.creationTime(Instant.now())
381
.build();
382
383
activeWorkflows.put(definition.getId(), info);
384
System.out.println("Created workflow: " + definition.getId());
385
}
386
387
public void pauseWorkflow(String workflowId) throws SchedulerException {
388
ModifyRefreshWorkflow request = ModifyRefreshWorkflow.builder()
389
.workflowId(workflowId)
390
.action(WorkflowAction.PAUSE)
391
.build();
392
393
scheduler.modifyRefreshWorkflow(request);
394
395
WorkflowInfo info = activeWorkflows.get(workflowId);
396
if (info != null) {
397
activeWorkflows.put(workflowId, info.withStatus(WorkflowStatus.PAUSED));
398
}
399
400
System.out.println("Paused workflow: " + workflowId);
401
}
402
403
public void resumeWorkflow(String workflowId) throws SchedulerException {
404
ModifyRefreshWorkflow request = ModifyRefreshWorkflow.builder()
405
.workflowId(workflowId)
406
.action(WorkflowAction.RESUME)
407
.build();
408
409
scheduler.modifyRefreshWorkflow(request);
410
411
WorkflowInfo info = activeWorkflows.get(workflowId);
412
if (info != null) {
413
activeWorkflows.put(workflowId, info.withStatus(WorkflowStatus.ACTIVE));
414
}
415
416
System.out.println("Resumed workflow: " + workflowId);
417
}
418
419
public void deleteWorkflow(String workflowId) throws SchedulerException {
420
DeleteRefreshWorkflow request = DeleteRefreshWorkflow.builder()
421
.workflowId(workflowId)
422
.build();
423
424
scheduler.deleteRefreshWorkflow(request);
425
activeWorkflows.remove(workflowId);
426
427
System.out.println("Deleted workflow: " + workflowId);
428
}
429
430
public List<WorkflowInfo> listActiveWorkflows() {
431
return new ArrayList<>(activeWorkflows.values());
432
}
433
434
private CreateRefreshWorkflow buildCreateRequest(WorkflowDefinition definition) {
435
return CreateRefreshWorkflow.builder()
436
.workflowId(definition.getId())
437
.workflowName(definition.getName())
438
.tableIdentifier(definition.getTableIdentifier())
439
.cronExpression(definition.getCronExpression())
440
.isPeriodic(definition.isPeriodic())
441
.refreshConfig(definition.getRefreshConfig())
442
.executionConfig(definition.getExecutionConfig())
443
.build();
444
}
445
}
446
```
447
448
### Monitoring Workflow Execution
449
450
```java
451
// Workflow execution monitoring
452
public class WorkflowMonitor {
453
private final MaterializedTableManager manager;
454
private final ScheduledExecutorService monitorExecutor;
455
456
public WorkflowMonitor(MaterializedTableManager manager) {
457
this.manager = manager;
458
this.monitorExecutor = Executors.newScheduledThreadPool(2);
459
}
460
461
public void startMonitoring() {
462
// Monitor refresh status every minute
463
monitorExecutor.scheduleAtFixedRate(
464
this::checkRefreshStatus,
465
0, 60, TimeUnit.SECONDS
466
);
467
468
// Generate reports every hour
469
monitorExecutor.scheduleAtFixedRate(
470
this::generateStatusReport,
471
0, 3600, TimeUnit.SECONDS
472
);
473
}
474
475
private void checkRefreshStatus() {
476
List<String> tables = getMonitoredTables();
477
478
for (String table : tables) {
479
try {
480
MaterializedTableRefreshStatus status = manager.getRefreshStatus(table);
481
482
switch (status.getStatus()) {
483
case RUNNING:
484
System.out.println("Refresh running for: " + table +
485
", Progress: " + status.getProgress() + "%");
486
break;
487
488
case FAILED:
489
System.err.println("Refresh failed for: " + table +
490
", Error: " + status.getErrorMessage());
491
handleRefreshFailure(table, status);
492
break;
493
494
case COMPLETED:
495
System.out.println("Refresh completed for: " + table +
496
", Duration: " + status.getDuration() + "ms");
497
break;
498
499
default:
500
// Handle other statuses
501
break;
502
}
503
} catch (Exception e) {
504
System.err.println("Failed to check status for: " + table + ", " + e.getMessage());
505
}
506
}
507
}
508
509
private void handleRefreshFailure(String table, MaterializedTableRefreshStatus status) {
510
// Implement retry logic, alerting, etc.
511
if (status.getRetryCount() < 3) {
512
System.out.println("Retrying refresh for: " + table);
513
// Trigger retry...
514
} else {
515
System.err.println("Max retries exceeded for: " + table);
516
// Send alert...
517
}
518
}
519
520
private void generateStatusReport() {
521
System.out.println("=== Workflow Status Report ===");
522
List<String> tables = getMonitoredTables();
523
524
int totalTables = tables.size();
525
int healthyTables = 0;
526
int failedTables = 0;
527
528
for (String table : tables) {
529
try {
530
MaterializedTableRefreshStatus status = manager.getRefreshStatus(table);
531
if (status.isHealthy()) {
532
healthyTables++;
533
} else {
534
failedTables++;
535
}
536
} catch (Exception e) {
537
failedTables++;
538
}
539
}
540
541
System.out.println("Total tables: " + totalTables);
542
System.out.println("Healthy tables: " + healthyTables);
543
System.out.println("Failed tables: " + failedTables);
544
System.out.println("Health ratio: " + (healthyTables * 100.0 / totalTables) + "%");
545
System.out.println("===============================");
546
}
547
548
private List<String> getMonitoredTables() {
549
// Return list of tables to monitor
550
return Arrays.asList(
551
"catalog.db.sales_summary",
552
"catalog.db.daily_metrics",
553
"catalog.db.user_analytics"
554
);
555
}
556
557
public void stop() {
558
monitorExecutor.shutdown();
559
try {
560
if (!monitorExecutor.awaitTermination(30, TimeUnit.SECONDS)) {
561
monitorExecutor.shutdownNow();
562
}
563
} catch (InterruptedException e) {
564
monitorExecutor.shutdownNow();
565
Thread.currentThread().interrupt();
566
}
567
}
568
}
569
```