0
# Thread Management and Infrastructure
1
2
Core thread management classes and infrastructure components for job execution, registration, and callback handling in XXL-Job Core.
3
4
## Capabilities
5
6
### JobThread Class
7
8
Core thread class that manages individual job execution with queuing and lifecycle management.
9
10
```java { .api }
11
/**
12
* Core job execution thread with queue management
13
* Extends Thread to handle asynchronous job execution
14
*/
15
public class JobThread extends Thread {
16
17
/**
18
* Push trigger parameter to job execution queue
19
* @param triggerParam Job execution parameters from admin
20
* @return Response indicating queue acceptance status
21
*/
22
public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam);
23
24
/**
25
* Request thread to stop execution
26
* @param stopReason Reason for stopping the thread
27
*/
28
public void toStop(String stopReason);
29
30
/**
31
* Check if thread is running or has queued jobs
32
* @return true if thread is active or has pending jobs
33
*/
34
public boolean isRunningOrHasQueue();
35
36
/**
37
* Get job handler associated with this thread
38
* @return IJobHandler instance
39
*/
40
public IJobHandler getHandler();
41
42
/**
43
* Check if thread is stopped
44
* @return true if thread has been stopped
45
*/
46
public boolean isStopped();
47
}
48
```
49
50
**Usage Examples:**
51
52
```java
53
// JobThread is primarily managed by XxlJobExecutor internally
54
// Direct usage is typically not required for most applications
55
56
// Example of how executor manages job threads:
57
public void manageJobThread(int jobId, IJobHandler handler) {
58
// Register job thread (done by executor automatically)
59
JobThread jobThread = XxlJobExecutor.registJobThread(jobId, handler, "Job registration");
60
61
// Check thread status
62
if (jobThread.isRunningOrHasQueue()) {
63
System.out.println("Job thread is active");
64
}
65
66
// Stop thread (typically done during shutdown)
67
jobThread.toStop("Application shutdown");
68
69
// Remove from executor
70
XxlJobExecutor.removeJobThread(jobId, "Cleanup");
71
}
72
```
73
74
### ExecutorRegistryThread
75
76
Singleton thread that manages executor registration and heartbeat with admin servers.
77
78
```java { .api }
79
/**
80
* Singleton thread for managing executor registration with admin servers
81
* Handles periodic heartbeat and re-registration
82
*/
83
public class ExecutorRegistryThread {
84
85
/**
86
* Get singleton instance of registry thread
87
* @return ExecutorRegistryThread instance
88
*/
89
public static ExecutorRegistryThread getInstance();
90
91
/**
92
* Start registry thread with admin addresses and application info
93
* @param adminAddresses Comma-separated admin server URLs
94
* @param accessToken Authentication token
95
* @param appname Application name for registration
96
* @param address Executor address for callbacks
97
*/
98
public void start(String adminAddresses, String accessToken, String appname, String address);
99
100
/**
101
* Stop registry thread and unregister from admin servers
102
*/
103
public void toStop();
104
}
105
```
106
107
### TriggerCallbackThread
108
109
Manages callback operations to admin servers after job execution completion.
110
111
```java { .api }
112
/**
113
* Thread for handling job execution callbacks to admin servers
114
* Manages result reporting back to admin after job completion
115
*/
116
public class TriggerCallbackThread {
117
118
/**
119
* Get singleton instance of callback thread
120
* @return TriggerCallbackThread instance
121
*/
122
public static TriggerCallbackThread getInstance();
123
124
/**
125
* Push callback parameter for async processing
126
* @param callback Job execution result to report to admin
127
*/
128
public static void pushCallBack(HandleCallbackParam callback);
129
130
/**
131
* Start callback processing thread
132
*/
133
public void start();
134
135
/**
136
* Stop callback thread
137
*/
138
public void toStop();
139
}
140
```
141
142
### JobLogFileCleanThread
143
144
Background thread for cleaning up old job log files based on retention policy.
145
146
```java { .api }
147
/**
148
* Background thread for log file cleanup
149
* Removes old log files based on retention policy
150
*/
151
public class JobLogFileCleanThread {
152
153
/**
154
* Get singleton instance of log cleanup thread
155
* @return JobLogFileCleanThread instance
156
*/
157
public static JobLogFileCleanThread getInstance();
158
159
/**
160
* Start log cleanup thread with retention policy
161
* @param logPath Base directory for log files
162
* @param logRetentionDays Number of days to retain log files
163
*/
164
public void start(String logPath, int logRetentionDays);
165
166
/**
167
* Stop log cleanup thread
168
*/
169
public void toStop();
170
}
171
```
172
173
## Thread Management Patterns
174
175
### Executor Lifecycle Management
176
177
```java
178
// Thread management during executor startup
179
public void startExecutor() {
180
// 1. Start registry thread for heartbeat
181
ExecutorRegistryThread.getInstance().start(
182
adminAddresses, accessToken, appname, address
183
);
184
185
// 2. Start callback thread for result reporting
186
TriggerCallbackThread.getInstance().start();
187
188
// 3. Start log cleanup thread
189
JobLogFileCleanThread.getInstance().start(logPath, logRetentionDays);
190
191
// 4. Job threads are created on-demand when jobs are triggered
192
}
193
194
// Thread management during executor shutdown
195
public void stopExecutor() {
196
// 1. Stop accepting new jobs
197
// 2. Stop all job threads
198
for (JobThread jobThread : activeJobThreads) {
199
jobThread.toStop("Executor shutdown");
200
}
201
202
// 3. Stop infrastructure threads
203
ExecutorRegistryThread.getInstance().toStop();
204
TriggerCallbackThread.getInstance().toStop();
205
JobLogFileCleanThread.getInstance().toStop();
206
}
207
```
208
209
### Job Execution Flow
210
211
```java
212
// Typical job execution flow involving threads
213
public void executeJobFlow(TriggerParam triggerParam) {
214
int jobId = triggerParam.getJobId();
215
IJobHandler handler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
216
217
// 1. Get or create job thread
218
JobThread jobThread = XxlJobExecutor.loadJobThread(jobId);
219
if (jobThread == null) {
220
jobThread = XxlJobExecutor.registJobThread(jobId, handler, "New job");
221
}
222
223
// 2. Queue job for execution
224
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
225
226
// Job executes asynchronously in JobThread
227
// 3. After completion, callback is automatically pushed
228
// TriggerCallbackThread handles reporting results back to admin
229
}
230
```
231
232
### Thread Safety Considerations
233
234
```java
235
// Thread-safe access to job context
236
@XxlJob("threadSafeJob")
237
public void threadSafeJobHandler() throws Exception {
238
// Each job execution runs in its own JobThread
239
// XxlJobHelper methods are thread-local and safe
240
long jobId = XxlJobHelper.getJobId();
241
String param = XxlJobHelper.getJobParam();
242
243
// Thread-local logging is safe
244
XxlJobHelper.log("Job {} executing in thread: {}",
245
jobId, Thread.currentThread().getName());
246
247
// Business logic here
248
249
XxlJobHelper.handleSuccess();
250
}
251
```
252
253
## Infrastructure Components
254
255
### EmbedServer
256
257
Embedded HTTP server based on Netty for handling admin-to-executor communication.
258
259
```java { .api }
260
/**
261
* Embedded HTTP server for executor communication
262
* Uses Netty for handling HTTP requests from admin servers
263
*/
264
public class EmbedServer {
265
266
/**
267
* Start embedded HTTP server on specified port
268
* @param address Bind address for server
269
* @param port Port number for server
270
* @param appname Application name for identification
271
* @param accessToken Authentication token
272
* @throws Exception if server startup fails
273
*/
274
public void start(String address, int port, String appname, String accessToken) throws Exception;
275
276
/**
277
* Stop embedded HTTP server
278
* @throws Exception if server shutdown fails
279
*/
280
public void stop() throws Exception;
281
282
/**
283
* Nested HTTP request handler
284
*/
285
public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
286
// Handles HTTP requests from admin servers
287
// Processes beat, idleBeat, run, kill, log operations
288
}
289
}
290
```
291
292
### XxlJobRemotingUtil
293
294
HTTP communication utility for RPC operations between components.
295
296
```java { .api }
297
/**
298
* HTTP communication utilities for RPC operations
299
* Handles request/response processing between admin and executor
300
*/
301
public class XxlJobRemotingUtil {
302
303
/**
304
* Access token header name for authentication
305
*/
306
public static final String XXL_JOB_ACCESS_TOKEN = "XXL-JOB-ACCESS-TOKEN";
307
308
/**
309
* Send HTTP POST request with JSON body
310
* @param url Target URL for request
311
* @param accessToken Authentication token
312
* @param timeout Request timeout in milliseconds
313
* @param requestObj Object to serialize as JSON body
314
* @param returnTargClassOfT Response class type
315
* @return Deserialized response object
316
*/
317
public static ReturnT postBody(String url, String accessToken, int timeout,
318
Object requestObj, Class returnTargClassOfT);
319
}
320
```
321
322
**Usage Examples:**
323
324
```java
325
// HTTP communication example
326
public ReturnT<String> sendCallback(String adminAddress, List<HandleCallbackParam> callbacks) {
327
String url = adminAddress + "/api/callback";
328
329
ReturnT<String> response = XxlJobRemotingUtil.postBody(
330
url, // Admin callback URL
331
accessToken, // Authentication token
332
30000, // 30 second timeout
333
callbacks, // Request payload
334
String.class // Response type
335
);
336
337
return response;
338
}
339
```
340
341
## Best Practices
342
343
### Thread Resource Management
344
345
```java
346
// Proper thread resource management
347
public class ExecutorResourceManager {
348
349
public void configureThreadResources() {
350
// 1. Set appropriate thread pool sizes
351
System.setProperty("xxl.job.executor.thread.pool.size", "200");
352
353
// 2. Configure timeout values
354
System.setProperty("xxl.job.executor.timeout", "300000"); // 5 minutes
355
356
// 3. Set log retention to prevent disk overflow
357
System.setProperty("xxl.job.executor.log.retention.days", "7");
358
}
359
360
public void monitorThreadHealth() {
361
// Monitor active job threads
362
Map<Integer, JobThread> activeThreads = getActiveJobThreads();
363
364
for (Map.Entry<Integer, JobThread> entry : activeThreads.entrySet()) {
365
JobThread thread = entry.getValue();
366
367
if (thread.isRunningOrHasQueue()) {
368
System.out.println("Job " + entry.getKey() + " is active");
369
}
370
371
// Check for stuck threads
372
if (isThreadStuck(thread)) {
373
thread.toStop("Thread appears stuck");
374
}
375
}
376
}
377
}
378
```