0
# Forwarding Utilities
1
2
Base classes for creating decorators and extensions of core gRPC components using the forwarding pattern. These utilities allow developers to add custom behavior while preserving the original functionality.
3
4
## Capabilities
5
6
### Forwarding Load Balancer
7
8
Base class for decorating load balancers with additional functionality.
9
10
```java { .api }
11
/**
12
* Base class for load balancer decorators that forward calls to an underlying
13
* load balancer while allowing customization of specific behaviors.
14
*/
15
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771")
16
public abstract class ForwardingLoadBalancer extends LoadBalancer {
17
18
/**
19
* Returns the underlying load balancer to which calls are forwarded
20
* @return the delegate load balancer
21
*/
22
protected abstract LoadBalancer delegate();
23
24
/**
25
* Handles resolved addresses by forwarding to the delegate
26
* @param resolvedAddresses the resolved addresses from name resolution
27
*/
28
@Override
29
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses);
30
31
/**
32
* Handles name resolution errors by forwarding to the delegate
33
* @param error the name resolution error status
34
*/
35
@Override
36
public void handleNameResolutionError(Status error);
37
38
/**
39
* Handles subchannel state changes by forwarding to the delegate
40
* @param subchannel the subchannel that changed state
41
* @param stateInfo the new state information
42
* @deprecated Use {@link #handleResolvedAddresses} instead
43
*/
44
@Override
45
@Deprecated
46
public void handleSubchannelState(Subchannel subchannel, ConnectivityStateInfo stateInfo);
47
48
/**
49
* Shuts down the load balancer by forwarding to the delegate
50
*/
51
@Override
52
public void shutdown();
53
54
/**
55
* Checks if the load balancer can handle empty address lists
56
* @return true if empty address lists can be handled
57
*/
58
@Override
59
public boolean canHandleEmptyAddressListFromNameResolution();
60
61
/**
62
* Requests connection by forwarding to the delegate
63
*/
64
@Override
65
public void requestConnection();
66
67
/**
68
* Returns string representation including delegate information
69
* @return string representation for debugging
70
*/
71
@Override
72
public String toString();
73
}
74
```
75
76
**Usage Examples:**
77
78
```java
79
import io.grpc.util.ForwardingLoadBalancer;
80
import io.grpc.LoadBalancer;
81
import io.grpc.Status;
82
83
// Custom load balancer that adds logging
84
public class LoggingLoadBalancer extends ForwardingLoadBalancer {
85
private final LoadBalancer delegate;
86
private final Logger logger;
87
88
public LoggingLoadBalancer(LoadBalancer delegate) {
89
this.delegate = delegate;
90
this.logger = Logger.getLogger(LoggingLoadBalancer.class.getName());
91
}
92
93
@Override
94
protected LoadBalancer delegate() {
95
return delegate;
96
}
97
98
@Override
99
public void handleResolvedAddresses(ResolvedAddresses resolvedAddresses) {
100
logger.info("Handling resolved addresses: " + resolvedAddresses.getAddresses().size() + " endpoints");
101
super.handleResolvedAddresses(resolvedAddresses);
102
}
103
104
@Override
105
public void handleNameResolutionError(Status error) {
106
logger.warning("Name resolution error: " + error);
107
super.handleNameResolutionError(error);
108
}
109
}
110
111
// Load balancer that adds retry logic
112
public class RetryingLoadBalancer extends ForwardingLoadBalancer {
113
private final LoadBalancer delegate;
114
private final int maxRetries;
115
116
public RetryingLoadBalancer(LoadBalancer delegate, int maxRetries) {
117
this.delegate = delegate;
118
this.maxRetries = maxRetries;
119
}
120
121
@Override
122
protected LoadBalancer delegate() {
123
return delegate;
124
}
125
126
@Override
127
public void handleNameResolutionError(Status error) {
128
// Custom retry logic before forwarding
129
if (shouldRetry(error)) {
130
// Implement retry mechanism
131
scheduleRetry();
132
} else {
133
super.handleNameResolutionError(error);
134
}
135
}
136
}
137
```
138
139
### Forwarding Load Balancer Helper
140
141
Base class for decorating load balancer helpers with additional functionality.
142
143
```java { .api }
144
/**
145
* Base class for load balancer helper decorators that forward calls to an
146
* underlying helper while allowing customization of specific behaviors.
147
*/
148
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771")
149
public abstract class ForwardingLoadBalancerHelper extends LoadBalancer.Helper {
150
151
/**
152
* Returns the underlying helper to which calls are forwarded
153
* @return the delegate helper
154
*/
155
protected abstract Helper delegate();
156
157
/**
158
* Creates a subchannel by forwarding to the delegate
159
* @param args arguments for subchannel creation
160
* @return new Subchannel instance
161
*/
162
@Override
163
public Subchannel createSubchannel(CreateSubchannelArgs args);
164
165
/**
166
* Creates out-of-band channel by forwarding to the delegate
167
* @param eag equivalent address group for the channel
168
* @param authority authority for the channel
169
* @return new ManagedChannel instance
170
*/
171
@Override
172
public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority);
173
174
/**
175
* Creates out-of-band channel with multiple address groups
176
* @param eag list of equivalent address groups
177
* @param authority authority for the channel
178
* @return new ManagedChannel instance
179
*/
180
@Override
181
public ManagedChannel createOobChannel(List<EquivalentAddressGroup> eag, String authority);
182
183
/**
184
* Updates out-of-band channel addresses
185
* @param channel the channel to update
186
* @param eag new equivalent address group
187
*/
188
@Override
189
public void updateOobChannelAddresses(ManagedChannel channel, EquivalentAddressGroup eag);
190
191
/**
192
* Updates out-of-band channel addresses with multiple groups
193
* @param channel the channel to update
194
* @param eag list of new equivalent address groups
195
*/
196
@Override
197
public void updateOobChannelAddresses(ManagedChannel channel, List<EquivalentAddressGroup> eag);
198
199
/**
200
* Creates resolving out-of-band channel builder (deprecated)
201
* @param target target URI for name resolution
202
* @return ManagedChannelBuilder instance
203
* @deprecated Use {@link #createResolvingOobChannelBuilder(String, ChannelCredentials)} instead
204
*/
205
@Override
206
@Deprecated
207
public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target);
208
209
/**
210
* Creates resolving out-of-band channel builder with credentials
211
* @param target target URI for name resolution
212
* @param creds channel credentials
213
* @return ManagedChannelBuilder instance
214
*/
215
@Override
216
public ManagedChannelBuilder<?> createResolvingOobChannelBuilder(String target, ChannelCredentials creds);
217
218
/**
219
* Creates resolving out-of-band channel
220
* @param target target URI for name resolution
221
* @return new ManagedChannel instance
222
*/
223
@Override
224
public ManagedChannel createResolvingOobChannel(String target);
225
226
/**
227
* Updates the load balancing state
228
* @param newState new connectivity state
229
* @param newPicker new subchannel picker
230
*/
231
@Override
232
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker);
233
234
/**
235
* Refreshes name resolution by forwarding to the delegate
236
*/
237
@Override
238
public void refreshNameResolution();
239
240
/**
241
* Ignores refresh name resolution check (deprecated)
242
* @deprecated No longer used
243
*/
244
@Override
245
@Deprecated
246
public void ignoreRefreshNameResolutionCheck();
247
248
/**
249
* Gets the channel authority
250
* @return authority string
251
*/
252
@Override
253
public String getAuthority();
254
255
/**
256
* Gets the channel target
257
* @return target string
258
*/
259
@Override
260
public String getChannelTarget();
261
262
/**
263
* Gets the channel credentials
264
* @return ChannelCredentials instance
265
*/
266
@Override
267
public ChannelCredentials getChannelCredentials();
268
269
/**
270
* Gets unsafe channel credentials for testing
271
* @return ChannelCredentials instance
272
*/
273
@Override
274
public ChannelCredentials getUnsafeChannelCredentials();
275
276
/**
277
* Gets the synchronization context
278
* @return SynchronizationContext instance
279
*/
280
@Override
281
public SynchronizationContext getSynchronizationContext();
282
283
/**
284
* Gets the scheduled executor service
285
* @return ScheduledExecutorService instance
286
*/
287
@Override
288
public ScheduledExecutorService getScheduledExecutorService();
289
290
/**
291
* Gets the channel logger
292
* @return ChannelLogger instance
293
*/
294
@Override
295
public ChannelLogger getChannelLogger();
296
297
/**
298
* Gets the name resolver arguments
299
* @return NameResolver.Args instance
300
*/
301
@Override
302
public NameResolver.Args getNameResolverArgs();
303
304
/**
305
* Gets the name resolver registry
306
* @return NameResolverRegistry instance
307
*/
308
@Override
309
public NameResolverRegistry getNameResolverRegistry();
310
311
/**
312
* Gets the metric recorder
313
* @return MetricRecorder instance
314
*/
315
@Override
316
public MetricRecorder getMetricRecorder();
317
318
/**
319
* Returns string representation including delegate information
320
* @return string representation for debugging
321
*/
322
@Override
323
public String toString();
324
}
325
```
326
327
**Usage Examples:**
328
329
```java
330
import io.grpc.util.ForwardingLoadBalancerHelper;
331
import io.grpc.LoadBalancer.Helper;
332
import io.grpc.LoadBalancer.Subchannel;
333
import io.grpc.ConnectivityState;
334
import io.grpc.LoadBalancer.SubchannelPicker;
335
336
// Helper that adds metrics collection
337
public class MetricsCollectingHelper extends ForwardingLoadBalancerHelper {
338
private final Helper delegate;
339
private final MetricsCollector metrics;
340
341
public MetricsCollectingHelper(Helper delegate, MetricsCollector metrics) {
342
this.delegate = delegate;
343
this.metrics = metrics;
344
}
345
346
@Override
347
protected Helper delegate() {
348
return delegate;
349
}
350
351
@Override
352
public Subchannel createSubchannel(CreateSubchannelArgs args) {
353
metrics.incrementSubchannelCreations();
354
return super.createSubchannel(args);
355
}
356
357
@Override
358
public void updateBalancingState(ConnectivityState newState, SubchannelPicker newPicker) {
359
metrics.recordStateChange(newState);
360
super.updateBalancingState(newState, newPicker);
361
}
362
}
363
364
// Helper that adds connection pooling
365
public class PoolingHelper extends ForwardingLoadBalancerHelper {
366
private final Helper delegate;
367
private final Map<EquivalentAddressGroup, ManagedChannel> channelPool = new ConcurrentHashMap<>();
368
369
public PoolingHelper(Helper delegate) {
370
this.delegate = delegate;
371
}
372
373
@Override
374
protected Helper delegate() {
375
return delegate;
376
}
377
378
@Override
379
public ManagedChannel createOobChannel(EquivalentAddressGroup eag, String authority) {
380
return channelPool.computeIfAbsent(eag,
381
key -> super.createOobChannel(key, authority));
382
}
383
}
384
```
385
386
### Forwarding Subchannel
387
388
Base class for decorating subchannels with additional functionality.
389
390
```java { .api }
391
/**
392
* Base class for subchannel decorators that forward calls to an underlying
393
* subchannel while allowing customization of specific behaviors.
394
*/
395
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/1771")
396
public abstract class ForwardingSubchannel extends LoadBalancer.Subchannel {
397
398
/**
399
* Returns the underlying subchannel to which calls are forwarded
400
* @return the delegate subchannel
401
*/
402
protected abstract Subchannel delegate();
403
404
/**
405
* Starts the subchannel with a state listener
406
* @param listener the subchannel state listener
407
*/
408
@Override
409
public void start(SubchannelStateListener listener);
410
411
/**
412
* Shuts down the subchannel by forwarding to the delegate
413
*/
414
@Override
415
public void shutdown();
416
417
/**
418
* Requests connection by forwarding to the delegate
419
*/
420
@Override
421
public void requestConnection();
422
423
/**
424
* Gets all addresses by forwarding to the delegate
425
* @return list of equivalent address groups
426
*/
427
@Override
428
public List<EquivalentAddressGroup> getAllAddresses();
429
430
/**
431
* Gets attributes by forwarding to the delegate
432
* @return Attributes instance
433
*/
434
@Override
435
public Attributes getAttributes();
436
437
/**
438
* Converts subchannel to channel by forwarding to the delegate
439
* @return Channel instance
440
*/
441
@Override
442
public Channel asChannel();
443
444
/**
445
* Gets the channel logger by forwarding to the delegate
446
* @return ChannelLogger instance
447
*/
448
@Override
449
public ChannelLogger getChannelLogger();
450
451
/**
452
* Gets internal subchannel object by forwarding to the delegate
453
* @return internal subchannel object
454
*/
455
@Override
456
public Object getInternalSubchannel();
457
458
/**
459
* Updates addresses by forwarding to the delegate
460
* @param addrs new list of equivalent address groups
461
*/
462
@Override
463
public void updateAddresses(List<EquivalentAddressGroup> addrs);
464
465
/**
466
* Gets connected address attributes by forwarding to the delegate
467
* @return Attributes of the connected address
468
*/
469
@Override
470
public Attributes getConnectedAddressAttributes();
471
472
/**
473
* Returns string representation including delegate information
474
* @return string representation for debugging
475
*/
476
@Override
477
public String toString();
478
}
479
```
480
481
**Usage Examples:**
482
483
```java
484
import io.grpc.util.ForwardingSubchannel;
485
import io.grpc.LoadBalancer.Subchannel;
486
import io.grpc.LoadBalancer.SubchannelStateListener;
487
import io.grpc.ConnectivityStateInfo;
488
489
// Subchannel that adds connection retry logic
490
public class RetryingSubchannel extends ForwardingSubchannel {
491
private final Subchannel delegate;
492
private final RetryPolicy retryPolicy;
493
494
public RetryingSubchannel(Subchannel delegate, RetryPolicy retryPolicy) {
495
this.delegate = delegate;
496
this.retryPolicy = retryPolicy;
497
}
498
499
@Override
500
protected Subchannel delegate() {
501
return delegate;
502
}
503
504
@Override
505
public void start(SubchannelStateListener listener) {
506
super.start(new RetryingStateListener(listener, retryPolicy));
507
}
508
509
private static class RetryingStateListener implements SubchannelStateListener {
510
private final SubchannelStateListener delegate;
511
private final RetryPolicy retryPolicy;
512
513
RetryingStateListener(SubchannelStateListener delegate, RetryPolicy retryPolicy) {
514
this.delegate = delegate;
515
this.retryPolicy = retryPolicy;
516
}
517
518
@Override
519
public void onSubchannelState(ConnectivityStateInfo newState) {
520
if (newState.getState() == ConnectivityState.TRANSIENT_FAILURE && retryPolicy.shouldRetry()) {
521
// Schedule retry
522
scheduleRetry();
523
} else {
524
delegate.onSubchannelState(newState);
525
}
526
}
527
}
528
}
529
530
// Subchannel that adds health checking
531
public class HealthCheckingSubchannel extends ForwardingSubchannel {
532
private final Subchannel delegate;
533
private final HealthChecker healthChecker;
534
535
public HealthCheckingSubchannel(Subchannel delegate, HealthChecker healthChecker) {
536
this.delegate = delegate;
537
this.healthChecker = healthChecker;
538
}
539
540
@Override
541
protected Subchannel delegate() {
542
return delegate;
543
}
544
545
@Override
546
public void start(SubchannelStateListener listener) {
547
super.start(new HealthCheckingStateListener(listener, healthChecker));
548
}
549
}
550
```
551
552
### Forwarding Client Stream Tracer
553
554
Base class for decorating client stream tracers with additional functionality.
555
556
```java { .api }
557
/**
558
* Base class for client stream tracer decorators that forward calls to an
559
* underlying tracer while allowing customization of specific behaviors.
560
*/
561
@ExperimentalApi("https://github.com/grpc/grpc-java/issues/2861")
562
public abstract class ForwardingClientStreamTracer extends ClientStreamTracer {
563
564
/**
565
* Returns the underlying tracer to which calls are forwarded
566
* @return the delegate tracer
567
*/
568
protected abstract ClientStreamTracer delegate();
569
570
/**
571
* Called when stream is created, forwards to delegate
572
* @param transportAttrs transport attributes
573
* @param headers request headers
574
*/
575
@Override
576
public void streamCreated(Attributes transportAttrs, Metadata headers);
577
578
/**
579
* Creates pending stream by forwarding to the delegate
580
* @return PendingStream instance
581
*/
582
@Override
583
protected PendingStream createPendingStream();
584
585
/**
586
* Gets outbound headers by forwarding to the delegate
587
* @return request headers
588
*/
589
@Override
590
public Metadata outboundHeaders();
591
592
/**
593
* Called on inbound headers, forwards to delegate
594
*/
595
@Override
596
public void inboundHeaders();
597
598
/**
599
* Called on inbound headers with metadata, forwards to delegate
600
* @param headers response headers
601
*/
602
@Override
603
public void inboundHeaders(Metadata headers);
604
605
/**
606
* Called on inbound trailers, forwards to delegate
607
* @param trailers response trailers
608
*/
609
@Override
610
public void inboundTrailers(Metadata trailers);
611
612
/**
613
* Adds optional label by forwarding to delegate
614
* @param key label key
615
* @param value label value
616
*/
617
@Override
618
public void addOptionalLabel(String key, String value);
619
620
/**
621
* Called when stream is closed, forwards to delegate
622
* @param status final status of the stream
623
*/
624
@Override
625
public void streamClosed(Status status);
626
627
/**
628
* Called on outbound message, forwards to delegate
629
* @param seqNo message sequence number
630
*/
631
@Override
632
public void outboundMessage(int seqNo);
633
634
/**
635
* Called on inbound message, forwards to delegate
636
* @param seqNo message sequence number
637
*/
638
@Override
639
public void inboundMessage(int seqNo);
640
641
/**
642
* Called when outbound message is sent, forwards to delegate
643
* @param seqNo message sequence number
644
* @param optionalWireSize wire size in bytes (or -1 if unknown)
645
* @param optionalUncompressedSize uncompressed size in bytes (or -1 if unknown)
646
*/
647
@Override
648
public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize);
649
650
/**
651
* Called when inbound message is read, forwards to delegate
652
* @param seqNo message sequence number
653
* @param optionalWireSize wire size in bytes (or -1 if unknown)
654
* @param optionalUncompressedSize uncompressed size in bytes (or -1 if unknown)
655
*/
656
@Override
657
public void inboundMessageRead(int seqNo, long optionalWireSize, long optionalUncompressedSize);
658
659
/**
660
* Records outbound wire size, forwards to delegate
661
* @param bytes number of bytes
662
*/
663
@Override
664
public void outboundWireSize(long bytes);
665
666
/**
667
* Records outbound uncompressed size, forwards to delegate
668
* @param bytes number of bytes
669
*/
670
@Override
671
public void outboundUncompressedSize(long bytes);
672
673
/**
674
* Records inbound wire size, forwards to delegate
675
* @param bytes number of bytes
676
*/
677
@Override
678
public void inboundWireSize(long bytes);
679
680
/**
681
* Records inbound uncompressed size, forwards to delegate
682
* @param bytes number of bytes
683
*/
684
@Override
685
public void inboundUncompressedSize(long bytes);
686
687
/**
688
* Returns string representation including delegate information
689
* @return string representation for debugging
690
*/
691
@Override
692
public String toString();
693
}
694
```
695
696
**Usage Examples:**
697
698
```java
699
import io.grpc.util.ForwardingClientStreamTracer;
700
import io.grpc.ClientStreamTracer;
701
import io.grpc.Metadata;
702
import io.grpc.Status;
703
704
// Stream tracer that adds detailed logging
705
public class LoggingStreamTracer extends ForwardingClientStreamTracer {
706
private final ClientStreamTracer delegate;
707
private final Logger logger;
708
709
public LoggingStreamTracer(ClientStreamTracer delegate) {
710
this.delegate = delegate;
711
this.logger = Logger.getLogger(LoggingStreamTracer.class.getName());
712
}
713
714
@Override
715
protected ClientStreamTracer delegate() {
716
return delegate;
717
}
718
719
@Override
720
public void streamCreated(Attributes transportAttrs, Metadata headers) {
721
logger.info("Stream created with " + headers.keys().size() + " headers");
722
super.streamCreated(transportAttrs, headers);
723
}
724
725
@Override
726
public void streamClosed(Status status) {
727
logger.info("Stream closed with status: " + status.getCode());
728
super.streamClosed(status);
729
}
730
731
@Override
732
public void outboundMessageSent(int seqNo, long optionalWireSize, long optionalUncompressedSize) {
733
logger.fine("Outbound message " + seqNo + ", wire size: " + optionalWireSize);
734
super.outboundMessageSent(seqNo, optionalWireSize, optionalUncompressedSize);
735
}
736
}
737
738
// Stream tracer that collects metrics
739
public class MetricsStreamTracer extends ForwardingClientStreamTracer {
740
private final ClientStreamTracer delegate;
741
private final StreamMetrics metrics;
742
private long startTime;
743
744
public MetricsStreamTracer(ClientStreamTracer delegate, StreamMetrics metrics) {
745
this.delegate = delegate;
746
this.metrics = metrics;
747
}
748
749
@Override
750
protected ClientStreamTracer delegate() {
751
return delegate;
752
}
753
754
@Override
755
public void streamCreated(Attributes transportAttrs, Metadata headers) {
756
startTime = System.nanoTime();
757
metrics.incrementStreamCount();
758
super.streamCreated(transportAttrs, headers);
759
}
760
761
@Override
762
public void streamClosed(Status status) {
763
long duration = System.nanoTime() - startTime;
764
metrics.recordStreamDuration(duration);
765
metrics.recordStreamStatus(status.getCode());
766
super.streamClosed(status);
767
}
768
}
769
```
770
771
## Common Patterns
772
773
### Creating Custom Decorators
774
775
When creating custom forwarding implementations, follow these patterns:
776
777
1. **Always implement the abstract `delegate()` method**
778
2. **Call `super.methodName()` to forward to the delegate**
779
3. **Add custom logic before or after the forwarded call**
780
4. **Handle exceptions appropriately**
781
5. **Override `toString()` for better debugging**
782
783
### Error Handling
784
785
Forwarding classes typically preserve the error handling behavior of their delegates. Custom implementations should:
786
787
- Catch and handle exceptions appropriately
788
- Forward errors to delegates when appropriate
789
- Log errors for debugging purposes
790
- Not suppress critical errors
791
792
### Resource Management
793
794
When creating forwarding implementations that hold resources:
795
796
- Implement proper cleanup in shutdown methods
797
- Forward shutdown calls to delegates
798
- Close any additional resources opened by the forwarding implementation
799
- Use try-with-resources or similar patterns for automatic cleanup