or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdlog-buffer.mdlogging-context.mdlogging-service.mdmetrics-collection.mdmetrics-processing.mdmetrics-query.md

metrics-processing.mddocs/

0

# Metrics Processing Services

1

2

Backend metrics processing infrastructure for consuming metrics data from message queues, processing and persisting metrics to storage systems, and providing status monitoring for metrics processing pipelines.

3

4

## Capabilities

5

6

### MetricsProcessorStatusService

7

8

Status service for metrics processing with HTTP endpoints for health checks and discovery during CDAP services startup.

9

10

```java { .api }

11

/**

12

* Status service with PingHandler used for discovery during CDAP-services startup

13

* Provides HTTP endpoints for monitoring metrics processor health and status

14

*/

15

public class MetricsProcessorStatusService extends AbstractIdleService {

16

/**

17

* Create metrics processor status service

18

* @param cConf CDAP configuration

19

* @param sConf Security configuration

20

* @param discoveryService Service discovery for registration

21

* @param handlers HTTP handlers for status endpoints

22

* @param commonNettyHttpServiceFactory Factory for creating HTTP service

23

*/

24

public MetricsProcessorStatusService(CConfiguration cConf,

25

SConfiguration sConf,

26

DiscoveryService discoveryService,

27

Set<HttpHandler> handlers,

28

CommonNettyHttpServiceFactory commonNettyHttpServiceFactory);

29

30

/**

31

* Start status service and register with discovery

32

* @throws Exception if service startup fails

33

*/

34

protected void startUp() throws Exception;

35

36

/**

37

* Stop status service and deregister from discovery

38

* @throws Exception if service shutdown fails

39

*/

40

protected void shutDown() throws Exception;

41

}

42

```

43

44

### MessagingMetricsProcessorService

45

46

Core metrics processing service that consumes metrics data from messaging system, processes and persists metrics to storage.

47

48

```java { .api }

49

/**

50

* Service that consumes metrics from messaging system and processes them

51

* Reads metrics data from message queues, processes and persists to storage systems

52

*/

53

public class MessagingMetricsProcessorService extends AbstractExecutionThreadService {

54

/**

55

* Create messaging metrics processor service

56

* @param cConf CDAP configuration

57

* @param metricsWriter Writer for persisting processed metrics

58

* @param messagingService Messaging service for consuming metrics data

59

* @param topicId Topic identifier for metrics message queue

60

* @param metricsContext Context for processor metrics collection

61

* @param instanceId Unique instance identifier for this processor

62

*/

63

public MessagingMetricsProcessorService(CConfiguration cConf,

64

MetricsWriter metricsWriter,

65

MessagingService messagingService,

66

TopicId topicId,

67

MetricsContext metricsContext,

68

int instanceId);

69

70

/**

71

* Main processing loop for consuming and processing metrics

72

* Continuously reads from messaging system and processes metrics

73

*/

74

protected void run() throws Exception;

75

76

/**

77

* Graceful shutdown of processing service

78

* Stops message consumption and completes in-flight processing

79

*/

80

protected void shutDown() throws Exception;

81

}

82

```

83

84

### MessagingMetricsProcessorManagerService

85

86

Manager service for coordinating multiple metrics processor instances and handling administrative operations.

87

88

```java { .api }

89

/**

90

* Manager service for coordinating multiple metrics processor instances

91

* Handles lifecycle management and administrative operations for processor services

92

*/

93

public class MessagingMetricsProcessorManagerService extends AbstractIdleService {

94

/**

95

* Create metrics processor manager service

96

* @param cConf CDAP configuration

97

* @param messagingService Messaging service for metrics consumption

98

* @param metricsWriterProvider Provider for metrics writers

99

* @param metricsCollectionService Service for collecting processor metrics

100

*/

101

public MessagingMetricsProcessorManagerService(CConfiguration cConf,

102

MessagingService messagingService,

103

Provider<MetricsWriter> metricsWriterProvider,

104

MetricsCollectionService metricsCollectionService);

105

106

/**

107

* Start manager service and processor instances

108

* @throws Exception if startup fails

109

*/

110

protected void startUp() throws Exception;

111

112

/**

113

* Stop manager service and all processor instances

114

* @throws Exception if shutdown fails

115

*/

116

protected void shutDown() throws Exception;

117

}

118

```

119

120

### Service Factory

121

122

Factory for creating metrics processor services with proper configuration and dependencies.

123

124

```java { .api }

125

/**

126

* Factory for creating messaging metrics processor services

127

* Provides configured instances of processor services with proper dependencies

128

*/

129

public interface MessagingMetricsProcessorServiceFactory {

130

/**

131

* Create messaging metrics processor service

132

* @param metricsWriter Writer for persisting metrics data

133

* @param topicId Topic identifier for metrics consumption

134

* @param instanceId Unique instance identifier

135

* @return Configured MessagingMetricsProcessorService instance

136

*/

137

MessagingMetricsProcessorService create(MetricsWriter metricsWriter,

138

TopicId topicId,

139

int instanceId);

140

}

141

```

142

143

### Runtime Services

144

145

Runtime management services for metrics processing in distributed environments.

146

147

