External shuffle service for Spark on YARN that runs as a long-running auxiliary service in the NodeManager process
npx @tessl/cli install tessl/maven-org-apache-spark--spark-network-yarn_2-11@2.4.0An external shuffle service implementation for Apache Spark applications running on YARN clusters. This library provides a long-running auxiliary service that operates within the YARN NodeManager process, enabling Spark executors to fetch shuffle data remotely even after the original executor containers have been terminated.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-yarn_2.11</artifactId>
<version>2.4.8</version>
</dependency>import org.apache.spark.network.yarn.YarnShuffleService;
import org.apache.spark.network.yarn.util.HadoopConfigProvider;The YarnShuffleService is typically deployed as a YARN auxiliary service rather than used directly in application code. However, for integration testing or custom deployments:
import org.apache.spark.network.yarn.YarnShuffleService;
import org.apache.hadoop.conf.Configuration;
// Create and configure the service
YarnShuffleService shuffleService = new YarnShuffleService();
// Configure Hadoop settings
Configuration conf = new Configuration();
conf.setInt("spark.shuffle.service.port", 7337);
conf.setBoolean("spark.authenticate", false);
// Initialize the service (typically done by YARN NodeManager)
shuffleService.serviceInit(conf);
// The service integrates with YARN application lifecycle
// Applications are initialized/stopped via YARN callbacksThe Spark Network YARN service is built around several key components:
The service operates as part of the YARN NodeManager process and provides shuffle data access to Spark executors across the cluster, enabling dynamic resource allocation and fault tolerance.
Core service that extends YARN's AuxiliaryService to provide external shuffle functionality for Spark applications.
public class YarnShuffleService extends AuxiliaryService {
/**
* Creates a new YarnShuffleService with the default service name "spark_shuffle".
* This constructor is typically called by the YARN NodeManager during service initialization.
*/
public YarnShuffleService();
/**
* Initialize an application with the shuffle service, registering its shuffle secret.
* Called by YARN when a new application starts.
*
* @param context Application initialization context containing app ID and shuffle secret
*/
@Override
public void initializeApplication(ApplicationInitializationContext context);
/**
* Stop an application and cleanup its resources from the shuffle service.
* Called by YARN when an application terminates.
*
* @param context Application termination context containing app ID
*/
@Override
public void stopApplication(ApplicationTerminationContext context);
/**
* Initialize a container (logs the container ID for debugging).
* Called by YARN when a new container starts.
*
* @param context Container initialization context
*/
@Override
public void initializeContainer(ContainerInitializationContext context);
/**
* Stop a container (logs the container ID for debugging).
* Called by YARN when a container terminates.
*
* @param context Container termination context
*/
@Override
public void stopContainer(ContainerTerminationContext context);
/**
* Get metadata for the auxiliary service (currently returns empty buffer).
*
* @return Empty ByteBuffer as metadata is not currently used
*/
@Override
public ByteBuffer getMetaData();
/**
* Set the recovery path for shuffle service recovery when NodeManager restarts.
* Called by YARN NodeManager if recovery is enabled.
*
* @param recoveryPath Path where recovery data should be stored
*/
@Override
public void setRecoveryPath(Path recoveryPath);
}Configuration property names used by the shuffle service. Note that these constants are primarily for internal use, but knowing the keys is important for configuration.
// Configuration keys (internal constants, but values are important for configuration)
// "spark.shuffle.service.port" - Port for shuffle service (default: 7337)
// "spark.authenticate" - Enable authentication (default: false)
// "spark.yarn.shuffle.stopOnFailure" - Stop NM on failure (default: false)Protected methods that handle the service initialization and shutdown lifecycle.
/**
* Initialize the shuffle server with the given Hadoop configuration.
* Sets up transport server, authentication, and recovery systems.
*
* @param conf Hadoop configuration containing service settings
* @throws Exception If service initialization fails
*/
@Override
protected void serviceInit(Configuration conf) throws Exception;
/**
* Stop the shuffle server and clean up all associated resources.
* Closes transport server, block handler, and recovery database.
*/
@Override
protected void serviceStop();
/**
* Get the recovery path specific to this auxiliary service for the given filename.
*
* @param fileName Name of the recovery file
* @return Path where the recovery file should be located
*/
protected Path getRecoveryPath(String fileName);
/**
* Initialize recovery database, handling migration from old NodeManager local directories.
*
* @param dbName Name of the database file
* @return File object pointing to the recovery database location
*/
protected File initRecoveryDb(String dbName);
/**
* Load shuffle secrets from the recovery database during service initialization.
* Called when authentication is enabled and recovery is configured.
*
* @throws IOException If database access fails during secret loading
*/
private void loadSecretsFromDb() throws IOException;
/**
* Parse a database key to extract the application ID.
* Used internally for database key management.
*
* @param s Database key string with APP_CREDS_KEY_PREFIX
* @return Application ID extracted from the key
* @throws IOException If key parsing fails
*/
private static String parseDbAppKey(String s) throws IOException;
/**
* Generate a database key for storing application credentials.
* Used internally for database key management.
*
* @param appExecId Application identity object
* @return Byte array representing the database key
* @throws IOException If key generation fails
*/
private static byte[] dbAppKey(AppId appExecId) throws IOException;
/**
* Check if authentication is enabled for the shuffle service.
* Authentication is enabled when a secret manager is configured.
*
* @return true if authentication is enabled, false otherwise
*/
private boolean isAuthenticationEnabled();Helper class for managing application identities in the shuffle service.
public static class AppId {
/** The application ID string */
public final String appId;
/**
* Create a new AppId with JSON deserialization support.
*
* @param appId The application ID string
*/
@JsonCreator
public AppId(@JsonProperty("appId") String appId);
/**
* Check equality based on application ID.
*
* @param o Object to compare with
* @return true if the objects represent the same application ID
*/
@Override
public boolean equals(Object o);
/**
* Generate hash code based on application ID.
*
* @return Hash code for this AppId
*/
@Override
public int hashCode();
/**
* String representation of the AppId.
*
* @return Formatted string showing the application ID
*/
@Override
public String toString();
}Configuration provider that adapts Hadoop Configuration for use with Spark's network layer.
public class HadoopConfigProvider extends ConfigProvider {
/**
* Create a new configuration provider wrapping the given Hadoop configuration.
*
* @param conf Hadoop Configuration instance to wrap
*/
public HadoopConfigProvider(Configuration conf);
/**
* Get a configuration value by name.
*
* @param name Configuration property name
* @return Configuration value as String
* @throws NoSuchElementException If the property is not found
*/
@Override
public String get(String name);
/**
* Get a configuration value by name with a default fallback.
*
* @param name Configuration property name
* @param defaultValue Default value if property not found
* @return Configuration value or default if not found
*/
@Override
public String get(String name, String defaultValue);
/**
* Get all configuration entries as an iterable.
*
* @return Iterable of all configuration key-value pairs
*/
@Override
public Iterable<Map.Entry<String, String>> getAll();
}The service depends on several Hadoop and YARN types that applications must have available:
// From Hadoop YARN
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
import org.apache.hadoop.yarn.api.records.ContainerId;
// From Hadoop Core
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
// From Java Standard Library
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
// From Jackson JSON Processing
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
// From Google Guava
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;Integration with Spark's network layer components:
// These types are used internally but applications typically don't interact with them directly
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
import org.apache.spark.network.sasl.ShuffleSecretManager;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.TransportServerBootstrap;
import org.apache.spark.network.crypto.AuthServerBootstrap;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.util.ConfigProvider;
import org.apache.spark.network.util.LevelDBProvider;
// From LevelDB
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
// From SLF4J Logging
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;spark.shuffle.service.port (default: 7337): Port for the shuffle service to listen onspark.authenticate (default: false): Enable authentication for shuffle requestsyarn.nodemanager.local-dirs: Local directories for NodeManager (used for recovery file placement)spark.yarn.shuffle.stopOnFailure (default: false): Whether service initialization failure should stop the NodeManagerThe service uses the following internal configuration constants:
// Configuration key constants (from source code)
private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port";
private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337;
private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";
private static final boolean DEFAULT_SPARK_AUTHENTICATE = false;
private static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure";
private static final boolean DEFAULT_STOP_ON_FAILURE = false;
// Recovery file names
private static final String RECOVERY_FILE_NAME = "registeredExecutors.ldb";
private static final String SECRETS_RECOVERY_FILE_NAME = "sparkShuffleRecovery.ldb";The service is typically configured in YARN's yarn-site.xml:
<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>spark_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
<value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>
<property>
<name>spark.shuffle.service.port</name>
<value>7337</value>
</property>
</configuration>The service handles several types of errors gracefully:
The YarnShuffleService is designed to handle concurrent operations safely: