CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/pypi-pytorch-lightning

Unified deep learning framework integrating PyTorch Lightning, Lightning Fabric, and Lightning Apps for training, deploying, and shipping AI products.

Pending
Overview
Eval results
Files

apps.mddocs/

Application Development

Lightning Apps framework for building end-to-end ML systems with components for workflow orchestration, computational work distribution, and cloud deployment. Enables creating everything from research demos to production ML systems.

Capabilities

LightningApp

Core class for defining Lightning applications that orchestrate workflows through a root flow component and handle deployment configurations.

class LightningApp:
    def __init__(
        self,
        root: LightningFlow,
        info: Optional[dict] = None,
        flow_cloud_compute: Optional[CloudCompute] = None,
        **kwargs
    ):
        """
        Initialize a Lightning App.
        
        Parameters:
        - root: Root LightningFlow component that orchestrates the app
        - info: Optional metadata about the app
        - flow_cloud_compute: Cloud compute configuration for the root flow
        - kwargs: Additional configuration options
        """
    
    def run(self, host: str = "127.0.0.1", port: int = 7501, **kwargs):
        """
        Run the Lightning App locally.
        
        Parameters:
        - host: Host address to bind the app server
        - port: Port number to bind the app server
        - kwargs: Additional runtime configuration
        """

LightningFlow

Orchestrates application logic and coordinates work components. Flows define the overall workflow structure and handle state management between components.

class LightningFlow:
    def __init__(self, cloud_compute: Optional[CloudCompute] = None, **kwargs):
        """
        Base class for orchestrating application workflows.
        
        Parameters:
        - cloud_compute: Cloud compute configuration for this flow
        - kwargs: Additional initialization parameters
        """
    
    def run(self):
        """
        Define the main execution logic of the flow.
        This method contains the workflow orchestration code.
        """
    
    def configure_layout(self):
        """
        Configure the UI layout for the flow.
        
        Returns:
        Layout configuration for the flow's user interface
        """
    
    def configure_commands(self):
        """
        Configure CLI commands for the flow.
        
        Returns:
        List of command configurations
        """
    
    @property
    def cloud_compute(self) -> Optional[CloudCompute]:
        """Cloud compute configuration for this flow."""
    
    def stop(self):
        """Stop the flow execution."""

LightningWork

Encapsulates computational work in Lightning apps. Work components handle specific tasks like data processing, model training, or inference serving.

class LightningWork:
    def __init__(
        self,
        cloud_compute: Optional[CloudCompute] = None,
        cloud_build_config: Optional[BuildConfig] = None,
        parallel: bool = False,
        cache_calls: bool = True,
        raise_exception: bool = True,
        **kwargs
    ):
        """
        Base class for encapsulating computational work.
        
        Parameters:
        - cloud_compute: Cloud compute configuration for this work
        - cloud_build_config: Build configuration for cloud deployment
        - parallel: Enable parallel execution
        - cache_calls: Cache work execution results
        - raise_exception: Raise exceptions on work failure
        - kwargs: Additional initialization parameters
        """
    
    def run(self, *args, **kwargs):
        """
        Define the main execution logic of the work.
        
        Parameters:
        - args, kwargs: Arguments passed to the work execution
        """
    
    def stop(self):
        """Stop the work execution."""
    
    @property
    def cloud_compute(self) -> Optional[CloudCompute]:
        """Cloud compute configuration for this work."""
    
    @property
    def cloud_build_config(self) -> Optional[BuildConfig]:
        """Build configuration for this work."""
    
    @property
    def status(self) -> str:
        """Current execution status of the work."""
    
    @property
    def has_succeeded(self) -> bool:
        """Whether the work has completed successfully."""
    
    @property
    def has_failed(self) -> bool:
        """Whether the work has failed."""
    
    @property
    def has_stopped(self) -> bool:
        """Whether the work has been stopped."""

Cloud Deployment

CloudCompute

Configuration for cloud compute resources including instance types, storage, and networking settings.

class CloudCompute:
    def __init__(
        self,
        name: str = "default",
        disk_size: int = 0,
        idle_timeout: Optional[int] = None,
        shm_size: int = 0,
        mounts: Optional[List] = None,
        **kwargs
    ):
        """
        Configuration for cloud compute resources.
        
        Parameters:
        - name: Name identifier for the compute resource
        - disk_size: Disk size in GB (0 for default)
        - idle_timeout: Idle timeout in seconds before shutdown
        - shm_size: Shared memory size in GB
        - mounts: List of mount configurations
        - kwargs: Additional cloud provider specific options
        """

BuildConfig

Configuration for building Lightning apps including Docker images, requirements, and build steps.

class BuildConfig:
    def __init__(
        self,
        image: Optional[str] = None,
        requirements: Optional[Union[List[str], str]] = None,
        dockerfile: Optional[str] = None,
        build_commands: Optional[List[str]] = None,
        **kwargs
    ):
        """
        Configuration for building Lightning apps.
        
        Parameters:
        - image: Base Docker image to use
        - requirements: Python requirements (list or requirements.txt path)
        - dockerfile: Path to custom Dockerfile
        - build_commands: Additional build commands to execute
        - kwargs: Additional build configuration options
        """

Usage Examples

Simple Lightning App

import lightning as L

