CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-gcp-pubsub-2-11

Apache Flink connector for Google Cloud Pub/Sub that provides streaming data integration capabilities

Pending
Overview
Eval results
Files

source.mddocs/

Message Consumption (Source)

The PubSubSource provides high-performance message consumption from Google Cloud Pub/Sub subscriptions with exactly-once processing guarantees through Flink's checkpointing mechanism.

Capabilities

PubSubSource Builder

Creates a new PubSubSource using the builder pattern. The builder enforces required parameters through type-safe interfaces.

/**
 * Creates a new builder for PubSubSource
 * @return DeserializationSchemaBuilder for setting deserialization schema
 */
public static DeserializationSchemaBuilder newBuilder();

Deserialization Schema Configuration

Configure how messages are deserialized from Pub/Sub.

public static class DeserializationSchemaBuilder {
    /**
     * Set standard Flink DeserializationSchema for message data only
     * @param deserializationSchema Schema for deserializing message data
     * @return ProjectNameBuilder for next configuration step
     */
    public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(
        DeserializationSchema<OUT> deserializationSchema);
    
    /**
     * Set PubSub-specific deserialization schema with metadata access
     * @param deserializationSchema Schema with access to full PubsubMessage
     * @return ProjectNameBuilder for next configuration step  
     */
    public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(
        PubSubDeserializationSchema<OUT> deserializationSchema);
}

Project and Subscription Configuration

Configure the GCP project and Pub/Sub subscription.

public interface ProjectNameBuilder<OUT> {
    /**
     * Set the GCP project name containing the subscription
     * @param projectName GCP project name
     * @return SubscriptionNameBuilder for next configuration step
     */
    SubscriptionNameBuilder<OUT> withProjectName(String projectName);
}

public interface SubscriptionNameBuilder<OUT> {
    /**
     * Set the Pub/Sub subscription name to consume from
     * @param subscriptionName Subscription name
     * @return PubSubSourceBuilder for additional configuration
     */
    PubSubSourceBuilder<OUT> withSubscriptionName(String subscriptionName);
}

Source Builder Configuration

Main builder class for configuring optional parameters.

public static class PubSubSourceBuilder<OUT> {
    /**
     * Set authentication credentials (optional - uses default credentials if not set)
     * @param credentials Google Cloud credentials
     * @return Current builder instance
     */
    public PubSubSourceBuilder<OUT> withCredentials(Credentials credentials);
    
    /**
     * Set custom subscriber factory for advanced configuration
     * @param pubSubSubscriberFactory Custom factory implementation
     * @return Current builder instance
     */
    public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(
        PubSubSubscriberFactory pubSubSubscriberFactory);
    
    /**
     * Configure default subscriber factory with custom parameters
     * @param maxMessagesPerPull Maximum messages per pull request (default: 100)
     * @param perRequestTimeout Timeout per request (default: 15 seconds)  
     * @param retries Number of retries on failure (default: 3)
     * @return Current builder instance
     */
    public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(
        int maxMessagesPerPull, Duration perRequestTimeout, int retries);
    
    /**
     * Set message rate limit per parallel instance (default: 100000 messages/second)
     * @param messagePerSecondRateLimit Rate limit per parallel subtask
     * @return Current builder instance
     */
    public PubSubSourceBuilder<OUT> withMessageRateLimit(int messagePerSecondRateLimit);
    
    /**
     * Build the configured PubSubSource
     * @return Configured PubSubSource instance
     * @throws IOException If credentials cannot be obtained
     * @throws IllegalArgumentException If required fields are missing
     */
    public PubSubSource<OUT> build() throws IOException;
}

Core Source Methods

Key methods of the PubSubSource class.

