0
# Application Development
1
2
Lightning Apps framework for building end-to-end ML systems with components for workflow orchestration, computational work distribution, and cloud deployment. Enables creating everything from research demos to production ML systems.
3
4
## Capabilities
5
6
### LightningApp
7
8
Core class for defining Lightning applications that orchestrate workflows through a root flow component and handle deployment configurations.
9
10
```python { .api }
11
class LightningApp:
12
def __init__(
13
self,
14
root: LightningFlow,
15
info: Optional[dict] = None,
16
flow_cloud_compute: Optional[CloudCompute] = None,
17
**kwargs
18
):
19
"""
20
Initialize a Lightning App.
21
22
Parameters:
23
- root: Root LightningFlow component that orchestrates the app
24
- info: Optional metadata about the app
25
- flow_cloud_compute: Cloud compute configuration for the root flow
26
- kwargs: Additional configuration options
27
"""
28
29
def run(self, host: str = "127.0.0.1", port: int = 7501, **kwargs):
30
"""
31
Run the Lightning App locally.
32
33
Parameters:
34
- host: Host address to bind the app server
35
- port: Port number to bind the app server
36
- kwargs: Additional runtime configuration
37
"""
38
```
39
40
### LightningFlow
41
42
Orchestrates application logic and coordinates work components. Flows define the overall workflow structure and handle state management between components.
43
44
```python { .api }
45
class LightningFlow:
46
def __init__(self, cloud_compute: Optional[CloudCompute] = None, **kwargs):
47
"""
48
Base class for orchestrating application workflows.
49
50
Parameters:
51
- cloud_compute: Cloud compute configuration for this flow
52
- kwargs: Additional initialization parameters
53
"""
54
55
def run(self):
56
"""
57
Define the main execution logic of the flow.
58
This method contains the workflow orchestration code.
59
"""
60
61
def configure_layout(self):
62
"""
63
Configure the UI layout for the flow.
64
65
Returns:
66
Layout configuration for the flow's user interface
67
"""
68
69
def configure_commands(self):
70
"""
71
Configure CLI commands for the flow.
72
73
Returns:
74
List of command configurations
75
"""
76
77
@property
78
def cloud_compute(self) -> Optional[CloudCompute]:
79
"""Cloud compute configuration for this flow."""
80
81
def stop(self):
82
"""Stop the flow execution."""
83
```
84
85
### LightningWork
86
87
Encapsulates computational work in Lightning apps. Work components handle specific tasks like data processing, model training, or inference serving.
88
89
```python { .api }
90
class LightningWork:
91
def __init__(
92
self,
93
cloud_compute: Optional[CloudCompute] = None,
94
cloud_build_config: Optional[BuildConfig] = None,
95
parallel: bool = False,
96
cache_calls: bool = True,
97
raise_exception: bool = True,
98
**kwargs
99
):
100
"""
101
Base class for encapsulating computational work.
102
103
Parameters:
104
- cloud_compute: Cloud compute configuration for this work
105
- cloud_build_config: Build configuration for cloud deployment
106
- parallel: Enable parallel execution
107
- cache_calls: Cache work execution results
108
- raise_exception: Raise exceptions on work failure
109
- kwargs: Additional initialization parameters
110
"""
111
112
def run(self, *args, **kwargs):
113
"""
114
Define the main execution logic of the work.
115
116
Parameters:
117
- args, kwargs: Arguments passed to the work execution
118
"""
119
120
def stop(self):
121
"""Stop the work execution."""
122
123
@property
124
def cloud_compute(self) -> Optional[CloudCompute]:
125
"""Cloud compute configuration for this work."""
126
127
@property
128
def cloud_build_config(self) -> Optional[BuildConfig]:
129
"""Build configuration for this work."""
130
131
@property
132
def status(self) -> str:
133
"""Current execution status of the work."""
134
135
@property
136
def has_succeeded(self) -> bool:
137
"""Whether the work has completed successfully."""
138
139
@property
140
def has_failed(self) -> bool:
141
"""Whether the work has failed."""
142
143
@property
144
def has_stopped(self) -> bool:
145
"""Whether the work has been stopped."""
146
```
147
148
### Cloud Deployment
149
150
#### CloudCompute
151
152
Configuration for cloud compute resources including instance types, storage, and networking settings.
153
154
```python { .api }
155
class CloudCompute:
156
def __init__(
157
self,
158
name: str = "default",
159
disk_size: int = 0,
160
idle_timeout: Optional[int] = None,
161
shm_size: int = 0,
162
mounts: Optional[List] = None,
163
**kwargs
164
):
165
"""
166
Configuration for cloud compute resources.
167
168
Parameters:
169
- name: Name identifier for the compute resource
170
- disk_size: Disk size in GB (0 for default)
171
- idle_timeout: Idle timeout in seconds before shutdown
172
- shm_size: Shared memory size in GB
173
- mounts: List of mount configurations
174
- kwargs: Additional cloud provider specific options
175
"""
176
```
177
178
#### BuildConfig
179
180
Configuration for building Lightning apps including Docker images, requirements, and build steps.
181
182
```python { .api }
183
class BuildConfig:
184
def __init__(
185
self,
186
image: Optional[str] = None,
187
requirements: Optional[Union[List[str], str]] = None,
188
dockerfile: Optional[str] = None,
189
build_commands: Optional[List[str]] = None,
190
**kwargs
191
):
192
"""
193
Configuration for building Lightning apps.
194
195
Parameters:
196
- image: Base Docker image to use
197
- requirements: Python requirements (list or requirements.txt path)
198
- dockerfile: Path to custom Dockerfile
199
- build_commands: Additional build commands to execute
200
- kwargs: Additional build configuration options
201
"""
202
```
203
204
## Usage Examples
205
206
### Simple Lightning App
207
208
```python
209
import lightning as L
210
211
class SimpleFlow(L.LightningFlow):
212
def run(self):
213
print("Hello from Lightning Flow!")
214
215
def configure_layout(self):
216
return {"name": "Simple App", "content": "This is a simple Lightning App"}
217
218
# Create and run the app
219
app = L.LightningApp(SimpleFlow())
220
app.run()
221
```
222
223
### Flow with Work Components
224
225
```python
226
import lightning as L
227
import time
228
229
class DataProcessor(L.LightningWork):
230
def __init__(self):
231
super().__init__()
232
self.processed_data = None
233
234
def run(self, data):
235
print(f"Processing data: {data}")
236
time.sleep(2) # Simulate processing
237
self.processed_data = f"processed_{data}"
238
print(f"Data processing complete: {self.processed_data}")
239
240
class ModelTrainer(L.LightningWork):
241
def __init__(self):
242
super().__init__()
243
self.model_trained = False
244
245
def run(self, processed_data):
246
print(f"Training model with: {processed_data}")
247
time.sleep(3) # Simulate training
248
self.model_trained = True
249
print("Model training complete!")
250
251
class MLPipeline(L.LightningFlow):
252
def __init__(self):
253
super().__init__()
254
self.processor = DataProcessor()
255
self.trainer = ModelTrainer()
256
257
def run(self):
258
# Step 1: Process data
259
self.processor.run("raw_data.csv")
260
261
# Wait for processing to complete
262
if self.processor.processed_data:
263
# Step 2: Train model
264
self.trainer.run(self.processor.processed_data)
265
266
# Check if pipeline is complete
267
if self.trainer.model_trained:
268
print("ML Pipeline completed successfully!")
269
270
# Create and run the ML pipeline
271
app = L.LightningApp(MLPipeline())
272
app.run()
273
```
274
275
### Cloud Deployment Configuration
276
277
```python
278
import lightning as L
279
280
class CloudDataProcessor(L.LightningWork):
281
def __init__(self):
282
# Configure cloud compute with GPU and storage
283
cloud_compute = L.CloudCompute(
284
name="gpu-instance",
285
disk_size=100, # 100GB disk
286
idle_timeout=300 # 5 minute timeout
287
)
288
289
# Configure build with custom requirements
290
build_config = L.BuildConfig(
291
requirements=["torch", "pandas", "scikit-learn"],
292
build_commands=["pip install --upgrade pip"]
293
)
294
295
super().__init__(
296
cloud_compute=cloud_compute,
297
cloud_build_config=build_config
298
)
299
300
def run(self, dataset_path):
301
import torch
302
import pandas as pd
303
304
# Load and process data on cloud GPU
305
print(f"Processing {dataset_path} on {torch.cuda.get_device_name()}")
306
data = pd.read_csv(dataset_path)
307
308
# Simulate GPU processing
309
processed = torch.randn(len(data), 10).cuda()
310
result = processed.mean(dim=0).cpu()
311
312
print(f"Processing complete. Result shape: {result.shape}")
313
return result.tolist()
314
315
class CloudMLFlow(L.LightningFlow):
316
def __init__(self):
317
super().__init__()
318
self.processor = CloudDataProcessor()
319
320
def run(self):
321
result = self.processor.run("s3://my-bucket/dataset.csv")
322
print(f"Final result: {result}")
323
324
# Deploy to cloud
325
app = L.LightningApp(CloudMLFlow())
326
```
327
328
### Parallel Work Execution
329
330
```python
331
import lightning as L
332
333
class ParallelProcessor(L.LightningWork):
334
def __init__(self, work_id):
335
super().__init__(parallel=True) # Enable parallel execution
336
self.work_id = work_id
337
self.result = None
338
339
def run(self, data_chunk):
340
print(f"Worker {self.work_id} processing chunk: {data_chunk}")
341
# Simulate processing
342
import time
343
time.sleep(1)
344
self.result = f"processed_chunk_{data_chunk}_by_worker_{self.work_id}"
345
346
class ParallelFlow(L.LightningFlow):
347
def __init__(self):
348
super().__init__()
349
# Create multiple parallel workers
350
self.workers = [ParallelProcessor(i) for i in range(4)]
351
352
def run(self):
353
# Distribute work across parallel workers
354
data_chunks = ["chunk_1", "chunk_2", "chunk_3", "chunk_4"]
355
356
for worker, chunk in zip(self.workers, data_chunks):
357
worker.run(chunk)
358
359
# Wait for all workers to complete
360
while not all(worker.has_succeeded for worker in self.workers):
361
time.sleep(0.1)
362
363
# Collect results
364
results = [worker.result for worker in self.workers]
365
print(f"All parallel work completed: {results}")
366
367
app = L.LightningApp(ParallelFlow())
368
app.run()
369
```
370
371
### Web Interface Integration
372
373
```python
374
import lightning as L
375
from lightning.app.frontend import StreamlitFrontend
376
377
class DataVisualizerWork(L.LightningWork):
378
def __init__(self):
379
super().__init__()
380
self.data = []
381
382
def run(self):
383
import streamlit as st
384
import pandas as pd
385
import numpy as np
386
387
st.title("Lightning App Data Visualizer")
388
389
# Generate sample data
390
if st.button("Generate Data"):
391
self.data = np.random.randn(100, 3)
392
393
if len(self.data) > 0:
394
df = pd.DataFrame(self.data, columns=['A', 'B', 'C'])
395
st.line_chart(df)
396
397
class VisualizationFlow(L.LightningFlow):
398
def __init__(self):
399
super().__init__()
400
self.visualizer = DataVisualizerWork()
401
402
def run(self):
403
self.visualizer.run()
404
405
def configure_layout(self):
406
return StreamlitFrontend(render_fn=self.visualizer.run)
407
408
app = L.LightningApp(VisualizationFlow())
409
app.run()
410
```