or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

configuration-options.mdcore-service-interface.mdendpoint-framework.mdindex.mdoperation-management.mdrest-implementation.mdresult-data-models.mdsession-management.mdworkflow-management.md

workflow-management.mddocs/

0

# Workflow Management

1

2

Workflow management provides materialized table scheduling system with Quartz integration for periodic refresh operations, workflow lifecycle management, and embedded scheduler support for automated data pipeline operations.

3

4

## Capabilities

5

6

### WorkflowScheduler Interface

7

8

Base interface for workflow schedulers managing materialized table refresh operations.

9

10

```java { .api }

11

/**

12

* Workflow scheduler interface for materialized table refresh operations

13

* @param <T> The type of RefreshHandler used by specific WorkflowScheduler to locate the refresh workflow in scheduler service

14

*/

15

public interface WorkflowScheduler<T extends RefreshHandler> {

16

/**

17

* Open this workflow scheduler instance for required preparation in initialization phase

18

* @throws WorkflowException if initializing workflow scheduler occurs exception

19

*/

20

void open() throws WorkflowException;

21

22

/**

23

* Close this workflow scheduler when it is no longer needed and release any resource that it might be holding

24

* @throws WorkflowException if closing the related resources of workflow scheduler failed

25

*/

26

void close() throws WorkflowException;

27

28

/**

29

* Return a RefreshHandlerSerializer instance to serialize and deserialize RefreshHandler created by specific workflow scheduler service

30

* @return RefreshHandlerSerializer instance for type T

31

*/

32

RefreshHandlerSerializer<T> getRefreshHandlerSerializer();

33

34

/**

35

* Create a refresh workflow in specific scheduler service for the materialized table

36

* This method supports creating workflow for periodic refresh, as well as workflow for a one-time refresh only

37

* @param createRefreshWorkflow The detail info for create refresh workflow of materialized table

38

* @return The meta info which points to the refresh workflow in scheduler service

39

* @throws WorkflowException if creating refresh workflow failed

40

*/

41

T createRefreshWorkflow(CreateRefreshWorkflow createRefreshWorkflow) throws WorkflowException;

42

43

/**

44

* Modify the refresh workflow status in scheduler service. This includes suspend, resume, modify schedule cron operation, and so on

45

* @param modifyRefreshWorkflow The detail info for modify refresh workflow of materialized table

46

* @throws WorkflowException if modify refresh workflow failed

47

*/

48

void modifyRefreshWorkflow(ModifyRefreshWorkflow<T> modifyRefreshWorkflow) throws WorkflowException;

49

50

/**

51

* Delete the refresh workflow in scheduler service

52

* @param deleteRefreshWorkflow The detail info for delete refresh workflow of materialized table

53

* @throws WorkflowException if delete refresh workflow failed

54

*/

55

void deleteRefreshWorkflow(DeleteRefreshWorkflow<T> deleteRefreshWorkflow) throws WorkflowException;

56

}

57

```

58

59

### EmbeddedWorkflowScheduler Implementation

60

61

Concrete implementation of WorkflowScheduler for embedded Quartz scheduler.

62

63

```java { .api }

64

/**

65

* A workflow scheduler plugin implementation for EmbeddedQuartzScheduler

66

* It is used to create, modify refresh workflow for materialized table

67

*/

68

public class EmbeddedWorkflowScheduler implements WorkflowScheduler<EmbeddedRefreshHandler> {

69

/**

70

* Constructor with configuration

71

* @param configuration Configuration for the embedded scheduler

72

*/

73

public EmbeddedWorkflowScheduler(Configuration configuration);

74

}

75

```

76

77

### EmbeddedWorkflowSchedulerFactory

78

79

Factory for creating embedded workflow scheduler instances.

80

81

