or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

configuration.mdentry-points.mdhigh-availability.mdindex.mdresource-management.mdtask-scheduling.mdutilities.md
tile.json

task-scheduling.mddocs/

Task Scheduling

Advanced task scheduling capabilities using Netflix Fenzo integration for optimal resource utilization and task placement on Mesos clusters. The scheduling system provides intelligent task placement, resource optimization, and comprehensive lifecycle management.

Capabilities

Launchable Task Interface

Core interface defining task requirements and launch capabilities for Mesos scheduler integration.

/**
 * Interface for tasks that can be launched on Mesos
 * Defines resource requirements and launch operations
 */
public interface LaunchableTask {
    /**
     * Get Fenzo task requirements for resource scheduling
     * Specifies CPU, memory, disk, and constraint requirements
     * @return TaskRequest with resource and placement requirements
     */
    TaskRequest taskRequest();
    
    /**
     * Launch the task on the specified Mesos slave with allocated resources
     * @param slaveId - Target Mesos slave for task execution
     * @param allocation - Allocated resources including CPU, memory, disk
     * @return TaskInfo containing complete task specification for Mesos
     */
    Protos.TaskInfo launch(Protos.SlaveID slaveId, MesosResourceAllocation allocation);
}

Resource Offer Management

Adapter class that transforms Mesos resource offers into Fenzo VirtualMachineLease objects for intelligent scheduling.

/**
 * Adapter transforming Mesos resource offers to Fenzo VirtualMachineLease
 * Provides resource availability information for task scheduling decisions
 */
public class Offer implements VirtualMachineLease {
    /**
     * Create offer from Mesos resource offer
     * @param offer - Mesos resource offer to wrap
     */
    public Offer(Protos.Offer offer);
    
    /**
     * Create offer with specific network resource name
     * @param offer - Mesos resource offer to wrap  
     * @param networkResourceName - Name of network resource to use
     */
    public Offer(Protos.Offer offer, String networkResourceName);
    
    /**
     * Get available CPU cores from this offer
     * @return Number of CPU cores available
     */
    public double cpuCores();
    
    /**
     * Get available GPU units from this offer
     * @return Number of GPU units available
     */
    public double gpus();
    
    /**
     * Get available memory in megabytes
     * @return Memory available in MB
     */
    public double memoryMB();
    
    /**
     * Get available network bandwidth in Mbps
     * @return Network bandwidth in megabits per second
     */
    public double networkMbps();
    
    /**
     * Get available disk space in megabytes
     * @return Disk space available in MB
     */
    public double diskMB();
    
    /**
     * Get hostname of the Mesos slave offering resources
     * @return Hostname string
     */
    public String hostname();
    
    /**
     * Get virtual machine ID (slave ID)
     * @return Unique VM identifier
     */
    public String getVMID();
    
    /**
     * Get all available resources from this offer
     * @return List of Mesos Resource objects
     */
    public List<Protos.Resource> getResources();
    
    /**
     * Get the underlying Mesos offer
     * @return Original Mesos Offer object
     */
    public Protos.Offer getOffer();
}

Offer Processing Example:

import org.apache.flink.mesos.scheduler.Offer;
import org.apache.mesos.Protos;

// Process incoming Mesos offers
public void processOffers(List<Protos.Offer> mesosOffers) {
    for (Protos.Offer mesosOffer : mesosOffers) {
        Offer offer = new Offer(mesosOffer);
        
        // Check resource availability
        if (offer.cpuCores() >= 2.0 && offer.memoryMB() >= 2048) {
            // Suitable for TaskManager placement
            scheduleTask(offer);
        } else {
            // Decline insufficient offer
            declineOffer(mesosOffer);
        }
    }
}

Scheduler Proxy

Mesos scheduler implementation that bridges Mesos scheduler callbacks with Flink's Akka actor system for event processing.

/**
 * Mesos scheduler proxy forwarding events to Akka actors
 * Handles all Mesos scheduler lifecycle events and state management
 */
public class SchedulerProxy extends Scheduler {
    /**
     * Handle framework registration with Mesos master
     * @param driver - Scheduler driver instance
     * @param frameworkId - Assigned framework ID
     * @param masterInfo - Mesos master information
     */
    public void registered(SchedulerDriver driver, 
                          Protos.FrameworkID frameworkId, 
                          Protos.MasterInfo masterInfo);
    
    /**
     * Handle framework re-registration after failover
     * @param driver - Scheduler driver instance  
     * @param masterInfo - New master information
     */
    public void reregistered(SchedulerDriver driver, Protos.MasterInfo masterInfo);
    
    /**
     * Handle resource offers from Mesos
     * @param driver - Scheduler driver instance
     * @param offers - List of resource offers
     */
    public void resourceOffers(SchedulerDriver driver, List<Protos.Offer> offers);
    
