0
# Command Line Tools
1
2
Luigi provides comprehensive command-line utilities for workflow management, task execution, dependency analysis, and debugging. These tools enable both interactive development and production deployment.
3
4
## Capabilities
5
6
### Main CLI Commands
7
8
Primary command-line interfaces for executing Luigi workflows and managing the scheduler daemon.
9
10
```python { .api }
11
# Main task execution command
12
luigi --module mymodule MyTask [--param value] [options]
13
14
# Scheduler daemon
15
luigid [--background] [--pidfile FILE] [--logdir DIR] [--state-path FILE] [--address ADDRESS] [--port PORT]
16
17
# Command-line options for luigi command
18
class LuigiCLI:
19
"""Luigi command-line interface options."""
20
21
# Task specification
22
module: str # Module containing task
23
task: str # Task class name
24
25
# Scheduler options
26
scheduler_host: str = 'localhost' # Scheduler host
27
scheduler_port: int = 8082 # Scheduler port
28
local_scheduler: bool = False # Use local scheduler
29
30
# Worker options
31
workers: int = 1 # Number of worker processes
32
keep_alive: bool = False # Keep worker alive after completion
33
timeout: int = 0 # Task timeout (0 = no timeout)
34
35
# Logging options
36
log_level: str = 'DEBUG' # Logging level
37
logging_conf_file: str # Logging configuration file
38
39
# Execution options
40
no_lock: bool = False # Disable file locking
41
lock_size: int = 1 # Lock file size
42
lock_pid_dir: str # PID lock directory
43
take_lock: bool = False # Take exclusive lock
44
45
# Output options
46
retcode_already_running: int = 10 # Return code if already running
47
retcode_missing_data: int = 20 # Return code for missing data
48
retcode_not_run: int = 25 # Return code if not run
49
retcode_task_failed: int = 30 # Return code for task failure
50
retcode_scheduling_error: int = 35 # Return code for scheduling error
51
52
# Help and version
53
help: bool = False # Show help message
54
version: bool = False # Show version information
55
```
56
57
### Scheduler Daemon Options
58
59
Configuration options for the Luigi scheduler daemon (luigid).
60
61
```python { .api }
62
class LuigiDaemon:
63
"""Luigi scheduler daemon options."""
64
65
# Server options
66
address: str = 'localhost' # Server bind address
67
port: int = 8082 # Server port
68
69
# Process options
70
background: bool = False # Run in background
71
pidfile: str # PID file path
72
user: str # Run as specific user
73
group: str # Run as specific group
74
75
# Logging options
76
logdir: str # Log directory
77
logging_conf_file: str # Logging configuration
78
79
# State persistence
80
state_path: str # Scheduler state file path
81
82
# Security options
83
unix_socket: str # Unix socket path
84
85
# Development options
86
dev: bool = False # Development mode
87
```
88
89
### Dependency Analysis Tools
90
91
Command-line utilities for analyzing task dependencies and workflow structure.
92
93
```python { .api }
94
# Dependency analysis commands
95
from luigi.tools.deps import deps_main
96
from luigi.tools.deps_tree import deps_tree_main
97
98
def deps_main(task_str: str, module: str = None) -> None:
99
"""
100
Analyze task dependencies.
101
102
Usage: python -m luigi.tools.deps MyTask --module mymodule
103
104
Args:
105
task_str: Task name and parameters
106
module: Module containing task
107
"""
108
109
def deps_tree_main(task_str: str, module: str = None) -> None:
110
"""
111
Display dependency tree visualization.
112
113
Usage: python -m luigi.tools.deps_tree MyTask --module mymodule
114
115
Args:
116
task_str: Task name and parameters
117
module: Module containing task
118
"""
119
120
# Task search utility
121
from luigi.tools.luigi_grep import luigi_grep_main
122
123
def luigi_grep_main(pattern: str, paths: list = None) -> None:
124
"""
125
Search for tasks matching pattern.
126
127
Usage: python -m luigi.tools.luigi_grep "pattern" [paths...]
128
129
Args:
130
pattern: Search pattern (regex)
131
paths: Paths to search in
132
"""
133
```
134
135
### Range Task Utilities
136
137
Built-in utilities for working with range tasks and date-based workflows.
138
139
```python { .api }
140
# Range task commands (auto-loaded if enabled)
141
from luigi.tools.range import RangeDaily, RangeHourly, RangeByMinutes
142
143
# Usage examples:
144
# luigi RangeDaily --of MyTask --start 2023-01-01 --stop 2023-01-31
145
# luigi RangeHourly --of MyTask --start 2023-01-01-00 --stop 2023-01-01-23
146
```
147
148
## Usage Examples
149
150
### Basic Task Execution
151
152
```bash
153
# Execute single task with parameters
154
luigi MyTask --param1 value1 --param2 value2 --module mypackage.tasks
155
156
# Execute with local scheduler
157
luigi MyTask --local-scheduler --module mypackage.tasks
158
159
# Execute with remote scheduler
160
luigi MyTask --scheduler-host scheduler.example.com --scheduler-port 8082 --module mypackage.tasks
161
162
# Execute with multiple workers
163
luigi MyTask --workers 4 --module mypackage.tasks
164
165
# Execute with timeout
166
luigi MyTask --timeout 3600 --module mypackage.tasks
167
```
168
169
### Scheduler Daemon Management
170
171
```bash
172
# Start scheduler daemon
173
luigid --background --pidfile /var/run/luigi.pid --logdir /var/log/luigi
174
175
# Start scheduler on specific address/port
176
luigid --address 0.0.0.0 --port 8082 --background
177
178
# Start with persistent state
179
luigid --state-path /var/lib/luigi/scheduler.state --background
180
181
# Development mode with verbose logging
182
luigid --dev --logdir ./logs
183
```
184
185
### Dependency Analysis
186
187
```bash
188
# Analyze task dependencies
189
python -m luigi.tools.deps MyTask --module mypackage.tasks
190
191
# Visualize dependency tree
192
python -m luigi.tools.deps_tree MyTask --module mypackage.tasks
193
194
# Search for tasks
195
python -m luigi.tools.luigi_grep "ProcessData" ./tasks/
196
197
# Analyze range task dependencies
198
python -m luigi.tools.deps RangeDaily --of MyTask --start 2023-01-01 --stop 2023-01-07
199
```
200
201
### Advanced CLI Usage
202
203
```python
204
# CLI wrapper script example
205
#!/usr/bin/env python3
206
"""Custom Luigi CLI wrapper with enhanced functionality."""
207
208
import luigi
209
import sys
210
import argparse
211
from luigi.cmdline import luigi_run
212
from luigi.configuration import get_config
213
214
def custom_luigi_main():
215
"""Enhanced Luigi CLI with custom options."""
216
217
parser = argparse.ArgumentParser(description='Enhanced Luigi CLI')
218
219
# Add custom options
220
parser.add_argument('--env', choices=['dev', 'staging', 'prod'],
221
default='dev', help='Environment to run in')
222
parser.add_argument('--dry-run', action='store_true',
223
help='Show what would be executed without running')
224
parser.add_argument('--notify', help='Notification email for completion')
225
226
# Parse known args, let Luigi handle the rest
227
args, luigi_args = parser.parse_known_args()
228
229
# Configure environment
230
setup_environment(args.env)
231
232
if args.dry_run:
233
print("DRY RUN: Would execute:")
234
print(" ".join(['luigi'] + luigi_args))
235
return
236
237
# Execute Luigi with remaining arguments
238
sys.argv = ['luigi'] + luigi_args
239
result = luigi_run()
240
241
# Send notification if requested
242
if args.notify:
243
send_completion_notification(args.notify, result)
244
245
return result
246
247
def setup_environment(env: str):
248
"""Configure Luigi for specific environment."""
249
config = get_config()
250
251
if env == 'prod':
252
config.set('core', 'scheduler_host', 'prod-scheduler.example.com')
253
config.set('core', 'log_level', 'WARNING')
254
elif env == 'staging':
255
config.set('core', 'scheduler_host', 'staging-scheduler.example.com')
256
config.set('core', 'log_level', 'INFO')
257
else: # dev
258
config.set('core', 'local_scheduler', 'true')
259
config.set('core', 'log_level', 'DEBUG')
260
261
def send_completion_notification(email: str, result):
262
"""Send email notification on completion."""
263
# Implementation would send email with result status
264
print(f"Would send notification to {email}: Status {result.status}")
265
266
if __name__ == '__main__':
267
custom_luigi_main()
268
```
269
270
### Batch Execution Scripts
271
272
```bash
273
#!/bin/bash
274
# Luigi batch execution script
275
276
# Configuration
277
LUIGI_MODULE="mypackage.tasks"
278
SCHEDULER_HOST="scheduler.example.com"
279
LOG_DIR="/var/log/luigi"
280
DATE=$(date +%Y-%m-%d)
281
282
# Create log directory
283
mkdir -p "$LOG_DIR"
284
285
# Function to run Luigi task with error handling
286
run_luigi_task() {
287
local task_name=$1
288
local log_file="$LOG_DIR/${task_name}_${DATE}.log"
289
290
echo "Starting $task_name at $(date)"
291
292
luigi "$task_name" \
293
--module "$LUIGI_MODULE" \
294
--scheduler-host "$SCHEDULER_HOST" \
295
--workers 2 \
296
--timeout 7200 \
297
>> "$log_file" 2>&1
298
299
local exit_code=$?
300
301
if [ $exit_code -eq 0 ]; then
302
echo "✓ $task_name completed successfully"
303
else
304
echo "✗ $task_name failed with exit code $exit_code"
305
echo "Check log file: $log_file"
306
fi
307
308
return $exit_code
309
}
310
311
# Execute daily tasks
312
echo "Starting daily Luigi workflow for $DATE"
313
314
run_luigi_task "DataIngestionTask --date $DATE"
315
run_luigi_task "ProcessingTask --date $DATE"
316
run_luigi_task "ReportGenerationTask --date $DATE"
317
318
echo "Daily workflow completed at $(date)"
319
```
320
321
### Configuration Management
322
323
```bash
324
#!/bin/bash
325
# Luigi configuration management script
326
327
LUIGI_CONFIG_DIR="/etc/luigi"
328
LUIGI_ENV="${LUIGI_ENV:-development}"
329
330
# Function to switch Luigi configuration
331
switch_config() {
332
local env=$1
333
local config_file="$LUIGI_CONFIG_DIR/luigi-$env.cfg"
334
335
if [ ! -f "$config_file" ]; then
336
echo "Configuration file not found: $config_file"
337
exit 1
338
fi
339
340
# Link active configuration
341
ln -sf "$config_file" "$LUIGI_CONFIG_DIR/luigi.cfg"
342
echo "Switched to $env configuration"
343
}
344
345
# Function to validate configuration
346
validate_config() {
347
echo "Validating Luigi configuration..."
348
349
# Test scheduler connection
350
if luigi --help > /dev/null 2>&1; then
351
echo "✓ Luigi CLI is working"
352
else
353
echo "✗ Luigi CLI not working"
354
exit 1
355
fi
356
357
# Test configuration parsing
358
python3 -c "
359
import luigi.configuration
360
config = luigi.configuration.get_config()
361
print(f'Scheduler: {config.get(\"core\", \"default_scheduler_host\", fallback=\"localhost\")}')
362
print(f'Port: {config.getint(\"core\", \"default_scheduler_port\", fallback=8082)}')
363
"
364
}
365
366
# Main command handling
367
case "$1" in
368
switch)
369
switch_config "$2"
370
;;
371
validate)
372
validate_config
373
;;
374
*)
375
echo "Usage: $0 {switch|validate} [environment]"
376
echo "Environments: development, staging, production"
377
exit 1
378
;;
379
esac
380
```
381
382
### Monitoring and Health Checks
383
384
```bash
385
#!/bin/bash
386
# Luigi health check and monitoring script
387
388
SCHEDULER_HOST="${LUIGI_SCHEDULER_HOST:-localhost}"
389
SCHEDULER_PORT="${LUIGI_SCHEDULER_PORT:-8082}"
390
HEALTH_CHECK_URL="http://$SCHEDULER_HOST:$SCHEDULER_PORT/api/graph"
391
392
# Function to check scheduler health
393
check_scheduler_health() {
394
echo "Checking Luigi scheduler health..."
395
396
if curl -s --connect-timeout 5 "$HEALTH_CHECK_URL" > /dev/null; then
397
echo "✓ Scheduler is responding"
398
return 0
399
else
400
echo "✗ Scheduler is not responding"
401
return 1
402
fi
403
}
404
405
# Function to get scheduler statistics
406
get_scheduler_stats() {
407
echo "Getting scheduler statistics..."
408
409
local stats=$(curl -s "$HEALTH_CHECK_URL" | python3 -c "
410
import sys, json
411
try:
412
data = json.load(sys.stdin)
413
nodes = data.get('response', {}).get('nodes', [])
414
print(f'Total tasks: {len(nodes)}')
415
416
# Count by status
417
status_counts = {}
418
for node in nodes:
419
status = node.get('status', 'UNKNOWN')
420
status_counts[status] = status_counts.get(status, 0) + 1
421
422
for status, count in status_counts.items():
423
print(f'{status}: {count}')
424
425
except Exception as e:
426
print(f'Error parsing response: {e}')
427
")
428
429
echo "$stats"
430
}
431
432
# Function to restart scheduler if unhealthy
433
restart_scheduler_if_needed() {
434
if ! check_scheduler_health; then
435
echo "Attempting to restart scheduler..."
436
437
# Kill existing scheduler
438
pkill -f luigid
439
sleep 5
440
441
# Start new scheduler
442
luigid --background --pidfile /var/run/luigi.pid --logdir /var/log/luigi
443
sleep 10
444
445
# Check if restart was successful
446
if check_scheduler_health; then
447
echo "✓ Scheduler restarted successfully"
448
else
449
echo "✗ Scheduler restart failed"
450
exit 1
451
fi
452
fi
453
}
454
455
# Main execution
456
case "$1" in
457
health)
458
check_scheduler_health
459
;;
460
stats)
461
get_scheduler_stats
462
;;
463
restart)
464
restart_scheduler_if_needed
465
;;
466
monitor)
467
# Continuous monitoring loop
468
while true; do
469
echo "=== $(date) ==="
470
check_scheduler_health && get_scheduler_stats
471
echo
472
sleep 60
473
done
474
;;
475
*)
476
echo "Usage: $0 {health|stats|restart|monitor}"
477
exit 1
478
;;
479
esac
480
```