External shuffle service for YARN NodeManagers that provides shuffle data management and retrieval for Spark applications. This auxiliary service runs as part of YARN NodeManager processes and improves executor stability by externalizing shuffle data management.
Main YARN auxiliary service class that integrates Spark's external shuffle service with YARN NodeManager lifecycle.
public class YarnShuffleService extends AuxiliaryService {
// Service lifecycle methods
protected void serviceInit(Configuration conf) throws Exception;
protected void serviceStart() throws Exception;
protected void serviceStop() throws Exception;
// Application lifecycle callbacks
public void initializeApplication(ApplicationInitializationContext context) throws Exception;
public void stopApplication(ApplicationTerminationContext context) throws Exception;
// Container lifecycle callbacks
public void initializeContainer(ContainerInitializationContext context) throws Exception;
public void stopContainer(ContainerTerminationContext context) throws Exception;
// Recovery and metadata
public void setRecoveryPath(Path recoveryPath);
public ByteBuffer getMetaData();
}Service Lifecycle Methods:
serviceInit(Configuration conf): void
serviceStart(): void
serviceStop(): void
Application Lifecycle:
initializeApplication(ApplicationInitializationContext context): void
stopApplication(ApplicationTerminationContext context): void
Container Lifecycle:
initializeContainer(ContainerInitializationContext context): void
stopContainer(ContainerTerminationContext context): void
setRecoveryPath(Path recoveryPath): void
getMetaData(): ByteBuffer
<!-- yarn-site.xml configuration -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>spark_shuffle</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
<value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>
<property>
<name>yarn.nodemanager.aux-services.spark_shuffle.classpath</name>
<value>/path/to/spark-*-yarn-shuffle.jar</value>
</property>val sparkConf = new SparkConf()
.set("spark.shuffle.service.enabled", "true")
.set("spark.shuffle.service.port", "7337")
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.shuffleTracking.enabled", "true")Key Configuration Options:
spark.shuffle.service.enabled
spark.shuffle.service.port
spark.dynamicAllocation.enabled
<!-- Kerberos authentication -->
<property>
<name>spark.shuffle.service.auth.enabled</name>
<value>true</value>
</property>
<property>
<name>spark.shuffle.service.sasl.timeout</name>
<value>30000</value>
</property><!-- Enable recovery for shuffle data -->
<property>
<name>spark.shuffle.service.db.enabled</name>
<value>true</value>
</property>
<property>
<name>spark.shuffle.service.db.backend</name>
<value>LEVELDB</value>
</property><!-- Network and I/O optimization -->
<property>
<name>spark.shuffle.io.serverThreads</name>
<value>8</value>
</property>
<property>
<name>spark.shuffle.io.clientThreads</name>
<value>8</value>
</property>
<property>
<name>spark.shuffle.service.index.cache.size</name>
<value>2048m</value>
</property>Copy Shuffle Service JAR:
cp spark-*-yarn-shuffle.jar $YARN_HOME/share/hadoop/yarn/lib/Configure YARN NodeManager:
<!-- Add to yarn-site.xml on all NodeManager nodes -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle,spark_shuffle</value>
</property>Restart NodeManagers:
$YARN_HOME/sbin/yarn-daemon.sh stop nodemanager
$YARN_HOME/sbin/yarn-daemon.sh start nodemanager# Check NodeManager logs for shuffle service initialization
grep "YarnShuffleService" $YARN_LOG_DIR/yarn-*-nodemanager-*.log
# Verify service is registered
yarn node -list -all | grep -A5 "Auxiliary Services"Service Not Starting:
ERROR: Failed to initialize YarnShuffleServiceAuthentication Failures:
ERROR: SASL authentication failed for shuffle servicePort Conflicts:
ERROR: Failed to bind shuffle service to port 7337The shuffle service exposes metrics through JMX:
// Key metrics available via JMX
"spark.shuffle.service:type=ExternalShuffleBlockHandler"
- OpenBlockRequestCount
- RegisterExecutorRequestCount
- RemoveBlocksRequestCount
- TotalBlockTransferTime<!-- log4j.properties for shuffle service logging -->
log4j.logger.org.apache.spark.network.yarn=INFO
log4j.logger.org.apache.spark.network.shuffle=DEBUG
log4j.logger.org.apache.spark.network.server=WARNServiceStateException
IOException
SecurityException
Shuffle Data Corruption:
Performance Degradation: