CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-co-cask-cdap--cdap-tms

CDAP Transactional Messaging System provides reliable, ordered message delivery with transaction support for the CDAP platform.

Pending
Overview
Eval results
Files

client-services.mddocs/

Client Services

HTTP-based messaging client for distributed environments with service discovery, comprehensive error handling, and transparent network communication. Enables messaging operations across distributed CDAP deployments.

Capabilities

Client Messaging Service

HTTP client implementation of MessagingService for distributed deployments with automatic service discovery.

class ClientMessagingService implements MessagingService {
    /**
     * Creates a client messaging service with service discovery.
     * @param discoveryServiceClient client for discovering messaging service instances
     */
    ClientMessagingService(DiscoveryServiceClient discoveryServiceClient);
}

Usage Examples:

import co.cask.cdap.messaging.client.ClientMessagingService;
import co.cask.cdap.messaging.MessagingService;
import org.apache.twill.discovery.DiscoveryServiceClient;

// Create client with service discovery
DiscoveryServiceClient discoveryClient = // obtain discovery client
MessagingService messagingService = new ClientMessagingService(discoveryClient);

// Use exactly like local MessagingService
TopicMetadata metadata = new TopicMetadata(topicId, 
    TopicMetadata.TTL_KEY, "3600");
messagingService.createTopic(metadata);

// All MessagingService operations work transparently
StoreRequest request = StoreRequestBuilder.of(topicId)
    .addPayload("Remote message")
    .build();
messagingService.publish(request);

Dependency Injection Setup

Configure client services using Guice modules for clean dependency management.

class MessagingClientModule extends AbstractModule {
    /**
     * Configures MessagingService binding to ClientMessagingService.
     */
    protected void configure();
}

Usage Examples:

import co.cask.cdap.messaging.guice.MessagingClientModule;
import com.google.inject.Guice;
import com.google.inject.Injector;

// Set up dependency injection
Injector injector = Guice.createInjector(
    new MessagingClientModule(),
    // other modules
);

// Inject messaging service
MessagingService messagingService = injector.getInstance(MessagingService.class);
// This will be a ClientMessagingService instance

// Use in application classes
public class MyService {
    private final MessagingService messagingService;
    
    @Inject
    public MyService(MessagingService messagingService) {
        this.messagingService = messagingService;
    }
    
    public void publishEvent(String event) throws IOException {
        StoreRequest request = StoreRequestBuilder.of(topicId)
            .addPayload(event)
            .build();
        messagingService.publish(request);
    }
}

Request Building

Enhanced StoreRequestBuilder for client-side request construction with fluent API.

class StoreRequestBuilder {
    /** Creates a new StoreRequestBuilder instance for the specified topic */
    static StoreRequestBuilder of(TopicId topicId);
    
    /** Adds a single payload string (UTF-8 encoded) */
    StoreRequestBuilder addPayload(String payload);
    
    /** Adds a single byte array payload */
    StoreRequestBuilder addPayload(byte[] payload);
    
    /** Adds multiple payloads from iterator */
    StoreRequestBuilder addPayloads(Iterator<byte[]> payloads);
    
    /** Adds multiple payloads from iterable */
    StoreRequestBuilder addPayloads(Iterable<byte[]> payloads);
    
    /** Sets transaction write pointer for transactional publish */
    StoreRequestBuilder setTransaction(Long txWritePointer);
    
    /** Returns true if builder contains payloads */
    boolean hasPayload();
    
    /** Creates StoreRequest from builder configuration */
    StoreRequest build();
}

Usage Examples:

// Build complex requests
List<String> events = Arrays.asList(
    "user.login",
    "page.view",
    "button.click"
);

StoreRequest request = StoreRequestBuilder.of(topicId)
    .addPayload("session.start")
    .addPayloads(events.stream()
        .map(String::getBytes)
        .collect(Collectors.toList()))
    .build();

// Conditional payload addition
StoreRequestBuilder builder = StoreRequestBuilder.of(topicId);
if (includeMetadata) {
    builder.addPayload(generateMetadata());
}
builder.addPayload(mainContent);
StoreRequest request = builder.build();

// Transactional request building
Transaction tx = // obtain transaction
StoreRequest txRequest = StoreRequestBuilder.of(topicId)
    .addPayload("transactional event")
    .setTransaction(tx.getWritePointer())
    .build();

Client Rollback Detail

Client-side implementation of RollbackDetail with encoded rollback information.

class ClientRollbackDetail implements RollbackDetail {
    /** 
     * Creates rollback detail from encoded bytes 
     * (typically received from server response)
     */
    ClientRollbackDetail(byte[] encoded);
    
    /** Returns encoded rollback information */
    byte[] getEncoded();
    
    // Implements RollbackDetail interface methods
    long getTransactionWritePointer();
    long getStartTimestamp();
    int getStartSequenceId();
    long getEndTimestamp();
    int getEndSequenceId();
}

Network Communication

HTTP Transport

Client uses HTTP/HTTPS for communication with messaging service endpoints:

  • Topic Operations: RESTful API for topic management
  • Message Publishing: Binary Avro encoding for efficiency
  • Message Consumption: Streaming HTTP responses with Avro
  • Service Discovery: Automatic endpoint discovery and failover