class SimpleFlow(L.LightningFlow):
    def run(self):
        print("Hello from Lightning Flow!")
        
    def configure_layout(self):
        return {"name": "Simple App", "content": "This is a simple Lightning App"}

# Create and run the app
app = L.LightningApp(SimpleFlow())
app.run()

Flow with Work Components

import lightning as L
import time

class DataProcessor(L.LightningWork):
    def __init__(self):
        super().__init__()
        self.processed_data = None
        
    def run(self, data):
        print(f"Processing data: {data}")
        time.sleep(2)  # Simulate processing
        self.processed_data = f"processed_{data}"
        print(f"Data processing complete: {self.processed_data}")

class ModelTrainer(L.LightningWork):
    def __init__(self):
        super().__init__()
        self.model_trained = False
        
    def run(self, processed_data):
        print(f"Training model with: {processed_data}")
        time.sleep(3)  # Simulate training
        self.model_trained = True
        print("Model training complete!")

class MLPipeline(L.LightningFlow):
    def __init__(self):
        super().__init__()
        self.processor = DataProcessor()
        self.trainer = ModelTrainer() 
        
    def run(self):
        # Step 1: Process data
        self.processor.run("raw_data.csv")
        
        # Wait for processing to complete
        if self.processor.processed_data:
            # Step 2: Train model
            self.trainer.run(self.processor.processed_data)
            
        # Check if pipeline is complete
        if self.trainer.model_trained:
            print("ML Pipeline completed successfully!")

# Create and run the ML pipeline
app = L.LightningApp(MLPipeline())
app.run()

Cloud Deployment Configuration

import lightning as L

class CloudDataProcessor(L.LightningWork):
    def __init__(self):
        # Configure cloud compute with GPU and storage
        cloud_compute = L.CloudCompute(
            name="gpu-instance",
            disk_size=100,  # 100GB disk
            idle_timeout=300  # 5 minute timeout
        )
        
        # Configure build with custom requirements
        build_config = L.BuildConfig(
            requirements=["torch", "pandas", "scikit-learn"],
            build_commands=["pip install --upgrade pip"]
        )
        
        super().__init__(
            cloud_compute=cloud_compute,
            cloud_build_config=build_config
        )
        
    def run(self, dataset_path):
        import torch
        import pandas as pd
        
        # Load and process data on cloud GPU
        print(f"Processing {dataset_path} on {torch.cuda.get_device_name()}")
        data = pd.read_csv(dataset_path)
        
        # Simulate GPU processing
        processed = torch.randn(len(data), 10).cuda()
        result = processed.mean(dim=0).cpu()
        
        print(f"Processing complete. Result shape: {result.shape}")
        return result.tolist()

class CloudMLFlow(L.LightningFlow):
    def __init__(self):
        super().__init__()
        self.processor = CloudDataProcessor()
        
    def run(self):
        result = self.processor.run("s3://my-bucket/dataset.csv")
        print(f"Final result: {result}")

# Deploy to cloud
app = L.LightningApp(CloudMLFlow())

Parallel Work Execution

import lightning as L

class ParallelProcessor(L.LightningWork):
    def __init__(self, work_id):
        super().__init__(parallel=True)  # Enable parallel execution
        self.work_id = work_id
        self.result = None
        
    def run(self, data_chunk):
        print(f"Worker {self.work_id} processing chunk: {data_chunk}")
        # Simulate processing
        import time
        time.sleep(1)
        self.result = f"processed_chunk_{data_chunk}_by_worker_{self.work_id}"

class ParallelFlow(L.LightningFlow):
    def __init__(self):
        super().__init__()
        # Create multiple parallel workers
        self.workers = [ParallelProcessor(i) for i in range(4)]
        
    def run(self):
        # Distribute work across parallel workers
        data_chunks = ["chunk_1", "chunk_2", "chunk_3", "chunk_4"]
        
        for worker, chunk in zip(self.workers, data_chunks):
            worker.run(chunk)
            
        # Wait for all workers to complete
        while not all(worker.has_succeeded for worker in self.workers):
            time.sleep(0.1)
            
        # Collect results
        results = [worker.result for worker in self.workers]
        print(f"All parallel work completed: {results}")

app = L.LightningApp(ParallelFlow())
app.run()

Web Interface Integration

import lightning as L
from lightning.app.frontend import StreamlitFrontend

class DataVisualizerWork(L.LightningWork):
    def __init__(self):
        super().__init__()
        self.data = []
        
    def run(self):
        import streamlit as st
        import pandas as pd
        import numpy as np
        
        st.title("Lightning App Data Visualizer")
        
        # Generate sample data
        if st.button("Generate Data"):
            self.data = np.random.randn(100, 3)
            
        if len(self.data) > 0:
            df = pd.DataFrame(self.data, columns=['A', 'B', 'C'])
            st.line_chart(df)

class VisualizationFlow(L.LightningFlow):
    def __init__(self):
        super().__init__()
        self.visualizer = DataVisualizerWork()
        
    def run(self):
        self.visualizer.run()
        
    def configure_layout(self):
        return StreamlitFrontend(render_fn=self.visualizer.run)

app = L.LightningApp(VisualizationFlow())
app.run()

Install with Tessl CLI

npx tessl i tessl/pypi-pytorch-lightning

docs

apps.md

fabric.md

index.md

training.md

utilities.md

tile.json