Apache Flink connector for Google Cloud Pub/Sub that provides streaming data integration capabilities
—
The PubSubSink provides reliable message publishing to Google Cloud Pub/Sub topics with at-least-once delivery semantics, automatic batching, and configurable retry policies.
Creates a new PubSubSink using the builder pattern. The builder enforces required parameters through type-safe interfaces.
/**
* Creates a new builder for PubSubSink
* @return SerializationSchemaBuilder for setting serialization schema
*/
public static SerializationSchemaBuilder newBuilder();Configure how messages are serialized for Pub/Sub.
public static class SerializationSchemaBuilder {
/**
* Set serialization schema for converting objects to byte arrays
* @param serializationSchema Schema for serializing message data
* @return ProjectNameBuilder for next configuration step
*/
public <IN> ProjectNameBuilder<IN> withSerializationSchema(
SerializationSchema<IN> serializationSchema);
}Configure the GCP project and Pub/Sub topic.
public interface ProjectNameBuilder<IN> {
/**
* Set the GCP project name containing the topic
* @param projectName GCP project name
* @return TopicNameBuilder for next configuration step
*/
TopicNameBuilder<IN> withProjectName(String projectName);
}
public interface TopicNameBuilder<IN> {
/**
* Set the Pub/Sub topic name to publish to
* @param topicName Topic name
* @return PubSubSinkBuilder for additional configuration
*/
PubSubSinkBuilder<IN> withTopicName(String topicName);
}Main builder class for configuring optional parameters.
public static class PubSubSinkBuilder<IN> {
/**
* Set authentication credentials (optional - uses default credentials if not set)
* @param credentials Google Cloud credentials
* @return Current builder instance
*/
public PubSubSinkBuilder<IN> withCredentials(Credentials credentials);
/**
* Set custom hostname/port for Pub/Sub emulator (testing only)
* @param hostAndPort Host and port combination ("hostname:1234")
* @return Current builder instance
*/
public PubSubSinkBuilder<IN> withHostAndPortForEmulator(String hostAndPort);
/**
* Build the configured PubSubSink
* @return Configured PubSubSink instance
* @throws IOException If credentials cannot be obtained
* @throws IllegalArgumentException If required fields are missing
*/
public PubSubSink<IN> build() throws IOException;
}Key methods of the PubSubSink class.
public class PubSubSink<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {
/**
* Process and publish a message to Pub/Sub
* @param message Message to publish
* @param context Sink context (provides processing time, etc.)
*/
public void invoke(IN message, SinkFunction.Context context);
/**
* Called during checkpointing - ensures all pending messages are published
* @param context Checkpoint context
* @throws Exception If publishing fails
*/
public void snapshotState(FunctionSnapshotContext context) throws Exception;
/**
* Initialize state - called once per subtask
* @param context Initialization context
*/
public void initializeState(FunctionInitializationContext context);
}import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
PubSubSink<String> sink = PubSubSink.newBuilder()
.withSerializationSchema(new SimpleStringSchema())
.withProjectName("my-gcp-project")
.withTopicName("my-topic")
.build();
env.fromElements("Hello", "World", "Pub/Sub")
.addSink(sink);
env.execute("Basic Pub/Sub Producer");import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.serialization.SerializationSchema;
public class JsonSerializationSchema<T> implements SerializationSchema<T> {
private ObjectMapper mapper = new ObjectMapper();
@Override
public byte[] serialize(T element) {
try {
return mapper.writeValueAsBytes(element);
} catch (Exception e) {
throw new RuntimeException("Failed to serialize object", e);
}
}
}
public class MyEvent {
public String eventType;
public long timestamp;
public String userId;
// ... other fields
}
PubSubSink<MyEvent> sink = PubSubSink.newBuilder()
.withSerializationSchema(new JsonSerializationSchema<MyEvent>())
.withProjectName("my-gcp-project")
.withTopicName("events")
.build();
DataStream<MyEvent> events = env.addSource(/* some source */);
events.addSink(sink);import com.google.auth.oauth2.ServiceAccountCredentials;
import java.io.FileInputStream;
// Load service account credentials
Credentials credentials = ServiceAccountCredentials
.fromStream(new FileInputStream("path/to/service-account.json"));
PubSubSink<String> sink = PubSubSink.newBuilder()
.withSerializationSchema(new SimpleStringSchema())
.withProjectName("my-gcp-project")
.withTopicName("my-topic")
.withCredentials(credentials)
.build();
env.addSource(/* some source */)
.addSink(sink);// Start Pub/Sub emulator first:
// gcloud beta emulators pubsub start --host-port=localhost:8085
PubSubSink<String> sink = PubSubSink.newBuilder()
.withSerializationSchema(new SimpleStringSchema())
.withProjectName("test-project")
.withTopicName("test-topic")
.withHostAndPortForEmulator("localhost:8085")
.build();
env.fromElements("test-message-1", "test-message-2")
.addSink(sink);For advanced use cases requiring message attributes, you would need to implement a custom serialization approach or use the Pub/Sub client library directly within a custom sink function:
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.pubsub.v1.PubsubMessage;
import com.google.protobuf.ByteString;
public class CustomPubSubSink extends RichSinkFunction<MyEventWithAttributes> {
private transient Publisher publisher;
@Override
public void open(Configuration parameters) throws Exception {
// Initialize publisher
publisher = Publisher.newBuilder(TopicName.of("project", "topic")).build();
}
@Override
public void invoke(MyEventWithAttributes event, Context context) throws Exception {
PubsubMessage message = PubsubMessage.newBuilder()
.setData(ByteString.copyFromUtf8(event.getData()))
.putAttributes("eventType", event.getEventType())
.putAttributes("source", event.getSource())
.build();
publisher.publish(message);
}
@Override
public void close() throws Exception {
if (publisher != null) {
publisher.shutdown();
}
}
}The PubSubSink provides at-least-once delivery guarantees. Messages may be delivered multiple times in case of failures, but no messages are lost.
The sink automatically retries failed publish operations according to the Google Cloud Pub/Sub client's default retry policy:
The sink integrates with Flink's checkpointing mechanism:
The sink handles back-pressure scenarios:
The underlying Google Cloud Pub/Sub publisher automatically batches messages for optimal throughput while respecting latency requirements.
For high-throughput scenarios, consider tuning the publisher settings by implementing a custom sink based on the PubSubSink pattern with explicit publisher configuration.
Message Ordering: Pub/Sub does not guarantee message ordering by default. Use message ordering keys if ordering is required.
Message Size Limits: Pub/Sub has a maximum message size limit (10 MB). Ensure your serialized messages are within this limit.
Topic Creation: Topics must exist before publishing. The sink does not create topics automatically.
Authentication: Uses Google Cloud default authentication if no explicit credentials are provided. Ensure proper service account configuration in production environments.
Install with Tessl CLI
npx tessl i tessl/maven-org-apache-flink--flink-connector-gcp-pubsub-2-11