CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-gcp-pubsub-2-12

Apache Flink connector for Google Cloud Pub/Sub with exactly-once processing guarantees for real-time stream processing applications

Pending
Overview
Eval results
Files

emulator-testing.mddocs/

Emulator Testing

The Apache Flink GCP Pub/Sub connector provides built-in support for the Google Cloud Pub/Sub emulator, enabling local development and testing without requiring actual Google Cloud infrastructure.

Capabilities

Emulator Credentials

Special credentials implementation for emulator scenarios that bypasses actual authentication.

/**
 * Placeholder credentials for emulator testing scenarios
 * Extends OAuth2Credentials but provides dummy authentication
 */
public final class EmulatorCredentials extends OAuth2Credentials {
    /**
     * Get singleton instance of emulator credentials
     * @return EmulatorCredentials instance
     */
    public static EmulatorCredentials getInstance();
    
    /**
     * Returns dummy access token for emulator authentication
     * @return AccessToken with dummy value and far-future expiration
     * @throws IOException Never thrown in emulator implementation
     */
    @Override
    public AccessToken refreshAccessToken() throws IOException;
}

Emulator Credentials Provider

CredentialsProvider implementation that supplies EmulatorCredentials for Google Cloud client libraries.

/**
 * CredentialsProvider for emulator scenarios
 * Implements Google Cloud's CredentialsProvider interface
 */
public final class EmulatorCredentialsProvider implements CredentialsProvider {
    /**
     * Create new EmulatorCredentialsProvider instance
     * @return New EmulatorCredentialsProvider
     */
    public static EmulatorCredentialsProvider create();
    
    /**
     * Get emulator credentials instance
     * @return EmulatorCredentials for emulator authentication
     */
    @Override
    public Credentials getCredentials();
}

Emulator Subscriber Factory

Specialized subscriber factory for connecting to the Pub/Sub emulator with plain-text communication.

/**
 * PubSubSubscriberFactory for connecting to Pub/Sub emulator
 * Configures plain-text communication without SSL/TLS
 */
public class PubSubSubscriberFactoryForEmulator implements PubSubSubscriberFactory {
    /**
     * Create emulator subscriber factory
     * @param hostAndPort Emulator host and port (e.g., "localhost:8085")
     * @param project GCP project name (can be any value for emulator)
     * @param subscription Subscription name
     * @param retries Number of retries for failed requests
     * @param timeout Timeout for pull requests
     * @param maxMessagesPerPull Maximum messages per pull request
     */
    public PubSubSubscriberFactoryForEmulator(
        String hostAndPort,
        String project,
        String subscription,
        int retries,
        Duration timeout,
        int maxMessagesPerPull
    );
    
    /**
     * Create subscriber configured for emulator connection
     * @param credentials Ignored for emulator (uses plain-text)
     * @return PubSubSubscriber configured for emulator
     */
    @Override
    public PubSubSubscriber getSubscriber(Credentials credentials) throws IOException;
}

Setting Up Pub/Sub Emulator

Installation

Install the Google Cloud SDK and Pub/Sub emulator:

# Install Google Cloud SDK
curl https://sdk.cloud.google.com | bash

# Install Pub/Sub emulator component
gcloud components install pubsub-emulator

# Start emulator on localhost:8085
gcloud beta emulators pubsub start --host-port=localhost:8085

Environment Setup

Set environment variables to point to emulator:

export PUBSUB_EMULATOR_HOST=localhost:8085
export PUBSUB_PROJECT_ID=test-project

Create Topics and Subscriptions

# Create topic
gcloud pubsub topics create test-topic --project=test-project

# Create subscription
gcloud pubsub subscriptions create test-subscription \
  --topic=test-topic \
  --project=test-project

Usage Examples

Source with Emulator

import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.time.Duration;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);

// Create emulator subscriber factory
PubSubSubscriberFactoryForEmulator emulatorFactory = 
    new PubSubSubscriberFactoryForEmulator(
        "localhost:8085",     // emulator host:port
        "test-project",       // project (any value for emulator)
        "test-subscription",  // subscription name
        3,                    // retries
        Duration.ofSeconds(15), // timeout
        100                   // max messages per pull
    );

// Create source with emulator factory
PubSubSource<String> source = PubSubSource.newBuilder()
    .withDeserializationSchema(new SimpleStringSchema())
    .withProjectName("test-project")
    .withSubscriptionName("test-subscription")
    .withPubSubSubscriberFactory(emulatorFactory)
    .build();

