or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

communication.mdconfiguration.mdcontext-helpers.mdexecutor.mdindex.mdjob-handlers.mdthread-management.md

thread-management.mddocs/

0

# Thread Management and Infrastructure

1

2

Core thread management classes and infrastructure components for job execution, registration, and callback handling in XXL-Job Core.

3

4

## Capabilities

5

6

### JobThread Class

7

8

Core thread class that manages individual job execution with queuing and lifecycle management.

9

10

```java { .api }

11

/**

12

* Core job execution thread with queue management

13

* Extends Thread to handle asynchronous job execution

14

*/

15

public class JobThread extends Thread {

16

17

/**

18

* Push trigger parameter to job execution queue

19

* @param triggerParam Job execution parameters from admin

20

* @return Response indicating queue acceptance status

21

*/

22

public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam);

23

24

/**

25

* Request thread to stop execution

26

* @param stopReason Reason for stopping the thread

27

*/

28

public void toStop(String stopReason);

29

30

/**

31

* Check if thread is running or has queued jobs

32

* @return true if thread is active or has pending jobs

33

*/

34

public boolean isRunningOrHasQueue();

35

36

/**

37

* Get job handler associated with this thread

38

* @return IJobHandler instance

39

*/

40

public IJobHandler getHandler();

41

42

/**

43

* Check if thread is stopped

44

* @return true if thread has been stopped

45

*/

46

public boolean isStopped();

47

}

48

```

49

50

**Usage Examples:**

51

52

```java

53

// JobThread is primarily managed by XxlJobExecutor internally

54

// Direct usage is typically not required for most applications

55

56

// Example of how executor manages job threads:

57

public void manageJobThread(int jobId, IJobHandler handler) {

58

// Register job thread (done by executor automatically)

59

JobThread jobThread = XxlJobExecutor.registJobThread(jobId, handler, "Job registration");

60

61

// Check thread status

62

if (jobThread.isRunningOrHasQueue()) {

63

System.out.println("Job thread is active");

64

}

65

66

// Stop thread (typically done during shutdown)

67

jobThread.toStop("Application shutdown");

68

69

// Remove from executor

70

XxlJobExecutor.removeJobThread(jobId, "Cleanup");

71

}

72

```

73

74

### ExecutorRegistryThread

75

76

Singleton thread that manages executor registration and heartbeat with admin servers.

77

78

```java { .api }

79

/**

80

* Singleton thread for managing executor registration with admin servers

81

* Handles periodic heartbeat and re-registration

82

*/

83

public class ExecutorRegistryThread {

84

85

/**

86

* Get singleton instance of registry thread

87

* @return ExecutorRegistryThread instance

88

*/

89

public static ExecutorRegistryThread getInstance();

90

91

/**

92

* Start registry thread with admin addresses and application info

93

* @param adminAddresses Comma-separated admin server URLs

94

* @param accessToken Authentication token

95

* @param appname Application name for registration

96

* @param address Executor address for callbacks

97

*/

98

public void start(String adminAddresses, String accessToken, String appname, String address);

99

100

/**

101

* Stop registry thread and unregister from admin servers

102

*/

103

public void toStop();

104

}

105

```

106

107

### TriggerCallbackThread

108

109

Manages callback operations to admin servers after job execution completion.

110

111

```java { .api }

112

/**

113

* Thread for handling job execution callbacks to admin servers

114

* Manages result reporting back to admin after job completion

115

*/

116

public class TriggerCallbackThread {

117

118

/**

119

* Get singleton instance of callback thread

120

* @return TriggerCallbackThread instance

121

*/

122

public static TriggerCallbackThread getInstance();

123

124

/**

125

* Push callback parameter for async processing

126

* @param callback Job execution result to report to admin

127

*/

128

public static void pushCallBack(HandleCallbackParam callback);

129

130

/**

131

* Start callback processing thread

132

*/

133

public void start();

134

135

/**

136

* Stop callback thread

137

*/

138

public void toStop();

139

}

140

```

141

142

### JobLogFileCleanThread

143

144

Background thread for cleaning up old job log files based on retention policy.

145

146

```java { .api }

147

/**

148

* Background thread for log file cleanup

149

* Removes old log files based on retention policy

150

*/

151

public class JobLogFileCleanThread {

152

153

/**

154

* Get singleton instance of log cleanup thread

155

* @return JobLogFileCleanThread instance

156

*/

157

public static JobLogFileCleanThread getInstance();

158

159

/**

160

* Start log cleanup thread with retention policy

161

* @param logPath Base directory for log files

162

* @param logRetentionDays Number of days to retain log files

163

*/

164

public void start(String logPath, int logRetentionDays);

165

166

/**

167

* Stop log cleanup thread

168

*/

169

public void toStop();

170

}

171

```

172

173

## Thread Management Patterns

174

175

### Executor Lifecycle Management

176

177

```java

178

// Thread management during executor startup

179

public void startExecutor() {

180

// 1. Start registry thread for heartbeat

181

ExecutorRegistryThread.getInstance().start(

182

adminAddresses, accessToken, appname, address

183

);

184

185

// 2. Start callback thread for result reporting

186

TriggerCallbackThread.getInstance().start();

187

188

// 3. Start log cleanup thread

189

JobLogFileCleanThread.getInstance().start(logPath, logRetentionDays);

190

191

// 4. Job threads are created on-demand when jobs are triggered

192

}

193

194

// Thread management during executor shutdown

195

public void stopExecutor() {

196

// 1. Stop accepting new jobs

197

// 2. Stop all job threads

198

for (JobThread jobThread : activeJobThreads) {

199

jobThread.toStop("Executor shutdown");

200

}

201

202

// 3. Stop infrastructure threads

203

ExecutorRegistryThread.getInstance().toStop();

204

TriggerCallbackThread.getInstance().toStop();

205

JobLogFileCleanThread.getInstance().toStop();

206

}

207

```

