or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

index.md
tile.json

tessl/maven-org-apache-spark--spark-network-yarn_2-11

External shuffle service for Spark on YARN that runs as a long-running auxiliary service in the NodeManager process

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.spark/spark-network-yarn_2.11@2.4.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-spark--spark-network-yarn_2-11@2.4.0

index.mddocs/

Spark Network YARN

An external shuffle service implementation for Apache Spark applications running on YARN clusters. This library provides a long-running auxiliary service that operates within the YARN NodeManager process, enabling Spark executors to fetch shuffle data remotely even after the original executor containers have been terminated.

Package Information

  • Package Name: spark-network-yarn_2.11
  • Package Type: Maven
  • Language: Java
  • Installation:
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-network-yarn_2.11</artifactId>
    <version>2.4.8</version>
</dependency>

Core Imports

import org.apache.spark.network.yarn.YarnShuffleService;
import org.apache.spark.network.yarn.util.HadoopConfigProvider;

Basic Usage

The YarnShuffleService is typically deployed as a YARN auxiliary service rather than used directly in application code. However, for integration testing or custom deployments:

import org.apache.spark.network.yarn.YarnShuffleService;
import org.apache.hadoop.conf.Configuration;

// Create and configure the service
YarnShuffleService shuffleService = new YarnShuffleService();

// Configure Hadoop settings
Configuration conf = new Configuration();
conf.setInt("spark.shuffle.service.port", 7337);
conf.setBoolean("spark.authenticate", false);

// Initialize the service (typically done by YARN NodeManager)  
shuffleService.serviceInit(conf);

// The service integrates with YARN application lifecycle
// Applications are initialized/stopped via YARN callbacks

Architecture

The Spark Network YARN service is built around several key components:

  • YarnShuffleService: Main auxiliary service that extends YARN's AuxiliaryService base class
  • HadoopConfigProvider: Configuration adapter that bridges Hadoop Configuration to Spark's network layer
  • Authentication Layer: Optional shuffle secret management for multi-tenant security
  • Recovery System: LevelDB-based persistence for handling NodeManager restarts
  • Transport Layer: Netty-based network server for shuffle data retrieval

The service operates as part of the YARN NodeManager process and provides shuffle data access to Spark executors across the cluster, enabling dynamic resource allocation and fault tolerance.

Capabilities

YARN Auxiliary Service Implementation

Core service that extends YARN's AuxiliaryService to provide external shuffle functionality for Spark applications.

public class YarnShuffleService extends AuxiliaryService {
    /**
     * Creates a new YarnShuffleService with the default service name "spark_shuffle".
     * This constructor is typically called by the YARN NodeManager during service initialization.
     */
    public YarnShuffleService();
    
    /**
     * Initialize an application with the shuffle service, registering its shuffle secret.
     * Called by YARN when a new application starts.
     * 
     * @param context Application initialization context containing app ID and shuffle secret
     */
    @Override
    public void initializeApplication(ApplicationInitializationContext context);
    
    /**
     * Stop an application and cleanup its resources from the shuffle service.
     * Called by YARN when an application terminates.
     * 
     * @param context Application termination context containing app ID
     */
    @Override
    public void stopApplication(ApplicationTerminationContext context);
    
    /**
     * Initialize a container (logs the container ID for debugging).
     * Called by YARN when a new container starts.
     * 
     * @param context Container initialization context
     */
    @Override
    public void initializeContainer(ContainerInitializationContext context);
    
    /**
     * Stop a container (logs the container ID for debugging).
     * Called by YARN when a container terminates.
     * 
     * @param context Container termination context
     */
    @Override
    public void stopContainer(ContainerTerminationContext context);
    
    /**
     * Get metadata for the auxiliary service (currently returns empty buffer).
     * 
     * @return Empty ByteBuffer as metadata is not currently used
     */
    @Override
    public ByteBuffer getMetaData();
    
    /**
     * Set the recovery path for shuffle service recovery when NodeManager restarts.
     * Called by YARN NodeManager if recovery is enabled.
     * 
     * @param recoveryPath Path where recovery data should be stored
     */
    @Override
    public void setRecoveryPath(Path recoveryPath);
}

