A distributed task scheduling framework core library for Java applications
—
Core thread management classes and infrastructure components for job execution, registration, and callback handling in XXL-Job Core.
Core thread class that manages individual job execution with queuing and lifecycle management.
/**
* Core job execution thread with queue management
* Extends Thread to handle asynchronous job execution
*/
public class JobThread extends Thread {
/**
* Push trigger parameter to job execution queue
* @param triggerParam Job execution parameters from admin
* @return Response indicating queue acceptance status
*/
public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam);
/**
* Request thread to stop execution
* @param stopReason Reason for stopping the thread
*/
public void toStop(String stopReason);
/**
* Check if thread is running or has queued jobs
* @return true if thread is active or has pending jobs
*/
public boolean isRunningOrHasQueue();
/**
* Get job handler associated with this thread
* @return IJobHandler instance
*/
public IJobHandler getHandler();
/**
* Check if thread is stopped
* @return true if thread has been stopped
*/
public boolean isStopped();
}Usage Examples:
// JobThread is primarily managed by XxlJobExecutor internally
// Direct usage is typically not required for most applications
// Example of how executor manages job threads:
public void manageJobThread(int jobId, IJobHandler handler) {
// Register job thread (done by executor automatically)
JobThread jobThread = XxlJobExecutor.registJobThread(jobId, handler, "Job registration");
// Check thread status
if (jobThread.isRunningOrHasQueue()) {
System.out.println("Job thread is active");
}
// Stop thread (typically done during shutdown)
jobThread.toStop("Application shutdown");
// Remove from executor
XxlJobExecutor.removeJobThread(jobId, "Cleanup");
}Singleton thread that manages executor registration and heartbeat with admin servers.
/**
* Singleton thread for managing executor registration with admin servers
* Handles periodic heartbeat and re-registration
*/
public class ExecutorRegistryThread {
/**
* Get singleton instance of registry thread
* @return ExecutorRegistryThread instance
*/
public static ExecutorRegistryThread getInstance();
/**
* Start registry thread with admin addresses and application info
* @param adminAddresses Comma-separated admin server URLs
* @param accessToken Authentication token
* @param appname Application name for registration
* @param address Executor address for callbacks
*/
public void start(String adminAddresses, String accessToken, String appname, String address);
/**
* Stop registry thread and unregister from admin servers
*/
public void toStop();
}Manages callback operations to admin servers after job execution completion.
/**
* Thread for handling job execution callbacks to admin servers
* Manages result reporting back to admin after job completion
*/
public class TriggerCallbackThread {
/**
* Get singleton instance of callback thread
* @return TriggerCallbackThread instance
*/
public static TriggerCallbackThread getInstance();
/**
* Push callback parameter for async processing
* @param callback Job execution result to report to admin
*/
public static void pushCallBack(HandleCallbackParam callback);
/**
* Start callback processing thread
*/
public void start();
/**
* Stop callback thread
*/
public void toStop();
}Background thread for cleaning up old job log files based on retention policy.
/**
* Background thread for log file cleanup
* Removes old log files based on retention policy
*/
public class JobLogFileCleanThread {
/**
* Get singleton instance of log cleanup thread
* @return JobLogFileCleanThread instance
*/
public static JobLogFileCleanThread getInstance();
/**
* Start log cleanup thread with retention policy
* @param logPath Base directory for log files
* @param logRetentionDays Number of days to retain log files
*/
public void start(String logPath, int logRetentionDays);
/**
* Stop log cleanup thread
*/
public void toStop();
}// Thread management during executor startup
public void startExecutor() {
// 1. Start registry thread for heartbeat
ExecutorRegistryThread.getInstance().start(
adminAddresses, accessToken, appname, address
);
// 2. Start callback thread for result reporting
TriggerCallbackThread.getInstance().start();
// 3. Start log cleanup thread
JobLogFileCleanThread.getInstance().start(logPath, logRetentionDays);
// 4. Job threads are created on-demand when jobs are triggered
}
// Thread management during executor shutdown
public void stopExecutor() {
// 1. Stop accepting new jobs
// 2. Stop all job threads
for (JobThread jobThread : activeJobThreads) {
jobThread.toStop("Executor shutdown");
}
// 3. Stop infrastructure threads
ExecutorRegistryThread.getInstance().toStop();
TriggerCallbackThread.getInstance().toStop();
JobLogFileCleanThread.getInstance().toStop();
}// Typical job execution flow involving threads
public void executeJobFlow(TriggerParam triggerParam) {
int jobId = triggerParam.getJobId();
IJobHandler handler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
// 1. Get or create job thread
JobThread jobThread = XxlJobExecutor.loadJobThread(jobId);
if (jobThread == null) {
jobThread = XxlJobExecutor.registJobThread(jobId, handler, "New job");
}
// 2. Queue job for execution
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
// Job executes asynchronously in JobThread
// 3. After completion, callback is automatically pushed
// TriggerCallbackThread handles reporting results back to admin
}// Thread-safe access to job context
@XxlJob("threadSafeJob")
public void threadSafeJobHandler() throws Exception {
// Each job execution runs in its own JobThread
// XxlJobHelper methods are thread-local and safe
long jobId = XxlJobHelper.getJobId();
String param = XxlJobHelper.getJobParam();
// Thread-local logging is safe
XxlJobHelper.log("Job {} executing in thread: {}",
jobId, Thread.currentThread().getName());
// Business logic here
XxlJobHelper.handleSuccess();
}Embedded HTTP server based on Netty for handling admin-to-executor communication.
/**
* Embedded HTTP server for executor communication
* Uses Netty for handling HTTP requests from admin servers
*/
public class EmbedServer {
/**
* Start embedded HTTP server on specified port
* @param address Bind address for server
* @param port Port number for server
* @param appname Application name for identification
* @param accessToken Authentication token
* @throws Exception if server startup fails
*/
public void start(String address, int port, String appname, String accessToken) throws Exception;
/**
* Stop embedded HTTP server
* @throws Exception if server shutdown fails
*/
public void stop() throws Exception;
/**
* Nested HTTP request handler
*/
public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
// Handles HTTP requests from admin servers
// Processes beat, idleBeat, run, kill, log operations
}
}HTTP communication utility for RPC operations between components.
/**
* HTTP communication utilities for RPC operations
* Handles request/response processing between admin and executor
*/
public class XxlJobRemotingUtil {
/**
* Access token header name for authentication
*/
public static final String XXL_JOB_ACCESS_TOKEN = "XXL-JOB-ACCESS-TOKEN";
/**
* Send HTTP POST request with JSON body
* @param url Target URL for request
* @param accessToken Authentication token
* @param timeout Request timeout in milliseconds
* @param requestObj Object to serialize as JSON body
* @param returnTargClassOfT Response class type
* @return Deserialized response object
*/
public static ReturnT postBody(String url, String accessToken, int timeout,
Object requestObj, Class returnTargClassOfT);
}Usage Examples:
// HTTP communication example
public ReturnT<String> sendCallback(String adminAddress, List<HandleCallbackParam> callbacks) {
String url = adminAddress + "/api/callback";
ReturnT<String> response = XxlJobRemotingUtil.postBody(
url, // Admin callback URL
accessToken, // Authentication token
30000, // 30 second timeout
callbacks, // Request payload
String.class // Response type
);
return response;
}// Proper thread resource management
public class ExecutorResourceManager {
public void configureThreadResources() {
// 1. Set appropriate thread pool sizes
System.setProperty("xxl.job.executor.thread.pool.size", "200");
// 2. Configure timeout values
System.setProperty("xxl.job.executor.timeout", "300000"); // 5 minutes
// 3. Set log retention to prevent disk overflow
System.setProperty("xxl.job.executor.log.retention.days", "7");
}
public void monitorThreadHealth() {
// Monitor active job threads
Map<Integer, JobThread> activeThreads = getActiveJobThreads();
for (Map.Entry<Integer, JobThread> entry : activeThreads.entrySet()) {
JobThread thread = entry.getValue();
if (thread.isRunningOrHasQueue()) {
System.out.println("Job " + entry.getKey() + " is active");
}
// Check for stuck threads
if (isThreadStuck(thread)) {
thread.toStop("Thread appears stuck");
}
}
}
}Install with Tessl CLI
npx tessl i tessl/maven-com-xuxueli--xxl-job-core