```java { .api }

82

/**

83

* The WorkflowSchedulerFactory to create the EmbeddedWorkflowScheduler

84

*/

85

public class EmbeddedWorkflowSchedulerFactory implements WorkflowSchedulerFactory {

86

/**

87

* Factory identifier for embedded scheduler

88

*/

89

public static final String IDENTIFIER = "embedded";

90

91

/**

92

* Get factory identifier

93

* @return Factory identifier string

94

*/

95

public String factoryIdentifier();

96

97

/**

98

* Get required configuration options

99

* @return Set of required ConfigOptions (empty for embedded scheduler)

100

*/

101

public Set<ConfigOption<?>> requiredOptions();

102

103

/**

104

* Get optional configuration options

105

* @return Set of optional ConfigOptions (empty for embedded scheduler)

106

*/

107

public Set<ConfigOption<?>> optionalOptions();

108

109

/**

110

* Create workflow scheduler instance

111

* @param context Factory context with configuration and other dependencies

112

* @return WorkflowScheduler instance

113

*/

114

public WorkflowScheduler<?> createWorkflowScheduler(Context context);

115

}

116

```

117

118

### EmbeddedRefreshHandler

119

120

Handler for materialized table refresh operations with serialization support.

121

122

```java { .api }

123

/**

124

* Handler for materialized table refresh operations

125

*/

126

public class EmbeddedRefreshHandler {

127

/**

128

* Execute materialized table refresh

129

* @param context Execution context with table and job details

130

* @throws Exception if refresh execution fails

131

*/

132

public void execute(RefreshContext context) throws Exception;

133

134

/**

135

* Get table identifier for this refresh handler

136

* @return String identifier of the materialized table

137

*/

138

public String getTableIdentifier();

139

140

/**

141

* Get refresh configuration

142

* @return Map of refresh configuration properties

143

*/

144

public Map<String, String> getRefreshConfig();

145

}

146

```

147

148

### WorkflowInfo

149

150

Information about workflow execution and status.

151

152

```java { .api }

153

/**

154

* Information about workflow execution

155

*/

156

public class WorkflowInfo {

157

/**

158

* Get workflow identifier

159

* @return Unique workflow identifier

160

*/

161

public String getWorkflowId();

162

163

/**

164

* Get workflow name

165

* @return Human-readable workflow name

166

*/

167

public String getWorkflowName();

168

169

/**

170

* Get workflow status

171

* @return Current workflow status (ACTIVE, PAUSED, COMPLETED, etc.)

172

*/

173

public WorkflowStatus getStatus();

174

175

/**

176

* Get next execution time

177

* @return Optional next scheduled execution time

178

*/

179

public Optional<Instant> getNextExecutionTime();

180

181

/**

182

* Get last execution time

183

* @return Optional last execution time

184

*/

185

public Optional<Instant> getLastExecutionTime();

186

187

/**

188

* Get workflow creation time

189

* @return Workflow creation timestamp

190

*/

191

public Instant getCreationTime();

192

}

193

```

194

195

### MaterializedTableManager

196

197

Manager for materialized table operations and refresh scheduling.

198

199

```java { .api }

200

/**

201

* Manager for materialized table operations

202

*/

203

public class MaterializedTableManager {

204

/**

205

* Refresh materialized table with specified options

206

* @param tableIdentifier Fully qualified table identifier

207

* @param isPeriodic Whether refresh is periodic or one-time

208

* @param scheduleTime Optional schedule time for execution

209

* @param dynamicOptions Dynamic configuration options

210

* @param staticPartitions Partition specifications for refresh

211

* @param executionConfig Flink job execution configuration

212

* @return OperationHandle for tracking the refresh operation

213

*/

214

public OperationHandle refreshMaterializedTable(

215

String tableIdentifier,

216

boolean isPeriodic,

217

@Nullable String scheduleTime,

218

Map<String, String> dynamicOptions,

219

Map<String, String> staticPartitions,

220

Map<String, String> executionConfig

221

);

222

223

/**

224

* Get refresh status for materialized table

225

* @param tableIdentifier Table identifier

226

* @return MaterializedTableRefreshStatus with current state

227

*/

228

public MaterializedTableRefreshStatus getRefreshStatus(String tableIdentifier);

229

230

/**

231

* Cancel ongoing refresh operation

232

* @param tableIdentifier Table identifier

233

* @param operationHandle Operation to cancel

234

*/

235

public void cancelRefresh(String tableIdentifier, OperationHandle operationHandle);

236

}

237

```