Connection Configuration:

// HTTP configuration is handled internally
// Timeouts and connection settings use DefaultHttpRequestConfig

// Service discovery automatically finds messaging service instances
// No manual endpoint configuration required

Error Handling and Retries

Comprehensive error handling with appropriate exception mapping:

try {
    messagingService.publish(request);
} catch (TopicNotFoundException e) {
    // HTTP 404 mapped to TopicNotFoundException
    System.out.println("Topic not found: " + e.getTopicName());
} catch (ServiceUnavailableException e) {
    // HTTP 503 mapped to ServiceUnavailableException
    System.out.println("Service unavailable: " + e.getServiceName());
} catch (IllegalArgumentException e) {
    // HTTP 400 mapped to IllegalArgumentException
    System.out.println("Bad request: " + e.getMessage());
} catch (IOException e) {
    // Other HTTP errors mapped to IOException
    System.out.println("Network error: " + e.getMessage());
}

Content Type Handling

Client handles multiple content types for different operations:

  • JSON: Topic metadata and configuration
  • Avro Binary: Message payloads and requests
  • HTTP Streaming: Large message consumption
// Content types are handled automatically
// JSON for topic operations
TopicMetadata metadata = messagingService.getTopic(topicId);

// Avro binary for message operations  
RollbackDetail rollback = messagingService.publish(request);

// Streaming for message consumption
MessageFetcher fetcher = messagingService.prepareFetch(topicId);
try (CloseableIterator<RawMessage> messages = fetcher.fetch()) {
    // Streaming consumption
}

Service Discovery Integration

Discovery Service Client

Integration with CDAP service discovery for automatic endpoint resolution:

import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.discovery.ServiceDiscovered;

// Service discovery handles endpoint resolution
DiscoveryServiceClient discoveryClient = // obtain from CDAP
ClientMessagingService messagingService = new ClientMessagingService(discoveryClient);

// Client automatically discovers and connects to messaging service instances
// Handles service failures and endpoint changes transparently

Failover and Load Balancing

Client automatically handles service instance failures and load distribution:

  • Automatic Failover: Switches to healthy service instances
  • Load Distribution: Distributes requests across available instances
  • Health Checking: Monitors service instance availability
  • Circuit Breaking: Prevents cascading failures

Configuration Examples

Spring Configuration

@Configuration
public class MessagingConfig {
    
    @Bean
    public MessagingService messagingService(DiscoveryServiceClient discoveryClient) {
        return new ClientMessagingService(discoveryClient);
    }
    
    @Bean
    public MessagePublisher messagePublisher(MessagingService messagingService) {
        return new MessagePublisher() {
            public void publish(TopicId topicId, String message) throws IOException {
                StoreRequest request = StoreRequestBuilder.of(topicId)
                    .addPayload(message)
                    .build();
                messagingService.publish(request);
            }
        };
    }
}

Application Integration

public class EventPublisher {
    private final MessagingService messagingService;
    private final TopicId eventTopic;
    
    public EventPublisher(MessagingService messagingService, String namespace, String topic) {
        this.messagingService = messagingService;
        this.eventTopic = new NamespaceId(namespace).topic(topic);
    }
    
    public void publishEvent(Object event) throws IOException {
        String eventJson = gson.toJson(event);
        StoreRequest request = StoreRequestBuilder.of(eventTopic)
            .addPayload(eventJson)
            .build();
        
        messagingService.publish(request);
    }
    
    public void publishEvents(List<Object> events) throws IOException {
        StoreRequestBuilder builder = StoreRequestBuilder.of(eventTopic);
        for (Object event : events) {
            builder.addPayload(gson.toJson(event));
        }
        
        messagingService.publish(builder.build());
    }
}

Performance Optimization

Batch Operations

Optimize network usage with batch operations:

// Batch multiple messages in single request
List<String> events = collectEvents();
StoreRequest batchRequest = StoreRequestBuilder.of(topicId)
    .addPayloads(events.stream()
        .map(String::getBytes)
        .collect(Collectors.toList()))
    .build();

messagingService.publish(batchRequest);

Connection Reuse

Client automatically reuses HTTP connections for efficiency:

// Same MessagingService instance reuses connections
MessagingService client = new ClientMessagingService(discoveryClient);

// Multiple operations reuse connections
client.createTopic(metadata1);
client.createTopic(metadata2);
client.publish(request1);
client.publish(request2);

Async Patterns

While the client API is synchronous, you can implement async patterns:

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;

public class AsyncEventPublisher {
    private final MessagingService messagingService;
    private final ExecutorService executor;
    
    public CompletableFuture<Void> publishEventAsync(TopicId topicId, String event) {
        return CompletableFuture.runAsync(() -> {
            try {
                StoreRequest request = StoreRequestBuilder.of(topicId)
                    .addPayload(event)
                    .build();
                messagingService.publish(request);
            } catch (IOException e) {
                throw new RuntimeException("Failed to publish event", e);
            }
        }, executor);
    }
}

Install with Tessl CLI

npx tessl i tessl/maven-co-cask-cdap--cdap-tms

docs

client-services.md

high-level-consumers.md

index.md

message-consumption.md

message-publishing.md

topic-management.md

tile.json