    /**
     * Handle task status updates from Mesos
     * @param driver - Scheduler driver instance
     * @param status - Task status update
     */
    public void statusUpdate(SchedulerDriver driver, Protos.TaskStatus status);
    
    /**
     * Handle framework disconnection from master
     * @param driver - Scheduler driver instance
     */
    public void disconnected(SchedulerDriver driver);
    
    /**
     * Handle unrecoverable framework errors
     * @param driver - Scheduler driver instance
     * @param message - Error message
     */
    public void error(SchedulerDriver driver, String message);
}

Task Scheduler Builder

Builder class for configuring Fenzo task scheduler with custom constraints, fitness functions, and optimization strategies.

/**
 * Builder for Fenzo task scheduler configuration
 * Provides fluent API for scheduler customization
 */
public class TaskSchedulerBuilder {
    /**
     * Create new task scheduler builder
     * @return Builder instance for configuration
     */
    public static TaskSchedulerBuilder newBuilder();
    
    /**
     * Set lease rejection action for unsuitable offers
     * @param action - Action to take when rejecting offers
     * @return Builder instance for chaining
     */
    public TaskSchedulerBuilder withLeaseRejectAction(Action1<VirtualMachineLease> action);
    
    /**
     * Set lease offer expiry handler
     * @param handler - Handler for expired offers
     * @return Builder instance for chaining  
     */
    public TaskSchedulerBuilder withLeaseOfferExpiry(Action1<VirtualMachineLease> handler);
    
    /**
     * Add fitness calculator for task placement optimization
     * @param calculator - Fitness function for placement decisions
     * @return Builder instance for chaining
     */
    public TaskSchedulerBuilder withFitnessCalculator(VMTaskFitnessCalculator calculator);
    
    /**
     * Build configured task scheduler
     * @return Configured TaskScheduler instance
     */
    public TaskScheduler build();
}

Scheduler Configuration Example:

import org.apache.flink.mesos.scheduler.TaskSchedulerBuilder;
import com.netflix.fenzo.TaskScheduler;
import com.netflix.fenzo.VMTaskFitnessCalculator;

// Configure advanced task scheduler
TaskScheduler scheduler = TaskSchedulerBuilder.newBuilder()
    .withLeaseRejectAction(offer -> {
        // Log rejected offers for monitoring
        logger.info("Rejecting offer from {}: insufficient resources", offer.hostname());
    })
    .withLeaseOfferExpiry(offer -> {
        // Handle expired offers
        logger.warn("Offer from {} expired", offer.hostname());
    })
    .withFitnessCalculator(new VMTaskFitnessCalculator() {
        @Override
        public double calculateFitness(TaskRequest taskRequest, 
                                       VirtualMachineLease lease, 
                                       TaskTrackerState taskTrackerState) {
            // Custom fitness calculation for optimal placement
            double cpuFitness = lease.cpuCores() / taskRequest.getCPUs();
            double memoryFitness = lease.memoryMB() / taskRequest.getMemory();
            return Math.min(cpuFitness, memoryFitness);
        }
    })
    .build();

Scheduler Messages

Akka actor messages for scheduler event handling and coordination between scheduler components.

Offer Management Messages

/**
 * Message to accept resource offers and launch tasks
 */
public class AcceptOffers implements Serializable {
    public AcceptOffers(List<TaskRequest> taskRequests, List<Offer> offers);
    public List<TaskRequest> getTaskRequests();
    public List<Offer> getOffers();
}

/**
 * Message containing new resource offers from Mesos
 */
public class ResourceOffers implements Serializable {
    public ResourceOffers(List<Offer> offers);
    public List<Offer> getOffers();
}

/**
 * Message indicating an offer was rescinded by Mesos
 */
public class OfferRescinded implements Serializable {
    public OfferRescinded(Protos.OfferID offerId);
    public Protos.OfferID getOfferId();
}

Connection Status Messages

/**
 * Message indicating scheduler connected to Mesos master
 */
public class Connected implements Serializable {
    public Connected(Protos.MasterInfo masterInfo);
    public Protos.MasterInfo getMasterInfo();
}

/**
 * Message indicating scheduler disconnected from Mesos master
 */
public class Disconnected implements Serializable {
    public Disconnected();
}

/**
 * Message indicating framework registered with Mesos
 */
public class Registered implements Serializable {
    public Registered(Protos.FrameworkID frameworkId, Protos.MasterInfo masterInfo);
    public Protos.FrameworkID getFrameworkId();
    public Protos.MasterInfo getMasterInfo();
}

/**
 * Message indicating framework re-registered after failover
 */
public class ReRegistered implements Serializable {
    public ReRegistered(Protos.MasterInfo masterInfo);
    public Protos.MasterInfo getMasterInfo();
}

Task Status Messages

/**
 * Message containing task status update from Mesos
 */