208

209

### Job Execution Flow

210

211

```java

212

// Typical job execution flow involving threads

213

public void executeJobFlow(TriggerParam triggerParam) {

214

int jobId = triggerParam.getJobId();

215

IJobHandler handler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

216

217

// 1. Get or create job thread

218

JobThread jobThread = XxlJobExecutor.loadJobThread(jobId);

219

if (jobThread == null) {

220

jobThread = XxlJobExecutor.registJobThread(jobId, handler, "New job");

221

}

222

223

// 2. Queue job for execution

224

ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);

225

226

// Job executes asynchronously in JobThread

227

// 3. After completion, callback is automatically pushed

228

// TriggerCallbackThread handles reporting results back to admin

229

}

230

```

231

232

### Thread Safety Considerations

233

234

```java

235

// Thread-safe access to job context

236

@XxlJob("threadSafeJob")

237

public void threadSafeJobHandler() throws Exception {

238

// Each job execution runs in its own JobThread

239

// XxlJobHelper methods are thread-local and safe

240

long jobId = XxlJobHelper.getJobId();

241

String param = XxlJobHelper.getJobParam();

242

243

// Thread-local logging is safe

244

XxlJobHelper.log("Job {} executing in thread: {}",

245

jobId, Thread.currentThread().getName());

246

247

// Business logic here

248

249

XxlJobHelper.handleSuccess();

250

}

251

```

252

253

## Infrastructure Components

254

255

### EmbedServer

256

257

Embedded HTTP server based on Netty for handling admin-to-executor communication.

258

259

```java { .api }

260

/**

261

* Embedded HTTP server for executor communication

262

* Uses Netty for handling HTTP requests from admin servers

263

*/

264

public class EmbedServer {

265

266

/**

267

* Start embedded HTTP server on specified port

268

* @param address Bind address for server

269

* @param port Port number for server

270

* @param appname Application name for identification

271

* @param accessToken Authentication token

272

* @throws Exception if server startup fails

273

*/

274

public void start(String address, int port, String appname, String accessToken) throws Exception;

275

276

/**

277

* Stop embedded HTTP server

278

* @throws Exception if server shutdown fails

279

*/

280

public void stop() throws Exception;

281

282

/**

283

* Nested HTTP request handler

284

*/

285

public static class EmbedHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {

286

// Handles HTTP requests from admin servers

287

// Processes beat, idleBeat, run, kill, log operations

288

}

289

}

290

```

291

292

### XxlJobRemotingUtil

293

294

HTTP communication utility for RPC operations between components.

295

296

```java { .api }

297

/**

298

* HTTP communication utilities for RPC operations

299

* Handles request/response processing between admin and executor

300

*/

301

public class XxlJobRemotingUtil {

302

303

/**

304

* Access token header name for authentication

305

*/

306

public static final String XXL_JOB_ACCESS_TOKEN = "XXL-JOB-ACCESS-TOKEN";

307

308

/**

309

* Send HTTP POST request with JSON body

310

* @param url Target URL for request

311

* @param accessToken Authentication token

312

* @param timeout Request timeout in milliseconds

313

* @param requestObj Object to serialize as JSON body

314

* @param returnTargClassOfT Response class type

315

* @return Deserialized response object

316

*/

317

public static ReturnT postBody(String url, String accessToken, int timeout,

318

Object requestObj, Class returnTargClassOfT);

319

}

320

```

321

322

**Usage Examples:**

323

324

```java

325

// HTTP communication example

326

public ReturnT<String> sendCallback(String adminAddress, List<HandleCallbackParam> callbacks) {

327

String url = adminAddress + "/api/callback";

328

329

ReturnT<String> response = XxlJobRemotingUtil.postBody(

330

url, // Admin callback URL

331

accessToken, // Authentication token

332

30000, // 30 second timeout

333

callbacks, // Request payload

334

String.class // Response type

335

);

336

337

return response;

338

}

339

```

340

341

## Best Practices

342

343

### Thread Resource Management

344

345

```java

346

// Proper thread resource management

347

public class ExecutorResourceManager {

348

349

public void configureThreadResources() {

350

// 1. Set appropriate thread pool sizes

351

System.setProperty("xxl.job.executor.thread.pool.size", "200");

352

353

// 2. Configure timeout values

354

System.setProperty("xxl.job.executor.timeout", "300000"); // 5 minutes

355

356

// 3. Set log retention to prevent disk overflow

357

System.setProperty("xxl.job.executor.log.retention.days", "7");

358

}

359

360

public void monitorThreadHealth() {

361

// Monitor active job threads

362

Map<Integer, JobThread> activeThreads = getActiveJobThreads();

363

364

for (Map.Entry<Integer, JobThread> entry : activeThreads.entrySet()) {

365

JobThread thread = entry.getValue();

366

367

if (thread.isRunningOrHasQueue()) {

368

System.out.println("Job " + entry.getKey() + " is active");

369

}

370

371

// Check for stuck threads

372

if (isThreadStuck(thread)) {

373

thread.toStop("Thread appears stuck");

374

}

375

}

376

}

377

}

378

```