Apache Flink connector for Google Cloud Pub/Sub that provides streaming data integration capabilities
—
The PubSubSource provides high-performance message consumption from Google Cloud Pub/Sub subscriptions with exactly-once processing guarantees through Flink's checkpointing mechanism.
Creates a new PubSubSource using the builder pattern. The builder enforces required parameters through type-safe interfaces.
/**
* Creates a new builder for PubSubSource
* @return DeserializationSchemaBuilder for setting deserialization schema
*/
public static DeserializationSchemaBuilder newBuilder();Configure how messages are deserialized from Pub/Sub.
public static class DeserializationSchemaBuilder {
/**
* Set standard Flink DeserializationSchema for message data only
* @param deserializationSchema Schema for deserializing message data
* @return ProjectNameBuilder for next configuration step
*/
public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(
DeserializationSchema<OUT> deserializationSchema);
/**
* Set PubSub-specific deserialization schema with metadata access
* @param deserializationSchema Schema with access to full PubsubMessage
* @return ProjectNameBuilder for next configuration step
*/
public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(
PubSubDeserializationSchema<OUT> deserializationSchema);
}Configure the GCP project and Pub/Sub subscription.
public interface ProjectNameBuilder<OUT> {
/**
* Set the GCP project name containing the subscription
* @param projectName GCP project name
* @return SubscriptionNameBuilder for next configuration step
*/
SubscriptionNameBuilder<OUT> withProjectName(String projectName);
}
public interface SubscriptionNameBuilder<OUT> {
/**
* Set the Pub/Sub subscription name to consume from
* @param subscriptionName Subscription name
* @return PubSubSourceBuilder for additional configuration
*/
PubSubSourceBuilder<OUT> withSubscriptionName(String subscriptionName);
}Main builder class for configuring optional parameters.
public static class PubSubSourceBuilder<OUT> {
/**
* Set authentication credentials (optional - uses default credentials if not set)
* @param credentials Google Cloud credentials
* @return Current builder instance
*/
public PubSubSourceBuilder<OUT> withCredentials(Credentials credentials);
/**
* Set custom subscriber factory for advanced configuration
* @param pubSubSubscriberFactory Custom factory implementation
* @return Current builder instance
*/
public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(
PubSubSubscriberFactory pubSubSubscriberFactory);
/**
* Configure default subscriber factory with custom parameters
* @param maxMessagesPerPull Maximum messages per pull request (default: 100)
* @param perRequestTimeout Timeout per request (default: 15 seconds)
* @param retries Number of retries on failure (default: 3)
* @return Current builder instance
*/
public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(
int maxMessagesPerPull, Duration perRequestTimeout, int retries);
/**
* Set message rate limit per parallel instance (default: 100000 messages/second)
* @param messagePerSecondRateLimit Rate limit per parallel subtask
* @return Current builder instance
*/
public PubSubSourceBuilder<OUT> withMessageRateLimit(int messagePerSecondRateLimit);
/**
* Build the configured PubSubSource
* @return Configured PubSubSource instance
* @throws IOException If credentials cannot be obtained
* @throws IllegalArgumentException If required fields are missing
*/
public PubSubSource<OUT> build() throws IOException;
}Key methods of the PubSubSource class.
public class PubSubSource<OUT> extends RichSourceFunction<OUT>
implements ResultTypeQueryable<OUT>, ParallelSourceFunction<OUT>,
CheckpointListener, ListCheckpointed<AcknowledgeIdsForCheckpoint<String>> {
/**
* Get type information for elements produced by this source
* @return TypeInformation for output elements
*/
public TypeInformation<OUT> getProducedType();
/**
* Called when checkpoint completes - acknowledges messages
* @param checkpointId Completed checkpoint ID
*/
public void notifyCheckpointComplete(long checkpointId) throws Exception;
/**
* Cancel the source function
*/
public void cancel();
}import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000); // Required for exactly-once processing
PubSubSource<String> source = PubSubSource.newBuilder()
.withDeserializationSchema(new SimpleStringSchema())
.withProjectName("my-gcp-project")
.withSubscriptionName("my-subscription")
.build();
env.addSource(source)
.print();
env.execute("Basic Pub/Sub Consumer");import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
public class JsonDeserializationSchema extends AbstractDeserializationSchema<MyObject> {
private ObjectMapper mapper = new ObjectMapper();
@Override
public MyObject deserialize(byte[] message) throws IOException {
return mapper.readValue(message, MyObject.class);
}
}
PubSubSource<MyObject> source = PubSubSource.newBuilder()
.withDeserializationSchema(new JsonDeserializationSchema())
.withProjectName("my-gcp-project")
.withSubscriptionName("json-messages")
.build();import com.google.auth.oauth2.ServiceAccountCredentials;
import java.io.FileInputStream;
import java.time.Duration;
// Load service account credentials
Credentials credentials = ServiceAccountCredentials
.fromStream(new FileInputStream("path/to/service-account.json"));
PubSubSource<String> source = PubSubSource.newBuilder()
.withDeserializationSchema(new SimpleStringSchema())
.withProjectName("my-gcp-project")
.withSubscriptionName("my-subscription")
.withCredentials(credentials)
.withPubSubSubscriberFactory(200, Duration.ofSeconds(30), 5) // Custom timeouts
.withMessageRateLimit(50000) // Limit to 50k messages/second per subtask
.build();import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import com.google.pubsub.v1.PubsubMessage;
public class MessageWithMetadata {
public String data;
public String messageId;
public long publishTime;
public Map<String, String> attributes;
}
public class MetadataDeserializationSchema implements PubSubDeserializationSchema<MessageWithMetadata> {
@Override
public MessageWithMetadata deserialize(PubsubMessage message) throws Exception {
MessageWithMetadata result = new MessageWithMetadata();
result.data = message.getData().toStringUtf8();
result.messageId = message.getMessageId();
result.publishTime = message.getPublishTime().getSeconds();
result.attributes = message.getAttributesMap();
return result;
}
@Override
public boolean isEndOfStream(MessageWithMetadata nextElement) {
return false;
}
@Override
public TypeInformation<MessageWithMetadata> getProducedType() {
return TypeInformation.of(MessageWithMetadata.class);
}
}
PubSubSource<MessageWithMetadata> source = PubSubSource.newBuilder()
.withDeserializationSchema(new MetadataDeserializationSchema())
.withProjectName("my-gcp-project")
.withSubscriptionName("my-subscription")
.build();Checkpointing Required: PubSubSource requires checkpointing to be enabled for exactly-once processing. The source will throw an IllegalArgumentException if checkpointing is disabled.
Acknowledgment Strategy: Messages are acknowledged only after successful checkpoint completion, ensuring exactly-once processing guarantees.
Rate Limiting: The rate limit applies per parallel subtask. Total throughput = rate limit × parallelism.
Retries: Failed pull requests are automatically retried according to the configured retry policy.
Thread Safety: The source is designed to work safely in Flink's parallel execution environment.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-gcp-pubsub-2-11