or run

tessl search
Log in

Version

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-network-yarn_2.13@3.5.x
tile.json

tessl/maven-org-apache-spark--spark-network-yarn_2-13

tessl install tessl/maven-org-apache-spark--spark-network-yarn_2-13@3.5.0

YARN auxiliary service for Apache Spark shuffle operations that provides external shuffle service functionality in YARN environments

index.mddocs/

Spark Network YARN

Apache Spark's YARN auxiliary service that provides external shuffle service functionality in YARN environments. This library enables shuffle data to be served by external processes (NodeManagers) rather than executors themselves, improving resource efficiency and allowing shuffle data to survive executor crashes.

Package Information

  • Package Name: spark-network-yarn_2.13
  • Package Type: Maven
  • Language: Java
  • Group ID: org.apache.spark
  • Artifact ID: spark-network-yarn_2.13
  • Installation: Add Maven dependency
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-network-yarn_2.13</artifactId>
    <version>3.5.6</version>
</dependency>

Core Imports

import org.apache.spark.network.yarn.YarnShuffleService;
import org.apache.spark.network.yarn.YarnShuffleServiceMetrics;
import org.apache.spark.network.yarn.util.HadoopConfigProvider;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;

Basic Usage

import org.apache.spark.network.yarn.YarnShuffleService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

// Create and configure the YARN shuffle service
YarnShuffleService shuffleService = new YarnShuffleService();

// Set up YARN configuration
YarnConfiguration conf = new YarnConfiguration();
conf.set("spark.shuffle.service.port", "7337");
conf.setBoolean("spark.authenticate", false);
conf.set("spark.yarn.shuffle.service.metrics.namespace", "sparkShuffleService");

// Initialize the service
shuffleService.serviceInit(conf);

// The service will automatically handle application and container lifecycle
// through YARN NodeManager integration

Architecture

The Spark Network YARN module consists of three main components:

  1. YarnShuffleService: The main auxiliary service that integrates with YARN NodeManagers
  2. YarnShuffleServiceMetrics: Metrics collection and forwarding to Hadoop's metrics system
  3. HadoopConfigProvider: Configuration provider that bridges Spark and Hadoop configuration systems

The service is designed to run as a long-lived auxiliary service within YARN NodeManagers, providing shuffle data persistence and serving capabilities for Spark applications.

Capabilities

YARN Auxiliary Service Integration

Main service implementation that extends Hadoop's AuxiliaryService interface for YARN integration.

public class YarnShuffleService extends AuxiliaryService {
    // Constructor
    public YarnShuffleService();
    
    // Service lifecycle methods
    protected void serviceInit(Configuration conf) throws Exception;
    protected void serviceStop();
    
    // Application lifecycle methods  
    public void initializeApplication(ApplicationInitializationContext context);
    public void stopApplication(ApplicationTerminationContext context);
    
    // Container lifecycle methods
    public void initializeContainer(ContainerInitializationContext context); 
    public void stopContainer(ContainerTerminationContext context);
    
    // Metadata and recovery methods
    public ByteBuffer getMetaData();
    public void setRecoveryPath(Path recoveryPath);
}

Configuration Constants:

// Service configuration keys
public static final String SPARK_SHUFFLE_SERVICE_METRICS_NAMESPACE_KEY = 
    "spark.yarn.shuffle.service.metrics.namespace";
public static final String SPARK_SHUFFLE_SERVICE_LOGS_NAMESPACE_KEY = 
    "spark.yarn.shuffle.service.logs.namespace";
public static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure";
public static final String INTEGRATION_TESTING = "spark.yarn.shuffle.testing";
public static final String SPARK_SHUFFLE_SERVER_RECOVERY_DISABLED = 
    "spark.yarn.shuffle.server.recovery.disabled";
public static final String SECRET_KEY = "secret";
public static final String SHUFFLE_SERVICE_CONF_OVERLAY_RESOURCE_NAME = 
    "spark-shuffle-site.xml";

// Recovery file names  
public static final String SPARK_SHUFFLE_MERGE_RECOVERY_FILE_NAME = 
    "sparkShuffleMergeRecovery";

Testing Support:

// Visible for testing
void setShuffleMergeManager(MergedShuffleFileManager mergeManager);
static MergedShuffleFileManager newMergedShuffleFileManagerInstance(
    TransportConf conf, File mergeManagerFile);

Application ID Management

Encapsulates application identifiers for JSON serialization and database operations.

public static class AppId {
    public final String appId;
    
    // Constructor with Jackson JSON annotation
    public AppId(String appId);
    
    // Object methods
    public boolean equals(Object o);
    public int hashCode();
    public String toString();
}

Metrics Integration

Forwards shuffle service metrics to Hadoop's metrics2 system for JMX exposure and monitoring.

class YarnShuffleServiceMetrics implements MetricsSource {
    // Constructor
    YarnShuffleServiceMetrics(String metricsNamespace, MetricSet metricSet);
    
    // MetricsSource implementation
    public void getMetrics(MetricsCollector collector, boolean all);
    
