Apache Flink connector for Google Cloud Pub/Sub with exactly-once processing guarantees for real-time stream processing applications
—
The PubSubSink provides reliable message publishing to Google Cloud Pub/Sub topics with at-least-once delivery guarantees and checkpoint synchronization to ensure message delivery before checkpoint completion.
Create a PubSubSink using the builder pattern with required project name, topic name, and serialization schema.
/**
* Creates a new builder for PubSubSink configuration
* @return SerializationSchemaBuilder instance to start configuration
*/
public static SerializationSchemaBuilder newBuilder();
public static class SerializationSchemaBuilder {
/**
* Set serialization schema for converting objects to PubSub message payloads
* @param serializationSchema Schema for serializing objects to byte arrays
* @return ProjectNameBuilder for next configuration step
*/
public <IN> ProjectNameBuilder<IN> withSerializationSchema(SerializationSchema<IN> serializationSchema);
}Configure project name, topic name, and optional parameters.
public interface ProjectNameBuilder<IN> {
/**
* Set the GCP project name containing the topic
* @param projectName Google Cloud project name
* @return TopicNameBuilder for next configuration step
*/
TopicNameBuilder<IN> withProjectName(String projectName);
}
public interface TopicNameBuilder<IN> {
/**
* Set the Pub/Sub topic name to publish to
* @param topicName Pub/Sub topic name
* @return PubSubSinkBuilder for optional configuration
*/
PubSubSinkBuilder<IN> withTopicName(String topicName);
}
public static class PubSubSinkBuilder<IN> {
/**
* Set custom GCP credentials (optional, defaults to environment credentials)
* @param credentials Google Cloud credentials
* @return Current builder instance
*/
public PubSubSinkBuilder<IN> withCredentials(Credentials credentials);
/**
* Set emulator host and port for testing (optional, for emulator use only)
* @param hostAndPort Host and port combination (e.g., "localhost:8085")
* @return Current builder instance
*/
public PubSubSinkBuilder<IN> withHostAndPortForEmulator(String hostAndPort);
/**
* Build the configured PubSubSink instance
* @return Configured PubSubSink ready for use
* @throws IOException If credentials cannot be obtained
* @throws IllegalArgumentException If required fields are missing or topic does not exist
*/
public PubSubSink<IN> build() throws IOException;
}import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Create input stream
DataStream<String> inputStream = env.fromElements(
"Hello PubSub",
"Message 1",
"Message 2"
);
// Create and configure sink
PubSubSink<String> sink = PubSubSink.newBuilder()
.withSerializationSchema(new SimpleStringSchema())
.withProjectName("my-gcp-project")
.withTopicName("my-topic")
.build();
// Add sink to stream
inputStream.addSink(sink);
env.execute("Basic PubSub Producer");import org.apache.flink.api.common.serialization.SerializationSchema;
import com.fasterxml.jackson.databind.ObjectMapper;
public class JsonUserSerializer implements SerializationSchema<User> {
private transient ObjectMapper objectMapper;
@Override
public void open(SerializationSchema.InitializationContext context) {
objectMapper = new ObjectMapper();
}
@Override
public byte[] serialize(User user) {
try {
return objectMapper.writeValueAsBytes(user);
} catch (Exception e) {
throw new RuntimeException("Failed to serialize user", e);
}
}
}
// Usage
DataStream<User> userStream = // ... create user stream
PubSubSink<User> userSink = PubSubSink.newBuilder()
.withSerializationSchema(new JsonUserSerializer())
.withProjectName("my-project")
.withTopicName("user-events")
.build();
userStream.addSink(userSink);import com.google.auth.oauth2.ServiceAccountCredentials;
import java.io.FileInputStream;
Credentials credentials = ServiceAccountCredentials.fromStream(
new FileInputStream("path/to/service-account-key.json")
);
PubSubSink<String> sink = PubSubSink.newBuilder()
.withSerializationSchema(new SimpleStringSchema())
.withProjectName("my-project")
.withTopicName("my-topic")
.withCredentials(credentials)
.build();import org.apache.flink.streaming.api.datastream.DataStream;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000);
// Source: Read from one PubSub subscription
PubSubSource<String> source = PubSubSource.newBuilder()
.withDeserializationSchema(new SimpleStringSchema())
.withProjectName("my-project")
.withSubscriptionName("input-subscription")
.build();
// Transform: Process messages
DataStream<String> processedStream = env.addSource(source)
.map(message -> "Processed: " + message.toUpperCase())
.filter(message -> message.length() > 10);
// Sink: Publish to another PubSub topic
PubSubSink<String> sink = PubSubSink.newBuilder()
.withSerializationSchema(new SimpleStringSchema())
.withProjectName("my-project")
.withTopicName("output-topic")
.build();
processedStream.addSink(sink);
env.execute("PubSub Processing Pipeline");The PubSubSink provides at-least-once delivery guarantees:
The sink integrates with Flink's checkpointing mechanism to ensure reliable delivery:
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
// Flush all buffered messages
publisher.publishAllOutstanding();
// Wait for all pending publish operations to complete
waitForFuturesToComplete();
// If any publish operation failed, throw exception to fail checkpoint
if (exceptionAtomicReference.get() != null) {
throw exceptionAtomicReference.get();
}
}Published messages contain:
The sink uses Google Cloud Pub/Sub client's default retry settings:
The Google Cloud Pub/Sub client automatically batches messages for efficiency:
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-gcp-pubsub-2-12