CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-com-xuxueli--xxl-job-core

A distributed task scheduling framework core library for Java applications

Pending
Overview
Eval results
Files

thread-management.mddocs/

Thread Management and Infrastructure

Core thread management classes and infrastructure components for job execution, registration, and callback handling in XXL-Job Core.

Capabilities

JobThread Class

Core thread class that manages individual job execution with queuing and lifecycle management.

/**
 * Core job execution thread with queue management
 * Extends Thread to handle asynchronous job execution
 */
public class JobThread extends Thread {
    
    /**
     * Push trigger parameter to job execution queue
     * @param triggerParam Job execution parameters from admin
     * @return Response indicating queue acceptance status
     */
    public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam);
    
    /**
     * Request thread to stop execution
     * @param stopReason Reason for stopping the thread
     */
    public void toStop(String stopReason);
    
    /**
     * Check if thread is running or has queued jobs
     * @return true if thread is active or has pending jobs
     */
    public boolean isRunningOrHasQueue();
    
    /**
     * Get job handler associated with this thread
     * @return IJobHandler instance
     */
    public IJobHandler getHandler();
    
    /**
     * Check if thread is stopped
     * @return true if thread has been stopped
     */
    public boolean isStopped();
}

Usage Examples:

// JobThread is primarily managed by XxlJobExecutor internally
// Direct usage is typically not required for most applications

// Example of how executor manages job threads:
public void manageJobThread(int jobId, IJobHandler handler) {
    // Register job thread (done by executor automatically)
    JobThread jobThread = XxlJobExecutor.registJobThread(jobId, handler, "Job registration");
    
    // Check thread status
    if (jobThread.isRunningOrHasQueue()) {
        System.out.println("Job thread is active");
    }
    
    // Stop thread (typically done during shutdown)
    jobThread.toStop("Application shutdown");
    
    // Remove from executor
    XxlJobExecutor.removeJobThread(jobId, "Cleanup");
}

ExecutorRegistryThread

Singleton thread that manages executor registration and heartbeat with admin servers.

/**
 * Singleton thread for managing executor registration with admin servers
 * Handles periodic heartbeat and re-registration
 */
public class ExecutorRegistryThread {
    
    /**
     * Get singleton instance of registry thread
     * @return ExecutorRegistryThread instance
     */
    public static ExecutorRegistryThread getInstance();
    
    /**
     * Start registry thread with admin addresses and application info
     * @param adminAddresses Comma-separated admin server URLs
     * @param accessToken Authentication token
     * @param appname Application name for registration
     * @param address Executor address for callbacks
     */
    public void start(String adminAddresses, String accessToken, String appname, String address);
    
    /**
     * Stop registry thread and unregister from admin servers
     */
    public void toStop();
}

TriggerCallbackThread

Manages callback operations to admin servers after job execution completion.

/**
 * Thread for handling job execution callbacks to admin servers
 * Manages result reporting back to admin after job completion
 */
public class TriggerCallbackThread {
    
    /**
     * Get singleton instance of callback thread
     * @return TriggerCallbackThread instance
     */
    public static TriggerCallbackThread getInstance();
    
    /**
     * Push callback parameter for async processing
     * @param callback Job execution result to report to admin
     */
    public static void pushCallBack(HandleCallbackParam callback);
    
    /**
     * Start callback processing thread
     */
    public void start();
    
    /**
     * Stop callback thread
     */
    public void toStop();
}

JobLogFileCleanThread

Background thread for cleaning up old job log files based on retention policy.

/**
 * Background thread for log file cleanup
 * Removes old log files based on retention policy
 */
public class JobLogFileCleanThread {
    
    /**
     * Get singleton instance of log cleanup thread
     * @return JobLogFileCleanThread instance
     */
    public static JobLogFileCleanThread getInstance();
    
    /**
     * Start log cleanup thread with retention policy
     * @param logPath Base directory for log files
     * @param logRetentionDays Number of days to retain log files
     */
    public void start(String logPath, int logRetentionDays);
    
    /**
     * Stop log cleanup thread
     */
    public void toStop();
}

Thread Management Patterns

Executor Lifecycle Management

// Thread management during executor startup
public void startExecutor() {
    // 1. Start registry thread for heartbeat
    ExecutorRegistryThread.getInstance().start(
        adminAddresses, accessToken, appname, address
    );
    
    // 2. Start callback thread for result reporting
    TriggerCallbackThread.getInstance().start();
    
    // 3. Start log cleanup thread
    JobLogFileCleanThread.getInstance().start(logPath, logRetentionDays);
    
    // 4. Job threads are created on-demand when jobs are triggered
}

