Apache Flink connector for Google Cloud Pub/Sub that provides streaming data integration capabilities
npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-gcp-pubsub-2-11@1.14.0The Apache Flink GCP Pub/Sub Connector provides streaming data integration between Apache Flink applications and Google Cloud Pub/Sub messaging service. It includes both source and sink implementations with exactly-once and at-least-once processing guarantees, authentication support, and comprehensive error handling.
pom.xml:<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-gcp-pubsub_2.11</artifactId>
<version>1.14.6</version>
</dependency>import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import com.google.auth.Credentials;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Enable checkpointing (required for exactly-once processing)
env.enableCheckpointing(30000);
PubSubSource<String> source = PubSubSource.newBuilder()
.withDeserializationSchema(new SimpleStringSchema())
.withProjectName("my-gcp-project")
.withSubscriptionName("my-subscription")
.build();
env.addSource(source)
.print();
env.execute("Pub/Sub Consumer");import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
PubSubSink<String> sink = PubSubSink.newBuilder()
.withSerializationSchema(new SimpleStringSchema())
.withProjectName("my-gcp-project")
.withTopicName("my-topic")
.build();
env.fromElements("Hello", "World", "Pub/Sub")
.addSink(sink);
env.execute("Pub/Sub Producer");The connector is built around several key components:
High-performance message consumption from Pub/Sub subscriptions with exactly-once processing guarantees through Flink's checkpointing mechanism. Supports rate limiting, custom deserialization, and flexible subscriber configuration.
public static DeserializationSchemaBuilder newBuilder();
public static class PubSubSourceBuilder<OUT> {
public PubSubSourceBuilder<OUT> withCredentials(Credentials credentials);
public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(PubSubSubscriberFactory factory);
public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(int maxMessagesPerPull, Duration timeout, int retries);
public PubSubSourceBuilder<OUT> withMessageRateLimit(int messagePerSecondRateLimit);
public PubSubSource<OUT> build() throws IOException;
}Reliable message publishing to Pub/Sub topics with at-least-once delivery semantics, automatic batching, and configurable retry policies. Includes emulator support for testing scenarios.
public static SerializationSchemaBuilder newBuilder();
public static class PubSubSinkBuilder<IN> {
public PubSubSinkBuilder<IN> withCredentials(Credentials credentials);
public PubSubSinkBuilder<IN> withHostAndPortForEmulator(String hostAndPort);
public PubSubSink<IN> build() throws IOException;
}Advanced deserialization system providing access to Pub/Sub message metadata including attributes, message ID, and publish time. Essential for applications requiring message metadata or custom deserialization logic.
public interface PubSubDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
void open(InitializationContext context) throws Exception;
boolean isEndOfStream(T nextElement);
T deserialize(PubsubMessage message) throws Exception;
void deserialize(PubsubMessage message, Collector<T> out) throws Exception;
TypeInformation<T> getProducedType();
}// Core source class
public class PubSubSource<OUT> extends RichSourceFunction<OUT>
implements ResultTypeQueryable<OUT>, ParallelSourceFunction<OUT>,
CheckpointListener, ListCheckpointed<AcknowledgeIdsForCheckpoint<String>>
// Core sink class
public class PubSubSink<IN> extends RichSinkFunction<IN> implements CheckpointedFunction
// Builder interfaces for source
public interface ProjectNameBuilder<OUT> {
SubscriptionNameBuilder<OUT> withProjectName(String projectName);
}
public interface SubscriptionNameBuilder<OUT> {
PubSubSourceBuilder<OUT> withSubscriptionName(String subscriptionName);
}
// Builder interfaces for sink
public interface ProjectNameBuilder<IN> {
TopicNameBuilder<IN> withProjectName(String projectName);
}
public interface TopicNameBuilder<IN> {
PubSubSinkBuilder<IN> withTopicName(String topicName);
}
// Subscriber factory interface
public interface PubSubSubscriberFactory extends Serializable {
PubSubSubscriber getSubscriber(Credentials credentials) throws IOException;
}
// Subscriber interface
public interface PubSubSubscriber extends Acknowledger<String> {
List<ReceivedMessage> pull();
void close() throws Exception;
}
// Acknowledger interface
public interface Acknowledger<AcknowledgeId> {
void acknowledge(List<AcknowledgeId> ids);
}
// Emulator subscriber factory
public class PubSubSubscriberFactoryForEmulator implements PubSubSubscriberFactory {
public PubSubSubscriberFactoryForEmulator(String hostAndPort, String project, String subscription,
int retries, Duration timeout, int maxMessagesPerPull);
}