or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

cli-commands.mdconfiguration.mdindex.mdnode-management.mdrequest-routing.mdsecurity.mdsession-distribution.mdsession-queuing.mdsession-storage.md

session-queuing.mddocs/

0

# Session Queuing

1

2

The session queuing system manages incoming session creation requests when nodes are at capacity, providing fair scheduling, timeout handling, and priority-based request processing.

3

4

## Capabilities

5

6

### Core NewSessionQueue Interface

7

8

The main abstract class for managing session creation request queues.

9

10

```java { .api }

11

/**

12

* Abstract class for queuing new session requests when nodes are busy

13

*/

14

abstract class NewSessionQueue implements HasReadyState, Routable {

15

/** Protected constructor with tracer and registration secret */

16

protected NewSessionQueue(Tracer tracer, Secret registrationSecret);

17

18

/** Fast-path to detect if the queue is empty */

19

abstract boolean peekEmpty();

20

21

/** Add a session request to the queue */

22

abstract HttpResponse addToQueue(SessionRequest request);

23

24

/** Retry adding a request to the queue */

25

abstract boolean retryAddToQueue(SessionRequest request);

26

27

/** Remove a specific request from the queue */

28

abstract Optional<SessionRequest> remove(RequestId reqId);

29

30

/** Get requests that match available node stereotypes */

31

abstract List<SessionRequest> getNextAvailable(Map<Capabilities, Long> stereotypes);

32

33

/** Complete a request with success or failure result */

34

abstract boolean complete(RequestId reqId, Either<SessionNotCreatedException, CreateSessionResponse> result);

35

36

/** Clear all pending requests from the queue */

37

abstract int clearQueue();

38

39

/** Get current queue contents */

40

abstract List<SessionRequestCapability> getQueueContents();

41

}

42

```

43

44

### Local NewSessionQueue Implementation

45

46

In-memory queue implementation for single-process deployments.

47

48

```java { .api }

49

/**

50

* In-memory new session queue implementation

51

*/

52

class LocalNewSessionQueue extends NewSessionQueue {

53

/** Create a local session queue with event bus integration */

54

LocalNewSessionQueue(Tracer tracer, Duration requestTimeout, Duration retryPeriod);

55

56

/** Factory method to create from configuration */

57

static NewSessionQueue create(Config config);

58

59

boolean isReady();

60

boolean offerLast(SessionRequest request, RequestId requestId);

61

Optional<SessionRequest> poll(Duration timeout);

62

int clear();

63

List<SessionRequest> getNextAvailable(Map<Capabilities, Long> stereotypes);

64

65

/** Get current queue size */

66

int getQueueSize();

67

68

/** Get queue statistics */

69

QueueStatistics getStatistics();

70

}

71

```

72

73

**Usage Example:**

74

75

```java

76

// Create local session queue

77

NewSessionQueue sessionQueue = new LocalNewSessionQueue(

78

tracer,

79

Duration.ofMinutes(5), // request timeout

80

Duration.ofSeconds(5) // retry period

81

);

82

83

// Add session request to queue

84

SessionRequest request = new SessionRequest(

85

new RequestId(UUID.randomUUID()),

86

Instant.now().plus(Duration.ofMinutes(5)), // enqueued time + timeout

87

Set.of(W3C), // WebDriver dialects

88

new ImmutableCapabilities("browserName", "chrome")

89

);

90

91

boolean queued = sessionQueue.offerLast(request, request.getRequestId());

92

if (queued) {

93

System.out.println("Request queued: " + request.getRequestId());

94

}

95

96

// Poll for next request (used by distributor)

97

Optional<SessionRequest> next = sessionQueue.poll(Duration.ofSeconds(10));

98

if (next.isPresent()) {

99

System.out.println("Processing request: " + next.get().getRequestId());

100

}

101

```

102

103

### Remote NewSessionQueue Client

104

105

Client for accessing session queues running in remote processes.

106

107

```java { .api }

108

/**

109

* Remote session queue client for distributed deployments

110

*/

111

class RemoteNewSessionQueue extends NewSessionQueue {

112

RemoteNewSessionQueue(HttpClient.Factory httpClientFactory, URI queueUri);

113

114

// All operations implemented via HTTP calls

115

boolean isReady();

116

boolean offerLast(SessionRequest request, RequestId requestId);

117

Optional<SessionRequest> poll(Duration timeout);

118

int clear();

119

List<SessionRequest> getNextAvailable(Map<Capabilities, Long> stereotypes);

120

}

121

```

