Apache Flink connector for Google Cloud Pub/Sub with exactly-once processing guarantees for real-time stream processing applications
—
The PubSubSource provides exactly-once message consumption from Google Cloud Pub/Sub subscriptions with automatic acknowledgment management through Flink's checkpointing mechanism.
Create a PubSubSource using the builder pattern with required project name, subscription name, and deserialization schema.
/**
* Creates a new builder for PubSubSource configuration
* @return DeserializationSchemaBuilder instance to start configuration
*/
public static DeserializationSchemaBuilder newBuilder();
public static class DeserializationSchemaBuilder {
/**
* Set standard Flink DeserializationSchema (extracts only message data)
* @param deserializationSchema Schema for deserializing message payload
* @return ProjectNameBuilder for next configuration step
*/
public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(DeserializationSchema<OUT> deserializationSchema);
/**
* Set PubSub-specific deserialization schema (provides access to full message)
* @param deserializationSchema Schema with access to PubSub message metadata
* @return ProjectNameBuilder for next configuration step
*/
public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(PubSubDeserializationSchema<OUT> deserializationSchema);
}Configure project name, subscription name, and optional parameters.
public interface ProjectNameBuilder<OUT> {
/**
* Set the GCP project name containing the subscription
* @param projectName Google Cloud project name
* @return SubscriptionNameBuilder for next configuration step
*/
SubscriptionNameBuilder<OUT> withProjectName(String projectName);
}
public interface SubscriptionNameBuilder<OUT> {
/**
* Set the Pub/Sub subscription name to consume from
* @param subscriptionName Pub/Sub subscription name
* @return PubSubSourceBuilder for optional configuration
*/
PubSubSourceBuilder<OUT> withSubscriptionName(String subscriptionName);
}
public static class PubSubSourceBuilder<OUT> {
/**
* Set custom GCP credentials (optional, defaults to environment credentials)
* @param credentials Google Cloud credentials
* @return Current builder instance
*/
public PubSubSourceBuilder<OUT> withCredentials(Credentials credentials);
/**
* Set custom subscriber factory for advanced configuration (optional)
* @param pubSubSubscriberFactory Custom factory for creating subscribers
* @return Current builder instance
*/
public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(PubSubSubscriberFactory pubSubSubscriberFactory);
/**
* Configure default subscriber factory with specific parameters (optional)
* @param maxMessagesPerPull Number of messages pulled per request (default: 100)
* @param perRequestTimeout Timeout per pull request (default: 15 seconds)
* @param retries Number of retries for failed requests (default: 3)
* @return Current builder instance
*/
public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(int maxMessagesPerPull, Duration perRequestTimeout, int retries);
/**
* Set message rate limit per parallel source instance (optional)
* @param messagePerSecondRateLimit Messages per second limit (default: 100000)
* @return Current builder instance
*/
public PubSubSourceBuilder<OUT> withMessageRateLimit(int messagePerSecondRateLimit);
/**
* Build the configured PubSubSource instance
* @return Configured PubSubSource ready for use
* @throws IOException If credentials cannot be obtained
* @throws IllegalArgumentException If required fields are missing or checkpointing is disabled
*/
public PubSubSource<OUT> build() throws IOException;
}import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000); // Required for exactly-once guarantees
PubSubSource<String> source = PubSubSource.newBuilder()
.withDeserializationSchema(new SimpleStringSchema())
.withProjectName("my-gcp-project")
.withSubscriptionName("my-subscription")
.build();
DataStream<String> stream = env.addSource(source);
stream.print();
env.execute("Basic PubSub Consumer");import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
import com.google.pubsub.v1.PubsubMessage;
import com.fasterxml.jackson.databind.ObjectMapper;
public class JsonUserDeserializer implements PubSubDeserializationSchema<User> {
private transient ObjectMapper objectMapper;
@Override
public void open(DeserializationSchema.InitializationContext context) {
objectMapper = new ObjectMapper();
}
@Override
public User deserialize(PubsubMessage message) throws Exception {
String json = message.getData().toStringUtf8();
return objectMapper.readValue(json, User.class);
}
@Override
public boolean isEndOfStream(User nextElement) {
return false;
}
@Override
public TypeInformation<User> getProducedType() {
return TypeInformation.of(User.class);
}
}
// Usage
PubSubSource<User> userSource = PubSubSource.newBuilder()
.withDeserializationSchema(new JsonUserDeserializer())
.withProjectName("my-project")
.withSubscriptionName("user-events")
.build();import java.time.Duration;
PubSubSource<String> source = PubSubSource.newBuilder()
.withDeserializationSchema(new SimpleStringSchema())
.withProjectName("my-project")
.withSubscriptionName("high-volume-subscription")
.withPubSubSubscriberFactory(
500, // maxMessagesPerPull
Duration.ofSeconds(30), // perRequestTimeout
5 // retries
)
.withMessageRateLimit(10000) // 10K messages per second per subtask
.build();import com.google.auth.oauth2.ServiceAccountCredentials;
import java.io.FileInputStream;
Credentials credentials = ServiceAccountCredentials.fromStream(
new FileInputStream("path/to/service-account-key.json")
);
PubSubSource<String> source = PubSubSource.newBuilder()
.withDeserializationSchema(new SimpleStringSchema())
.withProjectName("my-project")
.withSubscriptionName("my-subscription")
.withCredentials(credentials)
.build();The PubSubSource automatically manages message acknowledgment through Flink's checkpointing mechanism:
This ensures exactly-once processing semantics - each message is processed exactly once, even in the presence of failures.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-gcp-pubsub-2-12