// Thread management during executor shutdown
public void stopExecutor() {
    // 1. Stop accepting new jobs
    // 2. Stop all job threads
    for (JobThread jobThread : activeJobThreads) {
        jobThread.toStop("Executor shutdown");
    }
    
    // 3. Stop infrastructure threads
    ExecutorRegistryThread.getInstance().toStop();
    TriggerCallbackThread.getInstance().toStop();
    JobLogFileCleanThread.getInstance().toStop();
}

Job Execution Flow

// Typical job execution flow involving threads
public void executeJobFlow(TriggerParam triggerParam) {
    int jobId = triggerParam.getJobId();
    IJobHandler handler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
    
    // 1. Get or create job thread
    JobThread jobThread = XxlJobExecutor.loadJobThread(jobId);
    if (jobThread == null) {
        jobThread = XxlJobExecutor.registJobThread(jobId, handler, "New job");
    }
    
    // 2. Queue job for execution
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    
    // Job executes asynchronously in JobThread
    // 3. After completion, callback is automatically pushed
    // TriggerCallbackThread handles reporting results back to admin
}

Thread Safety Considerations

// Thread-safe access to job context
@XxlJob("threadSafeJob")
public void threadSafeJobHandler() throws Exception {
    // Each job execution runs in its own JobThread
    // XxlJobHelper methods are thread-local and safe
    long jobId = XxlJobHelper.getJobId();
    String param = XxlJobHelper.getJobParam();
    
    // Thread-local logging is safe
    XxlJobHelper.log("Job {} executing in thread: {}", 
                     jobId, Thread.currentThread().getName());
    
    // Business logic here
    
    XxlJobHelper.handleSuccess();
}

Infrastructure Components

EmbedServer

Embedded HTTP server based on Netty for handling admin-to-executor communication.

/**
 * Embedded HTTP server for executor communication
 * Uses Netty for handling HTTP requests from admin servers
 */
public class EmbedServer {
    
    /**
     * Start embedded HTTP server on specified port
     * @param address Bind address for server
     * @param port Port number for server
     * @param appname Application name for identification
     * @param accessToken Authentication token
     * @throws Exception if server startup fails
     */
    public void start(String address, int port, String appname, String accessToken) throws Exception;
    
    /**
     * Stop embedded HTTP server
     * @throws Exception if server shutdown fails
     */
    public void stop() throws Exception;
    
    /**
     * Nested HTTP request handler
     */
    public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
        // Handles HTTP requests from admin servers
        // Processes beat, idleBeat, run, kill, log operations
    }
}

XxlJobRemotingUtil

HTTP communication utility for RPC operations between components.

/**
 * HTTP communication utilities for RPC operations
 * Handles request/response processing between admin and executor
 */
public class XxlJobRemotingUtil {
    
    /**
     * Access token header name for authentication
     */
    public static final String XXL_JOB_ACCESS_TOKEN = "XXL-JOB-ACCESS-TOKEN";
    
    /**
     * Send HTTP POST request with JSON body
     * @param url Target URL for request
     * @param accessToken Authentication token
     * @param timeout Request timeout in milliseconds
     * @param requestObj Object to serialize as JSON body
     * @param returnTargClassOfT Response class type
     * @return Deserialized response object
     */
    public static ReturnT postBody(String url, String accessToken, int timeout, 
                                   Object requestObj, Class returnTargClassOfT);
}

Usage Examples:

// HTTP communication example
public ReturnT<String> sendCallback(String adminAddress, List<HandleCallbackParam> callbacks) {
    String url = adminAddress + "/api/callback";
    
    ReturnT<String> response = XxlJobRemotingUtil.postBody(
        url,                    // Admin callback URL
        accessToken,            // Authentication token
        30000,                  // 30 second timeout
        callbacks,              // Request payload
        String.class            // Response type
    );
    
    return response;
}

Best Practices

Thread Resource Management

// Proper thread resource management
public class ExecutorResourceManager {
    
    public void configureThreadResources() {
        // 1. Set appropriate thread pool sizes
        System.setProperty("xxl.job.executor.thread.pool.size", "200");
        
        // 2. Configure timeout values
        System.setProperty("xxl.job.executor.timeout", "300000"); // 5 minutes
        
        // 3. Set log retention to prevent disk overflow
        System.setProperty("xxl.job.executor.log.retention.days", "7");
    }
    
    public void monitorThreadHealth() {
        // Monitor active job threads
        Map<Integer, JobThread> activeThreads = getActiveJobThreads();
        
        for (Map.Entry<Integer, JobThread> entry : activeThreads.entrySet()) {
            JobThread thread = entry.getValue();
            
            if (thread.isRunningOrHasQueue()) {
                System.out.println("Job " + entry.getKey() + " is active");
            }
            
            // Check for stuck threads
            if (isThreadStuck(thread)) {
                thread.toStop("Thread appears stuck");
            }
        }
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-com-xuxueli--xxl-job-core

docs

communication.md

configuration.md

context-helpers.md

executor.md

index.md

job-handlers.md

thread-management.md

tile.json