Default cluster client implementation for Sentinel cluster flow control
—
The codec system provides request and response encoding/decoding for efficient network serialization with extensible registries for custom data types and protocols.
Registry system for managing request data encoders that serialize different types of cluster requests into ByteBuf format for network transmission.
/**
* Registry for request data writers that encode requests for network transmission
*/
public final class RequestDataWriterRegistry {
/**
* Register a writer for a specific request type
* @param type request type constant (e.g., TYPE_PING, TYPE_FLOW)
* @param writer entity writer implementation for this type
* @return true if writer was registered, false if type already has a writer
*/
public static <T> boolean addWriter(int type, EntityWriter<T, ByteBuf> writer);
/**
* Get writer for a specific request type
* @param type request type constant
* @return writer for the type, or null if no writer registered
*/
public static EntityWriter<Object, ByteBuf> getWriter(int type);
/**
* Remove writer for a specific request type
* @param type request type constant to remove
* @return true if writer was removed, false if no writer existed
*/
public static boolean remove(int type);
}Usage Examples:
import com.alibaba.csp.sentinel.cluster.client.codec.registry.RequestDataWriterRegistry;
import com.alibaba.csp.sentinel.cluster.client.ClientConstants;
import com.alibaba.csp.sentinel.cluster.codec.EntityWriter;
import io.netty.buffer.ByteBuf;
// Register a custom writer for a new request type
EntityWriter<CustomRequestData, ByteBuf> customWriter = new CustomRequestDataWriter();
boolean registered = RequestDataWriterRegistry.addWriter(100, customWriter);
if (registered) {
System.out.println("Custom writer registered");
} else {
System.out.println("Writer for type 100 already exists");
}
// Get writer for existing type
EntityWriter<Object, ByteBuf> flowWriter = RequestDataWriterRegistry.getWriter(ClientConstants.TYPE_FLOW);
if (flowWriter != null) {
// Use writer to encode flow request data
}
// Remove custom writer
RequestDataWriterRegistry.remove(100);Registry system for managing response data decoders that deserialize network responses from ByteBuf format into Java objects.
/**
* Registry for response data decoders that decode responses from network transmission
*/
public final class ResponseDataDecodeRegistry {
/**
* Register a decoder for a specific response type
* @param type response type constant (e.g., TYPE_PING, TYPE_FLOW)
* @param decoder entity decoder implementation for this type
* @return true if decoder was registered, false if type already has a decoder
*/
public static boolean addDecoder(int type, EntityDecoder<ByteBuf, ?> decoder);
/**
* Get decoder for a specific response type
* @param type response type constant
* @return decoder for the type, or null if no decoder registered
*/
public static EntityDecoder<ByteBuf, Object> getDecoder(int type);
/**
* Remove decoder for a specific response type
* @param type response type constant to remove
* @return true if decoder was removed, false if no decoder existed
*/
public static boolean removeDecoder(int type);
}Usage Examples:
import com.alibaba.csp.sentinel.cluster.client.codec.registry.ResponseDataDecodeRegistry;
import com.alibaba.csp.sentinel.cluster.codec.EntityDecoder;
import io.netty.buffer.ByteBuf;
// Register custom decoder
EntityDecoder<ByteBuf, CustomResponseData> customDecoder = new CustomResponseDataDecoder();
boolean registered = ResponseDataDecodeRegistry.addDecoder(200, customDecoder);
// Get decoder for ping responses
EntityDecoder<ByteBuf, Object> pingDecoder = ResponseDataDecodeRegistry.getDecoder(ClientConstants.TYPE_PING);
if (pingDecoder != null) {
// Use decoder to parse ping response
}
// Remove custom decoder
ResponseDataDecodeRegistry.removeDecoder(200);The system includes several built-in writers for standard request types that are automatically registered during initialization.
Flow Request Writer:
ClientConstants.TYPE_FLOWFlowRequestData containing flow rule ID, token count, and priorityParameterized Flow Request Writer:
ClientConstants.TYPE_PARAM_FLOWParamFlowRequestData containing flow rule ID, token count, and parametersPing Request Writer:
ClientConstants.TYPE_PINGWriter Usage Examples:
import com.alibaba.csp.sentinel.cluster.request.data.FlowRequestData;
import com.alibaba.csp.sentinel.cluster.request.data.ParamFlowRequestData;
import java.util.Arrays;
// Flow request data
FlowRequestData flowData = new FlowRequestData()
.setFlowId(12345L)
.setCount(1)
.setPriority(false);
// Parameterized flow request data
ParamFlowRequestData paramData = new ParamFlowRequestData()
.setFlowId(67890L)
.setCount(2)
.setParams(Arrays.asList("user123", "premium"));
// These data objects are automatically encoded by registered writersThe system includes built-in decoders for standard response types.
Flow Response Decoder:
ClientConstants.TYPE_FLOW, ClientConstants.TYPE_PARAM_FLOWFlowTokenResponseData containing remaining count and wait timePing Response Decoder:
ClientConstants.TYPE_PINGThe codec system integrates with Netty pipeline handlers for seamless request/response processing.
/**
* Netty encoder for outgoing cluster requests
*/
public class NettyRequestEncoder extends MessageToByteEncoder<ClusterRequest> {
// Automatically encodes ClusterRequest objects using registered writers
}
/**
* Netty decoder for incoming cluster responses
*/
public class NettyResponseDecoder extends ByteToMessageDecoder {
// Automatically decodes network data into ClusterResponse objects using registered decoders
}Pipeline Integration:
// The codec system is automatically integrated into the Netty pipeline:
// 1. LengthFieldBasedFrameDecoder - handles message framing
// 2. NettyResponseDecoder - decodes responses using registry
// 3. LengthFieldPrepender - adds length headers
// 4. NettyRequestEncoder - encodes requests using registry
// 5. TokenClientHandler - handles request/response correlationThe codec system uses an entity provider pattern for accessing writers and decoders through SPI loading.
/**
* Provider for entity codecs using SPI loading mechanism
*/
public final class ClientEntityCodecProvider {
/**
* Get the configured request entity writer
* @return request entity writer instance, or null if not configured
*/
public static RequestEntityWriter getRequestEntityWriter();
/**
* Get the configured response entity decoder
* @return response entity decoder instance, or null if not configured
*/
public static ResponseEntityDecoder getResponseEntityDecoder();
}Usage Examples:
import com.alibaba.csp.sentinel.cluster.client.codec.ClientEntityCodecProvider;
import com.alibaba.csp.sentinel.cluster.codec.request.RequestEntityWriter;
import com.alibaba.csp.sentinel.cluster.codec.response.ResponseEntityDecoder;
// Get configured codecs
RequestEntityWriter writer = ClientEntityCodecProvider.getRequestEntityWriter();
ResponseEntityDecoder decoder = ClientEntityCodecProvider.getResponseEntityDecoder();
if (writer != null) {
System.out.println("Request writer available: " + writer.getClass().getName());
} else {
System.out.println("No request writer configured via SPI");
}You can implement custom codecs for new request/response types.
Custom Request Writer:
import com.alibaba.csp.sentinel.cluster.codec.EntityWriter;
import io.netty.buffer.ByteBuf;
public class CustomRequestDataWriter implements EntityWriter<CustomRequestData, ByteBuf> {
@Override
public void writeTo(ByteBuf target, CustomRequestData source) throws Exception {
// Write custom data fields to ByteBuf
target.writeInt(source.getId());
target.writeLong(source.getTimestamp());
byte[] nameBytes = source.getName().getBytes("UTF-8");
target.writeInt(nameBytes.length);
target.writeBytes(nameBytes);
}
}
// Register the custom writer
RequestDataWriterRegistry.addWriter(CUSTOM_TYPE, new CustomRequestDataWriter());Custom Response Decoder:
import com.alibaba.csp.sentinel.cluster.codec.EntityDecoder;
import io.netty.buffer.ByteBuf;
public class CustomResponseDataDecoder implements EntityDecoder<ByteBuf, CustomResponseData> {
@Override
public CustomResponseData decode(ByteBuf source) throws Exception {
// Read custom data fields from ByteBuf
int id = source.readInt();
long timestamp = source.readLong();
int nameLength = source.readInt();
byte[] nameBytes = new byte[nameLength];
source.readBytes(nameBytes);
String name = new String(nameBytes, "UTF-8");
return new CustomResponseData(id, timestamp, name);
}
}
// Register the custom decoder
ResponseDataDecodeRegistry.addDecoder(CUSTOM_TYPE, new CustomResponseDataDecoder());The codec system is automatically initialized through the DefaultClusterClientInitFunc class during Sentinel startup.
/**
* Initialization function for default cluster client components
* Automatically registers default codecs for built-in request/response types
*/
@InitOrder(0)
public class DefaultClusterClientInitFunc implements InitFunc {
/**
* Initialize default entity writers and decoders
* @throws Exception if initialization fails
*/
public void init() throws Exception;
}Automatic Registration Process:
// The initialization process automatically registers:
// Request writers
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PING, new PingRequestDataWriter());
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_FLOW, new FlowRequestDataWriter());
// Parameterized flow writer with optional size limit
Integer maxParamByteSize = ClusterClientStartUpConfig.getMaxParamByteSize();
if (maxParamByteSize == null) {
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PARAM_FLOW, new ParamFlowRequestDataWriter());
} else {
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PARAM_FLOW, new ParamFlowRequestDataWriter(maxParamByteSize));
}
// Response decoders
ResponseDataDecodeRegistry.addDecoder(ClientConstants.TYPE_PING, new PingResponseDataDecoder());
ResponseDataDecodeRegistry.addDecoder(ClientConstants.TYPE_FLOW, new FlowResponseDataDecoder());
ResponseDataDecodeRegistry.addDecoder(ClientConstants.TYPE_PARAM_FLOW, new FlowResponseDataDecoder());The codec system supports configuration parameters for customization.
Parameter Serialization Limits:
import com.alibaba.csp.sentinel.cluster.client.config.ClusterClientStartUpConfig;
// Configure maximum parameter byte size
Integer maxParamBytes = ClusterClientStartUpConfig.getMaxParamByteSize();
if (maxParamBytes != null) {
// Use custom limit for parameter serialization
ParamFlowRequestDataWriter writer = new ParamFlowRequestDataWriter(maxParamBytes);
RequestDataWriterRegistry.addWriter(ClientConstants.TYPE_PARAM_FLOW, writer);
}
// System property: -Dcsp.sentinel.cluster.max.param.byte.size=2048All registry operations are thread-safe and can be called from multiple threads simultaneously. However, it's recommended to register custom codecs during application initialization before starting the cluster client.
Codec Efficiency:
Registry Lookup:
Install with Tessl CLI
npx tessl i tessl/maven-com-alibaba-csp--sentinel-cluster-client-default