public class StatusUpdate implements Serializable {
    public StatusUpdate(Protos.TaskStatus status);
    public Protos.TaskStatus getStatus();
    public Protos.TaskID getTaskId();
    public Protos.TaskState getState();
}

/**
 * Message indicating an executor was lost
 */
public class ExecutorLost implements Serializable {
    public ExecutorLost(Protos.ExecutorID executorId, Protos.SlaveID slaveId, int status);
    public Protos.ExecutorID getExecutorId();
    public Protos.SlaveID getSlaveId();
    public int getStatus();
}

/**
 * Message indicating a Mesos slave was lost
 */
public class SlaveLost implements Serializable {
    public SlaveLost(Protos.SlaveID slaveId);
    public Protos.SlaveID getSlaveId();
}

Error Handling Messages

/**
 * Message for unrecoverable scheduler/driver errors
 */
public class Error implements Serializable {
    /**
     * Create error message
     * @param message - Error description
     */
    public Error(String message);
    
    /**
     * Get error message
     * @return Error description string
     */
    public String message();
}

/**
 * Message containing framework messages from Mesos
 */
public class FrameworkMessage implements Serializable {
    public FrameworkMessage(Protos.ExecutorID executorId, 
                           Protos.SlaveID slaveId, 
                           byte[] data);
    public Protos.ExecutorID getExecutorId();
    public Protos.SlaveID getSlaveId();
    public byte[] getData();
}

Scheduling Strategies

Constraint-Based Scheduling

Configure placement constraints for optimal resource utilization:

Configuration config = new Configuration();

// Attribute-based constraints
config.setString("mesos.constraints.hard.attribute", "rack:LIKE:rack-[12]");
config.setString("mesos.constraints.soft.attribute", "datacenter:EQUALS:us-west");

// Resource constraints
config.setString("mesos.resourcemanager.tasks.cpus", "2.0");
config.setString("mesos.resourcemanager.tasks.mem", "2048");
config.setString("mesos.resourcemanager.tasks.disk", "1024");

// Network constraints
config.setString("mesos.constraints.hard.hostname", "UNIQUE");

Resource Optimization

Advanced resource allocation strategies for cluster efficiency:

// Configure resource optimization
Configuration config = new Configuration();

// Bin packing strategy
config.setString("mesos.scheduler.placement.strategy", "BIN_PACK");

// Resource utilization thresholds
config.setDouble("mesos.scheduler.cpu.utilization.threshold", 0.8);
config.setDouble("mesos.scheduler.memory.utilization.threshold", 0.85);

// Offer management
config.setLong("mesos.scheduler.offer.expiry.duration", 30000L);
config.setInteger("mesos.scheduler.offer.batch.size", 10);

Task Lifecycle Management

Comprehensive task state management and recovery:

// Configure task lifecycle settings
Configuration config = new Configuration();

// Task restart policy
config.setString("restart-strategy", "exponential-delay");
config.setInteger("restart-strategy.exponential-delay.max-failures", 3);
config.setString("restart-strategy.exponential-delay.delay", "10s");

// Health checking
config.setString("mesos.task.health.check.enabled", "true");
config.setString("mesos.task.health.check.interval", "30s");
config.setString("mesos.task.health.check.timeout", "10s");

Performance Optimization

Batch Task Scheduling

Efficient handling of multiple task launches:

  • Offer batching: Group offers for bulk processing
  • Task batching: Launch multiple tasks simultaneously
  • Resource reservation: Pre-allocate resources for predictable workloads

Constraint Optimization

  • Hard constraints: Mandatory placement requirements
  • Soft constraints: Preferred placement with fallback options
  • Fitness functions: Custom scoring for optimal placement decisions

Error Handling

The scheduling system provides robust error handling:

  • Task failure recovery: Automatic restart with backoff strategies
  • Offer timeout handling: Graceful cleanup of expired offers
  • Scheduler disconnection: Automatic reconnection and state recovery
  • Resource constraint violations: Intelligent fallback and rescheduling

Deprecation Notice

All task scheduling classes are deprecated as of Flink 1.13. Migration alternatives:

  • Kubernetes: Use Kubernetes-native scheduling with org.apache.flink.kubernetes.*
  • YARN: Use YARN resource management with org.apache.flink.yarn.*

Types

/**
 * Task placement request with resource requirements
 */
public class TaskPlacementRequest {
    public String taskId();
    public double cpuCores();
    public double memoryMB();
    public double diskMB();
    public Map<String, String> constraints();
    public List<String> preferredHosts();
}

/**
 * Scheduling result with placement decisions
 */
public class SchedulingResult {
    public List<TaskAssignment> assignments();
    public List<Offer> unusedOffers();
    public Map<String, String> failures();
}

/**
 * Task assignment to specific resource offer
 */
public class TaskAssignment {
    public TaskRequest task();
    public Offer offer();
    public Map<String, String> assignmentDetails();
}