Apache Flink connector for Google Cloud Pub/Sub with exactly-once processing guarantees for real-time stream processing applications
—
The Apache Flink GCP Pub/Sub connector provides built-in support for the Google Cloud Pub/Sub emulator, enabling local development and testing without requiring actual Google Cloud infrastructure.
Special credentials implementation for emulator scenarios that bypasses actual authentication.
/**
* Placeholder credentials for emulator testing scenarios
* Extends OAuth2Credentials but provides dummy authentication
*/
public final class EmulatorCredentials extends OAuth2Credentials {
/**
* Get singleton instance of emulator credentials
* @return EmulatorCredentials instance
*/
public static EmulatorCredentials getInstance();
/**
* Returns dummy access token for emulator authentication
* @return AccessToken with dummy value and far-future expiration
* @throws IOException Never thrown in emulator implementation
*/
@Override
public AccessToken refreshAccessToken() throws IOException;
}CredentialsProvider implementation that supplies EmulatorCredentials for Google Cloud client libraries.
/**
* CredentialsProvider for emulator scenarios
* Implements Google Cloud's CredentialsProvider interface
*/
public final class EmulatorCredentialsProvider implements CredentialsProvider {
/**
* Create new EmulatorCredentialsProvider instance
* @return New EmulatorCredentialsProvider
*/
public static EmulatorCredentialsProvider create();
/**
* Get emulator credentials instance
* @return EmulatorCredentials for emulator authentication
*/
@Override
public Credentials getCredentials();
}Specialized subscriber factory for connecting to the Pub/Sub emulator with plain-text communication.
/**
* PubSubSubscriberFactory for connecting to Pub/Sub emulator
* Configures plain-text communication without SSL/TLS
*/
public class PubSubSubscriberFactoryForEmulator implements PubSubSubscriberFactory {
/**
* Create emulator subscriber factory
* @param hostAndPort Emulator host and port (e.g., "localhost:8085")
* @param project GCP project name (can be any value for emulator)
* @param subscription Subscription name
* @param retries Number of retries for failed requests
* @param timeout Timeout for pull requests
* @param maxMessagesPerPull Maximum messages per pull request
*/
public PubSubSubscriberFactoryForEmulator(
String hostAndPort,
String project,
String subscription,
int retries,
Duration timeout,
int maxMessagesPerPull
);
/**
* Create subscriber configured for emulator connection
* @param credentials Ignored for emulator (uses plain-text)
* @return PubSubSubscriber configured for emulator
*/
@Override
public PubSubSubscriber getSubscriber(Credentials credentials) throws IOException;
}Install the Google Cloud SDK and Pub/Sub emulator:
# Install Google Cloud SDK
curl https://sdk.cloud.google.com | bash
# Install Pub/Sub emulator component
gcloud components install pubsub-emulator
# Start emulator on localhost:8085
gcloud beta emulators pubsub start --host-port=localhost:8085Set environment variables to point to emulator:
export PUBSUB_EMULATOR_HOST=localhost:8085
export PUBSUB_PROJECT_ID=test-project# Create topic
gcloud pubsub topics create test-topic --project=test-project
# Create subscription
gcloud pubsub subscriptions create test-subscription \
--topic=test-topic \
--project=test-projectimport org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import java.time.Duration;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(10000);
// Create emulator subscriber factory
PubSubSubscriberFactoryForEmulator emulatorFactory =
new PubSubSubscriberFactoryForEmulator(
"localhost:8085", // emulator host:port
"test-project", // project (any value for emulator)
"test-subscription", // subscription name
3, // retries
Duration.ofSeconds(15), // timeout
100 // max messages per pull
);
// Create source with emulator factory
PubSubSource<String> source = PubSubSource.newBuilder()
.withDeserializationSchema(new SimpleStringSchema())
.withProjectName("test-project")
.withSubscriptionName("test-subscription")
.withPubSubSubscriberFactory(emulatorFactory)
.build();
DataStream<String> stream = env.addSource(source);
stream.print();
env.execute("Emulator Source Test");import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create test data
DataStream<String> inputStream = env.fromElements(
"Test message 1",
"Test message 2",
"Test message 3"
);
// Create sink with emulator configuration
PubSubSink<String> sink = PubSubSink.newBuilder()
.withSerializationSchema(new SimpleStringSchema())
.withProjectName("test-project")
.withTopicName("test-topic")
.withHostAndPortForEmulator("localhost:8085")
.build();
inputStream.addSink(sink);
env.execute("Emulator Sink Test");import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator;
import java.time.Duration;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(5000);
// Configure emulator source
PubSubSubscriberFactoryForEmulator sourceFactory =
new PubSubSubscriberFactoryForEmulator(
"localhost:8085", "test-project", "input-subscription",
3, Duration.ofSeconds(10), 50
);
PubSubSource<String> source = PubSubSource.newBuilder()
.withDeserializationSchema(new SimpleStringSchema())
.withProjectName("test-project")
.withSubscriptionName("input-subscription")
.withPubSubSubscriberFactory(sourceFactory)
.build();
// Configure emulator sink
PubSubSink<String> sink = PubSubSink.newBuilder()
.withSerializationSchema(new SimpleStringSchema())
.withProjectName("test-project")
.withTopicName("output-topic")
.withHostAndPortForEmulator("localhost:8085")
.build();
// Create processing pipeline
env.addSource(source)
.map(msg -> "Processed: " + msg.toUpperCase())
.addSink(sink);
env.execute("Emulator Test Pipeline");import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
public class PubSubConnectorIntegrationTest {
private static final String EMULATOR_HOST = "localhost:8085";
private static final String TEST_PROJECT = "test-project";
@BeforeEach
void setupEmulator() throws Exception {
// Start emulator programmatically
// Create test topics and subscriptions
}
@AfterEach
void cleanupEmulator() throws Exception {
// Stop emulator
// Clean up resources
}
@Test
void testSourceSinkIntegration() throws Exception {
// Create Flink job with emulator configuration
// Publish test messages
// Verify message consumption and processing
}
}# Dockerfile for emulator testing
FROM google/cloud-sdk:alpine
# Install Pub/Sub emulator
RUN gcloud components install pubsub-emulator
# Expose emulator port
EXPOSE 8085
# Start emulator
CMD ["gcloud", "beta", "emulators", "pubsub", "start", "--host-port=0.0.0.0:8085"]# docker-compose.yml for test environment
version: '3.8'
services:
pubsub-emulator:
build: .
ports:
- "8085:8085"
environment:
- PUBSUB_PROJECT_ID=test-projectInstall with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-gcp-pubsub-2-12