Service Configuration Keys

Configuration property names used by the shuffle service. Note that these constants are primarily for internal use, but knowing the keys is important for configuration.

// Configuration keys (internal constants, but values are important for configuration)
// "spark.shuffle.service.port" - Port for shuffle service (default: 7337)
// "spark.authenticate" - Enable authentication (default: false)  
// "spark.yarn.shuffle.stopOnFailure" - Stop NM on failure (default: false)

Service Lifecycle Management

Protected methods that handle the service initialization and shutdown lifecycle.

/**
 * Initialize the shuffle server with the given Hadoop configuration.
 * Sets up transport server, authentication, and recovery systems.
 * 
 * @param conf Hadoop configuration containing service settings
 * @throws Exception If service initialization fails
 */
@Override
protected void serviceInit(Configuration conf) throws Exception;

/**
 * Stop the shuffle server and clean up all associated resources.
 * Closes transport server, block handler, and recovery database.
 */
@Override
protected void serviceStop();

/**
 * Get the recovery path specific to this auxiliary service for the given filename.
 * 
 * @param fileName Name of the recovery file
 * @return Path where the recovery file should be located
 */
protected Path getRecoveryPath(String fileName);

/**
 * Initialize recovery database, handling migration from old NodeManager local directories.
 * 
 * @param dbName Name of the database file
 * @return File object pointing to the recovery database location
 */
protected File initRecoveryDb(String dbName);

/**
 * Load shuffle secrets from the recovery database during service initialization.
 * Called when authentication is enabled and recovery is configured.
 * 
 * @throws IOException If database access fails during secret loading
 */
private void loadSecretsFromDb() throws IOException;

/**
 * Parse a database key to extract the application ID.
 * Used internally for database key management.
 * 
 * @param s Database key string with APP_CREDS_KEY_PREFIX
 * @return Application ID extracted from the key
 * @throws IOException If key parsing fails
 */
private static String parseDbAppKey(String s) throws IOException;

/**
 * Generate a database key for storing application credentials.
 * Used internally for database key management.
 * 
 * @param appExecId Application identity object
 * @return Byte array representing the database key
 * @throws IOException If key generation fails
 */
private static byte[] dbAppKey(AppId appExecId) throws IOException;

/**
 * Check if authentication is enabled for the shuffle service.
 * Authentication is enabled when a secret manager is configured.
 * 
 * @return true if authentication is enabled, false otherwise
 */
private boolean isAuthenticationEnabled();

Application Identity Management

Helper class for managing application identities in the shuffle service.

public static class AppId {
    /** The application ID string */
    public final String appId;
    
    /**
     * Create a new AppId with JSON deserialization support.
     * 
     * @param appId The application ID string
     */
    @JsonCreator
    public AppId(@JsonProperty("appId") String appId);
    
    /**
     * Check equality based on application ID.
     * 
     * @param o Object to compare with
     * @return true if the objects represent the same application ID
     */
    @Override
    public boolean equals(Object o);
    
    /**
     * Generate hash code based on application ID.
     * 
     * @return Hash code for this AppId
     */
    @Override
    public int hashCode();
    
    /**
     * String representation of the AppId.
     * 
     * @return Formatted string showing the application ID
     */
    @Override
    public String toString();
}

Hadoop Configuration Integration

Configuration provider that adapts Hadoop Configuration for use with Spark's network layer.

public class HadoopConfigProvider extends ConfigProvider {
    /**
     * Create a new configuration provider wrapping the given Hadoop configuration.
     * 
     * @param conf Hadoop Configuration instance to wrap
     */
    public HadoopConfigProvider(Configuration conf);
    
    /**
     * Get a configuration value by name.
     * 
     * @param name Configuration property name
     * @return Configuration value as String
     * @throws NoSuchElementException If the property is not found
     */
    @Override
    public String get(String name);
    
    /**
     * Get a configuration value by name with a default fallback.
     * 
     * @param name Configuration property name  
     * @param defaultValue Default value if property not found
     * @return Configuration value or default if not found
     */
    @Override
    public String get(String name, String defaultValue);
    