122

123

### Configuration Options

124

125

NewSessionQueue-specific configuration settings.

126

127

```java { .api }

128

/**

129

* Configuration options for session queue behavior

130

*/

131

class NewSessionQueueOptions {

132

static final String SESSION_QUEUE_SECTION = "sessionqueue";

133

134

/** Get session queue implementation class */

135

String getSessionQueueImplementation(Config config);

136

137

/** Get request timeout duration */

138

Duration getRequestTimeout(Config config);

139

140

/** Get retry period for polling requests */

141

Duration getRetryPeriod(Config config);

142

143

/** Get maximum queue size */

144

int getMaxQueueSize(Config config);

145

146

/** Get queue cleanup interval */

147

Duration getCleanupInterval(Config config);

148

}

149

```

150

151

## Queue Management

152

153

### Request Lifecycle

154

155

```java

156

// 1. Router receives new session request

157

@POST

158

@Path("/session")

159

public Response createSession(NewSessionPayload payload) {

160

SessionRequest sessionRequest = new SessionRequest(

161

new RequestId(UUID.randomUUID()),

162

Instant.now().plus(requestTimeout),

163

payload.getDownstreamDialects(),

164

payload.getDesiredCapabilities()

165

);

166

167

// 2. Try immediate session creation

168

Either<SessionNotCreatedException, CreateSessionResponse> result =

169

distributor.newSession(sessionRequest);

170

171

if (result.isRight()) {

172

// Session created immediately

173

return Response.ok(result.right()).build();

174

}

175

176

// 3. Queue the request if nodes are busy

177

boolean queued = sessionQueue.offerLast(sessionRequest, sessionRequest.getRequestId());

178

if (!queued) {

179

return Response.status(503).entity("Queue full").build();

180

}

181

182

// 4. Wait for session creation

183

return waitForSession(sessionRequest.getRequestId());

184

}

185

```

186

187

### Distributor Integration

188

189

```java

190

// Distributor processes queued requests

191

public class QueueProcessingDistributor extends LocalDistributor {

192

@Scheduled(fixedRate = 1000) // Every second

193

public void processQueue() {

194

// Get available node capabilities

195

Map<Capabilities, Long> availableSlots = getAvailableSlots();

196

197

// Get matching requests from queue

198

List<SessionRequest> availableRequests =

199

sessionQueue.getNextAvailable(availableSlots);

200

201

for (SessionRequest request : availableRequests) {

202

Either<SessionNotCreatedException, CreateSessionResponse> result =

203

newSession(request);

204

205

if (result.isRight()) {

206

// Notify waiting client

207

notifySessionCreated(request.getRequestId(), result.right());

208

} else {

209

// Check if request has expired

210

if (request.getEnqueued().isBefore(Instant.now().minus(requestTimeout))) {

211

notifySessionFailed(request.getRequestId(), "Request timeout");

212

} else {

213

// Put back in queue for retry

214

sessionQueue.offerLast(request, request.getRequestId());

215

}

216

}

217

}

218

}

219

}

220

```

221

222

### Request Timeout Handling

223

224

```java

225

// Cleanup expired requests

226

@Scheduled(fixedRate = 30000) // Every 30 seconds

227

public void cleanupExpiredRequests() {

228

Instant cutoff = Instant.now();

229

List<SessionRequest> expiredRequests = new ArrayList<>();

230

231

// Check all queued requests

232

SessionRequest request;

233

while ((request = sessionQueue.poll(Duration.ZERO)) != null) {

234

if (request.getEnqueued().isAfter(cutoff)) {

235

// Request still valid, put back in queue

236

sessionQueue.offerLast(request, request.getRequestId());

237

} else {

238

// Request expired

239

expiredRequests.add(request);

240

}

241

}

242

243

// Notify clients of expired requests

244

for (SessionRequest expired : expiredRequests) {

245

notifySessionFailed(expired.getRequestId(), "Request timeout");

246

}

247

}

248

```

