CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-gcp-pubsub-2-12

Apache Flink connector for Google Cloud Pub/Sub with exactly-once processing guarantees for real-time stream processing applications

Pending
Overview
Eval results
Files

pubsub-source.mddocs/

PubSub Source

The PubSubSource provides exactly-once message consumption from Google Cloud Pub/Sub subscriptions with automatic acknowledgment management through Flink's checkpointing mechanism.

Capabilities

Source Creation

Create a PubSubSource using the builder pattern with required project name, subscription name, and deserialization schema.

/**
 * Creates a new builder for PubSubSource configuration
 * @return DeserializationSchemaBuilder instance to start configuration
 */
public static DeserializationSchemaBuilder newBuilder();

public static class DeserializationSchemaBuilder {
    /**
     * Set standard Flink DeserializationSchema (extracts only message data)
     * @param deserializationSchema Schema for deserializing message payload
     * @return ProjectNameBuilder for next configuration step
     */
    public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(DeserializationSchema<OUT> deserializationSchema);
    
    /**
     * Set PubSub-specific deserialization schema (provides access to full message)
     * @param deserializationSchema Schema with access to PubSub message metadata
     * @return ProjectNameBuilder for next configuration step
     */
    public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(PubSubDeserializationSchema<OUT> deserializationSchema);
}

Builder Configuration

Configure project name, subscription name, and optional parameters.

public interface ProjectNameBuilder<OUT> {
    /**
     * Set the GCP project name containing the subscription
     * @param projectName Google Cloud project name
     * @return SubscriptionNameBuilder for next configuration step
     */
    SubscriptionNameBuilder<OUT> withProjectName(String projectName);
}

public interface SubscriptionNameBuilder<OUT> {
    /**
     * Set the Pub/Sub subscription name to consume from
     * @param subscriptionName Pub/Sub subscription name
     * @return PubSubSourceBuilder for optional configuration
     */
    PubSubSourceBuilder<OUT> withSubscriptionName(String subscriptionName);
}

public static class PubSubSourceBuilder<OUT> {
    /**
     * Set custom GCP credentials (optional, defaults to environment credentials)
     * @param credentials Google Cloud credentials
     * @return Current builder instance
     */
    public PubSubSourceBuilder<OUT> withCredentials(Credentials credentials);
    
    /**
     * Set custom subscriber factory for advanced configuration (optional)
     * @param pubSubSubscriberFactory Custom factory for creating subscribers
     * @return Current builder instance
     */
    public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(PubSubSubscriberFactory pubSubSubscriberFactory);
    
    /**
     * Configure default subscriber factory with specific parameters (optional)
     * @param maxMessagesPerPull Number of messages pulled per request (default: 100)
     * @param perRequestTimeout Timeout per pull request (default: 15 seconds)
     * @param retries Number of retries for failed requests (default: 3)
     * @return Current builder instance
     */
    public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(int maxMessagesPerPull, Duration perRequestTimeout, int retries);
    
    /**
     * Set message rate limit per parallel source instance (optional)
     * @param messagePerSecondRateLimit Messages per second limit (default: 100000)
     * @return Current builder instance
     */
    public PubSubSourceBuilder<OUT> withMessageRateLimit(int messagePerSecondRateLimit);
    
    /**
     * Build the configured PubSubSource instance
     * @return Configured PubSubSource ready for use
     * @throws IOException If credentials cannot be obtained
     * @throws IllegalArgumentException If required fields are missing or checkpointing is disabled
     */
    public PubSubSource<OUT> build() throws IOException;
}

Usage Examples

Basic String Message Consumer

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000); // Required for exactly-once guarantees

PubSubSource<String> source = PubSubSource.newBuilder()
    .withDeserializationSchema(new SimpleStringSchema())
    .withProjectName("my-gcp-project")
    .withSubscriptionName("my-subscription")
    .build();

DataStream<String> stream = env.addSource(source);
stream.print();

env.execute("Basic PubSub Consumer");

JSON Message Consumer with Custom Deserialization

import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
import com.google.pubsub.v1.PubsubMessage;
import com.fasterxml.jackson.databind.ObjectMapper;

public class JsonUserDeserializer implements PubSubDeserializationSchema<User> {
    private transient ObjectMapper objectMapper;
    
    @Override
    public void open(DeserializationSchema.InitializationContext context) {
        objectMapper = new ObjectMapper();
    }
    
    @Override
    public User deserialize(PubsubMessage message) throws Exception {
        String json = message.getData().toStringUtf8();
        return objectMapper.readValue(json, User.class);
    }
    
    @Override
    public boolean isEndOfStream(User nextElement) {
        return false;
    }
    
    @Override
    public TypeInformation<User> getProducedType() {
        return TypeInformation.of(User.class);
    }
}

// Usage
PubSubSource<User> userSource = PubSubSource.newBuilder()
    .withDeserializationSchema(new JsonUserDeserializer())
    .withProjectName("my-project")
    .withSubscriptionName("user-events")
    .build();

Advanced Configuration with Rate Limiting

import java.time.Duration;

PubSubSource<String> source = PubSubSource.newBuilder()
    .withDeserializationSchema(new SimpleStringSchema())
    .withProjectName("my-project")
    .withSubscriptionName("high-volume-subscription")
    .withPubSubSubscriberFactory(
        500,                    // maxMessagesPerPull
        Duration.ofSeconds(30), // perRequestTimeout
        5                       // retries
    )
    .withMessageRateLimit(10000) // 10K messages per second per subtask
    .build();

Using Custom Credentials

import com.google.auth.oauth2.ServiceAccountCredentials;
import java.io.FileInputStream;

Credentials credentials = ServiceAccountCredentials.fromStream(
    new FileInputStream("path/to/service-account-key.json")
);

PubSubSource<String> source = PubSubSource.newBuilder()
    .withDeserializationSchema(new SimpleStringSchema())
    .withProjectName("my-project")
    .withSubscriptionName("my-subscription")
    .withCredentials(credentials)
    .build();

Message Acknowledgment

The PubSubSource automatically manages message acknowledgment through Flink's checkpointing mechanism:

  1. Message Reception: Messages are pulled from the subscription and added to pending acknowledgments
  2. Processing: Messages are deserialized and emitted to the Flink stream
  3. Checkpointing: When a checkpoint completes successfully, all messages received before that checkpoint are acknowledged
  4. Failure Recovery: If a checkpoint fails, messages remain unacknowledged and will be redelivered by Pub/Sub

This ensures exactly-once processing semantics - each message is processed exactly once, even in the presence of failures.

Important Requirements

  • Checkpointing: Flink checkpointing MUST be enabled. The source will throw IllegalArgumentException if checkpointing is disabled
  • Checkpoint Frequency: Checkpoint frequency should be much lower than Pub/Sub's acknowledgment timeout (default 600 seconds)
  • Parallel Processing: Each parallel subtask creates its own subscriber and manages its own acknowledgments
  • Rate Limiting: Rate limits are applied per parallel subtask, not globally

Error Handling

  • Connection Failures: Automatic retry with exponential backoff through the subscriber factory
  • Deserialization Errors: Exceptions during deserialization will cause the job to fail
  • Acknowledgment Failures: Failed acknowledgments during checkpoint completion will cause checkpoint failure
  • Timeout Handling: Pull request timeouts are handled gracefully with configurable retry logic

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-gcp-pubsub-2-12

docs

emulator-testing.md

index.md

pubsub-sink.md

pubsub-source.md

tile.json