or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

emulator-testing.mdindex.mdpubsub-sink.mdpubsub-source.md
tile.json

tessl/maven-org-apache-flink--flink-connector-gcp-pubsub-2-12

Apache Flink connector for Google Cloud Pub/Sub with exactly-once processing guarantees for real-time stream processing applications

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
mavenpkg:maven/org.apache.flink/flink-connector-gcp-pubsub_2.12@1.14.x

To install, run

npx @tessl/cli install tessl/maven-org-apache-flink--flink-connector-gcp-pubsub-2-12@1.14.0

index.mddocs/

Apache Flink GCP Pub/Sub Connector

Apache Flink connector for Google Cloud Pub/Sub enables consuming messages from and publishing messages to Google Pub/Sub topics with exactly-once processing guarantees. The connector provides both source and sink capabilities for real-time stream processing applications with automatic acknowledgment management through Flink's checkpointing mechanism.

Package Information

  • Package Name: flink-connector-gcp-pubsub_2.12
  • Package Type: Maven
  • Group ID: org.apache.flink
  • Language: Java
  • Installation:
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-gcp-pubsub_2.12</artifactId>
      <version>1.14.6</version>
    </dependency>

Core Imports

import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;
import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.util.Collector;

For credentials management:

import com.google.auth.Credentials;
import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;

For Google Cloud Pub/Sub types:

import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.ReceivedMessage;
import java.util.List;
import java.time.Duration;

For emulator testing:

import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials;
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider;
import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator;

Basic Usage

Consuming Messages (Source)

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Enable checkpointing (required for exactly-once guarantees)
env.enableCheckpointing(30000); // checkpoint every 30 seconds

// Create PubSubSource
PubSubSource<String> pubsubSource = PubSubSource.newBuilder()
    .withDeserializationSchema(new SimpleStringSchema())
    .withProjectName("my-gcp-project")
    .withSubscriptionName("my-subscription")
    .build();

// Add source to stream
DataStream<String> stream = env.addSource(pubsubSource);
stream.print();

env.execute("PubSub Consumer");

Publishing Messages (Sink)

import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Create data stream
DataStream<String> inputStream = env.fromElements("Hello", "World", "PubSub");

// Create PubSubSink
PubSubSink<String> pubsubSink = PubSubSink.newBuilder()
    .withSerializationSchema(new SimpleStringSchema())
    .withProjectName("my-gcp-project")
    .withTopicName("my-topic")
    .build();

// Add sink to stream
inputStream.addSink(pubsubSink);

env.execute("PubSub Producer");

Architecture

The Apache Flink GCP Pub/Sub connector is built around several key components:

  • PubSubSource: Source function for consuming messages with exactly-once processing guarantees
  • PubSubSink: Sink function for publishing messages with at-least-once delivery semantics
  • Builder Pattern: Fluent API for configuring both sources and sinks with optional parameters
  • Checkpointing Integration: Automatic message acknowledgment tied to Flink checkpoint completion
  • Rate Limiting: Configurable message consumption rate limits per parallel subtask
  • Emulator Support: Built-in support for Google Pub/Sub emulator for testing scenarios

The connector ensures data consistency through Flink's distributed checkpointing mechanism, where Pub/Sub messages are only acknowledged after successful checkpoint completion, preventing message loss during failure scenarios.

Capabilities

PubSub Message Source

Source functionality for consuming messages from Google Cloud Pub/Sub subscriptions with exactly-once processing guarantees and configurable rate limiting.

public static DeserializationSchemaBuilder newBuilder();

public static class PubSubSourceBuilder<OUT> {
    public PubSubSourceBuilder<OUT> withCredentials(Credentials credentials);
    public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(PubSubSubscriberFactory factory);
    public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(int maxMessagesPerPull, Duration perRequestTimeout, int retries);
    public PubSubSourceBuilder<OUT> withMessageRateLimit(int messagePerSecondRateLimit);
    public PubSubSource<OUT> build();
}

PubSub Source

PubSub Message Sink

Sink functionality for publishing messages to Google Cloud Pub/Sub topics with reliable delivery and checkpoint synchronization.

public static SerializationSchemaBuilder newBuilder();

public static class PubSubSinkBuilder<IN> {
    public PubSubSinkBuilder<IN> withCredentials(Credentials credentials);
    public PubSubSinkBuilder<IN> withHostAndPortForEmulator(String hostAndPort);
    public PubSubSink<IN> build();
}

PubSub Sink

Testing Support

Emulator support for local development and testing scenarios without requiring actual Google Cloud Pub/Sub infrastructure.

public final class EmulatorCredentials extends OAuth2Credentials {
    public static EmulatorCredentials getInstance();
}

public final class EmulatorCredentialsProvider implements CredentialsProvider {
    public static EmulatorCredentialsProvider create();
}

Emulator Testing

Core Interfaces

PubSubDeserializationSchema

Interface for custom deserialization with access to full PubSub message metadata.

public interface PubSubDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
    default void open(DeserializationSchema.InitializationContext context) throws Exception;
    boolean isEndOfStream(T nextElement);
    T deserialize(PubsubMessage message) throws Exception;
    default void deserialize(PubsubMessage message, Collector<T> out) throws Exception;
}

PubSubSubscriberFactory

Factory interface for creating custom Pub/Sub subscribers with specialized configurations.

public interface PubSubSubscriberFactory extends Serializable {
    PubSubSubscriber getSubscriber(Credentials credentials) throws IOException;
}

Checkpointing Support Classes

Core classes for managing checkpoint-based message acknowledgment.

public interface Acknowledger<AcknowledgeId> {
    void acknowledge(List<AcknowledgeId> ids);
}

public interface PubSubSubscriber extends Acknowledger<String> {
    List<ReceivedMessage> pull();
    void close() throws Exception;
}

public class AcknowledgeIdsForCheckpoint<AcknowledgeId> implements Serializable {
    AcknowledgeIdsForCheckpoint(long checkpointId, List<AcknowledgeId> acknowledgeIds);
    public long getCheckpointId();
    public void setCheckpointId(long checkpointId);
    public List<AcknowledgeId> getAcknowledgeIds();
    public void setAcknowledgeIds(List<AcknowledgeId> acknowledgeIds);
}

public class AcknowledgeOnCheckpoint<ACKID extends Serializable> 
        implements ListCheckpointed<AcknowledgeIdsForCheckpoint<ACKID>>, CheckpointListener {
    public AcknowledgeOnCheckpoint(Acknowledger<ACKID> acknowledger);
    public void addAcknowledgeId(ACKID acknowledgeId);
    public void acknowledgeIdsUpToCheckpoint(long checkpointId);
}

Important Notes

  • Checkpointing Required: PubSubSource requires Flink checkpointing to be enabled for exactly-once guarantees
  • Parallel Processing: Both source and sink support parallel execution across multiple Flink subtasks
  • Rate Limiting: Default rate limit is 100,000 messages per second per parallel source instance
  • Credentials: If not explicitly provided, credentials are automatically loaded from the environment
  • Error Handling: Failed message publishing in sink will cause job failure during checkpointing