DataStream<String> stream = env.addSource(source);
stream.print();

env.execute("Emulator Source Test");

Sink with Emulator

import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Create test data
DataStream<String> inputStream = env.fromElements(
    "Test message 1",
    "Test message 2",
    "Test message 3"
);

// Create sink with emulator configuration
PubSubSink<String> sink = PubSubSink.newBuilder()
    .withSerializationSchema(new SimpleStringSchema())
    .withProjectName("test-project")
    .withTopicName("test-topic")
    .withHostAndPortForEmulator("localhost:8085")
    .build();

inputStream.addSink(sink);

env.execute("Emulator Sink Test");

Complete Test Pipeline

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator;
import java.time.Duration;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);

// Configure emulator source
PubSubSubscriberFactoryForEmulator sourceFactory = 
    new PubSubSubscriberFactoryForEmulator(
        "localhost:8085", "test-project", "input-subscription",
        3, Duration.ofSeconds(10), 50
    );

PubSubSource<String> source = PubSubSource.newBuilder()
    .withDeserializationSchema(new SimpleStringSchema())
    .withProjectName("test-project")
    .withSubscriptionName("input-subscription")
    .withPubSubSubscriberFactory(sourceFactory)
    .build();

// Configure emulator sink
PubSubSink<String> sink = PubSubSink.newBuilder()
    .withSerializationSchema(new SimpleStringSchema())
    .withProjectName("test-project")
    .withTopicName("output-topic")
    .withHostAndPortForEmulator("localhost:8085")
    .build();

// Create processing pipeline
env.addSource(source)
   .map(msg -> "Processed: " + msg.toUpperCase())
   .addSink(sink);

env.execute("Emulator Test Pipeline");

Testing Best Practices

Test Environment Setup

  1. Isolated Emulator: Start fresh emulator instance for each test
  2. Clean State: Clear topics and subscriptions between tests
  3. Port Management: Use different ports for parallel test execution
  4. Resource Cleanup: Properly shutdown emulator after tests

Integration Testing

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;

public class PubSubConnectorIntegrationTest {
    private static final String EMULATOR_HOST = "localhost:8085";
    private static final String TEST_PROJECT = "test-project";
    
    @BeforeEach
    void setupEmulator() throws Exception {
        // Start emulator programmatically
        // Create test topics and subscriptions
    }
    
    @AfterEach
    void cleanupEmulator() throws Exception {
        // Stop emulator
        // Clean up resources
    }
    
    @Test
    void testSourceSinkIntegration() throws Exception {
        // Create Flink job with emulator configuration
        // Publish test messages
        // Verify message consumption and processing
    }
}

Docker Testing

# Dockerfile for emulator testing
FROM google/cloud-sdk:alpine

# Install Pub/Sub emulator
RUN gcloud components install pubsub-emulator

# Expose emulator port
EXPOSE 8085

# Start emulator
CMD ["gcloud", "beta", "emulators", "pubsub", "start", "--host-port=0.0.0.0:8085"]
# docker-compose.yml for test environment
version: '3.8'
services:
  pubsub-emulator:
    build: .
    ports:
      - "8085:8085"
    environment:
      - PUBSUB_PROJECT_ID=test-project

Emulator Limitations

Feature Differences

  • Authentication: No actual authentication required
  • IAM: Access control not enforced
  • Monitoring: Limited metrics and monitoring
  • Persistence: Messages not persisted across emulator restarts
  • Performance: Different performance characteristics than production

Configuration Differences

  • Network: Plain-text HTTP instead of HTTPS
  • Credentials: Dummy credentials instead of service account keys
  • Endpoints: Local endpoints instead of Google Cloud endpoints
  • Retries: Simplified retry behavior

Best Practices

  1. Production Parity: Keep emulator configuration as close to production as possible
  2. Error Scenarios: Test failure scenarios that emulator may not simulate
  3. Performance Testing: Use actual Pub/Sub for performance benchmarks
  4. Security Testing: Verify authentication works with real credentials
  5. End-to-End Testing: Include tests against actual Google Cloud Pub/Sub

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-gcp-pubsub-2-12

docs

emulator-testing.md

index.md

pubsub-sink.md

pubsub-source.md

tile.json