Unified deep learning framework integrating PyTorch Lightning, Lightning Fabric, and Lightning Apps for training, deploying, and shipping AI products.
—
Storage abstractions, utility modules, and debugging tools for Lightning Apps. Provides file system operations, cloud storage integrations, and enhanced debugging capabilities.
File system abstractions and cloud storage integrations for Lightning Apps, enabling seamless data management across local and cloud environments.
class Drive:
"""Cloud storage drive abstraction for managing cloud storage resources."""
def __init__(
self,
id: str,
allow_duplicates: bool = False,
component_name: Optional[str] = None,
root_folder: Optional[str] = None,
):
"""
Initialize a Drive for shared file storage.
Parameters:
- id: Unique identifier for this Drive
- allow_duplicates: Whether to enable files duplication between components
- component_name: Component name which owns this drive (auto-inferred if None)
- root_folder: Mount directory for the drive
"""
def put(self, source: str, destination: str = "") -> str:
"""Put a file or directory into the drive."""
def get(self, source: str, destination: str = "", overwrite: bool = True):
"""Get a file or directory from the drive."""
def list(self, source: str = "") -> List[str]:
"""List files and directories in the drive."""
def delete(self, source: str):
"""Delete a file or directory from the drive."""
class FileSystem:
"""File system operations interface for local and remote file systems."""
def __init__(self):
"""Initialize filesystem operations."""
def get(
self,
source: str,
destination: str,
overwrite: bool = True,
on_progress: Optional[Callable] = None
):
"""Copy files from source to destination."""
def put(
self,
source: str,
destination: str,
overwrite: bool = True,
on_progress: Optional[Callable] = None
):
"""Copy files from source to destination."""
class Mount:
"""Mount point configuration for storage resources in Lightning Apps."""
def __init__(
self,
source: str,
mount_path: str,
read_only: bool = False
):
"""
Initialize a mount configuration.
Parameters:
- source: Source path to mount
- mount_path: Path where source will be mounted
- read_only: Whether mount is read-only
"""
class StorageOrchestrator:
"""Orchestrates storage operations across multiple storage backends."""
def __init__(
self,
app: "LightningApp",
request_queues: Dict[str, BaseQueue],
response_queues: Dict[str, BaseQueue],
copy_request_queue: BaseQueue,
copy_response_queue: BaseQueue,
):
"""Initialize storage orchestrator for managing storage operations."""
class Path:
"""Enhanced path handling utilities optimized for Lightning Apps."""
def __init__(self, *args):
"""Initialize a Path object with enhanced Lightning Apps functionality."""
class Payload:
"""Data payload abstraction for efficient storage and transfer operations."""
def __init__(self, value: Any):
"""
Initialize a payload for data transfer.
Parameters:
- value: The data value to wrap as a payload
"""The storage module provides utilities for:
FileSystemDrivePathStorageOrchestratorMountPayloadEnhanced debugging tools specifically designed for Lightning Apps development and production debugging.
# Enhanced debugger for Lightning Apps
def set_trace():
"""
Set a breakpoint for debugging Lightning Apps.
Enhanced version of pdb.set_trace() that works correctly
within Lightning Apps execution context, including
forked processes and distributed environments.
"""import lightning as L
class DataManager(L.LightningWork):
def __init__(self):
super().__init__()
def run(self, data_path):
# Storage operations within Lightning Apps
# The exact API depends on the storage backend
print(f"Managing data at: {data_path}")
# Example storage operations would be implemented here
# based on the specific storage module functionalityimport lightning as L
from lightning import pdb
class DebuggableWork(L.LightningWork):
def __init__(self):
super().__init__()
def run(self, data):
print("Starting work execution...")
# Set breakpoint for debugging
pdb.set_trace() # Enhanced debugger for Lightning Apps
# Process data
result = self.process_data(data)
return result
def process_data(self, data):
# Complex processing logic that might need debugging
processed = []
for item in data:
# Another potential breakpoint location
if len(processed) > 10:
pdb.set_trace()
processed.append(item * 2)
return processed
class DebuggableFlow(L.LightningFlow):
def __init__(self):
super().__init__()
self.worker = DebuggableWork()
def run(self):
test_data = list(range(20))
result = self.worker.run(test_data)
print(f"Work completed with result: {result}")
# The enhanced debugger works correctly in Lightning Apps context
app = L.LightningApp(DebuggableFlow())
app.run()These utilities are automatically available when importing the main Lightning package:
import lightning as L
# Storage utilities available through L.storage
# Enhanced debugging available through L.pdb.set_trace()The storage and utility modules are designed to work seamlessly within the Lightning Apps ecosystem, providing consistent APIs whether running locally or in cloud environments.
Install with Tessl CLI
npx tessl i tessl/pypi-pytorch-lightning