YARN Shuffle Service for Apache Spark that provides external shuffle service functionality running as a long-running auxiliary service in the NodeManager process
npx @tessl/cli install tessl/maven-org-apache-spark--spark-network-yarn-2-12@3.0.0Spark Network YARN provides the YARN Shuffle Service functionality for Apache Spark, enabling external shuffle management in YARN-managed clusters. This service runs as a long-running auxiliary service within the NodeManager process and allows Spark applications to offload shuffle data storage and retrieval operations, improving application performance and reliability by maintaining shuffle data even when executors fail or are deallocated.
org.apache.spark:spark-network-yarn_2.12:3.0.1import org.apache.spark.network.yarn.YarnShuffleService;
import org.apache.spark.network.yarn.util.HadoopConfigProvider;
import org.apache.hadoop.conf.Configuration;// Configure and start YARN Shuffle Service (typically done by YARN NodeManager)
YarnShuffleService shuffleService = new YarnShuffleService();
// Initialize with Hadoop configuration
Configuration conf = new Configuration();
conf.setBoolean("spark.authenticate", false);
conf.setInt("spark.shuffle.service.port", 7337);
shuffleService.serviceInit(conf);
// The service will automatically handle application lifecycle
// through YARN callbacks: initializeApplication, stopApplication, etc.The Spark Network YARN module consists of three main components:
The service operates as an auxiliary service within YARN's NodeManager process, automatically starting and stopping with the NodeManager and handling Spark application lifecycle events.
Core external shuffle service implementation for YARN clusters.
public class YarnShuffleService extends AuxiliaryService {
// Constructors
public YarnShuffleService();
// Lifecycle methods
public void initializeApplication(ApplicationInitializationContext context);
public void stopApplication(ApplicationTerminationContext context);
public void initializeContainer(ContainerInitializationContext context);
public void stopContainer(ContainerTerminationContext context);
public ByteBuffer getMetaData();
public void setRecoveryPath(Path recoveryPath);
// Protected service lifecycle methods
protected void serviceInit(Configuration conf) throws Exception;
protected void serviceStop();
protected Path getRecoveryPath(String fileName);
protected File initRecoveryDb(String dbName);
}Key Configuration Properties:
spark.shuffle.service.port (default: 7337) - Port for shuffle serverspark.authenticate (default: false) - Enable authenticationspark.yarn.shuffle.stopOnFailure (default: false) - Stop NodeManager on service failureUtility class for encoding application IDs in shuffle service context.
public static class YarnShuffleService.AppId {
public final String appId;
// Constructors
public AppId(String appId);
// Standard object methods
public boolean equals(Object o);
public int hashCode();
public String toString();
}Forwards shuffle service metrics to Hadoop's metrics system for monitoring and observability.
class YarnShuffleServiceMetrics implements MetricsSource {
// Constructor
YarnShuffleServiceMetrics(MetricSet metricSet);
// MetricsSource implementation
public void getMetrics(MetricsCollector collector, boolean all);
// Static utility methods
public static void collectMetric(
MetricsRecordBuilder metricsRecordBuilder,
String name,
Metric metric);
}Hadoop Configuration adapter for Spark network configuration system.
public class HadoopConfigProvider extends ConfigProvider {
// Constructor
public HadoopConfigProvider(Configuration conf);
// ConfigProvider implementation
public String get(String name);
public String get(String name, String defaultValue);
public Iterable<Map.Entry<String, String>> getAll();
}// From Hadoop YARN APIs
interface ApplicationInitializationContext {
ApplicationId getApplicationId();
ByteBuffer getApplicationDataForService();
}
interface ApplicationTerminationContext {
ApplicationId getApplicationId();
}
interface ContainerInitializationContext {
ContainerId getContainerId();
}
interface ContainerTerminationContext {
ContainerId getContainerId();
}
// From Hadoop Configuration
class Configuration implements Iterable<Map.Entry<String, String>> {
boolean getBoolean(String name, boolean defaultValue);
int getInt(String name, int defaultValue);
String get(String name);
String[] getTrimmedStrings(String name);
}
// From Hadoop Metrics
interface MetricsSource {
void getMetrics(MetricsCollector collector, boolean all);
}
interface MetricsCollector {
MetricsRecordBuilder addRecord(String name);
}
interface MetricsRecordBuilder {
MetricsRecordBuilder addCounter(MetricsInfo info, long value);
MetricsRecordBuilder addGauge(MetricsInfo info, Number value);
}
// From Codahale Metrics
interface MetricSet {
Map<String, Metric> getMetrics();
}
interface Metric {
// Base interface for all metrics
}
interface Timer extends Metric {
long getCount();
double getFifteenMinuteRate();
double getFiveMinuteRate();
double getOneMinuteRate();
double getMeanRate();
}
interface Meter extends Metric {
long getCount();
double getFifteenMinuteRate();
double getFiveMinuteRate();
double getOneMinuteRate();
double getMeanRate();
}
interface Counter extends Metric {
long getCount();
}
interface Gauge<T> extends Metric {
T getValue();
}The service includes robust error handling for common scenarios:
Common exceptions:
NoSuchElementException - Thrown by HadoopConfigProvider when required configuration key is missingIOException - Various I/O operations during service initialization, recovery, and database operationsException - General service lifecycle exceptions that are caught and loggedspark.shuffle.service.enabled=true