238

239

## Usage Examples

240

241

### Creating Workflow Scheduler

242

243

```java

244

import org.apache.flink.table.gateway.workflow.EmbeddedWorkflowScheduler;

245

import org.apache.flink.table.gateway.workflow.EmbeddedWorkflowSchedulerFactory;

246

247

// Create workflow scheduler

248

Configuration schedulerConfig = new Configuration();

249

schedulerConfig.setString("workflow.scheduler.type", "quartz");

250

schedulerConfig.setString("workflow.scheduler.quartz.instanceName", "FlinkSQLGateway");

251

schedulerConfig.setInteger("workflow.scheduler.quartz.threadCount", 10);

252

253

ClassLoader classLoader = Thread.currentThread().getContextClassLoader();

254

255

EmbeddedWorkflowSchedulerFactory factory = new EmbeddedQuartzSchedulerFactory();

256

EmbeddedWorkflowScheduler scheduler = factory.createEmbeddedWorkflowScheduler(

257

schedulerConfig,

258

classLoader

259

);

260

261

// Initialize scheduler

262

scheduler.open();

263

264

// Scheduler is now ready for workflow management

265

```

266

267

### Managing Materialized Table Refresh

268

269

```java

270

import org.apache.flink.table.gateway.service.materializedtable.MaterializedTableManager;

271

272

// Configure materialized table refresh

273

MaterializedTableManager manager = new MaterializedTableManager(scheduler, service);

274

275

// One-time refresh

276

OperationHandle oneTimeRefresh = manager.refreshMaterializedTable(

277

"my_catalog.my_database.sales_summary", // Table identifier

278

false, // Not periodic

279

null, // No schedule time (immediate)

280

Map.of("execution.parallelism", "4"), // Dynamic options

281

Map.of("year", "2023", "month", "12"), // Static partitions

282

Map.of("execution.savepoint.path", "hdfs://cluster/savepoints") // Execution config

283

);

284

285

System.out.println("Started one-time refresh: " + oneTimeRefresh);

286

287

// Periodic refresh (daily at 2 AM)

288

OperationHandle periodicRefresh = manager.refreshMaterializedTable(

289

"my_catalog.my_database.daily_metrics",

290

true, // Periodic

291

"0 0 2 * * ?", // Cron expression for daily 2 AM

292

Map.of(

293

"execution.parallelism", "8",

294

"execution.max-parallelism", "128"

295

),

296

Collections.emptyMap(), // No partition restrictions

297

Map.of(

298

"execution.checkpointing.interval", "30s",

299

"execution.savepoint.path", "hdfs://cluster/savepoints"

300

)

301

);

302

303

System.out.println("Started periodic refresh: " + periodicRefresh);

304

```

305

306

### Creating Custom Refresh Workflows

307

308

```java

309

// Create custom refresh workflow

310

public class CustomRefreshWorkflow {

311

312

public void createHourlyRefreshWorkflow(

313

EmbeddedWorkflowScheduler scheduler,

314

String tableIdentifier,

315

Map<String, String> refreshConfig) throws SchedulerException {

316

317

CreateRefreshWorkflow request = CreateRefreshWorkflow.builder()

318

.workflowId("hourly_refresh_" + tableIdentifier.replace(".", "_"))

319

.workflowName("Hourly Refresh for " + tableIdentifier)

320

.tableIdentifier(tableIdentifier)

321

.cronExpression("0 0 * * * ?") // Every hour

322

.isPeriodic(true)

323

.refreshConfig(refreshConfig)

324

.executionConfig(Map.of(

325

"execution.parallelism", "4",

326

"execution.checkpointing.interval", "60s"

327

))

328

.build();

329

330

scheduler.createRefreshWorkflow(request);

331

System.out.println("Created hourly refresh workflow for: " + tableIdentifier);

332

}

333

334

public void createConditionalRefreshWorkflow(

335

EmbeddedWorkflowScheduler scheduler,

336

String tableIdentifier,

337

String condition) throws SchedulerException {

338

339

// Custom refresh with conditions

340

Map<String, String> refreshConfig = Map.of(

341

"refresh.condition", condition,

342

"refresh.partition.strategy", "incremental",

343

"refresh.max.records", "1000000"

344

);

345

346

CreateRefreshWorkflow request = CreateRefreshWorkflow.builder()

347

.workflowId("conditional_refresh_" + System.currentTimeMillis())

348

.workflowName("Conditional Refresh")

349

.tableIdentifier(tableIdentifier)

350

.cronExpression("0 */15 * * * ?") // Every 15 minutes

351

.isPeriodic(true)

352

.refreshConfig(refreshConfig)

353

.build();

354

355

scheduler.createRefreshWorkflow(request);

356

}

357

}

358

```

