tessl install tessl/maven-org-apache-spark--spark-network-yarn_2-13@3.5.0YARN auxiliary service for Apache Spark shuffle operations that provides external shuffle service functionality in YARN environments
Apache Spark's YARN auxiliary service that provides external shuffle service functionality in YARN environments. This library enables shuffle data to be served by external processes (NodeManagers) rather than executors themselves, improving resource efficiency and allowing shuffle data to survive executor crashes.
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-network-yarn_2.13</artifactId>
<version>3.5.6</version>
</dependency>import org.apache.spark.network.yarn.YarnShuffleService;
import org.apache.spark.network.yarn.YarnShuffleServiceMetrics;
import org.apache.spark.network.yarn.util.HadoopConfigProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;import org.apache.spark.network.yarn.YarnShuffleService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
// Create and configure the YARN shuffle service
YarnShuffleService shuffleService = new YarnShuffleService();
// Set up YARN configuration
YarnConfiguration conf = new YarnConfiguration();
conf.set("spark.shuffle.service.port", "7337");
conf.setBoolean("spark.authenticate", false);
conf.set("spark.yarn.shuffle.service.metrics.namespace", "sparkShuffleService");
// Initialize the service
shuffleService.serviceInit(conf);
// The service will automatically handle application and container lifecycle
// through YARN NodeManager integrationThe Spark Network YARN module consists of three main components:
The service is designed to run as a long-lived auxiliary service within YARN NodeManagers, providing shuffle data persistence and serving capabilities for Spark applications.
Main service implementation that extends Hadoop's AuxiliaryService interface for YARN integration.
public class YarnShuffleService extends AuxiliaryService {
// Constructor
public YarnShuffleService();
// Service lifecycle methods
protected void serviceInit(Configuration conf) throws Exception;
protected void serviceStop();
// Application lifecycle methods
public void initializeApplication(ApplicationInitializationContext context);
public void stopApplication(ApplicationTerminationContext context);
// Container lifecycle methods
public void initializeContainer(ContainerInitializationContext context);
public void stopContainer(ContainerTerminationContext context);
// Metadata and recovery methods
public ByteBuffer getMetaData();
public void setRecoveryPath(Path recoveryPath);
}Configuration Constants:
// Service configuration keys
public static final String SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY =
"spark.yarn.shuffle.service.metrics.namespace";
public static final String SPARK_SHUFFLE_SERVICE_LOGS_NAMESPACE_KEY =
"spark.yarn.shuffle.service.logs.namespace";
public static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure";
public static final String INTEGRATION_TESTING = "spark.yarn.shuffle.testing";
public static final String SPARK_SHUFFLE_SERVER_RECOVERY_DISABLED =
"spark.yarn.shuffle.server.recovery.disabled";
public static final String SECRET_KEY = "secret";
public static final String SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME =
"spark-shuffle-site.xml";
// Recovery file names
public static final String SPARK_SHUFFLE_MERGE_RECOVERY_FILE_NAME =
"sparkShuffleMergeRecovery";Testing Support:
// Visible for testing
void setShuffleMergeManager(MergedShuffleFileManager mergeManager);
static MergedShuffleFileManager newMergedShuffleFileManagerInstance(
TransportConf conf, File mergeManagerFile);Encapsulates application identifiers for JSON serialization and database operations.
public static class AppId {
public final String appId;
// Constructor with Jackson JSON annotation
public AppId(String appId);
// Object methods
public boolean equals(Object o);
public int hashCode();
public String toString();
}Forwards shuffle service metrics to Hadoop's metrics2 system for JMX exposure and monitoring.
class YarnShuffleServiceMetrics implements MetricsSource {
// Constructor
YarnShuffleServiceMetrics(String metricsNamespace, MetricSet metricSet);
// MetricsSource implementation
public void getMetrics(MetricsCollector collector, boolean all);
// Static utility for metric collection
public static void collectMetric(
MetricsRecordBuilder metricsRecordBuilder,
String name,
Metric metric);
}Provides Spark network configuration using Hadoop Configuration as the backing store.
public class HadoopConfigProvider extends ConfigProvider {
// Constructor
public HadoopConfigProvider(Configuration conf);
// Configuration access methods
public String get(String name);
public String get(String name, String defaultValue);
public Iterable<Map.Entry<String, String>> getAll();
}// Service port configuration (default: 7337)
conf.setInt("spark.shuffle.service.port", 7337);
// Authentication settings (default: false)
conf.setBoolean("spark.authenticate", false);
// Metrics namespace (default: "sparkShuffleService")
conf.set("spark.yarn.shuffle.service.metrics.namespace", "sparkShuffleService");
// Logs namespace (optional)
conf.set("spark.yarn.shuffle.service.logs.namespace", "customNamespace");
// Failure handling (default: false)
conf.setBoolean("spark.yarn.shuffle.stopOnFailure", true);
// Recovery settings (default: false)
conf.setBoolean("spark.yarn.shuffle.server.recovery.disabled", false);
// Database backend for recovery (default: LEVELDB)
conf.set("spark.shuffle.service.db.backend", "LEVELDB");<!-- yarn-site.xml configuration -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>spark_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
<value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.spark_shuffle.classpath</name>
<value>/path/to/spark-network-yarn-shuffle.jar</value>
</property>The service implements comprehensive error handling for common failure scenarios:
// Example error handling pattern
try {
shuffleService.serviceInit(configuration);
} catch (Exception e) {
// Service initialization failed
// Check configuration and dependencies
logger.error("Failed to initialize shuffle service", e);
throw e;
}All public methods are designed to be thread-safe for concurrent operations:
org.apache.hadoop:hadoop-client-api - Hadoop client APIorg.apache.hadoop:hadoop-client-runtime - Hadoop client runtimeorg.apache.spark:spark-network-shuffle - Spark network shuffle componentsBuild the service JAR:
mvn clean package -pl common/network-yarnDeploy to YARN NodeManagers:
# Copy JAR to all NodeManager hosts
cp target/spark-*-yarn-shuffle.jar $YARN_HOME/share/hadoop/yarn/Configure YARN (yarn-site.xml):
<property>
<name>yarn.nodemanager.aux-services</name>
<value>spark_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
<value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>Restart NodeManagers:
$YARN_HOME/sbin/yarn-daemon.sh stop nodemanager
$YARN_HOME/sbin/yarn-daemon.sh start nodemanagerFor custom configuration without modifying yarn-site.xml:
Create spark-shuffle-site.xml:
<?xml version="1.0"?>
<configuration>
<property>
<name>spark.shuffle.service.port</name>
<value>7338</value>
</property>
<property>
<name>spark.authenticate</name>
<value>true</value>
</property>
</configuration>Place on classpath: Add to NodeManager classpath or auxiliary service classpath
The service will automatically load and apply the overlay configuration on startup.