CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-rpc-akka

Pekko-based RPC implementation for Apache Flink's distributed computing framework

Pending
Overview
Eval results
Files

concurrent-utilities.mddocs/

Concurrent Utilities

Utilities for integrating Pekko actor systems with Java's concurrency APIs, converting between Scala and Java futures, and providing scheduled execution capabilities.

Capabilities

ScalaFutureUtils

Utilities for converting between Scala Future types (used by Pekko) and Java CompletableFuture types.

/**
 * Utilities to convert Scala types into Java types, particularly for Future interoperability.
 */
public class ScalaFutureUtils {
    
    /**
     * Converts a Scala Future to a Java CompletableFuture.
     * This is essential for integrating Pekko's Scala-based async operations
     * with Java's CompletableFuture-based async APIs.
     * 
     * @param scalaFuture The Scala Future to convert
     * @return CompletableFuture that completes with the same result
     */
    public static <T, U extends T> CompletableFuture<T> toJava(Future<U> scalaFuture);
}

Usage Examples:

import org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils;
import org.apache.pekko.actor.ActorSelection;
import org.apache.pekko.pattern.Patterns;
import org.apache.pekko.util.Timeout;
import scala.concurrent.Future;
import java.util.concurrent.CompletableFuture;
import java.time.Duration;

// Convert Pekko ask pattern result to CompletableFuture
ActorSelection actorSelection = actorSystem.actorSelection("/user/someActor");
Timeout timeout = Timeout.create(Duration.ofSeconds(10));

// Pekko ask returns Scala Future
Future<Object> scalaFuture = Patterns.ask(actorSelection, "someMessage", timeout);

// Convert to Java CompletableFuture for easier Java integration
CompletableFuture<Object> javaFuture = ScalaFutureUtils.toJava(scalaFuture);

// Now you can use standard Java CompletableFuture operations
javaFuture
    .thenApply(result -> processResult(result))
    .thenAccept(processedResult -> logger.info("Received: {}", processedResult))
    .exceptionally(throwable -> {
        logger.error("RPC call failed", throwable);
        return null;
    });

ActorSystemScheduledExecutorAdapter

Adapter that allows using a Pekko ActorSystem as a Java ScheduledExecutor, enabling integration with Java's scheduled execution APIs.

/**
 * Adapter to use ActorSystem as ScheduledExecutor.
 * Provides Java ScheduledExecutorService-compatible interface backed by Pekko's scheduler.
 */
public class ActorSystemScheduledExecutorAdapter implements ScheduledExecutor {
    
    /**
     * Constructor for ActorSystemScheduledExecutorAdapter.
     * @param actorSystem Pekko ActorSystem to use for scheduling
     * @param flinkClassLoader ClassLoader for task execution context
     */
    public ActorSystemScheduledExecutorAdapter(ActorSystem actorSystem, ClassLoader flinkClassLoader);
    
    /**
     * Schedules a Runnable task for execution after a delay.
     * @param command Task to execute
     * @param delay Delay before execution
     * @param unit Time unit for the delay
     * @return ScheduledFuture representing the scheduled task
     */
    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);
    
    /**
     * Schedules a Callable task for execution after a delay.
     * @param callable Task to execute that returns a value
     * @param delay Delay before execution
     * @param unit Time unit for the delay
     * @return ScheduledFuture representing the scheduled task and its result
     */
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);
    
    /**
     * Schedules a task to run repeatedly at fixed rate.
     * @param command Task to execute repeatedly
     * @param initialDelay Delay before first execution
     * @param period Period between successive executions
     * @param unit Time unit for delays and period
     * @return ScheduledFuture representing the scheduled repeating task
     */
    public ScheduledFuture<?> scheduleAtFixedRate(
        Runnable command, 
        long initialDelay, 
        long period, 
        TimeUnit unit
    );
    
    /**
     * Schedules a task to run repeatedly with fixed delay between executions.
     * @param command Task to execute repeatedly
     * @param initialDelay Delay before first execution
     * @param delay Delay between end of one execution and start of next
     * @param unit Time unit for delays
     * @return ScheduledFuture representing the scheduled repeating task
     */
    public ScheduledFuture<?> scheduleWithFixedDelay(
        Runnable command, 
        long initialDelay, 
        long delay, 
        TimeUnit unit
    );
    
    /**
     * Executes a command immediately (implements Executor interface).
     * @param command Task to execute
     */
    public void execute(Runnable command);
}

Usage Examples:

import org.apache.flink.runtime.concurrent.pekko.ActorSystemScheduledExecutorAdapter;
import org.apache.pekko.actor.ActorSystem;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Callable;

// Create scheduled executor adapter
ActorSystem actorSystem = // ... get actor system
ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
ActorSystemScheduledExecutorAdapter scheduler = 
    new ActorSystemScheduledExecutorAdapter(actorSystem, classLoader);

