A service that enables multiple clients from the remote to execute SQL in concurrency, providing an easy way to submit Flink Jobs, look up metadata, and analyze data online.
—
Operation management provides asynchronous execution of SQL statements and operations with comprehensive status tracking, cancellation support, and resource management. Operations are tracked throughout their lifecycle from submission to completion.
Unique identifier for operations using UUID-based handles.
/**
* Operation Handle that identifies a unique operation
*/
public class OperationHandle {
private final UUID identifier;
/**
* Create a new operation handle with random UUID
* @return New OperationHandle instance
*/
public static OperationHandle create();
/**
* Create operation handle with specific UUID
* @param identifier UUID to use for the operation
*/
public OperationHandle(UUID identifier);
/**
* Get the UUID identifier for this operation
* @return UUID identifier
*/
public UUID getIdentifier();
@Override
public boolean equals(Object o);
@Override
public int hashCode();
@Override
public String toString();
}Enumeration representing the complete lifecycle of operations with terminal status checking and transition validation.
/**
* Enumeration of operation states throughout lifecycle
*/
public enum OperationStatus {
/** Newly created operation, not yet started */
INITIALIZED,
/** Preparing resources for execution */
PENDING,
/** Operation currently executing */
RUNNING,
/** Completed successfully with results available */
FINISHED,
/** Operation was cancelled by user or system */
CANCELED,
/** Resources cleaned up, operation no longer accessible */
CLOSED,
/** Error occurred during execution */
ERROR,
/** Execution timed out */
TIMEOUT;
/**
* Check if this status represents a terminal state
* @return true if operation cannot transition to other states
*/
public boolean isTerminalStatus();
/**
* Validate if transition from one status to another is allowed
* @param from Source status
* @param to Target status
* @return true if transition is valid
*/
public static boolean isValidStatusTransition(OperationStatus from, OperationStatus to);
}Information about operation status and any exceptions that occurred during execution.
/**
* Status and error information for operations
*/
public class OperationInfo {
/**
* Get current operation status
* @return OperationStatus representing current state
*/
public OperationStatus getStatus();
/**
* Get exception information if operation failed
* @return Optional exception message and details
*/
public Optional<String> getException();
/**
* Create OperationInfo with status
* @param status Current operation status
* @return OperationInfo instance
*/
public static OperationInfo of(OperationStatus status);
/**
* Create OperationInfo with status and exception
* @param status Current operation status
* @param exception Exception that occurred
* @return OperationInfo instance
*/
public static OperationInfo of(OperationStatus status, String exception);
}Manages operation lifecycle, execution, and resource cleanup.
/**
* Manages operation lifecycle and execution
*/
public class OperationManager {
/**
* Submit operation for execution
* @param sessionHandle Session for the operation
* @param executor Callable that produces results
* @return OperationHandle for tracking
*/
public OperationHandle submitOperation(SessionHandle sessionHandle, Callable<ResultSet> executor);
/**
* Cancel running operation
* @param sessionHandle Session handle
* @param operationHandle Operation to cancel
*/
public void cancelOperation(SessionHandle sessionHandle, OperationHandle operationHandle);
/**
* Close operation and clean up resources
* @param sessionHandle Session handle
* @param operationHandle Operation to close
*/
public void closeOperation(SessionHandle sessionHandle, OperationHandle operationHandle);
/**
* Get operation information
* @param sessionHandle Session handle
* @param operationHandle Operation handle
* @return OperationInfo with current status
*/
public OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle operationHandle);
/**
* Get operation result schema when available
* @param sessionHandle Session handle
* @param operationHandle Operation handle
* @return ResolvedSchema of operation results
*/
public ResolvedSchema getOperationResultSchema(SessionHandle sessionHandle, OperationHandle operationHandle);
}Executes operations in background threads with timeout and cancellation support.
/**
* Executes operations in background threads
*/
public class OperationExecutor {
/**
* Start the executor with configured thread pool
*/
public void start();
/**
* Stop the executor and shutdown thread pool
*/
public void stop();
/**
* Submit operation for asynchronous execution
* @param operation Operation to execute
* @return Future representing the execution
*/
public Future<ResultSet> submitOperation(Operation operation);
/**
* Get number of active operations
* @return Count of currently executing operations
*/
public int getActiveOperationCount();
}import org.apache.flink.table.gateway.api.operation.OperationHandle;
import org.apache.flink.table.gateway.api.operation.OperationStatus;
import java.util.concurrent.Callable;
// Submit SQL statement as operation
OperationHandle operation = service.executeStatement(
sessionHandle,
"SELECT COUNT(*) FROM orders WHERE status = 'COMPLETED'",
30000L, // 30 second timeout
new Configuration()
);
// Check operation status
OperationInfo info = service.getOperationInfo(sessionHandle, operation);
System.out.println("Operation status: " + info.getStatus());
// Wait for completion
while (!info.getStatus().isTerminalStatus()) {
Thread.sleep(1000);
info = service.getOperationInfo(sessionHandle, operation);
}
if (info.getStatus() == OperationStatus.FINISHED) {
// Fetch results
ResultSet results = service.fetchResults(sessionHandle, operation, 0L, 100);
// Process results...
} else if (info.getStatus() == OperationStatus.ERROR) {
System.err.println("Operation failed: " + info.getException().orElse("Unknown error"));
}
// Clean up
service.closeOperation(sessionHandle, operation);import java.util.concurrent.Callable;
// Submit custom operation with Callable
Callable<ResultSet> customLogic = () -> {
// Custom processing logic
List<RowData> data = processData();
ResolvedSchema schema = createSchema();
return ResultSet.builder()
.resultType(ResultType.PAYLOAD)
.data(data)
.resultSchema(schema)
.build();
};
OperationHandle customOp = service.submitOperation(sessionHandle, customLogic);
// Monitor execution
OperationInfo info = service.getOperationInfo(sessionHandle, customOp);
while (info.getStatus() == OperationStatus.RUNNING) {
Thread.sleep(500);
info = service.getOperationInfo(sessionHandle, customOp);
}// Start long-running operation
OperationHandle longOp = service.executeStatement(
sessionHandle,
"SELECT * FROM large_table ORDER BY timestamp",
0L, // No timeout
new Configuration()
);
// Check if operation is running
OperationInfo info = service.getOperationInfo(sessionHandle, longOp);
if (info.getStatus() == OperationStatus.RUNNING) {
// Cancel the operation
service.cancelOperation(sessionHandle, longOp);
// Verify cancellation
info = service.getOperationInfo(sessionHandle, longOp);
System.out.println("Operation status after cancel: " + info.getStatus());
}
// Clean up
service.closeOperation(sessionHandle, longOp);import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
// Submit multiple operations
List<String> queries = List.of(
"SELECT COUNT(*) FROM users",
"SELECT AVG(price) FROM products",
"SELECT MAX(order_date) FROM orders"
);
Map<String, OperationHandle> operations = new HashMap<>();
for (int i = 0; i < queries.size(); i++) {
String query = queries.get(i);
OperationHandle op = service.executeStatement(sessionHandle, query, 60000L, new Configuration());
operations.put("query_" + i, op);
}
// Monitor all operations
Map<String, OperationInfo> results = new HashMap<>();
while (operations.size() > results.size()) {
for (Map.Entry<String, OperationHandle> entry : operations.entrySet()) {
String key = entry.getKey();
OperationHandle op = entry.getValue();
if (!results.containsKey(key)) {
OperationInfo info = service.getOperationInfo(sessionHandle, op);
if (info.getStatus().isTerminalStatus()) {
results.put(key, info);
System.out.println(key + " completed with status: " + info.getStatus());
if (info.getStatus() == OperationStatus.FINISHED) {
ResultSet resultSet = service.fetchResults(sessionHandle, op, 0L, 10);
// Process results...
}
// Clean up completed operation
service.closeOperation(sessionHandle, op);
}
}
}
Thread.sleep(1000);
}// Advanced status monitoring with timeout
public class OperationMonitor {
private final SqlGatewayService service;
public OperationResult waitForCompletion(
SessionHandle session,
OperationHandle operation,
long timeoutMs) {
long startTime = System.currentTimeMillis();
long endTime = startTime + timeoutMs;
while (System.currentTimeMillis() < endTime) {
OperationInfo info = service.getOperationInfo(session, operation);
switch (info.getStatus()) {
case FINISHED:
ResultSet results = service.fetchResults(session, operation, 0L, Integer.MAX_VALUE);
return OperationResult.success(results);
case ERROR:
return OperationResult.error(info.getException().orElse("Unknown error"));
case CANCELED:
return OperationResult.cancelled();
case TIMEOUT:
return OperationResult.timeout();
case RUNNING:
case PENDING:
case INITIALIZED:
// Continue waiting
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return OperationResult.interrupted();
}
break;
case CLOSED:
return OperationResult.error("Operation was closed");
}
}
// Timeout reached
service.cancelOperation(session, operation);
return OperationResult.timeout();
}
}// Robust operation execution with error handling
public ResultSet executeWithRetry(String sql, int maxRetries) {
for (int attempt = 1; attempt <= maxRetries; attempt++) {
OperationHandle operation = null;
try {
operation = service.executeStatement(
sessionHandle,
sql,
30000L,
new Configuration()
);
// Wait for completion
OperationInfo info;
do {
Thread.sleep(1000);
info = service.getOperationInfo(sessionHandle, operation);
} while (!info.getStatus().isTerminalStatus());
if (info.getStatus() == OperationStatus.FINISHED) {
return service.fetchResults(sessionHandle, operation, 0L, Integer.MAX_VALUE);
} else if (info.getStatus() == OperationStatus.ERROR) {
String error = info.getException().orElse("Unknown error");
if (attempt < maxRetries && isRetryableError(error)) {
System.out.println("Attempt " + attempt + " failed, retrying: " + error);
continue;
} else {
throw new RuntimeException("Operation failed: " + error);
}
} else {
throw new RuntimeException("Operation ended with status: " + info.getStatus());
}
} catch (Exception e) {
if (attempt == maxRetries) {
throw new RuntimeException("All retry attempts failed", e);
}
System.out.println("Attempt " + attempt + " failed with exception, retrying: " + e.getMessage());
} finally {
if (operation != null) {
try {
service.closeOperation(sessionHandle, operation);
} catch (Exception e) {
System.err.println("Failed to close operation: " + e.getMessage());
}
}
}
}
throw new RuntimeException("Should not reach here");
}
private boolean isRetryableError(String error) {
return error.contains("timeout") ||
error.contains("connection") ||
error.contains("temporary");
}Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-sql-gateway