Python API for Apache Spark, providing distributed computing, data analysis, and machine learning capabilities
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Resource allocation and management for Spark applications including task resources, executor resources, and resource profiles for optimized cluster utilization and performance tuning.
Define resource requirements for different stages of Spark applications.
class ResourceProfile:
"""Resource profile defining executor and task resource requirements."""
def __init__(self, executorResources=None, taskResources=None):
"""
Create ResourceProfile.
Parameters:
- executorResources (dict): Executor resource requirements
- taskResources (dict): Task resource requirements
"""
@property
def id(self):
"""Unique ID of this resource profile."""
@property
def executorResources(self):
"""Executor resource requirements."""
@property
def taskResources(self):
"""Task resource requirements."""
class ResourceProfileBuilder:
"""Builder for ResourceProfile."""
def __init__(self):
"""Initialize ResourceProfileBuilder."""
def require(self, resourceRequests):
"""
Specify resource requirements.
Parameters:
- resourceRequests: Resource requirements
Returns:
ResourceProfileBuilder
"""
def build(self):
"""
Build ResourceProfile.
Returns:
ResourceProfile
"""Define resource requirements for Spark executors.
class ExecutorResourceRequests:
"""Executor resource requirements."""
def __init__(self):
"""Initialize ExecutorResourceRequests."""
def cores(self, amount):
"""
Specify number of CPU cores.
Parameters:
- amount (int): Number of cores
Returns:
ExecutorResourceRequests
"""
def memory(self, amount):
"""
Specify memory amount.
Parameters:
- amount (str): Memory amount (e.g., '2g', '1024m')
Returns:
ExecutorResourceRequests
"""
def memoryFraction(self, fraction):
"""
Specify memory fraction.
Parameters:
- fraction (float): Memory fraction (0.0 to 1.0)
Returns:
ExecutorResourceRequests
"""
def offHeapMemory(self, amount):
"""
Specify off-heap memory amount.
Parameters:
- amount (str): Off-heap memory amount
Returns:
ExecutorResourceRequests
"""
def memoryOverhead(self, amount):
"""
Specify memory overhead amount.
Parameters:
- amount (str): Memory overhead amount
Returns:
ExecutorResourceRequests
"""
def pysparkMemory(self, amount):
"""
Specify PySpark memory amount.
Parameters:
- amount (str): PySpark memory amount
Returns:
ExecutorResourceRequests
"""
def resource(self, resourceName, amount, discoveryScript=None, vendor=None):
"""
Specify custom resource requirement.
Parameters:
- resourceName (str): Name of the resource
- amount (int): Amount of resource
- discoveryScript (str): Script to discover resource
- vendor (str): Vendor of the resource
Returns:
ExecutorResourceRequests
"""
def build(self):
"""
Build executor resource requirements.
Returns:
dict: Executor resource requirements
"""
class ExecutorResourceRequest:
"""Single executor resource requirement."""
def __init__(self, resourceName, amount, discoveryScript=None, vendor=None):
"""
Create ExecutorResourceRequest.
Parameters:
- resourceName (str): Name of the resource
- amount (int): Amount of resource
- discoveryScript (str): Script to discover resource
- vendor (str): Vendor of the resource
"""
@property
def resourceName(self):
"""Name of the resource."""
@property
def amount(self):
"""Amount of resource."""
@property
def discoveryScript(self):
"""Discovery script for the resource."""
@property
def vendor(self):
"""Vendor of the resource."""Define resource requirements for individual tasks.
class TaskResourceRequests:
"""Task resource requirements."""
def __init__(self):
"""Initialize TaskResourceRequests."""
def cpus(self, amount):
"""
Specify number of CPU cores for task.
Parameters:
- amount (float): Number of CPU cores
Returns:
TaskResourceRequests
"""
def resource(self, resourceName, amount):
"""
Specify custom resource requirement for task.
Parameters:
- resourceName (str): Name of the resource
- amount (float): Amount of resource
Returns:
TaskResourceRequests
"""
def build(self):
"""
Build task resource requirements.
Returns:
dict: Task resource requirements
"""
class TaskResourceRequest:
"""Single task resource requirement."""
def __init__(self, resourceName, amount):
"""
Create TaskResourceRequest.
Parameters:
- resourceName (str): Name of the resource
- amount (float): Amount of resource
"""
@property
def resourceName(self):
"""Name of the resource."""
@property
def amount(self):
"""Amount of resource."""Information about available resources.
class ResourceInformation:
"""Information about a resource."""
def __init__(self, name, addresses):
"""
Create ResourceInformation.
Parameters:
- name (str): Resource name
- addresses (list): Resource addresses
"""
@property
def name(self):
"""Name of the resource."""
@property
def addresses(self):
"""Addresses of the resource."""# Create executor resource requirements
executor_req = ExecutorResourceRequests() \
.cores(4) \
.memory("8g") \
.resource("gpu", 1, "/path/to/gpu_discovery.sh")
# Create task resource requirements
task_req = TaskResourceRequests() \
.cpus(1.0) \
.resource("gpu", 0.5)
# Build resource profile
profile = ResourceProfile(
executorResources=executor_req.build(),
taskResources=task_req.build()
)
# Use with RDD
rdd = sc.parallelize(data)
rdd_with_profile = rdd.withResources(profile)
result = rdd_with_profile.map(my_gpu_function).collect()class ResourceProfile:
"""Resource profile for Spark applications."""
pass
class ExecutorResourceRequest:
"""Executor resource requirement."""
pass
class TaskResourceRequest:
"""Task resource requirement."""
pass
class ResourceInformation:
"""Information about available resources."""
passInstall with Tessl CLI
npx tessl i tessl/pypi-pyspark