359

360

### Workflow Lifecycle Management

361

362

```java

363

// Comprehensive workflow lifecycle management

364

public class WorkflowLifecycleManager {

365

private final EmbeddedWorkflowScheduler scheduler;

366

private final Map<String, WorkflowInfo> activeWorkflows = new ConcurrentHashMap<>();

367

368

public WorkflowLifecycleManager(EmbeddedWorkflowScheduler scheduler) {

369

this.scheduler = scheduler;

370

}

371

372

public void createWorkflow(WorkflowDefinition definition) throws SchedulerException {

373

CreateRefreshWorkflow request = buildCreateRequest(definition);

374

scheduler.createRefreshWorkflow(request);

375

376

WorkflowInfo info = WorkflowInfo.builder()

377

.workflowId(definition.getId())

378

.workflowName(definition.getName())

379

.status(WorkflowStatus.ACTIVE)

380

.creationTime(Instant.now())

381

.build();

382

383

activeWorkflows.put(definition.getId(), info);

384

System.out.println("Created workflow: " + definition.getId());

385

}

386

387

public void pauseWorkflow(String workflowId) throws SchedulerException {

388

ModifyRefreshWorkflow request = ModifyRefreshWorkflow.builder()

389

.workflowId(workflowId)

390

.action(WorkflowAction.PAUSE)

391

.build();

392

393

scheduler.modifyRefreshWorkflow(request);

394

395

WorkflowInfo info = activeWorkflows.get(workflowId);

396

if (info != null) {

397

activeWorkflows.put(workflowId, info.withStatus(WorkflowStatus.PAUSED));

398

}

399

400

System.out.println("Paused workflow: " + workflowId);

401

}

402

403

public void resumeWorkflow(String workflowId) throws SchedulerException {

404

ModifyRefreshWorkflow request = ModifyRefreshWorkflow.builder()

405

.workflowId(workflowId)

406

.action(WorkflowAction.RESUME)

407

.build();

408

409

scheduler.modifyRefreshWorkflow(request);

410

411

WorkflowInfo info = activeWorkflows.get(workflowId);

412

if (info != null) {

413

activeWorkflows.put(workflowId, info.withStatus(WorkflowStatus.ACTIVE));

414

}

415

416

System.out.println("Resumed workflow: " + workflowId);

417

}

418

419

public void deleteWorkflow(String workflowId) throws SchedulerException {

420

DeleteRefreshWorkflow request = DeleteRefreshWorkflow.builder()

421

.workflowId(workflowId)

422

.build();

423

424

scheduler.deleteRefreshWorkflow(request);

425

activeWorkflows.remove(workflowId);

426

427

System.out.println("Deleted workflow: " + workflowId);

428

}

429

430

public List<WorkflowInfo> listActiveWorkflows() {

431

return new ArrayList<>(activeWorkflows.values());

432

}

433

434

private CreateRefreshWorkflow buildCreateRequest(WorkflowDefinition definition) {

435

return CreateRefreshWorkflow.builder()

436

.workflowId(definition.getId())

437

.workflowName(definition.getName())

438

.tableIdentifier(definition.getTableIdentifier())

439

.cronExpression(definition.getCronExpression())

440

.isPeriodic(definition.isPeriodic())

441

.refreshConfig(definition.getRefreshConfig())

442

.executionConfig(definition.getExecutionConfig())

443

.build();

444

}

445

}

446

```

