0
# Load Balancing Utilities
1
2
Client-side health checking utilities for load balancers and connection management. Enables intelligent routing decisions based on server health and performance metrics.
3
4
## Capabilities
5
6
### HealthCheckingLoadBalancerUtil
7
8
Utility for enabling client-side health checking for LoadBalancers, allowing load balancers to automatically route traffic away from unhealthy servers.
9
10
```java { .api }
11
/**
12
* Utility for enabling client-side health checking for LoadBalancers.
13
* Wraps existing load balancer factories with health checking capabilities.
14
*/
15
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/5025")
16
public final class HealthCheckingLoadBalancerUtil {
17
18
/**
19
* Creates a health-checking-capable LoadBalancer
20
* @param factory The underlying load balancer factory to wrap
21
* @param helper The load balancer helper for managing subchannels
22
* @return LoadBalancer instance with health checking enabled
23
*/
24
public static LoadBalancer newHealthCheckingLoadBalancer(
25
LoadBalancer.Factory factory,
26
Helper helper
27
);
28
}
29
```
30
31
**Usage Examples:**
32
33
```java
34
import io.grpc.LoadBalancer;
35
import io.grpc.ManagedChannelBuilder;
36
import io.grpc.protobuf.services.HealthCheckingLoadBalancerUtil;
37
import io.grpc.util.RoundRobinLoadBalancerFactory;
38
39
// Create a health-checking round-robin load balancer
40
public class HealthAwareClient {
41
private final ManagedChannel channel;
42
43
public HealthAwareClient(String target) {
44
// Create channel with health-checking load balancer
45
this.channel = ManagedChannelBuilder.forTarget(target)
46
.defaultLoadBalancingPolicy("health_checking_round_robin")
47
.usePlaintext()
48
.build();
49
}
50
51
// Alternative: Programmatic load balancer configuration
52
public HealthAwareClient(String target, LoadBalancer.Factory baseFactory) {
53
LoadBalancerRegistry registry = LoadBalancerRegistry.getDefaultRegistry();
54
55
// Register health-checking variant
56
registry.register(new LoadBalancer.Factory() {
57
@Override
58
public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
59
return HealthCheckingLoadBalancerUtil.newHealthCheckingLoadBalancer(
60
baseFactory, helper);
61
}
62
63
@Override
64
public String getPolicyName() {
65
return "health_checking_custom";
66
}
67
});
68
69
this.channel = ManagedChannelBuilder.forTarget(target)
70
.defaultLoadBalancingPolicy("health_checking_custom")
71
.usePlaintext()
72
.build();
73
}
74
}
75
```
76
77
## Integration with Service Discovery
78
79
Health checking load balancers work seamlessly with service discovery systems:
80
81
### Consul Integration Example
82
83
```java
84
import io.grpc.ManagedChannel;
85
import io.grpc.ManagedChannelBuilder;
86
import io.grpc.protobuf.services.HealthCheckingLoadBalancerUtil;
87
88
public class ConsulAwareHealthCheckingClient {
89
private final ManagedChannel channel;
90
91
public ConsulAwareHealthCheckingClient() {
92
// Service discovery target that resolves to multiple endpoints
93
String consulTarget = "consul://user-service";
94
95
this.channel = ManagedChannelBuilder.forTarget(consulTarget)
96
// Health checking will verify each discovered endpoint
97
.defaultLoadBalancingPolicy("health_checking_round_robin")
98
.usePlaintext()
99
.build();
100
}
101
102
public void makeRequest() {
103
UserServiceGrpc.UserServiceBlockingStub stub =
104
UserServiceGrpc.newBlockingStub(channel);
105
106
try {
107
// Request will be routed only to healthy servers
108
GetUserResponse response = stub.getUser(
109
GetUserRequest.newBuilder()
110
.setUserId(12345)
111
.build()
112
);
113
114
System.out.println("User retrieved: " + response.getUser().getName());
115
116
} catch (StatusRuntimeException e) {
117
System.err.println("Request failed: " + e.getStatus());
118
}
119
}
120
}
121
```
122
123
### Kubernetes Service Integration
124
125
```java
126
public class KubernetesHealthCheckingClient {
127
private final ManagedChannel channel;
128
129
public KubernetesHealthCheckingClient(String serviceName, String namespace) {
130
// Kubernetes DNS target
131
String kubernetesTarget = String.format("dns:///%s.%s.svc.cluster.local:8080",
132
serviceName, namespace);
133
134
this.channel = ManagedChannelBuilder.forTarget(kubernetesTarget)
135
.defaultLoadBalancingPolicy("health_checking_round_robin")
136
// Use keep-alive for better health detection
137
.keepAliveTime(30, TimeUnit.SECONDS)
138
.keepAliveTimeout(5, TimeUnit.SECONDS)
139
.keepAliveWithoutCalls(true)
140
.usePlaintext()
141
.build();
142
}
143
144
public void shutdown() {
145
channel.shutdown();
146
try {
147
if (!channel.awaitTermination(60, TimeUnit.SECONDS)) {
148
channel.shutdownNow();
149
}
150
} catch (InterruptedException e) {
151
channel.shutdownNow();
152
Thread.currentThread().interrupt();
153
}
154
}
155
}
156
```
157
158
## Advanced Health Checking Patterns
159
160
### Custom Health Check Configuration
161
162
```java
163
import io.grpc.ChannelLogger;
164
import io.grpc.ConnectivityState;
165
import io.grpc.LoadBalancer;
166
167
public class AdvancedHealthCheckingClient {
168
169
public static class CustomHealthCheckingFactory extends LoadBalancer.Factory {
170
private final LoadBalancer.Factory delegate;
171
private final String healthServiceName;
172
173
public CustomHealthCheckingFactory(LoadBalancer.Factory delegate,
174
String healthServiceName) {
175
this.delegate = delegate;
176
this.healthServiceName = healthServiceName;
177
}
178
179
@Override
180
public LoadBalancer newLoadBalancer(LoadBalancer.Helper helper) {
181
// Custom helper that configures health checking
182
LoadBalancer.Helper healthCheckingHelper = new LoadBalancer.Helper() {
183
@Override
184
public ManagedChannel createSubchannel(
185
CreateSubchannelArgs args) {
186
187
// Configure health checking for each subchannel
188
CreateSubchannelArgs.Builder argsBuilder = args.toBuilder();
189
190
// Add health check configuration
191
Map<String, Object> healthCheckConfig = new HashMap<>();
192
healthCheckConfig.put("serviceName", healthServiceName);
193
194
// Apply health check configuration to subchannel
195
return helper.createSubchannel(argsBuilder.build());
196
}
197
198
// Delegate other methods to original helper
199
@Override
200
public void updateBalancingState(ConnectivityState newState,
201
SubchannelPicker newPicker) {
202
helper.updateBalancingState(newState, newPicker);
203
}
204
205
@Override
206
public ChannelLogger getChannelLogger() {
207
return helper.getChannelLogger();
208
}
209
210
// ... other delegated methods
211
};
212
213
return HealthCheckingLoadBalancerUtil.newHealthCheckingLoadBalancer(
214
delegate, healthCheckingHelper);
215
}
216
217
@Override
218
public String getPolicyName() {
219
return "custom_health_checking";
220
}
221
}
222
223
public static ManagedChannel createHealthCheckingChannel(String target,
224
String serviceName) {
225
LoadBalancer.Factory baseFactory = new RoundRobinLoadBalancerFactory();
226
LoadBalancer.Factory healthCheckingFactory =
227
new CustomHealthCheckingFactory(baseFactory, serviceName);
228
229
LoadBalancerRegistry.getDefaultRegistry()
230
.register(healthCheckingFactory);
231
232
return ManagedChannelBuilder.forTarget(target)
233
.defaultLoadBalancingPolicy("custom_health_checking")
234
.usePlaintext()
235
.build();
236
}
237
}
238
```
239
240
### Health Check Monitoring
241
242
```java
243
public class HealthCheckMonitor {
244
private final ManagedChannel channel;
245
private final ScheduledExecutorService scheduler;
246
247
public HealthCheckMonitor(ManagedChannel channel) {
248
this.channel = channel;
249
this.scheduler = Executors.newScheduledThreadPool(1);
250
}
251
252
public void startMonitoring() {
253
scheduler.scheduleAtFixedRate(this::checkChannelHealth, 0, 10, TimeUnit.SECONDS);
254
}
255
256
private void checkChannelHealth() {
257
ConnectivityState state = channel.getState(false);
258
259
switch (state) {
260
case READY:
261
System.out.println("Channel is healthy and ready");
262
break;
263
case TRANSIENT_FAILURE:
264
System.out.println("Channel experiencing transient failure");
265
break;
266
case SHUTDOWN:
267
System.out.println("Channel is shutdown");
268
break;
269
case IDLE:
270
System.out.println("Channel is idle");
271
// Trigger connection attempt
272
channel.getState(true);
273
break;
274
case CONNECTING:
275
System.out.println("Channel is connecting");
276
break;
277
}
278
}
279
280
public void shutdown() {
281
scheduler.shutdown();
282
}
283
}
284
```
285
286
## Integration with Metrics
287
288
Health checking load balancers can be combined with metrics collection for comprehensive observability:
289
290
```java
291
public class MetricsAwareHealthCheckingClient {
292
private final ManagedChannel channel;
293
private final CallMetricRecorder metricsRecorder;
294
295
public MetricsAwareHealthCheckingClient(String target) {
296
this.channel = ManagedChannelBuilder.forTarget(target)
297
.defaultLoadBalancingPolicy("health_checking_round_robin")
298
// Interceptor to record client-side metrics
299
.intercept(new ClientMetricsInterceptor())
300
.usePlaintext()
301
.build();
302
}
303
304
private static class ClientMetricsInterceptor implements ClientInterceptor {
305
@Override
306
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
307
MethodDescriptor<ReqT, RespT> method,
308
CallOptions callOptions,
309
Channel next) {
310
311
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) {
312
private long startTime = System.nanoTime();
313
314
@Override
315
public void start(Listener<RespT> responseListener, Metadata headers) {
316
super.start(new SimpleForwardingClientCallListener<RespT>(responseListener) {
317
@Override
318
public void onClose(Status status, Metadata trailers) {
319
long duration = System.nanoTime() - startTime;
320
double durationMs = duration / 1_000_000.0;
321
322
// Record client-side metrics
323
if (status.isOk()) {
324
System.out.println("Successful call duration: " + durationMs + "ms");
325
} else {
326
System.out.println("Failed call duration: " + durationMs + "ms, " +
327
"Status: " + status.getCode());
328
}
329
330
super.onClose(status, trailers);
331
}
332
}, headers);
333
}
334
};
335
}
336
}
337
338
public void makeHealthyRequest() {
339
UserServiceGrpc.UserServiceBlockingStub stub =
340
UserServiceGrpc.newBlockingStub(channel);
341
342
try {
343
// This request will only go to healthy servers
344
// thanks to the health checking load balancer
345
GetUserResponse response = stub.getUser(
346
GetUserRequest.newBuilder().setUserId(123).build());
347
348
System.out.println("Request successful: " + response.getUser().getName());
349
350
} catch (StatusRuntimeException e) {
351
System.err.println("Request failed even with health checking: " +
352
e.getStatus().getCode());
353
}
354
}
355
}
356
```
357
358
## Best Practices
359
360
### Connection Management
361
362
```java
363
public class RobustHealthCheckingClient {
364
private final ManagedChannel channel;
365
366
public RobustHealthCheckingClient(String target) {
367
this.channel = ManagedChannelBuilder.forTarget(target)
368
.defaultLoadBalancingPolicy("health_checking_round_robin")
369
370
// Configure connection parameters for better health detection
371
.keepAliveTime(30, TimeUnit.SECONDS)
372
.keepAliveTimeout(5, TimeUnit.SECONDS)
373
.keepAliveWithoutCalls(true)
374
375
// Configure retry behavior
376
.enableRetry()
377
.maxRetryAttempts(3)
378
379
.usePlaintext()
380
.build();
381
}
382
383
public void gracefulShutdown() {
384
System.out.println("Initiating graceful shutdown...");
385
386
channel.shutdown();
387
388
try {
389
// Wait for ongoing calls to complete
390
if (!channel.awaitTermination(60, TimeUnit.SECONDS)) {
391
System.out.println("Forcing shutdown...");
392
channel.shutdownNow();
393
394
// Final wait for forced shutdown
395
if (!channel.awaitTermination(10, TimeUnit.SECONDS)) {
396
System.err.println("Channel did not terminate cleanly");
397
}
398
}
399
} catch (InterruptedException e) {
400
System.out.println("Shutdown interrupted, forcing immediate shutdown");
401
channel.shutdownNow();
402
Thread.currentThread().interrupt();
403
}
404
}
405
}
406
```