0
# Session Queuing
1
2
The session queuing system manages incoming session creation requests when nodes are at capacity, providing fair scheduling, timeout handling, and priority-based request processing.
3
4
## Capabilities
5
6
### Core NewSessionQueue Interface
7
8
The main abstract class for managing session creation request queues.
9
10
```java { .api }
11
/**
12
* Abstract class for queuing new session requests when nodes are busy
13
*/
14
abstract class NewSessionQueue implements HasReadyState, Routable {
15
/** Protected constructor with tracer and registration secret */
16
protected NewSessionQueue(Tracer tracer, Secret registrationSecret);
17
18
/** Fast-path to detect if the queue is empty */
19
abstract boolean peekEmpty();
20
21
/** Add a session request to the queue */
22
abstract HttpResponse addToQueue(SessionRequest request);
23
24
/** Retry adding a request to the queue */
25
abstract boolean retryAddToQueue(SessionRequest request);
26
27
/** Remove a specific request from the queue */
28
abstract Optional<SessionRequest> remove(RequestId reqId);
29
30
/** Get requests that match available node stereotypes */
31
abstract List<SessionRequest> getNextAvailable(Map<Capabilities, Long> stereotypes);
32
33
/** Complete a request with success or failure result */
34
abstract boolean complete(RequestId reqId, Either<SessionNotCreatedException, CreateSessionResponse> result);
35
36
/** Clear all pending requests from the queue */
37
abstract int clearQueue();
38
39
/** Get current queue contents */
40
abstract List<SessionRequestCapability> getQueueContents();
41
}
42
```
43
44
### Local NewSessionQueue Implementation
45
46
In-memory queue implementation for single-process deployments.
47
48
```java { .api }
49
/**
50
* In-memory new session queue implementation
51
*/
52
class LocalNewSessionQueue extends NewSessionQueue {
53
/** Create a local session queue with event bus integration */
54
LocalNewSessionQueue(Tracer tracer, Duration requestTimeout, Duration retryPeriod);
55
56
/** Factory method to create from configuration */
57
static NewSessionQueue create(Config config);
58
59
boolean isReady();
60
boolean offerLast(SessionRequest request, RequestId requestId);
61
Optional<SessionRequest> poll(Duration timeout);
62
int clear();
63
List<SessionRequest> getNextAvailable(Map<Capabilities, Long> stereotypes);
64
65
/** Get current queue size */
66
int getQueueSize();
67
68
/** Get queue statistics */
69
QueueStatistics getStatistics();
70
}
71
```
72
73
**Usage Example:**
74
75
```java
76
// Create local session queue
77
NewSessionQueue sessionQueue = new LocalNewSessionQueue(
78
tracer,
79
Duration.ofMinutes(5), // request timeout
80
Duration.ofSeconds(5) // retry period
81
);
82
83
// Add session request to queue
84
SessionRequest request = new SessionRequest(
85
new RequestId(UUID.randomUUID()),
86
Instant.now().plus(Duration.ofMinutes(5)), // enqueued time + timeout
87
Set.of(W3C), // WebDriver dialects
88
new ImmutableCapabilities("browserName", "chrome")
89
);
90
91
boolean queued = sessionQueue.offerLast(request, request.getRequestId());
92
if (queued) {
93
System.out.println("Request queued: " + request.getRequestId());
94
}
95
96
// Poll for next request (used by distributor)
97
Optional<SessionRequest> next = sessionQueue.poll(Duration.ofSeconds(10));
98
if (next.isPresent()) {
99
System.out.println("Processing request: " + next.get().getRequestId());
100
}
101
```
102
103
### Remote NewSessionQueue Client
104
105
Client for accessing session queues running in remote processes.
106
107
```java { .api }
108
/**
109
* Remote session queue client for distributed deployments
110
*/
111
class RemoteNewSessionQueue extends NewSessionQueue {
112
RemoteNewSessionQueue(HttpClient.Factory httpClientFactory, URI queueUri);
113
114
// All operations implemented via HTTP calls
115
boolean isReady();
116
boolean offerLast(SessionRequest request, RequestId requestId);
117
Optional<SessionRequest> poll(Duration timeout);
118
int clear();
119
List<SessionRequest> getNextAvailable(Map<Capabilities, Long> stereotypes);
120
}
121
```
122
123
### Configuration Options
124
125
NewSessionQueue-specific configuration settings.
126
127
```java { .api }
128
/**
129
* Configuration options for session queue behavior
130
*/
131
class NewSessionQueueOptions {
132
static final String SESSION_QUEUE_SECTION = "sessionqueue";
133
134
/** Get session queue implementation class */
135
String getSessionQueueImplementation(Config config);
136
137
/** Get request timeout duration */
138
Duration getRequestTimeout(Config config);
139
140
/** Get retry period for polling requests */
141
Duration getRetryPeriod(Config config);
142
143
/** Get maximum queue size */
144
int getMaxQueueSize(Config config);
145
146
/** Get queue cleanup interval */
147
Duration getCleanupInterval(Config config);
148
}
149
```
150
151
## Queue Management
152
153
### Request Lifecycle
154
155
```java
156
// 1. Router receives new session request
157
@POST
158
@Path("/session")
159
public Response createSession(NewSessionPayload payload) {
160
SessionRequest sessionRequest = new SessionRequest(
161
new RequestId(UUID.randomUUID()),
162
Instant.now().plus(requestTimeout),
163
payload.getDownstreamDialects(),
164
payload.getDesiredCapabilities()
165
);
166
167
// 2. Try immediate session creation
168
Either<SessionNotCreatedException, CreateSessionResponse> result =
169
distributor.newSession(sessionRequest);
170
171
if (result.isRight()) {
172
// Session created immediately
173
return Response.ok(result.right()).build();
174
}
175
176
// 3. Queue the request if nodes are busy
177
boolean queued = sessionQueue.offerLast(sessionRequest, sessionRequest.getRequestId());
178
if (!queued) {
179
return Response.status(503).entity("Queue full").build();
180
}
181
182
// 4. Wait for session creation
183
return waitForSession(sessionRequest.getRequestId());
184
}
185
```
186
187
### Distributor Integration
188
189
```java
190
// Distributor processes queued requests
191
public class QueueProcessingDistributor extends LocalDistributor {
192
@Scheduled(fixedRate = 1000) // Every second
193
public void processQueue() {
194
// Get available node capabilities
195
Map<Capabilities, Long> availableSlots = getAvailableSlots();
196
197
// Get matching requests from queue
198
List<SessionRequest> availableRequests =
199
sessionQueue.getNextAvailable(availableSlots);
200
201
for (SessionRequest request : availableRequests) {
202
Either<SessionNotCreatedException, CreateSessionResponse> result =
203
newSession(request);
204
205
if (result.isRight()) {
206
// Notify waiting client
207
notifySessionCreated(request.getRequestId(), result.right());
208
} else {
209
// Check if request has expired
210
if (request.getEnqueued().isBefore(Instant.now().minus(requestTimeout))) {
211
notifySessionFailed(request.getRequestId(), "Request timeout");
212
} else {
213
// Put back in queue for retry
214
sessionQueue.offerLast(request, request.getRequestId());
215
}
216
}
217
}
218
}
219
}
220
```
221
222
### Request Timeout Handling
223
224
```java
225
// Cleanup expired requests
226
@Scheduled(fixedRate = 30000) // Every 30 seconds
227
public void cleanupExpiredRequests() {
228
Instant cutoff = Instant.now();
229
List<SessionRequest> expiredRequests = new ArrayList<>();
230
231
// Check all queued requests
232
SessionRequest request;
233
while ((request = sessionQueue.poll(Duration.ZERO)) != null) {
234
if (request.getEnqueued().isAfter(cutoff)) {
235
// Request still valid, put back in queue
236
sessionQueue.offerLast(request, request.getRequestId());
237
} else {
238
// Request expired
239
expiredRequests.add(request);
240
}
241
}
242
243
// Notify clients of expired requests
244
for (SessionRequest expired : expiredRequests) {
245
notifySessionFailed(expired.getRequestId(), "Request timeout");
246
}
247
}
248
```
249
250
## Queue Strategies
251
252
### Priority-Based Queuing
253
254
```java
255
// Custom queue with priority support
256
public class PrioritySessionQueue implements NewSessionQueue {
257
private final PriorityQueue<PrioritizedRequest> queue;
258
259
static class PrioritizedRequest implements Comparable<PrioritizedRequest> {
260
final SessionRequest request;
261
final int priority;
262
final Instant enqueued;
263
264
@Override
265
public int compareTo(PrioritizedRequest other) {
266
// Higher priority first, then FIFO for same priority
267
int priorityCompare = Integer.compare(other.priority, this.priority);
268
if (priorityCompare != 0) return priorityCompare;
269
return this.enqueued.compareTo(other.enqueued);
270
}
271
}
272
273
@Override
274
public boolean offerLast(SessionRequest request, RequestId requestId) {
275
int priority = extractPriority(request.getDesiredCapabilities());
276
return queue.offer(new PrioritizedRequest(request, priority, Instant.now()));
277
}
278
279
private int extractPriority(Capabilities caps) {
280
// Extract priority from capabilities or use default
281
return (Integer) caps.getCapability("se:priority", 0);
282
}
283
}
284
```
285
286
### Load-Based Queueing
287
288
```java
289
// Queue that considers current grid load
290
public class LoadAwareSessionQueue extends LocalNewSessionQueue {
291
@Override
292
public List<SessionRequest> getNextAvailable(Map<Capabilities, Long> stereotypes) {
293
List<SessionRequest> available = super.getNextAvailable(stereotypes);
294
295
// Sort by grid load - prefer requests for less loaded browser types
296
return available.stream()
297
.sorted((r1, r2) -> {
298
String browser1 = r1.getDesiredCapabilities().getBrowserName();
299
String browser2 = r2.getDesiredCapabilities().getBrowserName();
300
301
long load1 = getCurrentLoad(browser1);
302
long load2 = getCurrentLoad(browser2);
303
304
return Long.compare(load1, load2);
305
})
306
.collect(Collectors.toList());
307
}
308
309
private long getCurrentLoad(String browserName) {
310
// Calculate current load for browser type
311
return distributor.getStatus().getNodes().stream()
312
.mapToLong(node -> node.getSlots().stream()
313
.filter(slot -> slot.getStereotype().getBrowserName().equals(browserName))
314
.filter(slot -> slot.getSession() != null)
315
.count())
316
.sum();
317
}
318
}
319
```
320
321
## Monitoring and Metrics
322
323
```java
324
// Queue monitoring
325
public class QueueMetrics {
326
public int getQueueSize() {
327
return sessionQueue.getQueueSize();
328
}
329
330
public Duration getAverageWaitTime() {
331
// Track wait times for completed requests
332
return averageWaitTime;
333
}
334
335
public Map<String, Integer> getQueuedRequestsByBrowser() {
336
// Get breakdown of queued requests by browser
337
return queuedRequests.stream()
338
.collect(Collectors.groupingBy(
339
request -> request.getDesiredCapabilities().getBrowserName(),
340
Collectors.summingInt(request -> 1)
341
));
342
}
343
344
public int getExpiredRequestCount() {
345
return expiredRequestCount.get();
346
}
347
}
348
```
349
350
## Error Handling
351
352
```java
353
// Handle queue errors
354
try {
355
boolean queued = sessionQueue.offerLast(request, requestId);
356
if (!queued) {
357
// Queue full or request rejected
358
return Response.status(503)
359
.entity(Map.of("error", "Unable to queue request - queue may be full"))
360
.build();
361
}
362
} catch (Exception e) {
363
// Queue service unavailable
364
return Response.status(503)
365
.entity(Map.of("error", "Queue service unavailable: " + e.getMessage()))
366
.build();
367
}
368
369
// Handle polling errors
370
try {
371
Optional<SessionRequest> next = sessionQueue.poll(Duration.ofSeconds(5));
372
// Process request...
373
} catch (InterruptedException e) {
374
Thread.currentThread().interrupt();
375
throw new RuntimeException("Queue polling interrupted", e);
376
} catch (Exception e) {
377
// Log error and continue processing
378
log.warn("Error polling session queue", e);
379
}
380
```