0
# Metrics Processing Services
1
2
Backend metrics processing infrastructure for consuming metrics data from message queues, processing and persisting metrics to storage systems, and providing status monitoring for metrics processing pipelines.
3
4
## Capabilities
5
6
### MetricsProcessorStatusService
7
8
Status service for metrics processing with HTTP endpoints for health checks and discovery during CDAP services startup.
9
10
```java { .api }
11
/**
12
* Status service with PingHandler used for discovery during CDAP-services startup
13
* Provides HTTP endpoints for monitoring metrics processor health and status
14
*/
15
public class MetricsProcessorStatusService extends AbstractIdleService {
16
/**
17
* Create metrics processor status service
18
* @param cConf CDAP configuration
19
* @param sConf Security configuration
20
* @param discoveryService Service discovery for registration
21
* @param handlers HTTP handlers for status endpoints
22
* @param commonNettyHttpServiceFactory Factory for creating HTTP service
23
*/
24
public MetricsProcessorStatusService(CConfiguration cConf,
25
SConfiguration sConf,
26
DiscoveryService discoveryService,
27
Set<HttpHandler> handlers,
28
CommonNettyHttpServiceFactory commonNettyHttpServiceFactory);
29
30
/**
31
* Start status service and register with discovery
32
* @throws Exception if service startup fails
33
*/
34
protected void startUp() throws Exception;
35
36
/**
37
* Stop status service and deregister from discovery
38
* @throws Exception if service shutdown fails
39
*/
40
protected void shutDown() throws Exception;
41
}
42
```
43
44
### MessagingMetricsProcessorService
45
46
Core metrics processing service that consumes metrics data from messaging system, processes and persists metrics to storage.
47
48
```java { .api }
49
/**
50
* Service that consumes metrics from messaging system and processes them
51
* Reads metrics data from message queues, processes and persists to storage systems
52
*/
53
public class MessagingMetricsProcessorService extends AbstractExecutionThreadService {
54
/**
55
* Create messaging metrics processor service
56
* @param cConf CDAP configuration
57
* @param metricsWriter Writer for persisting processed metrics
58
* @param messagingService Messaging service for consuming metrics data
59
* @param topicId Topic identifier for metrics message queue
60
* @param metricsContext Context for processor metrics collection
61
* @param instanceId Unique instance identifier for this processor
62
*/
63
public MessagingMetricsProcessorService(CConfiguration cConf,
64
MetricsWriter metricsWriter,
65
MessagingService messagingService,
66
TopicId topicId,
67
MetricsContext metricsContext,
68
int instanceId);
69
70
/**
71
* Main processing loop for consuming and processing metrics
72
* Continuously reads from messaging system and processes metrics
73
*/
74
protected void run() throws Exception;
75
76
/**
77
* Graceful shutdown of processing service
78
* Stops message consumption and completes in-flight processing
79
*/
80
protected void shutDown() throws Exception;
81
}
82
```
83
84
### MessagingMetricsProcessorManagerService
85
86
Manager service for coordinating multiple metrics processor instances and handling administrative operations.
87
88
```java { .api }
89
/**
90
* Manager service for coordinating multiple metrics processor instances
91
* Handles lifecycle management and administrative operations for processor services
92
*/
93
public class MessagingMetricsProcessorManagerService extends AbstractIdleService {
94
/**
95
* Create metrics processor manager service
96
* @param cConf CDAP configuration
97
* @param messagingService Messaging service for metrics consumption
98
* @param metricsWriterProvider Provider for metrics writers
99
* @param metricsCollectionService Service for collecting processor metrics
100
*/
101
public MessagingMetricsProcessorManagerService(CConfiguration cConf,
102
MessagingService messagingService,
103
Provider<MetricsWriter> metricsWriterProvider,
104
MetricsCollectionService metricsCollectionService);
105
106
/**
107
* Start manager service and processor instances
108
* @throws Exception if startup fails
109
*/
110
protected void startUp() throws Exception;
111
112
/**
113
* Stop manager service and all processor instances
114
* @throws Exception if shutdown fails
115
*/
116
protected void shutDown() throws Exception;
117
}
118
```
119
120
### Service Factory
121
122
Factory for creating metrics processor services with proper configuration and dependencies.
123
124
```java { .api }
125
/**
126
* Factory for creating messaging metrics processor services
127
* Provides configured instances of processor services with proper dependencies
128
*/
129
public interface MessagingMetricsProcessorServiceFactory {
130
/**
131
* Create messaging metrics processor service
132
* @param metricsWriter Writer for persisting metrics data
133
* @param topicId Topic identifier for metrics consumption
134
* @param instanceId Unique instance identifier
135
* @return Configured MessagingMetricsProcessorService instance
136
*/
137
MessagingMetricsProcessorService create(MetricsWriter metricsWriter,
138
TopicId topicId,
139
int instanceId);
140
}
141
```
142
143
### Runtime Services
144
145
Runtime management services for metrics processing in distributed environments.
146
147
```java { .api }
148
/**
149
* Runtime service for messaging metrics processor in distributed environments
150
* Manages processor lifecycle and integration with CDAP runtime systems
151
*/
152
public class MessagingMetricsProcessorRuntimeService extends AbstractIdleService {
153
/**
154
* Create runtime service for metrics processor
155
* @param cConf CDAP configuration
156
* @param sConf Security configuration
157
* @param discoveryService Service discovery
158
* @param messagingService Messaging service
159
* @param metricsCollectionService Metrics collection service
160
*/
161
public MessagingMetricsProcessorRuntimeService(CConfiguration cConf,
162
SConfiguration sConf,
163
DiscoveryService discoveryService,
164
MessagingService messagingService,
165
MetricsCollectionService metricsCollectionService);
166
167
/**
168
* Start runtime service and all managed components
169
* @throws Exception if startup fails
170
*/
171
protected void startUp() throws Exception;
172
173
/**
174
* Stop runtime service and cleanup resources
175
* @throws Exception if shutdown fails
176
*/
177
protected void shutDown() throws Exception;
178
}
179
180
/**
181
* Manager for metrics processor status service instances
182
* Coordinates status services across multiple processor instances
183
*/
184
public class MetricsProcessorStatusServiceManager extends AbstractIdleService {
185
/**
186
* Create status service manager
187
* @param cConf CDAP configuration
188
* @param sConf Security configuration
189
* @param discoveryService Service discovery
190
* @param handlers HTTP handlers for status endpoints
191
* @param httpServiceFactory Factory for creating HTTP services
192
*/
193
public MetricsProcessorStatusServiceManager(CConfiguration cConf,
194
SConfiguration sConf,
195
DiscoveryService discoveryService,
196
Set<HttpHandler> handlers,
197
CommonNettyHttpServiceFactory httpServiceFactory);
198
199
/**
200
* Start status service manager
201
* @throws Exception if startup fails
202
*/
203
protected void startUp() throws Exception;
204
205
/**
206
* Stop status service manager
207
* @throws Exception if shutdown fails
208
*/
209
protected void shutDown() throws Exception;
210
}
211
```
212
213
## Administrative Operations
214
215
Services and data models for metrics processing administration and maintenance.
216
217
```java { .api }
218
/**
219
* Administrative message for metrics processing operations
220
* Used for coordinating administrative actions across processor instances
221
*/
222
public final class MetricsAdminMessage {
223
/**
224
* Get the administrative operation type
225
* @return Type of administrative operation
226
*/
227
public Type getType();
228
229
/**
230
* Get the message payload
231
* @param gson Gson instance for deserialization
232
* @param type Target type for payload deserialization
233
* @return Deserialized payload object
234
*/
235
public <T> T getPayload(Gson gson, Type type);
236
237
/**
238
* Administrative operation types
239
*/
240
public enum Type {
241
/** Delete metrics operation */
242
DELETE
243
}
244
}
245
246
/**
247
* Key provider for subscriber metrics processing
248
* Provides topic-based keys for metrics processing coordination
249
*/
250
public interface TopicSubscriberMetricsKeyProvider {
251
/**
252
* Get metrics key for subscriber processing
253
* @param topicId Topic identifier
254
* @param instanceId Processor instance identifier
255
* @return Metrics key for this subscriber instance
256
*/
257
String getMetricsKey(TopicId topicId, int instanceId);
258
}
259
260
/**
261
* Key provider for topic-based metrics processing
262
* Provides keys for organizing metrics by topic
263
*/
264
public interface TopicIdMetricsKeyProvider {
265
/**
266
* Get metrics key for topic processing
267
* @param topicId Topic identifier
268
* @return Metrics key for this topic
269
*/
270
String getMetricsKey(TopicId topicId);
271
}
272
```
273
274
**Usage Examples:**
275
276
```java
277
import io.cdap.cdap.metrics.process.*;
278
import io.cdap.cdap.messaging.spi.MessagingService;
279
import io.cdap.cdap.api.metrics.MetricsWriter;
280
import io.cdap.cdap.proto.id.TopicId;
281
282
// Create and start status service
283
Set<HttpHandler> statusHandlers = // ... configure handlers
284
MetricsProcessorStatusService statusService = new MetricsProcessorStatusService(
285
cConf, sConf, discoveryService, statusHandlers, httpServiceFactory
286
);
287
statusService.startUp();
288
289
// Create processor service factory
290
MessagingMetricsProcessorServiceFactory factory = // ... obtain factory
291
292
// Create processor service for specific topic
293
TopicId metricsTopicId = new TopicId("system", "metrics");
294
MetricsWriter metricsWriter = // ... obtain metrics writer
295
MessagingMetricsProcessorService processor = factory.create(
296
metricsWriter,
297
metricsTopicId,
298
1 // instance ID
299
);
300
301
// Start processor (runs in background thread)
302
processor.startAsync().awaitRunning();
303
304
// Create manager service to coordinate multiple processors
305
MessagingMetricsProcessorManagerService manager = new MessagingMetricsProcessorManagerService(
306
cConf, messagingService, metricsWriterProvider, metricsCollectionService
307
);
308
manager.startUp();
309
310
// Administrative operations
311
MetricsAdminMessage deleteMessage = // ... create delete message
312
// Process admin message through appropriate channels
313
314
// Shutdown services
315
processor.stopAsync().awaitTerminated();
316
manager.shutDown();
317
statusService.shutDown();
318
```