    /**
     * Get all configuration entries as an iterable.
     * 
     * @return Iterable of all configuration key-value pairs
     */
    @Override
    public Iterable<Map.Entry<String, String>> getAll();
}

Types

Required Hadoop/YARN Types

The service depends on several Hadoop and YARN types that applications must have available:

// From Hadoop YARN
import org.apache.hadoop.yarn.server.api.AuxiliaryService;
import org.apache.hadoop.yarn.server.api.ApplicationInitializationContext;
import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.api.ContainerInitializationContext;
import org.apache.hadoop.yarn.server.api.ContainerTerminationContext;
import org.apache.hadoop.yarn.api.records.ContainerId;

// From Hadoop Core
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

// From Java Standard Library
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;

// From Jackson JSON Processing
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;

// From Google Guava
import com.google.common.base.Objects;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;

Spark Network Types

Integration with Spark's network layer components:

// These types are used internally but applications typically don't interact with them directly
import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler;
import org.apache.spark.network.sasl.ShuffleSecretManager;
import org.apache.spark.network.server.TransportServer;
import org.apache.spark.network.server.TransportServerBootstrap;
import org.apache.spark.network.crypto.AuthServerBootstrap;
import org.apache.spark.network.TransportContext;
import org.apache.spark.network.util.TransportConf;
import org.apache.spark.network.util.ConfigProvider;
import org.apache.spark.network.util.LevelDBProvider;

// From LevelDB
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;

// From SLF4J Logging
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Configuration

Required Configuration Properties

  • spark.shuffle.service.port (default: 7337): Port for the shuffle service to listen on
  • spark.authenticate (default: false): Enable authentication for shuffle requests
  • yarn.nodemanager.local-dirs: Local directories for NodeManager (used for recovery file placement)

Optional Configuration Properties

  • spark.yarn.shuffle.stopOnFailure (default: false): Whether service initialization failure should stop the NodeManager

Configuration Constants

The service uses the following internal configuration constants:

// Configuration key constants (from source code)
private static final String SPARK_SHUFFLE_SERVICE_PORT_KEY = "spark.shuffle.service.port";
private static final int DEFAULT_SPARK_SHUFFLE_SERVICE_PORT = 7337;
private static final String SPARK_AUTHENTICATE_KEY = "spark.authenticate";  
private static final boolean DEFAULT_SPARK_AUTHENTICATE = false;
private static final String STOP_ON_FAILURE_KEY = "spark.yarn.shuffle.stopOnFailure";
private static final boolean DEFAULT_STOP_ON_FAILURE = false;

// Recovery file names
private static final String RECOVERY_FILE_NAME = "registeredExecutors.ldb";
private static final String SECRETS_RECOVERY_FILE_NAME = "sparkShuffleRecovery.ldb";

YARN Integration Configuration

The service is typically configured in YARN's yarn-site.xml:

<configuration>
  <property>
    <name>yarn.nodemanager.aux-services</name>
    <value>spark_shuffle</value>
  </property>
  <property>
    <name>yarn.nodemanager.aux-services.spark_shuffle.class</name>
    <value>org.apache.spark.network.yarn.YarnShuffleService</value>
  </property>
  <property>
    <name>spark.shuffle.service.port</name>
    <value>7337</value>
  </property>
</configuration>

Error Handling

The service handles several types of errors gracefully:

  • Service Initialization Failures: Logged and optionally stop NodeManager based on configuration
  • Database Errors: Logged but don't prevent service operation
  • Application/Container Lifecycle Errors: Logged and handled to prevent service disruption
  • Configuration Errors: NoSuchElementException thrown for missing required properties
  • Recovery Failures: File system errors during recovery are logged and handled

Thread Safety

The YarnShuffleService is designed to handle concurrent operations safely:

  • Service lifecycle methods are synchronized by YARN NodeManager
  • Database operations use LevelDB's thread-safe implementation
  • Network transport operations are handled by Netty's thread-safe components
  • Multiple applications and containers can be managed concurrently