public class PubSubSource<OUT> extends RichSourceFunction<OUT> 
    implements ResultTypeQueryable<OUT>, ParallelSourceFunction<OUT>, 
               CheckpointListener, ListCheckpointed<AcknowledgeIdsForCheckpoint<String>> {
    
    /**
     * Get type information for elements produced by this source
     * @return TypeInformation for output elements
     */
    public TypeInformation<OUT> getProducedType();
    
    /**
     * Called when checkpoint completes - acknowledges messages
     * @param checkpointId Completed checkpoint ID
     */
    public void notifyCheckpointComplete(long checkpointId) throws Exception;
    
    /**
     * Cancel the source function
     */
    public void cancel();
}

Usage Examples

Basic String Consumption

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000); // Required for exactly-once processing

PubSubSource<String> source = PubSubSource.newBuilder()
    .withDeserializationSchema(new SimpleStringSchema())
    .withProjectName("my-gcp-project")
    .withSubscriptionName("my-subscription")
    .build();

env.addSource(source)
    .print();

env.execute("Basic Pub/Sub Consumer");

JSON Message Consumption with Custom Schema

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;

public class JsonDeserializationSchema extends AbstractDeserializationSchema<MyObject> {
    private ObjectMapper mapper = new ObjectMapper();
    
    @Override
    public MyObject deserialize(byte[] message) throws IOException {
        return mapper.readValue(message, MyObject.class);
    }
}

PubSubSource<MyObject> source = PubSubSource.newBuilder()
    .withDeserializationSchema(new JsonDeserializationSchema())
    .withProjectName("my-gcp-project")
    .withSubscriptionName("json-messages")
    .build();

Advanced Configuration with Custom Credentials

import com.google.auth.oauth2.ServiceAccountCredentials;
import java.io.FileInputStream;
import java.time.Duration;

// Load service account credentials
Credentials credentials = ServiceAccountCredentials
    .fromStream(new FileInputStream("path/to/service-account.json"));

PubSubSource<String> source = PubSubSource.newBuilder()
    .withDeserializationSchema(new SimpleStringSchema())
    .withProjectName("my-gcp-project")
    .withSubscriptionName("my-subscription")
    .withCredentials(credentials)
    .withPubSubSubscriberFactory(200, Duration.ofSeconds(30), 5) // Custom timeouts
    .withMessageRateLimit(50000) // Limit to 50k messages/second per subtask
    .build();

Message Consumption with Metadata Access

import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import com.google.pubsub.v1.PubsubMessage;

public class MessageWithMetadata {
    public String data;
    public String messageId;
    public long publishTime;
    public Map<String, String> attributes;
}

public class MetadataDeserializationSchema implements PubSubDeserializationSchema<MessageWithMetadata> {
    @Override
    public MessageWithMetadata deserialize(PubsubMessage message) throws Exception {
        MessageWithMetadata result = new MessageWithMetadata();
        result.data = message.getData().toStringUtf8();
        result.messageId = message.getMessageId();
        result.publishTime = message.getPublishTime().getSeconds();
        result.attributes = message.getAttributesMap();
        return result;
    }
    
    @Override
    public boolean isEndOfStream(MessageWithMetadata nextElement) {
        return false;
    }
    
    @Override
    public TypeInformation<MessageWithMetadata> getProducedType() {
        return TypeInformation.of(MessageWithMetadata.class);
    }
}

PubSubSource<MessageWithMetadata> source = PubSubSource.newBuilder()
    .withDeserializationSchema(new MetadataDeserializationSchema())
    .withProjectName("my-gcp-project")
    .withSubscriptionName("my-subscription")
    .build();

Important Notes

  • Checkpointing Required: PubSubSource requires checkpointing to be enabled for exactly-once processing. The source will throw an IllegalArgumentException if checkpointing is disabled.

  • Acknowledgment Strategy: Messages are acknowledged only after successful checkpoint completion, ensuring exactly-once processing guarantees.

  • Rate Limiting: The rate limit applies per parallel subtask. Total throughput = rate limit × parallelism.

  • Retries: Failed pull requests are automatically retried according to the configured retry policy.

  • Thread Safety: The source is designed to work safely in Flink's parallel execution environment.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-gcp-pubsub-2-11

docs

deserialization.md

index.md

sink.md

source.md

tile.json