249

250

## Queue Strategies

251

252

### Priority-Based Queuing

253

254

```java

255

// Custom queue with priority support

256

public class PrioritySessionQueue implements NewSessionQueue {

257

private final PriorityQueue<PrioritizedRequest> queue;

258

259

static class PrioritizedRequest implements Comparable<PrioritizedRequest> {

260

final SessionRequest request;

261

final int priority;

262

final Instant enqueued;

263

264

@Override

265

public int compareTo(PrioritizedRequest other) {

266

// Higher priority first, then FIFO for same priority

267

int priorityCompare = Integer.compare(other.priority, this.priority);

268

if (priorityCompare != 0) return priorityCompare;

269

return this.enqueued.compareTo(other.enqueued);

270

}

271

}

272

273

@Override

274

public boolean offerLast(SessionRequest request, RequestId requestId) {

275

int priority = extractPriority(request.getDesiredCapabilities());

276

return queue.offer(new PrioritizedRequest(request, priority, Instant.now()));

277

}

278

279

private int extractPriority(Capabilities caps) {

280

// Extract priority from capabilities or use default

281

return (Integer) caps.getCapability("se:priority", 0);

282

}

283

}

284

```

285

286

### Load-Based Queueing

287

288

```java

289

// Queue that considers current grid load

290

public class LoadAwareSessionQueue extends LocalNewSessionQueue {

291

@Override

292

public List<SessionRequest> getNextAvailable(Map<Capabilities, Long> stereotypes) {

293

List<SessionRequest> available = super.getNextAvailable(stereotypes);

294

295

// Sort by grid load - prefer requests for less loaded browser types

296

return available.stream()

297

.sorted((r1, r2) -> {

298

String browser1 = r1.getDesiredCapabilities().getBrowserName();

299

String browser2 = r2.getDesiredCapabilities().getBrowserName();

300

301

long load1 = getCurrentLoad(browser1);

302

long load2 = getCurrentLoad(browser2);

303

304

return Long.compare(load1, load2);

305

})

306

.collect(Collectors.toList());

307

}

308

309

private long getCurrentLoad(String browserName) {

310

// Calculate current load for browser type

311

return distributor.getStatus().getNodes().stream()

312

.mapToLong(node -> node.getSlots().stream()

313

.filter(slot -> slot.getStereotype().getBrowserName().equals(browserName))

314

.filter(slot -> slot.getSession() != null)

315

.count())

316

.sum();

317

}

318

}

319

```

320

321

## Monitoring and Metrics

322

323

```java

324

// Queue monitoring

325

public class QueueMetrics {

326

public int getQueueSize() {

327

return sessionQueue.getQueueSize();

328

}

329

330

public Duration getAverageWaitTime() {

331

// Track wait times for completed requests

332

return averageWaitTime;

333

}

334

335

public Map<String, Integer> getQueuedRequestsByBrowser() {

336

// Get breakdown of queued requests by browser

337

return queuedRequests.stream()

338

.collect(Collectors.groupingBy(

339

request -> request.getDesiredCapabilities().getBrowserName(),

340

Collectors.summingInt(request -> 1)

341

));

342

}

343

344

public int getExpiredRequestCount() {

345

return expiredRequestCount.get();

346

}

347

}

348

```

349

350

## Error Handling

351

352

```java

353

// Handle queue errors

354

try {

355

boolean queued = sessionQueue.offerLast(request, requestId);

356

if (!queued) {

357

// Queue full or request rejected

358

return Response.status(503)

359

.entity(Map.of("error", "Unable to queue request - queue may be full"))

360

.build();

361

}

362

} catch (Exception e) {

363

// Queue service unavailable

364

return Response.status(503)

365

.entity(Map.of("error", "Queue service unavailable: " + e.getMessage()))

366

.build();

367

}

368

369

// Handle polling errors

370

try {

371

Optional<SessionRequest> next = sessionQueue.poll(Duration.ofSeconds(5));

372

// Process request...

373

} catch (InterruptedException e) {

374

Thread.currentThread().interrupt();

375

throw new RuntimeException("Queue polling interrupted", e);

376

} catch (Exception e) {

377

// Log error and continue processing

378

log.warn("Error polling session queue", e);

379

}

380

```