or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-tools.mdconfiguration.mdevents.mdexecution.mdindex.mdintegrations.mdparameters.mdscheduler.mdtargets.mdtasks.md

cli-tools.mddocs/

0

# Command Line Tools

1

2

Luigi provides comprehensive command-line utilities for workflow management, task execution, dependency analysis, and debugging. These tools enable both interactive development and production deployment.

3

4

## Capabilities

5

6

### Main CLI Commands

7

8

Primary command-line interfaces for executing Luigi workflows and managing the scheduler daemon.

9

10

```python { .api }

11

# Main task execution command

12

luigi --module mymodule MyTask [--param value] [options]

13

14

# Scheduler daemon

15

luigid [--background] [--pidfile FILE] [--logdir DIR] [--state-path FILE] [--address ADDRESS] [--port PORT]

16

17

# Command-line options for luigi command

18

class LuigiCLI:

19

"""Luigi command-line interface options."""

20

21

# Task specification

22

module: str # Module containing task

23

task: str # Task class name

24

25

# Scheduler options

26

scheduler_host: str = 'localhost' # Scheduler host

27

scheduler_port: int = 8082 # Scheduler port

28

local_scheduler: bool = False # Use local scheduler

29

30

# Worker options

31

workers: int = 1 # Number of worker processes

32

keep_alive: bool = False # Keep worker alive after completion

33

timeout: int = 0 # Task timeout (0 = no timeout)

34

35

# Logging options

36

log_level: str = 'DEBUG' # Logging level

37

logging_conf_file: str # Logging configuration file

38

39

# Execution options

40

no_lock: bool = False # Disable file locking

41

lock_size: int = 1 # Lock file size

42

lock_pid_dir: str # PID lock directory

43

take_lock: bool = False # Take exclusive lock

44

45

# Output options

46

retcode_already_running: int = 10 # Return code if already running

47

retcode_missing_data: int = 20 # Return code for missing data

48

retcode_not_run: int = 25 # Return code if not run

49

retcode_task_failed: int = 30 # Return code for task failure

50

retcode_scheduling_error: int = 35 # Return code for scheduling error

51

52

# Help and version

53

help: bool = False # Show help message

54

version: bool = False # Show version information

55

```

56

57

### Scheduler Daemon Options

58

59

Configuration options for the Luigi scheduler daemon (luigid).

60

61

```python { .api }

62

class LuigiDaemon:

63

"""Luigi scheduler daemon options."""

64

65

# Server options

66

address: str = 'localhost' # Server bind address

67

port: int = 8082 # Server port

68

69

# Process options

70

background: bool = False # Run in background

71

pidfile: str # PID file path

72

user: str # Run as specific user

73

group: str # Run as specific group

74

75

# Logging options

76

logdir: str # Log directory

77

logging_conf_file: str # Logging configuration

78

79

# State persistence

80

state_path: str # Scheduler state file path

81

82

# Security options

83

unix_socket: str # Unix socket path

84

85

# Development options

86

dev: bool = False # Development mode

87

```

88

89

### Dependency Analysis Tools

90

91

Command-line utilities for analyzing task dependencies and workflow structure.

92

93

```python { .api }

94

# Dependency analysis commands

95

from luigi.tools.deps import deps_main

96

from luigi.tools.deps_tree import deps_tree_main

97

98

def deps_main(task_str: str, module: str = None) -> None:

99

"""

100

Analyze task dependencies.

101

102

Usage: python -m luigi.tools.deps MyTask --module mymodule

103

104

Args:

105

task_str: Task name and parameters

106

module: Module containing task

107

"""

108

109

def deps_tree_main(task_str: str, module: str = None) -> None:

110

"""

111

Display dependency tree visualization.

112

113

Usage: python -m luigi.tools.deps_tree MyTask --module mymodule

114

115

Args:

116

task_str: Task name and parameters

117

module: Module containing task

118

"""

119

120

# Task search utility

121

from luigi.tools.luigi_grep import luigi_grep_main

122

123

def luigi_grep_main(pattern: str, paths: list = None) -> None:

124

"""

125

Search for tasks matching pattern.

126

127

Usage: python -m luigi.tools.luigi_grep "pattern" [paths...]

128

129

Args:

130

pattern: Search pattern (regex)

131

paths: Paths to search in

132

"""

133

```

134

135

### Range Task Utilities

136

137

Built-in utilities for working with range tasks and date-based workflows.

138

139

```python { .api }

140

# Range task commands (auto-loaded if enabled)

141

from luigi.tools.range import RangeDaily, RangeHourly, RangeByMinutes

142

143

# Usage examples:

144

# luigi RangeDaily --of MyTask --start 2023-01-01 --stop 2023-01-31

145

# luigi RangeHourly --of MyTask --start 2023-01-01-00 --stop 2023-01-01-23

146

```

147

148

## Usage Examples

149

150

### Basic Task Execution

151

152