447

448

### Monitoring Workflow Execution

449

450

```java

451

// Workflow execution monitoring

452

public class WorkflowMonitor {

453

private final MaterializedTableManager manager;

454

private final ScheduledExecutorService monitorExecutor;

455

456

public WorkflowMonitor(MaterializedTableManager manager) {

457

this.manager = manager;

458

this.monitorExecutor = Executors.newScheduledThreadPool(2);

459

}

460

461

public void startMonitoring() {

462

// Monitor refresh status every minute

463

monitorExecutor.scheduleAtFixedRate(

464

this::checkRefreshStatus,

465

0, 60, TimeUnit.SECONDS

466

);

467

468

// Generate reports every hour

469

monitorExecutor.scheduleAtFixedRate(

470

this::generateStatusReport,

471

0, 3600, TimeUnit.SECONDS

472

);

473

}

474

475

private void checkRefreshStatus() {

476

List<String> tables = getMonitoredTables();

477

478

for (String table : tables) {

479

try {

480

MaterializedTableRefreshStatus status = manager.getRefreshStatus(table);

481

482

switch (status.getStatus()) {

483

case RUNNING:

484

System.out.println("Refresh running for: " + table +

485

", Progress: " + status.getProgress() + "%");

486

break;

487

488

case FAILED:

489

System.err.println("Refresh failed for: " + table +

490

", Error: " + status.getErrorMessage());

491

handleRefreshFailure(table, status);

492

break;

493

494

case COMPLETED:

495

System.out.println("Refresh completed for: " + table +

496

", Duration: " + status.getDuration() + "ms");

497

break;

498

499

default:

500

// Handle other statuses

501

break;

502

}

503

} catch (Exception e) {

504

System.err.println("Failed to check status for: " + table + ", " + e.getMessage());

505

}

506

}

507

}

508

509

private void handleRefreshFailure(String table, MaterializedTableRefreshStatus status) {

510

// Implement retry logic, alerting, etc.

511

if (status.getRetryCount() < 3) {

512

System.out.println("Retrying refresh for: " + table);

513

// Trigger retry...

514

} else {

515

System.err.println("Max retries exceeded for: " + table);

516

// Send alert...

517

}

518

}

519

520

private void generateStatusReport() {

521

System.out.println("=== Workflow Status Report ===");

522

List<String> tables = getMonitoredTables();

523

524

int totalTables = tables.size();

525

int healthyTables = 0;

526

int failedTables = 0;

527

528

for (String table : tables) {

529

try {

530

MaterializedTableRefreshStatus status = manager.getRefreshStatus(table);

531

if (status.isHealthy()) {

532

healthyTables++;

533

} else {

534

failedTables++;

535

}

536

} catch (Exception e) {

537

failedTables++;

538

}

539

}

540

541

System.out.println("Total tables: " + totalTables);

542

System.out.println("Healthy tables: " + healthyTables);

543

System.out.println("Failed tables: " + failedTables);

544

System.out.println("Health ratio: " + (healthyTables * 100.0 / totalTables) + "%");

545

System.out.println("===============================");

546

}

547

548

private List<String> getMonitoredTables() {

549

// Return list of tables to monitor

550

return Arrays.asList(

551

"catalog.db.sales_summary",

552

"catalog.db.daily_metrics",

553

"catalog.db.user_analytics"

554

);

555

}

556

557

public void stop() {

558

monitorExecutor.shutdown();

559

try {

560

if (!monitorExecutor.awaitTermination(30, TimeUnit.SECONDS)) {

561

monitorExecutor.shutdownNow();

562

}

563

} catch (InterruptedException e) {

564

monitorExecutor.shutdownNow();

565

Thread.currentThread().interrupt();

566

}

567

}

568

}

569

```