    // Static utility for metric collection
    public static void collectMetric(
        MetricsRecordBuilder metricsRecordBuilder, 
        String name, 
        Metric metric);
}

Configuration Management

Provides Spark network configuration using Hadoop Configuration as the backing store.

public class HadoopConfigProvider extends ConfigProvider {
    // Constructor
    public HadoopConfigProvider(Configuration conf);
    
    // Configuration access methods
    public String get(String name);
    public String get(String name, String defaultValue);
    public Iterable<Map.Entry<String, String>> getAll();
}

Configuration Reference

Service Configuration

// Service port configuration (default: 7337)
conf.setInt("spark.shuffle.service.port", 7337);

// Authentication settings (default: false)
conf.setBoolean("spark.authenticate", false);

// Metrics namespace (default: "sparkShuffleService")
conf.set("spark.yarn.shuffle.service.metrics.namespace", "sparkShuffleService");

// Logs namespace (optional)
conf.set("spark.yarn.shuffle.service.logs.namespace", "customNamespace");

// Failure handling (default: false)
conf.setBoolean("spark.yarn.shuffle.stopOnFailure", true);

// Recovery settings (default: false)
conf.setBoolean("spark.yarn.shuffle.server.recovery.disabled", false);

// Database backend for recovery (default: LEVELDB)
conf.set("spark.shuffle.service.db.backend", "LEVELDB");

YARN Integration Configuration

<!-- yarn-site.xml configuration -->
<property>
    <name>yarn.nodemanager.aux-services</name>
    <value>spark_shuffle</value>
</property>

<property>
    <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
    <value>org.apache.spark.network.yarn.YarnShuffleService</value>
</property>

<property>
    <name>yarn.nodemanager.aux-services.spark_shuffle.classpath</name>
    <value>/path/to/spark-network-yarn-shuffle.jar</value>
</property>

Error Handling

The service implements comprehensive error handling for common failure scenarios:

Service Initialization Errors

  • Configuration validation failures
  • Port binding conflicts
  • Authentication setup failures
  • Database initialization problems

Runtime Errors

  • Application authentication failures
  • Network transport errors
  • Database corruption/recovery issues
  • Container lifecycle management errors

Recovery Scenarios

  • NodeManager restart recovery
  • Database migration across local directories
  • Application state reconstruction
  • Shuffle data preservation
// Example error handling pattern
try {
    shuffleService.serviceInit(configuration);
} catch (Exception e) {
    // Service initialization failed
    // Check configuration and dependencies
    logger.error("Failed to initialize shuffle service", e);
    throw e;
}

Thread Safety

All public methods are designed to be thread-safe for concurrent operations:

  • Multiple application initialization/termination
  • Concurrent container lifecycle management
  • Parallel metrics collection
  • Database operations with proper locking

Integration Points

With Apache Spark

  • Integrates with Spark's ExternalBlockHandler for shuffle block serving
  • Supports push-based shuffle through MergedShuffleFileManager
  • Uses Spark's authentication and security mechanisms
  • Handles executor registration and cleanup automatically

With Hadoop YARN

  • Implements AuxiliaryService interface for NodeManager integration
  • Participates in YARN application and container lifecycle
  • Integrates with YARN's metrics2 system for monitoring
  • Supports YARN NodeManager recovery mechanisms

With Database Systems

  • Supports LevelDB and RocksDB backends for state persistence
  • Handles database migration and recovery operations
  • Provides configurable database backend selection
  • Implements proper database lifecycle management

Dependencies

Required Dependencies

  • org.apache.hadoop:hadoop-client-api - Hadoop client API
  • org.apache.hadoop:hadoop-client-runtime - Hadoop client runtime
  • org.apache.spark:spark-network-shuffle - Spark network shuffle components

Shaded Dependencies (Internal)

  • Jackson JSON processing libraries
  • Netty networking libraries
  • Guava utilities

Deployment

As YARN Auxiliary Service

  1. Build the service JAR:

    mvn clean package -pl common/network-yarn
  2. Deploy to YARN NodeManagers:

    # Copy JAR to all NodeManager hosts
    cp target/spark-*-yarn-shuffle.jar $YARN_HOME/share/hadoop/yarn/
  3. Configure YARN (yarn-site.xml):

    <property>
      <name>yarn.nodemanager.aux-services</name>
      <value>spark_shuffle</value>
    </property>
    <property>
      <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
      <value>org.apache.spark.network.yarn.YarnShuffleService</value>
    </property>
  4. Restart NodeManagers:

    $YARN_HOME/sbin/yarn-daemon.sh stop nodemanager
    $YARN_HOME/sbin/yarn-daemon.sh start nodemanager

Configuration Overlay

For custom configuration without modifying yarn-site.xml:

  1. Create spark-shuffle-site.xml:

    <?xml version="1.0"?>
    <configuration>
      <property>
        <name>spark.shuffle.service.port</name>
        <value>7338</value>
      </property>
      <property>
        <name>spark.authenticate</name>
        <value>true</value>
      </property>
    </configuration>
  2. Place on classpath: Add to NodeManager classpath or auxiliary service classpath

The service will automatically load and apply the overlay configuration on startup.