```java { .api }

148

/**

149

* Runtime service for messaging metrics processor in distributed environments

150

* Manages processor lifecycle and integration with CDAP runtime systems

151

*/

152

public class MessagingMetricsProcessorRuntimeService extends AbstractIdleService {

153

/**

154

* Create runtime service for metrics processor

155

* @param cConf CDAP configuration

156

* @param sConf Security configuration

157

* @param discoveryService Service discovery

158

* @param messagingService Messaging service

159

* @param metricsCollectionService Metrics collection service

160

*/

161

public MessagingMetricsProcessorRuntimeService(CConfiguration cConf,

162

SConfiguration sConf,

163

DiscoveryService discoveryService,

164

MessagingService messagingService,

165

MetricsCollectionService metricsCollectionService);

166

167

/**

168

* Start runtime service and all managed components

169

* @throws Exception if startup fails

170

*/

171

protected void startUp() throws Exception;

172

173

/**

174

* Stop runtime service and cleanup resources

175

* @throws Exception if shutdown fails

176

*/

177

protected void shutDown() throws Exception;

178

}

179

180

/**

181

* Manager for metrics processor status service instances

182

* Coordinates status services across multiple processor instances

183

*/

184

public class MetricsProcessorStatusServiceManager extends AbstractIdleService {

185

/**

186

* Create status service manager

187

* @param cConf CDAP configuration

188

* @param sConf Security configuration

189

* @param discoveryService Service discovery

190

* @param handlers HTTP handlers for status endpoints

191

* @param httpServiceFactory Factory for creating HTTP services

192

*/

193

public MetricsProcessorStatusServiceManager(CConfiguration cConf,

194

SConfiguration sConf,

195

DiscoveryService discoveryService,

196

Set<HttpHandler> handlers,

197

CommonNettyHttpServiceFactory httpServiceFactory);

198

199

/**

200

* Start status service manager

201

* @throws Exception if startup fails

202

*/

203

protected void startUp() throws Exception;

204

205

/**

206

* Stop status service manager

207

* @throws Exception if shutdown fails

208

*/

209

protected void shutDown() throws Exception;

210

}

211

```

212

213

## Administrative Operations

214

215

Services and data models for metrics processing administration and maintenance.

216

217

```java { .api }

218

/**

219

* Administrative message for metrics processing operations

220

* Used for coordinating administrative actions across processor instances

221

*/

222

public final class MetricsAdminMessage {

223

/**

224

* Get the administrative operation type

225

* @return Type of administrative operation

226

*/

227

public Type getType();

228

229

/**

230

* Get the message payload

231

* @param gson Gson instance for deserialization

232

* @param type Target type for payload deserialization

233

* @return Deserialized payload object

234

*/

235

public <T> T getPayload(Gson gson, Type type);

236

237

/**

238

* Administrative operation types

239

*/

240

public enum Type {

241

/** Delete metrics operation */

242

DELETE

243

}

244

}

245

246

/**

247

* Key provider for subscriber metrics processing

248

* Provides topic-based keys for metrics processing coordination

249

*/

250

public interface TopicSubscriberMetricsKeyProvider {

251

/**

252

* Get metrics key for subscriber processing

253

* @param topicId Topic identifier

254

* @param instanceId Processor instance identifier

255

* @return Metrics key for this subscriber instance

256

*/

257

String getMetricsKey(TopicId topicId, int instanceId);

258

}

259

260

/**

261

* Key provider for topic-based metrics processing

262

* Provides keys for organizing metrics by topic

263

*/

264

public interface TopicIdMetricsKeyProvider {

265

/**

266

* Get metrics key for topic processing

267

* @param topicId Topic identifier

268

* @return Metrics key for this topic

269

*/

270

String getMetricsKey(TopicId topicId);

271

}

272

```

273

274

**Usage Examples:**

275

276

```java

277

import io.cdap.cdap.metrics.process.*;

278

import io.cdap.cdap.messaging.spi.MessagingService;

279

import io.cdap.cdap.api.metrics.MetricsWriter;

280

import io.cdap.cdap.proto.id.TopicId;

281

282

// Create and start status service

283

Set<HttpHandler> statusHandlers = // ... configure handlers

284

MetricsProcessorStatusService statusService = new MetricsProcessorStatusService(

285

cConf, sConf, discoveryService, statusHandlers, httpServiceFactory

286

);

287

statusService.startUp();

288

289

// Create processor service factory

290

MessagingMetricsProcessorServiceFactory factory = // ... obtain factory

291

292

// Create processor service for specific topic

293

TopicId metricsTopicId = new TopicId("system", "metrics");

294

MetricsWriter metricsWriter = // ... obtain metrics writer

295

MessagingMetricsProcessorService processor = factory.create(

296

metricsWriter,

297

metricsTopicId,

298

1 // instance ID

299

);

300

301

// Start processor (runs in background thread)

302

processor.startAsync().awaitRunning();

303

304

// Create manager service to coordinate multiple processors

305

MessagingMetricsProcessorManagerService manager = new MessagingMetricsProcessorManagerService(

306

cConf, messagingService, metricsWriterProvider, metricsCollectionService

307

);

308

manager.startUp();

309

310

// Administrative operations

311

MetricsAdminMessage deleteMessage = // ... create delete message

312

// Process admin message through appropriate channels

313

314

// Shutdown services

315

processor.stopAsync().awaitTerminated();

316

manager.shutDown();

317

statusService.shutDown();

318

```