```bash

153

# Execute single task with parameters

154

luigi MyTask --param1 value1 --param2 value2 --module mypackage.tasks

155

156

# Execute with local scheduler

157

luigi MyTask --local-scheduler --module mypackage.tasks

158

159

# Execute with remote scheduler

160

luigi MyTask --scheduler-host scheduler.example.com --scheduler-port 8082 --module mypackage.tasks

161

162

# Execute with multiple workers

163

luigi MyTask --workers 4 --module mypackage.tasks

164

165

# Execute with timeout

166

luigi MyTask --timeout 3600 --module mypackage.tasks

167

```

168

169

### Scheduler Daemon Management

170

171

```bash

172

# Start scheduler daemon

173

luigid --background --pidfile /var/run/luigi.pid --logdir /var/log/luigi

174

175

# Start scheduler on specific address/port

176

luigid --address 0.0.0.0 --port 8082 --background

177

178

# Start with persistent state

179

luigid --state-path /var/lib/luigi/scheduler.state --background

180

181

# Development mode with verbose logging

182

luigid --dev --logdir ./logs

183

```

184

185

### Dependency Analysis

186

187

```bash

188

# Analyze task dependencies

189

python -m luigi.tools.deps MyTask --module mypackage.tasks

190

191

# Visualize dependency tree

192

python -m luigi.tools.deps_tree MyTask --module mypackage.tasks

193

194

# Search for tasks

195

python -m luigi.tools.luigi_grep "ProcessData" ./tasks/

196

197

# Analyze range task dependencies

198

python -m luigi.tools.deps RangeDaily --of MyTask --start 2023-01-01 --stop 2023-01-07

199

```

200

201

### Advanced CLI Usage

202

203

```python

204

# CLI wrapper script example

205

#!/usr/bin/env python3

206

"""Custom Luigi CLI wrapper with enhanced functionality."""

207

208

import luigi

209

import sys

210

import argparse

211

from luigi.cmdline import luigi_run

212

from luigi.configuration import get_config

213

214

def custom_luigi_main():

215

"""Enhanced Luigi CLI with custom options."""

216

217

parser = argparse.ArgumentParser(description='Enhanced Luigi CLI')

218

219

# Add custom options

220

parser.add_argument('--env', choices=['dev', 'staging', 'prod'],

221

default='dev', help='Environment to run in')

222

parser.add_argument('--dry-run', action='store_true',

223

help='Show what would be executed without running')

224

parser.add_argument('--notify', help='Notification email for completion')

225

226

# Parse known args, let Luigi handle the rest

227

args, luigi_args = parser.parse_known_args()

228

229

# Configure environment

230

setup_environment(args.env)

231

232

if args.dry_run:

233

print("DRY RUN: Would execute:")

234

print(" ".join(['luigi'] + luigi_args))

235

return

236

237

# Execute Luigi with remaining arguments

238

sys.argv = ['luigi'] + luigi_args

239

result = luigi_run()

240

241

# Send notification if requested

242

if args.notify:

243

send_completion_notification(args.notify, result)

244

245

return result

246

247

def setup_environment(env: str):

248

"""Configure Luigi for specific environment."""

249

config = get_config()

250

251

if env == 'prod':

252

config.set('core', 'scheduler_host', 'prod-scheduler.example.com')

253

config.set('core', 'log_level', 'WARNING')

254

elif env == 'staging':

255

config.set('core', 'scheduler_host', 'staging-scheduler.example.com')

256

config.set('core', 'log_level', 'INFO')

257

else: # dev

258

config.set('core', 'local_scheduler', 'true')

259

config.set('core', 'log_level', 'DEBUG')

260

261

def send_completion_notification(email: str, result):

262

"""Send email notification on completion."""

263

# Implementation would send email with result status

264

print(f"Would send notification to {email}: Status {result.status}")

265

266

if __name__ == '__main__':

267

custom_luigi_main()

268

```

269

270

### Batch Execution Scripts

271

272

```bash

273

#!/bin/bash

274

# Luigi batch execution script

275

276

# Configuration

277

LUIGI_MODULE="mypackage.tasks"

278

SCHEDULER_HOST="scheduler.example.com"

279

LOG_DIR="/var/log/luigi"

280

DATE=$(date +%Y-%m-%d)

281

282

# Create log directory

283

mkdir -p "$LOG_DIR"

284

285

# Function to run Luigi task with error handling

286

run_luigi_task() {

287

local task_name=$1

288

local log_file="$LOG_DIR/${task_name}_${DATE}.log"

289

290

echo "Starting $task_name at $(date)"

291

292

luigi "$task_name" \

293

--module "$LUIGI_MODULE" \

294

--scheduler-host "$SCHEDULER_HOST" \

295

--workers 2 \

296

--timeout 7200 \

297

>> "$log_file" 2>&1

298

299

local exit_code=$?

300

301

if [ $exit_code -eq 0 ]; then

302

echo "✓ $task_name completed successfully"

303

else

304

echo "✗ $task_name failed with exit code $exit_code"

305

echo "Check log file: $log_file"

306

fi

307

308

return $exit_code

309

}

310

311

# Execute daily tasks

312

echo "Starting daily Luigi workflow for $DATE"

313

314

run_luigi_task "DataIngestionTask --date $DATE"

315

run_luigi_task "ProcessingTask --date $DATE"

316

run_luigi_task "ReportGenerationTask --date $DATE"

317

318

echo "Daily workflow completed at $(date)"

319

```