// One-time delayed execution
ScheduledFuture<?> delayedTask = scheduler.schedule(() -> {
    logger.info("Delayed task executed");
    performMaintenanceTask();
}, 30, TimeUnit.SECONDS);

// Scheduled task with return value
ScheduledFuture<String> valuedTask = scheduler.schedule(() -> {
    return "Task completed at " + System.currentTimeMillis();
}, 10, TimeUnit.SECONDS);

String result = valuedTask.get(); // Blocks until completion

// Periodic execution at fixed rate (heartbeat)
ScheduledFuture<?> heartbeat = scheduler.scheduleAtFixedRate(() -> {
    sendHeartbeat();
}, 0, 5, TimeUnit.SECONDS); // Start immediately, repeat every 5 seconds

// Periodic execution with fixed delay (cleanup)
ScheduledFuture<?> cleanup = scheduler.scheduleWithFixedDelay(() -> {
    performCleanup();
}, 60, 30, TimeUnit.SECONDS); // Start after 60s, repeat with 30s gap

// Immediate execution
scheduler.execute(() -> {
    logger.info("Immediate task executed");
});

// Cancel scheduled tasks when done
heartbeat.cancel(false);
cleanup.cancel(true);

Integration with Flink Components:

import org.apache.flink.runtime.concurrent.pekko.ActorSystemScheduledExecutorAdapter;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;

// Use scheduler in Flink components
public class JobManagerServices {
    private final ActorSystemScheduledExecutorAdapter scheduler;
    private final ScheduledFuture<?> checkpointScheduler;
    
    public JobManagerServices(ActorSystem actorSystem) {
        this.scheduler = new ActorSystemScheduledExecutorAdapter(
            actorSystem, 
            JobManagerServices.class.getClassLoader()
        );
        
        // Schedule checkpoint triggering
        this.checkpointScheduler = scheduler.scheduleAtFixedRate(
            this::triggerCheckpoint,
            10, // initial delay
            30, // checkpoint interval
            TimeUnit.SECONDS
        );
    }
    
    private void triggerCheckpoint() {
        // Checkpoint triggering logic
        logger.debug("Triggering periodic checkpoint");
    }
    
    public void shutdown() {
        checkpointScheduler.cancel(false);
    }
}

// Resource cleanup with scheduled executor
public class ResourceManager {
    private final ActorSystemScheduledExecutorAdapter scheduler;
    
    public void scheduleResourceCleanup() {
        // Clean up unused resources every 5 minutes
        scheduler.scheduleWithFixedDelay(() -> {
            cleanupUnusedResources();
        }, 5, 5, TimeUnit.MINUTES);
    }
    
    public void scheduleTaskManagerHeartbeat() {
        // Check TaskManager heartbeats every 10 seconds
        scheduler.scheduleAtFixedRate(() -> {
            checkTaskManagerHeartbeats();
        }, 0, 10, TimeUnit.SECONDS);
    }
    
    private void cleanupUnusedResources() {
        // Resource cleanup implementation
    }
    
    private void checkTaskManagerHeartbeats() {
        // Heartbeat checking implementation
    }
}

Error Handling and Best Practices:

import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;

// Proper error handling with scheduled tasks
public class RobustScheduledTasks {
    private final ActorSystemScheduledExecutorAdapter scheduler;
    
    public void setupRobustScheduling() {
        // Task with proper error handling
        scheduler.scheduleAtFixedRate(() -> {
            try {
                performRiskyOperation();
            } catch (Exception e) {
                logger.error("Scheduled task failed, but continuing", e);
                // Don't rethrow - would stop the scheduled execution
            }
        }, 0, 30, TimeUnit.SECONDS);
        
        // Task with future completion handling
        ScheduledFuture<String> task = scheduler.schedule(() -> {
            return performLongRunningOperation();
        }, 10, TimeUnit.SECONDS);
        
        // Handle completion asynchronously
        CompletableFuture<String> futureResult = CompletableFuture.supplyAsync(() -> {
            try {
                return task.get();
            } catch (ExecutionException e) {
                throw new CompletionException(e.getCause());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new CompletionException(e);
            }
        });
        
        futureResult
            .thenAccept(result -> logger.info("Operation completed: {}", result))
            .exceptionally(throwable -> {
                logger.error("Scheduled operation failed", throwable);
                return null;
            });
    }
    
    private void performRiskyOperation() throws Exception {
        // Implementation that might throw exceptions
    }
    
    private String performLongRunningOperation() {
        // Implementation that takes time to complete
        return "operation result";
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-rpc-akka

docs

actor-system.md

concurrent-utilities.md

exceptions.md

index.md

rpc-configuration.md

rpc-system.md

tile.json