CDAP Transactional Messaging System provides reliable, ordered message delivery with transaction support for the CDAP platform.
—
HTTP-based messaging client for distributed environments with service discovery, comprehensive error handling, and transparent network communication. Enables messaging operations across distributed CDAP deployments.
HTTP client implementation of MessagingService for distributed deployments with automatic service discovery.
class ClientMessagingService implements MessagingService {
/**
* Creates a client messaging service with service discovery.
* @param discoveryServiceClient client for discovering messaging service instances
*/
ClientMessagingService(DiscoveryServiceClient discoveryServiceClient);
}Usage Examples:
import co.cask.cdap.messaging.client.ClientMessagingService;
import co.cask.cdap.messaging.MessagingService;
import org.apache.twill.discovery.DiscoveryServiceClient;
// Create client with service discovery
DiscoveryServiceClient discoveryClient = // obtain discovery client
MessagingService messagingService = new ClientMessagingService(discoveryClient);
// Use exactly like local MessagingService
TopicMetadata metadata = new TopicMetadata(topicId,
TopicMetadata.TTL_KEY, "3600");
messagingService.createTopic(metadata);
// All MessagingService operations work transparently
StoreRequest request = StoreRequestBuilder.of(topicId)
.addPayload("Remote message")
.build();
messagingService.publish(request);Configure client services using Guice modules for clean dependency management.
class MessagingClientModule extends AbstractModule {
/**
* Configures MessagingService binding to ClientMessagingService.
*/
protected void configure();
}Usage Examples:
import co.cask.cdap.messaging.guice.MessagingClientModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
// Set up dependency injection
Injector injector = Guice.createInjector(
new MessagingClientModule(),
// other modules
);
// Inject messaging service
MessagingService messagingService = injector.getInstance(MessagingService.class);
// This will be a ClientMessagingService instance
// Use in application classes
public class MyService {
private final MessagingService messagingService;
@Inject
public MyService(MessagingService messagingService) {
this.messagingService = messagingService;
}
public void publishEvent(String event) throws IOException {
StoreRequest request = StoreRequestBuilder.of(topicId)
.addPayload(event)
.build();
messagingService.publish(request);
}
}Enhanced StoreRequestBuilder for client-side request construction with fluent API.
class StoreRequestBuilder {
/** Creates a new StoreRequestBuilder instance for the specified topic */
static StoreRequestBuilder of(TopicId topicId);
/** Adds a single payload string (UTF-8 encoded) */
StoreRequestBuilder addPayload(String payload);
/** Adds a single byte array payload */
StoreRequestBuilder addPayload(byte[] payload);
/** Adds multiple payloads from iterator */
StoreRequestBuilder addPayloads(Iterator<byte[]> payloads);
/** Adds multiple payloads from iterable */
StoreRequestBuilder addPayloads(Iterable<byte[]> payloads);
/** Sets transaction write pointer for transactional publish */
StoreRequestBuilder setTransaction(Long txWritePointer);
/** Returns true if builder contains payloads */
boolean hasPayload();
/** Creates StoreRequest from builder configuration */
StoreRequest build();
}Usage Examples:
// Build complex requests
List<String> events = Arrays.asList(
"user.login",
"page.view",
"button.click"
);
StoreRequest request = StoreRequestBuilder.of(topicId)
.addPayload("session.start")
.addPayloads(events.stream()
.map(String::getBytes)
.collect(Collectors.toList()))
.build();
// Conditional payload addition
StoreRequestBuilder builder = StoreRequestBuilder.of(topicId);
if (includeMetadata) {
builder.addPayload(generateMetadata());
}
builder.addPayload(mainContent);
StoreRequest request = builder.build();
// Transactional request building
Transaction tx = // obtain transaction
StoreRequest txRequest = StoreRequestBuilder.of(topicId)
.addPayload("transactional event")
.setTransaction(tx.getWritePointer())
.build();Client-side implementation of RollbackDetail with encoded rollback information.
class ClientRollbackDetail implements RollbackDetail {
/**
* Creates rollback detail from encoded bytes
* (typically received from server response)
*/
ClientRollbackDetail(byte[] encoded);
/** Returns encoded rollback information */
byte[] getEncoded();
// Implements RollbackDetail interface methods
long getTransactionWritePointer();
long getStartTimestamp();
int getStartSequenceId();
long getEndTimestamp();
int getEndSequenceId();
}Client uses HTTP/HTTPS for communication with messaging service endpoints:
Connection Configuration:
// HTTP configuration is handled internally
// Timeouts and connection settings use DefaultHttpRequestConfig
// Service discovery automatically finds messaging service instances
// No manual endpoint configuration requiredComprehensive error handling with appropriate exception mapping:
try {
messagingService.publish(request);
} catch (TopicNotFoundException e) {
// HTTP 404 mapped to TopicNotFoundException
System.out.println("Topic not found: " + e.getTopicName());
} catch (ServiceUnavailableException e) {
// HTTP 503 mapped to ServiceUnavailableException
System.out.println("Service unavailable: " + e.getServiceName());
} catch (IllegalArgumentException e) {
// HTTP 400 mapped to IllegalArgumentException
System.out.println("Bad request: " + e.getMessage());
} catch (IOException e) {
// Other HTTP errors mapped to IOException
System.out.println("Network error: " + e.getMessage());
}Client handles multiple content types for different operations:
// Content types are handled automatically
// JSON for topic operations
TopicMetadata metadata = messagingService.getTopic(topicId);
// Avro binary for message operations
RollbackDetail rollback = messagingService.publish(request);
// Streaming for message consumption
MessageFetcher fetcher = messagingService.prepareFetch(topicId);
try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {
// Streaming consumption
}Integration with CDAP service discovery for automatic endpoint resolution:
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.discovery.ServiceDiscovered;
// Service discovery handles endpoint resolution
DiscoveryServiceClient discoveryClient = // obtain from CDAP
ClientMessagingService messagingService = new ClientMessagingService(discoveryClient);
// Client automatically discovers and connects to messaging service instances
// Handles service failures and endpoint changes transparentlyClient automatically handles service instance failures and load distribution:
@Configuration
public class MessagingConfig {
@Bean
public MessagingService messagingService(DiscoveryServiceClient discoveryClient) {
return new ClientMessagingService(discoveryClient);
}
@Bean
public MessagePublisher messagePublisher(MessagingService messagingService) {
return new MessagePublisher() {
public void publish(TopicId topicId, String message) throws IOException {
StoreRequest request = StoreRequestBuilder.of(topicId)
.addPayload(message)
.build();
messagingService.publish(request);
}
};
}
}public class EventPublisher {
private final MessagingService messagingService;
private final TopicId eventTopic;
public EventPublisher(MessagingService messagingService, String namespace, String topic) {
this.messagingService = messagingService;
this.eventTopic = new NamespaceId(namespace).topic(topic);
}
public void publishEvent(Object event) throws IOException {
String eventJson = gson.toJson(event);
StoreRequest request = StoreRequestBuilder.of(eventTopic)
.addPayload(eventJson)
.build();
messagingService.publish(request);
}
public void publishEvents(List<Object> events) throws IOException {
StoreRequestBuilder builder = StoreRequestBuilder.of(eventTopic);
for (Object event : events) {
builder.addPayload(gson.toJson(event));
}
messagingService.publish(builder.build());
}
}Optimize network usage with batch operations:
// Batch multiple messages in single request
List<String> events = collectEvents();
StoreRequest batchRequest = StoreRequestBuilder.of(topicId)
.addPayloads(events.stream()
.map(String::getBytes)
.collect(Collectors.toList()))
.build();
messagingService.publish(batchRequest);Client automatically reuses HTTP connections for efficiency:
// Same MessagingService instance reuses connections
MessagingService client = new ClientMessagingService(discoveryClient);
// Multiple operations reuse connections
client.createTopic(metadata1);
client.createTopic(metadata2);
client.publish(request1);
client.publish(request2);While the client API is synchronous, you can implement async patterns:
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
public class AsyncEventPublisher {
private final MessagingService messagingService;
private final ExecutorService executor;
public CompletableFuture<Void> publishEventAsync(TopicId topicId, String event) {
return CompletableFuture.runAsync(() -> {
try {
StoreRequest request = StoreRequestBuilder.of(topicId)
.addPayload(event)
.build();
messagingService.publish(request);
} catch (IOException e) {
throw new RuntimeException("Failed to publish event", e);
}
}, executor);
}
}Install with Tessl CLI
npx tessl i tessl/maven-co-cask-cdap--cdap-tms