Apache Flink connector for Google Cloud Pub/Sub with exactly-once processing guarantees for real-time stream processing applications
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-gcp-pubsub-2-12@1.14.0Apache Flink connector for Google Cloud Pub/Sub enables consuming messages from and publishing messages to Google Pub/Sub topics with exactly-once processing guarantees. The connector provides both source and sink capabilities for real-time stream processing applications with automatic acknowledgment management through Flink's checkpointing mechanism.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-gcp-pubsub_2.12</artifactId>
<version>1.14.6</version>
</dependency>import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.util.Collector;For credentials management:
import com.google.auth.Credentials;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;For Google Cloud Pub/Sub types:
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import java.util.List;
import java.time.Duration;For emulator testing:
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials;
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider;
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing (required for exactly-once guarantees)
env.enableCheckpointing(30000); // checkpoint every 30 seconds
// Create PubSubSource
PubSubSource<String> pubsubSource = PubSubSource.newBuilder()
.withDeserializationSchema(new SimpleStringSchema())
.withProjectName("my-gcp-project")
.withSubscriptionName("my-subscription")
.build();
// Add source to stream
DataStream<String> stream = env.addSource(pubsubSource);
stream.print();
env.execute("PubSub Consumer");import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create data stream
DataStream<String> inputStream = env.fromElements("Hello", "World", "PubSub");
// Create PubSubSink
PubSubSink<String> pubsubSink = PubSubSink.newBuilder()
.withSerializationSchema(new SimpleStringSchema())
.withProjectName("my-gcp-project")
.withTopicName("my-topic")
.build();
// Add sink to stream
inputStream.addSink(pubsubSink);
env.execute("PubSub Producer");The Apache Flink GCP Pub/Sub connector is built around several key components:
The connector ensures data consistency through Flink's distributed checkpointing mechanism, where Pub/Sub messages are only acknowledged after successful checkpoint completion, preventing message loss during failure scenarios.
Source functionality for consuming messages from Google Cloud Pub/Sub subscriptions with exactly-once processing guarantees and configurable rate limiting.
public static DeserializationSchemaBuilder newBuilder();
public static class PubSubSourceBuilder<OUT> {
public PubSubSourceBuilder<OUT> withCredentials(Credentials credentials);
public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(PubSubSubscriberFactory factory);
public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(int maxMessagesPerPull, Duration perRequestTimeout, int retries);
public PubSubSourceBuilder<OUT> withMessageRateLimit(int messagePerSecondRateLimit);
public PubSubSource<OUT> build();
}Sink functionality for publishing messages to Google Cloud Pub/Sub topics with reliable delivery and checkpoint synchronization.
public static SerializationSchemaBuilder newBuilder();
public static class PubSubSinkBuilder<IN> {
public PubSubSinkBuilder<IN> withCredentials(Credentials credentials);
public PubSubSinkBuilder<IN> withHostAndPortForEmulator(String hostAndPort);
public PubSubSink<IN> build();
}Emulator support for local development and testing scenarios without requiring actual Google Cloud Pub/Sub infrastructure.
public final class EmulatorCredentials extends OAuth2Credentials {
public static EmulatorCredentials getInstance();
}
public final class EmulatorCredentialsProvider implements CredentialsProvider {
public static EmulatorCredentialsProvider create();
}Interface for custom deserialization with access to full PubSub message metadata.
public interface PubSubDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
default void open(DeserializationSchema.InitializationContext context) throws Exception;
boolean isEndOfStream(T nextElement);
T deserialize(PubsubMessage message) throws Exception;
default void deserialize(PubsubMessage message, Collector<T> out) throws Exception;
}Factory interface for creating custom Pub/Sub subscribers with specialized configurations.
public interface PubSubSubscriberFactory extends Serializable {
PubSubSubscriber getSubscriber(Credentials credentials) throws IOException;
}Core classes for managing checkpoint-based message acknowledgment.
public interface Acknowledger<AcknowledgeId> {
void acknowledge(List<AcknowledgeId> ids);
}
public interface PubSubSubscriber extends Acknowledger<String> {
List<ReceivedMessage> pull();
void close() throws Exception;
}
public class AcknowledgeIdsForCheckpoint<AcknowledgeId> implements Serializable {
AcknowledgeIdsForCheckpoint(long checkpointId, List<AcknowledgeId> acknowledgeIds);
public long getCheckpointId();
public void setCheckpointId(long checkpointId);
public List<AcknowledgeId> getAcknowledgeIds();
public void setAcknowledgeIds(List<AcknowledgeId> acknowledgeIds);
}
public class AcknowledgeOnCheckpoint<ACKID extends Serializable>
implements ListCheckpointed<AcknowledgeIdsForCheckpoint<ACKID>>, CheckpointListener {
public AcknowledgeOnCheckpoint(Acknowledger<ACKID> acknowledger);
public void addAcknowledgeId(ACKID acknowledgeId);
public void acknowledgeIdsUpToCheckpoint(long checkpointId);
}