Work with distributed computing frameworks in Domino including Apache Spark, Ray, and Dask clusters. Covers cluster configuration, on-demand clusters, choosing between frameworks, PySpark usage, and scaling workloads. Use when processing large datasets, parallel ML training, or running distributed compute jobs.
80
—
Does it follow best practices?
Impact
—
No eval scenarios have been run
Risky
Do not use without reviewing
This skill helps users work with distributed computing frameworks in Domino - Spark, Ray, and Dask clusters for scaling compute-intensive workloads.
Activate this skill when users want to:
| Framework | Best For |
|---|---|
| Apache Spark | Large-scale data processing, SQL, ETL |
| Ray | Distributed ML, hyperparameter tuning, RL |
| Dask | Parallel pandas, NumPy at scale |
| MPI | Scientific computing, HPC workloads |
from domino import Domino
domino = Domino("project-owner/project-name")
# Start workspace with Spark cluster
workspace = domino.workspace_start(
hardware_tier_name="medium",
cluster_config={
"clusterType": "Spark",
"workerCount": 4,
"workerHardwareTier": "medium",
"masterHardwareTier": "medium"
}
)from pyspark.sql import SparkSession
# Domino auto-configures Spark
spark = SparkSession.builder.getOrCreate()
# Check configuration
print(f"Spark version: {spark.version}")
print(f"Executors: {spark.sparkContext.defaultParallelism}")# Read CSV
df = spark.read.csv("/mnt/data/dataset/data.csv", header=True, inferSchema=True)
# Read Parquet
df = spark.read.parquet("/mnt/data/dataset/")
# Read from database
df = spark.read.jdbc(
url="jdbc:postgresql://host:5432/db",
table="schema.table",
properties={"user": "user", "password": "pass"}
)from pyspark.sql import functions as F
# Transformations
result = df.filter(F.col("value") > 100) \
.groupBy("category") \
.agg(F.mean("value").alias("avg_value")) \
.orderBy("avg_value", ascending=False)
# Show results
result.show()from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml import Pipeline
# Prepare features
assembler = VectorAssembler(
inputCols=["feature1", "feature2", "feature3"],
outputCol="features"
)
# Create model
rf = RandomForestClassifier(
featuresCol="features",
labelCol="label",
numTrees=100
)
# Build pipeline
pipeline = Pipeline(stages=[assembler, rf])
model = pipeline.fit(train_df)
predictions = model.transform(test_df)# Write Parquet (recommended)
result.write.parquet("/mnt/artifacts/output/", mode="overwrite")
# Write CSV
result.write.csv("/mnt/artifacts/output.csv", header=True)import ray
# Domino auto-initializes Ray
# Or manually connect
ray.init(address="auto")
print(f"Cluster resources: {ray.cluster_resources()}")import ray
@ray.remote
def process_item(item):
# Your processing logic
return item * 2
# Run in parallel
items = [1, 2, 3, 4, 5]
futures = [process_item.remote(item) for item in items]
results = ray.get(futures)
print(results) # [2, 4, 6, 8, 10]from ray import train
from ray.train import ScalingConfig
from ray.train.torch import TorchTrainer
def train_func():
# Training logic
model = create_model()
for epoch in range(10):
train_epoch(model)
train.report({"loss": loss})
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(num_workers=4, use_gpu=True)
)
result = trainer.fit()from ray import tune
from ray.tune import CLIReporter
def objective(config):
# Training with hyperparameters
model = train_model(
learning_rate=config["lr"],
batch_size=config["batch_size"]
)
return {"accuracy": accuracy}
analysis = tune.run(
objective,
config={
"lr": tune.loguniform(1e-4, 1e-1),
"batch_size": tune.choice([32, 64, 128])
},
num_samples=20,
progress_reporter=CLIReporter()
)
print(f"Best config: {analysis.best_config}")from dask.distributed import Client
# Domino auto-configures Dask
client = Client()
print(f"Dashboard: {client.dashboard_link}")
print(f"Workers: {len(client.scheduler_info()['workers'])}")import dask.dataframe as dd
# Read large CSV files
df = dd.read_csv("/mnt/data/dataset/*.csv")
# Parallel operations (lazy)
result = df.groupby("category")["value"].mean()
# Execute
computed_result = result.compute()import dask.array as da
# Create large array
x = da.random.random((100000, 100000), chunks=(1000, 1000))
# Operations (lazy)
result = x.mean()
# Compute
value = result.compute()from dask_ml.model_selection import GridSearchCV
from sklearn.ensemble import RandomForestClassifier
# Distributed hyperparameter search
param_grid = {
"n_estimators": [100, 200, 300],
"max_depth": [10, 20, 30]
}
grid_search = GridSearchCV(
RandomForestClassifier(),
param_grid,
cv=3
)
grid_search.fit(X_train, y_train)
print(f"Best params: {grid_search.best_params_}")# Use GPU-accelerated Spark
spark = SparkSession.builder \
.config("spark.rapids.sql.enabled", "true") \
.getOrCreate()
# Operations automatically use GPU
df = spark.read.parquet("/mnt/data/large_dataset/")
result = df.groupBy("category").agg({"value": "mean"})@ray.remote(num_gpus=1)
def train_on_gpu():
import torch
device = torch.device("cuda")
# GPU training logic
return model
# Run on GPU workers
futures = [train_on_gpu.remote() for _ in range(4)]Configure clusters to scale based on workload:
cluster_config = {
"clusterType": "Spark",
"workerCount": 2,
"maxWorkerCount": 10, # Scale up to 10
"autoScaling": True
}View cluster status in Domino UI or via dashboard URLs.
# Keep data close to compute
# Use Domino Datasets or cloud storage in same region
df = spark.read.parquet("/mnt/data/dataset/")# Cache frequently used DataFrames
df.cache()
df.persist()47c6e0a
If you maintain this skill, you can claim it as your own. Once claimed, you can manage eval scenarios, bundle related skills, attach documentation or rules, and ensure cross-agent compatibility.