Unified deep learning framework integrating PyTorch Lightning, Lightning Fabric, and Lightning Apps for training, deploying, and shipping AI products.
—
Lightning Apps framework for building end-to-end ML systems with components for workflow orchestration, computational work distribution, and cloud deployment. Enables creating everything from research demos to production ML systems.
Core class for defining Lightning applications that orchestrate workflows through a root flow component and handle deployment configurations.
class LightningApp:
def __init__(
self,
root: LightningFlow,
info: Optional[dict] = None,
flow_cloud_compute: Optional[CloudCompute] = None,
**kwargs
):
"""
Initialize a Lightning App.
Parameters:
- root: Root LightningFlow component that orchestrates the app
- info: Optional metadata about the app
- flow_cloud_compute: Cloud compute configuration for the root flow
- kwargs: Additional configuration options
"""
def run(self, host: str = "127.0.0.1", port: int = 7501, **kwargs):
"""
Run the Lightning App locally.
Parameters:
- host: Host address to bind the app server
- port: Port number to bind the app server
- kwargs: Additional runtime configuration
"""Orchestrates application logic and coordinates work components. Flows define the overall workflow structure and handle state management between components.
class LightningFlow:
def __init__(self, cloud_compute: Optional[CloudCompute] = None, **kwargs):
"""
Base class for orchestrating application workflows.
Parameters:
- cloud_compute: Cloud compute configuration for this flow
- kwargs: Additional initialization parameters
"""
def run(self):
"""
Define the main execution logic of the flow.
This method contains the workflow orchestration code.
"""
def configure_layout(self):
"""
Configure the UI layout for the flow.
Returns:
Layout configuration for the flow's user interface
"""
def configure_commands(self):
"""
Configure CLI commands for the flow.
Returns:
List of command configurations
"""
@property
def cloud_compute(self) -> Optional[CloudCompute]:
"""Cloud compute configuration for this flow."""
def stop(self):
"""Stop the flow execution."""Encapsulates computational work in Lightning apps. Work components handle specific tasks like data processing, model training, or inference serving.
class LightningWork:
def __init__(
self,
cloud_compute: Optional[CloudCompute] = None,
cloud_build_config: Optional[BuildConfig] = None,
parallel: bool = False,
cache_calls: bool = True,
raise_exception: bool = True,
**kwargs
):
"""
Base class for encapsulating computational work.
Parameters:
- cloud_compute: Cloud compute configuration for this work
- cloud_build_config: Build configuration for cloud deployment
- parallel: Enable parallel execution
- cache_calls: Cache work execution results
- raise_exception: Raise exceptions on work failure
- kwargs: Additional initialization parameters
"""
def run(self, *args, **kwargs):
"""
Define the main execution logic of the work.
Parameters:
- args, kwargs: Arguments passed to the work execution
"""
def stop(self):
"""Stop the work execution."""
@property
def cloud_compute(self) -> Optional[CloudCompute]:
"""Cloud compute configuration for this work."""
@property
def cloud_build_config(self) -> Optional[BuildConfig]:
"""Build configuration for this work."""
@property
def status(self) -> str:
"""Current execution status of the work."""
@property
def has_succeeded(self) -> bool:
"""Whether the work has completed successfully."""
@property
def has_failed(self) -> bool:
"""Whether the work has failed."""
@property
def has_stopped(self) -> bool:
"""Whether the work has been stopped."""Configuration for cloud compute resources including instance types, storage, and networking settings.
class CloudCompute:
def __init__(
self,
name: str = "default",
disk_size: int = 0,
idle_timeout: Optional[int] = None,
shm_size: int = 0,
mounts: Optional[List] = None,
**kwargs
):
"""
Configuration for cloud compute resources.
Parameters:
- name: Name identifier for the compute resource
- disk_size: Disk size in GB (0 for default)
- idle_timeout: Idle timeout in seconds before shutdown
- shm_size: Shared memory size in GB
- mounts: List of mount configurations
- kwargs: Additional cloud provider specific options
"""Configuration for building Lightning apps including Docker images, requirements, and build steps.
class BuildConfig:
def __init__(
self,
image: Optional[str] = None,
requirements: Optional[Union[List[str], str]] = None,
dockerfile: Optional[str] = None,
build_commands: Optional[List[str]] = None,
**kwargs
):
"""
Configuration for building Lightning apps.
Parameters:
- image: Base Docker image to use
- requirements: Python requirements (list or requirements.txt path)
- dockerfile: Path to custom Dockerfile
- build_commands: Additional build commands to execute
- kwargs: Additional build configuration options
"""import lightning as L
class SimpleFlow(L.LightningFlow):
def run(self):
print("Hello from Lightning Flow!")
def configure_layout(self):
return {"name": "Simple App", "content": "This is a simple Lightning App"}
# Create and run the app
app = L.LightningApp(SimpleFlow())
app.run()import lightning as L
import time
class DataProcessor(L.LightningWork):
def __init__(self):
super().__init__()
self.processed_data = None
def run(self, data):
print(f"Processing data: {data}")
time.sleep(2) # Simulate processing
self.processed_data = f"processed_{data}"
print(f"Data processing complete: {self.processed_data}")
class ModelTrainer(L.LightningWork):
def __init__(self):
super().__init__()
self.model_trained = False
def run(self, processed_data):
print(f"Training model with: {processed_data}")
time.sleep(3) # Simulate training
self.model_trained = True
print("Model training complete!")
class MLPipeline(L.LightningFlow):
def __init__(self):
super().__init__()
self.processor = DataProcessor()
self.trainer = ModelTrainer()
def run(self):
# Step 1: Process data
self.processor.run("raw_data.csv")
# Wait for processing to complete
if self.processor.processed_data:
# Step 2: Train model
self.trainer.run(self.processor.processed_data)
# Check if pipeline is complete
if self.trainer.model_trained:
print("ML Pipeline completed successfully!")
# Create and run the ML pipeline
app = L.LightningApp(MLPipeline())
app.run()import lightning as L
class CloudDataProcessor(L.LightningWork):
def __init__(self):
# Configure cloud compute with GPU and storage
cloud_compute = L.CloudCompute(
name="gpu-instance",
disk_size=100, # 100GB disk
idle_timeout=300 # 5 minute timeout
)
# Configure build with custom requirements
build_config = L.BuildConfig(
requirements=["torch", "pandas", "scikit-learn"],
build_commands=["pip install --upgrade pip"]
)
super().__init__(
cloud_compute=cloud_compute,
cloud_build_config=build_config
)
def run(self, dataset_path):
import torch
import pandas as pd
# Load and process data on cloud GPU
print(f"Processing {dataset_path} on {torch.cuda.get_device_name()}")
data = pd.read_csv(dataset_path)
# Simulate GPU processing
processed = torch.randn(len(data), 10).cuda()
result = processed.mean(dim=0).cpu()
print(f"Processing complete. Result shape: {result.shape}")
return result.tolist()
class CloudMLFlow(L.LightningFlow):
def __init__(self):
super().__init__()
self.processor = CloudDataProcessor()
def run(self):
result = self.processor.run("s3://my-bucket/dataset.csv")
print(f"Final result: {result}")
# Deploy to cloud
app = L.LightningApp(CloudMLFlow())import lightning as L
class ParallelProcessor(L.LightningWork):
def __init__(self, work_id):
super().__init__(parallel=True) # Enable parallel execution
self.work_id = work_id
self.result = None
def run(self, data_chunk):
print(f"Worker {self.work_id} processing chunk: {data_chunk}")
# Simulate processing
import time
time.sleep(1)
self.result = f"processed_chunk_{data_chunk}_by_worker_{self.work_id}"
class ParallelFlow(L.LightningFlow):
def __init__(self):
super().__init__()
# Create multiple parallel workers
self.workers = [ParallelProcessor(i) for i in range(4)]
def run(self):
# Distribute work across parallel workers
data_chunks = ["chunk_1", "chunk_2", "chunk_3", "chunk_4"]
for worker, chunk in zip(self.workers, data_chunks):
worker.run(chunk)
# Wait for all workers to complete
while not all(worker.has_succeeded for worker in self.workers):
time.sleep(0.1)
# Collect results
results = [worker.result for worker in self.workers]
print(f"All parallel work completed: {results}")
app = L.LightningApp(ParallelFlow())
app.run()import lightning as L
from lightning.app.frontend import StreamlitFrontend
class DataVisualizerWork(L.LightningWork):
def __init__(self):
super().__init__()
self.data = []
def run(self):
import streamlit as st
import pandas as pd
import numpy as np
st.title("Lightning App Data Visualizer")
# Generate sample data
if st.button("Generate Data"):
self.data = np.random.randn(100, 3)
if len(self.data) > 0:
df = pd.DataFrame(self.data, columns=['A', 'B', 'C'])
st.line_chart(df)
class VisualizationFlow(L.LightningFlow):
def __init__(self):
super().__init__()
self.visualizer = DataVisualizerWork()
def run(self):
self.visualizer.run()
def configure_layout(self):
return StreamlitFrontend(render_fn=self.visualizer.run)
app = L.LightningApp(VisualizationFlow())
app.run()Install with Tessl CLI
npx tessl i tessl/pypi-pytorch-lightning