Apache Spark YARN Shuffle Service - provides shuffle service functionality for YARN-managed clusters
npx @tessl/cli install tessl/maven-org-apache-spark--spark-network-yarn_2-13@4.0.0Apache Spark YARN Shuffle Service provides shuffle service functionality for YARN-managed clusters. This library implements a shuffle service that runs as an auxiliary service on YARN NodeManagers, allowing Spark executors to fetch shuffle data even after the original executor that wrote the data has terminated.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-yarn_2.13</artifactId>
<version>4.0.0</version>
</dependency>import org.apache.spark.network.yarn.YarnShuffleService;
import org.apache.spark.network.yarn.YarnShuffleServiceMetrics;
import org.apache.spark.network.yarn.util.HadoopConfigProvider;
// YARN integration imports
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;The YarnShuffleService is typically deployed as a YARN auxiliary service and managed by the NodeManager automatically:
import org.apache.spark.network.yarn.YarnShuffleService;
import org.apache.hadoop.conf.Configuration;
// Create and initialize the shuffle service
YarnShuffleService shuffleService = new YarnShuffleService();
// Service is initialized with YARN configuration
Configuration conf = new Configuration();
shuffleService.serviceInit(conf);
// Service lifecycle is managed by YARN NodeManager
// Applications connect by setting spark.shuffle.service.enabled=trueThe Spark YARN Shuffle Service is built around several key components:
Main shuffle service class that extends Hadoop's AuxiliaryService for integration with YARN NodeManager.
public class YarnShuffleService extends AuxiliaryService {
/**
* Default constructor - initializes service with name "spark_shuffle"
*/
public YarnShuffleService();
/**
* Return whether authentication is enabled as specified by the configuration.
* If so, fetch requests will fail unless the appropriate authentication secret
* for the application is provided.
* @return true if authentication is enabled, false otherwise
*/
private boolean isAuthenticationEnabled();
/**
* Initialize application with the shuffle service
* @param context Application initialization context from YARN
*/
public void initializeApplication(ApplicationInitializationContext context);
/**
* Stop and cleanup application resources
* @param context Application termination context from YARN
*/
public void stopApplication(ApplicationTerminationContext context);
/**
* Initialize container with the shuffle service
* @param context Container initialization context from YARN
*/
public void initializeContainer(ContainerInitializationContext context);
/**
* Stop container and cleanup resources
* @param context Container termination context from YARN
*/
public void stopContainer(ContainerTerminationContext context);
/**
* Get service metadata (returns empty buffer)
* @return Empty ByteBuffer for metadata
*/
public ByteBuffer getMetaData();
/**
* Set recovery path for NodeManager restart scenarios
* @param recoveryPath Path where recovery data should be stored
*/
public void setRecoveryPath(Path recoveryPath);
/**
* Initialize service with external configuration
* @param externalConf Hadoop configuration from NodeManager
* @throws Exception if initialization fails
*/
protected void serviceInit(Configuration externalConf) throws Exception;
/**
* Stop service and cleanup all resources
*/
protected void serviceStop() throws Exception;
/**
* Get recovery path for specific file
* @param fileName Name of the recovery file
* @return Path for recovery file storage
*/
protected Path getRecoveryPath(String fileName);
/**
* Initialize recovery database file
* @param dbName Database file name
* @return File object for recovery database
*/
protected File initRecoveryDb(String dbName);
/**
* Set customized MergedShuffleFileManager for unit testing
* @param mergeManager Custom merge manager implementation
*/
@VisibleForTesting
void setShuffleMergeManager(MergedShuffleFileManager mergeManager);
/**
* Create new MergedShuffleFileManager instance for testing
* @param conf Transport configuration
* @param mergeManagerFile Recovery file for merge manager
* @return New MergedShuffleFileManager instance
*/
@VisibleForTesting
static MergedShuffleFileManager newMergedShuffleFileManagerInstance(
TransportConf conf, File mergeManagerFile);
/**
* Load application secrets from the recovery database
* @throws IOException if database operations fail
*/
private void loadSecretsFromDb() throws IOException;
/**
* Parse database key to extract application ID
* @param s Database key string
* @return Application ID
* @throws IOException if parsing fails
*/
private static String parseDbAppKey(String s) throws IOException;
/**
* Create database key for application ID
* @param appExecId Application ID wrapper
* @return Database key as byte array
* @throws IOException if serialization fails
*/
private static byte[] dbAppKey(AppId appExecId) throws IOException;
}Simple container class for encoding application identifiers with JSON serialization support.
public static class AppId {
/** The application identifier string */
public final String appId;
/**
* Constructor for application ID
* @param appId Application identifier string
*/
public AppId(String appId);
/**
* Check equality with another AppId
* @param o Object to compare
* @return true if equal, false otherwise
*/
public boolean equals(Object o);
/**
* Compute hash code for the application ID
* @return Hash code value
*/
public int hashCode();
/**
* String representation of the application ID
* @return Formatted string representation
*/
public String toString();
}Forwards shuffle service metrics to Hadoop's metrics2 system for JMX exposure and monitoring.
class YarnShuffleServiceMetrics implements MetricsSource {
/**
* Package-private constructor for metrics integration
* @param metricsNamespace Namespace for metrics collection
* @param metricSet Set of metrics to forward
*/
YarnShuffleServiceMetrics(String metricsNamespace, MetricSet metricSet);
/**
* Collect metrics from the shuffle service
* @param collector Metrics collector to receive metrics
* @param all Whether to return all metrics even if unchanged
*/
public void getMetrics(MetricsCollector collector, boolean all);
/**
* Collect individual metric and add to metrics record
* @param metricsRecordBuilder Builder for metrics record
* @param name Name of the metric
* @param metric The metric object to collect
*/
public static void collectMetric(
MetricsRecordBuilder metricsRecordBuilder,
String name,
Metric metric
);
/**
* Internal record class for metrics information
*/
private record ShuffleServiceMetricsInfo(String name, String description)
implements MetricsInfo;
}Configuration provider that bridges Hadoop Configuration with Spark's network configuration system.
public class HadoopConfigProvider extends ConfigProvider {
/**
* Constructor that wraps Hadoop configuration
* @param conf Hadoop Configuration instance
*/
public HadoopConfigProvider(Configuration conf);
/**
* Get configuration value by name
* @param name Configuration property name
* @return Configuration value
* @throws NoSuchElementException if property not found
*/
public String get(String name);
/**
* Get configuration value with default fallback
* @param name Configuration property name
* @param defaultValue Default value if property not found
* @return Configuration value or default
*/
public String get(String name, String defaultValue);
/**
* Get all configuration entries as iterable
* @return Iterable of all configuration key-value pairs
*/
public Iterable<Map.Entry<String, String>> getAll();
}The shuffle service supports extensive configuration through Hadoop Configuration properties:
// Service port configuration
public static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port";
public static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337;
// Authentication configuration
public static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";
public static final boolean DEFAULT_SPARK_AUTHENTICATE = false;
// Metrics configuration
public static final String SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY =
"spark.yarn.shuffle.service.metrics.namespace";
public static final String DEFAULT_SPARK_SHUFFLE_SERVICE_METRICS_NAME = "sparkShuffleService";
// Logging configuration
public static final String SPARK_SHUFFLE_SERVICE_LOGS_NAMESPACE_KEY =
"spark.yarn.shuffle.service.logs.namespace";
// Failure handling configuration
public static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure";
public static final boolean DEFAULT_STOP_ON_FAILURE = false;
// Recovery configuration
public static final String SPARK_SHUFFLE_SERVER_RECOVERY_DISABLED =
"spark.yarn.shuffle.server.recovery.disabled";
// Recovery file names
public static final String RECOVERY_FILE_NAME = "registeredExecutors";
public static final String SECRETS_RECOVERY_FILE_NAME = "sparkShuffleRecovery";
// Configuration overlay resource
public static final String SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME =
"spark-shuffle-site.xml";
// Testing and integration constants
@VisibleForTesting
public static final String INTEGRATION_TESTING = "spark.yarn.shuffle.testing";
@VisibleForTesting
public static final String SECRET_KEY = "secret";
@VisibleForTesting
public static final String SPARK_SHUFFLE_MERGE_RECOVERY_FILE_NAME =
"sparkShuffleMergeRecovery";
// Internal database constants
private static final String APP_CREDS_KEY_PREFIX = "AppCreds";
private static final ObjectMapper mapper = new ObjectMapper();import org.apache.hadoop.conf.Configuration;
import org.apache.spark.network.yarn.util.HadoopConfigProvider;
// Create configuration with shuffle service settings
Configuration conf = new Configuration();
conf.setInt("spark.shuffle.service.port", 7337);
conf.setBoolean("spark.authenticate", true);
conf.set("spark.yarn.shuffle.service.metrics.namespace", "sparkShuffleService");
// Use with configuration provider
HadoopConfigProvider configProvider = new HadoopConfigProvider(conf);
String port = configProvider.get("spark.shuffle.service.port", "7337");The shuffle service provides comprehensive error handling and recovery capabilities:
// Example error handling configuration
Configuration conf = new Configuration();
conf.setBoolean("spark.yarn.shuffle.stopOnFailure", false); // Continue on failure
conf.setBoolean("spark.yarn.shuffle.server.recovery.disabled", false); // Enable recoveryThe shuffle service depends on several key libraries:
com.fasterxml.jackson)io.netty with native library renaming)Dependencies with provided scope are expected to be available in the YARN environment. External dependencies like Jackson and Netty are carefully shaded to avoid classpath conflicts in YARN environments.
// Core YARN context types (from hadoop-yarn-server-api)
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
// Configuration and filesystem types (from hadoop-common)
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
// Network and transport types (from spark-network-common)
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.shuffle.MergedShuffleFileManager;
// Metrics types (from hadoop-common and dropwizard-metrics)
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import com.codahale.metrics.Metric;
import com.codahale.metrics.MetricSet;
// Standard Java types
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.NoSuchElementException;
// Jackson types (shaded)
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;