A Python clone of Foreman for managing Procfile-based applications with process management and export capabilities
—
Core functionality for managing and orchestrating multiple concurrent processes with lifecycle management, output handling, and signal forwarding. The process management system enables running multiple services simultaneously with proper coordination and cleanup.
The Manager class orchestrates multiple processes, handles signals, and manages process lifecycle events. It provides a high-level interface for running multiple processes concurrently with proper event handling and cleanup.
class Manager:
"""
Manager is responsible for running multiple external processes in parallel
managing the events that result (starting, stopping, printing).
"""
def __init__(self, printer=None):
"""
Initialize manager with optional printer.
Parameters:
- printer: Printer instance for output formatting (defaults to stdout printer)
"""
def add_process(self, name, cmd, quiet=False, env=None, cwd=None):
"""
Add a process to this manager instance. The process will not be started
until loop() is called.
Parameters:
- name: str, unique process name
- cmd: str, command to execute
- quiet: bool, whether to suppress process output
- env: dict, environment variables for the process
- cwd: str, working directory for the process
Returns:
Process: The created process object
"""
def loop(self):
"""
Start all the added processes and multiplex their output onto the bound
printer. If one process terminates, all others will be terminated.
This method blocks until all processes have terminated.
"""
def terminate(self):
"""
Terminate all processes managed by this ProcessManager.
"""
def kill(self):
"""
Kill all processes managed by this ProcessManager forcefully.
"""
# Properties
returncode: Optional[int] # Return code after loop() finishesWrapper around subprocess with event forwarding and output handling. Provides a clean interface for individual process management with lifecycle events.
class Process:
"""
A utility wrapper around subprocess.Popen that stores attributes needed
by Honcho and supports forwarding process lifecycle events and output to a queue.
"""
def __init__(self, cmd, name=None, colour=None, quiet=False, env=None, cwd=None):
"""
Initialize a process wrapper.
Parameters:
- cmd: str, command to execute
- name: str, process name for identification
- colour: str, ANSI color code for output
- quiet: bool, whether to suppress output
- env: dict, environment variables
- cwd: str, working directory
"""
def run(self, events=None, ignore_signals=False):
"""
Run the process and forward events to the queue.
Parameters:
- events: multiprocessing.Queue, event queue for forwarding messages
- ignore_signals: bool, whether to ignore SIGINT/SIGTERM
"""
# Properties
cmd: str
colour: Optional[str]
quiet: bool
name: Optional[str]
env: Dict[str, str]
cwd: Optional[str]Custom Popen subclass with platform-specific optimizations and proper signal handling for process groups.
class Popen(subprocess.Popen):
"""
Enhanced subprocess.Popen with platform-specific optimizations.
"""
def __init__(self, cmd, **kwargs):
"""
Initialize enhanced Popen with default options for Honcho.
Parameters:
- cmd: str, command to execute
- **kwargs: additional subprocess options
"""Cross-platform process management utilities for terminating and killing process groups.
class ProcessManager:
"""
Cross-platform process management utilities.
"""
def terminate(self, pid):
"""
Terminate process group by PID.
Parameters:
- pid: int, process ID to terminate
"""
def kill(self, pid):
"""
Kill process group by PID forcefully.
Parameters:
- pid: int, process ID to kill
"""import sys
from honcho.manager import Manager
from honcho.printer import Printer
# Create manager with custom printer settings
manager = Manager(Printer(sys.stdout, colour=True, prefix=True))
# Add multiple processes
manager.add_process('web', 'python app.py', env={'PORT': '5000'})
manager.add_process('worker', 'python worker.py', quiet=True)
manager.add_process('scheduler', 'python scheduler.py', cwd='/app/scheduler')
# Start all processes and wait for completion
manager.loop()
# Exit with appropriate return code
sys.exit(manager.returncode)from honcho.process import Process
import multiprocessing
# Create event queue for process communication
events = multiprocessing.Queue()
# Create and run individual process
process = Process(
cmd='python long_running_task.py',
name='task',
colour='32', # Green
env={'DEBUG': '1'}
)
# Run process in background thread or multiprocessing
import multiprocessing
p = multiprocessing.Process(target=process.run, args=(events, False))
p.start()
# Handle events from queue
while True:
try:
msg = events.get(timeout=1.0)
print(f"Event: {msg.type}, Data: {msg.data}")
if msg.type == 'stop':
break
except queue.Empty:
continue
p.join()import signal
from honcho.manager import Manager
manager = Manager()
manager.add_process('app', 'python app.py')
# The manager automatically handles SIGINT and SIGTERM
# to gracefully terminate all child processes
try:
manager.loop()
except KeyboardInterrupt:
print("Received interrupt, shutting down...")
finally:
# Manager automatically cleans up processes
sys.exit(manager.returncode or 0)KILL_WAIT = 5 # Seconds to wait before forceful kill
SIGNALS = {
signal.SIGINT: {'name': 'SIGINT', 'rc': 130},
signal.SIGTERM: {'name': 'SIGTERM', 'rc': 143},
}
SYSTEM_PRINTER_NAME = 'system' # Name for system messages
ON_WINDOWS = 'win32' in str(sys.platform).lower() # Platform detectionInstall with Tessl CLI
npx tessl i tessl/pypi-honcho