320

321

### Configuration Management

322

323

```bash

324

#!/bin/bash

325

# Luigi configuration management script

326

327

LUIGI_CONFIG_DIR="/etc/luigi"

328

LUIGI_ENV="${LUIGI_ENV:-development}"

329

330

# Function to switch Luigi configuration

331

switch_config() {

332

local env=$1

333

local config_file="$LUIGI_CONFIG_DIR/luigi-$env.cfg"

334

335

if [ ! -f "$config_file" ]; then

336

echo "Configuration file not found: $config_file"

337

exit 1

338

fi

339

340

# Link active configuration

341

ln -sf "$config_file" "$LUIGI_CONFIG_DIR/luigi.cfg"

342

echo "Switched to $env configuration"

343

}

344

345

# Function to validate configuration

346

validate_config() {

347

echo "Validating Luigi configuration..."

348

349

# Test scheduler connection

350

if luigi --help > /dev/null 2>&1; then

351

echo "✓ Luigi CLI is working"

352

else

353

echo "✗ Luigi CLI not working"

354

exit 1

355

fi

356

357

# Test configuration parsing

358

python3 -c "

359

import luigi.configuration

360

config = luigi.configuration.get_config()

361

print(f'Scheduler: {config.get(\"core\", \"default_scheduler_host\", fallback=\"localhost\")}')

362

print(f'Port: {config.getint(\"core\", \"default_scheduler_port\", fallback=8082)}')

363

"

364

}

365

366

# Main command handling

367

case "$1" in

368

switch)

369

switch_config "$2"

370

;;

371

validate)

372

validate_config

373

;;

374

*)

375

echo "Usage: $0 {switch|validate} [environment]"

376

echo "Environments: development, staging, production"

377

exit 1

378

;;

379

esac

380

```

381

382

### Monitoring and Health Checks

383

384

```bash

385

#!/bin/bash

386

# Luigi health check and monitoring script

387

388

SCHEDULER_HOST="${LUIGI_SCHEDULER_HOST:-localhost}"

389

SCHEDULER_PORT="${LUIGI_SCHEDULER_PORT:-8082}"

390

HEALTH_CHECK_URL="http://$SCHEDULER_HOST:$SCHEDULER_PORT/api/graph"

391

392

# Function to check scheduler health

393

check_scheduler_health() {

394

echo "Checking Luigi scheduler health..."

395

396

if curl -s --connect-timeout 5 "$HEALTH_CHECK_URL" > /dev/null; then

397

echo "✓ Scheduler is responding"

398

return 0

399

else

400

echo "✗ Scheduler is not responding"

401

return 1

402

fi

403

}

404

405

# Function to get scheduler statistics

406

get_scheduler_stats() {

407

echo "Getting scheduler statistics..."

408

409

local stats=$(curl -s "$HEALTH_CHECK_URL" | python3 -c "

410

import sys, json

411

try:

412

data = json.load(sys.stdin)

413

nodes = data.get('response', {}).get('nodes', [])

414

print(f'Total tasks: {len(nodes)}')

415

416

# Count by status

417

status_counts = {}

418

for node in nodes:

419

status = node.get('status', 'UNKNOWN')

420

status_counts[status] = status_counts.get(status, 0) + 1

421

422

for status, count in status_counts.items():

423

print(f'{status}: {count}')

424

425

except Exception as e:

426

print(f'Error parsing response: {e}')

427

")

428

429

echo "$stats"

430

}

431

432

# Function to restart scheduler if unhealthy

433

restart_scheduler_if_needed() {

434

if ! check_scheduler_health; then

435

echo "Attempting to restart scheduler..."

436

437

# Kill existing scheduler

438

pkill -f luigid

439

sleep 5

440

441

# Start new scheduler

442

luigid --background --pidfile /var/run/luigi.pid --logdir /var/log/luigi

443

sleep 10

444

445

# Check if restart was successful

446

if check_scheduler_health; then

447

echo "✓ Scheduler restarted successfully"

448

else

449

echo "✗ Scheduler restart failed"

450

exit 1

451

fi

452

fi

453

}

454

455

# Main execution

456

case "$1" in

457

health)

458

check_scheduler_health

459

;;

460

stats)

461

get_scheduler_stats

462

;;

463

restart)

464

restart_scheduler_if_needed

465

;;

466

monitor)

467

# Continuous monitoring loop

468

while true; do

469

echo "=== $(date) ==="

470

check_scheduler_health && get_scheduler_stats

471

echo

472

sleep 60

473

done

474

;;

475

*)

476

echo "Usage: $0 {health|stats|restart|monitor}"

477

exit 1

478

;;

479

esac

480

```