CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-gcp-pubsub-2-11

Apache Flink connector for Google Cloud Pub/Sub that provides streaming data integration capabilities

Pending
Overview
Eval results
Files

sink.mddocs/

Message Publishing (Sink)

The PubSubSink provides reliable message publishing to Google Cloud Pub/Sub topics with at-least-once delivery semantics, automatic batching, and configurable retry policies.

Capabilities

PubSubSink Builder

Creates a new PubSubSink using the builder pattern. The builder enforces required parameters through type-safe interfaces.

/**
 * Creates a new builder for PubSubSink
 * @return SerializationSchemaBuilder for setting serialization schema
 */
public static SerializationSchemaBuilder newBuilder();

Serialization Schema Configuration

Configure how messages are serialized for Pub/Sub.

public static class SerializationSchemaBuilder {
    /**
     * Set serialization schema for converting objects to byte arrays
     * @param serializationSchema Schema for serializing message data
     * @return ProjectNameBuilder for next configuration step
     */
    public <IN> ProjectNameBuilder<IN> withSerializationSchema(
        SerializationSchema<IN> serializationSchema);
}

Project and Topic Configuration

Configure the GCP project and Pub/Sub topic.

public interface ProjectNameBuilder<IN> {
    /**
     * Set the GCP project name containing the topic
     * @param projectName GCP project name
     * @return TopicNameBuilder for next configuration step
     */
    TopicNameBuilder<IN> withProjectName(String projectName);
}

public interface TopicNameBuilder<IN> {
    /**
     * Set the Pub/Sub topic name to publish to
     * @param topicName Topic name
     * @return PubSubSinkBuilder for additional configuration
     */
    PubSubSinkBuilder<IN> withTopicName(String topicName);
}

Sink Builder Configuration

Main builder class for configuring optional parameters.

public static class PubSubSinkBuilder<IN> {
    /**
     * Set authentication credentials (optional - uses default credentials if not set)
     * @param credentials Google Cloud credentials
     * @return Current builder instance
     */
    public PubSubSinkBuilder<IN> withCredentials(Credentials credentials);
    
    /**
     * Set custom hostname/port for Pub/Sub emulator (testing only)
     * @param hostAndPort Host and port combination ("hostname:1234")
     * @return Current builder instance
     */
    public PubSubSinkBuilder<IN> withHostAndPortForEmulator(String hostAndPort);
    
    /**
     * Build the configured PubSubSink
     * @return Configured PubSubSink instance
     * @throws IOException If credentials cannot be obtained
     * @throws IllegalArgumentException If required fields are missing
     */
    public PubSubSink<IN> build() throws IOException;
}

Core Sink Methods

Key methods of the PubSubSink class.

public class PubSubSink<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {
    
    /**
     * Process and publish a message to Pub/Sub
     * @param message Message to publish
     * @param context Sink context (provides processing time, etc.)
     */
    public void invoke(IN message, SinkFunction.Context context);
    
    /**
     * Called during checkpointing - ensures all pending messages are published
     * @param context Checkpoint context
     * @throws Exception If publishing fails
     */
    public void snapshotState(FunctionSnapshotContext context) throws Exception;
    
    /**
     * Initialize state - called once per subtask
     * @param context Initialization context
     */
    public void initializeState(FunctionInitializationContext context);
}

Usage Examples

Basic String Publishing

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

PubSubSink<String> sink = PubSubSink.newBuilder()
    .withSerializationSchema(new SimpleStringSchema())
    .withProjectName("my-gcp-project")
    .withTopicName("my-topic")
    .build();

env.fromElements("Hello", "World", "Pub/Sub")
    .addSink(sink);

env.execute("Basic Pub/Sub Producer");

JSON Object Publishing

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.api.common.serialization.SerializationSchema;

public class JsonSerializationSchema<T> implements SerializationSchema<T> {
    private ObjectMapper mapper = new ObjectMapper();
    
    @Override
    public byte[] serialize(T element) {
        try {
            return mapper.writeValueAsBytes(element);
        } catch (Exception e) {
            throw new RuntimeException("Failed to serialize object", e);
        }
    }
}

public class MyEvent {
    public String eventType;
    public long timestamp;
    public String userId;
    // ... other fields
}

PubSubSink<MyEvent> sink = PubSubSink.newBuilder()
    .withSerializationSchema(new JsonSerializationSchema<MyEvent>())
    .withProjectName("my-gcp-project")
    .withTopicName("events")
    .build();

DataStream<MyEvent> events = env.addSource(/* some source */);
events.addSink(sink);

Advanced Configuration with Custom Credentials

import com.google.auth.oauth2.ServiceAccountCredentials;
import java.io.FileInputStream;

// Load service account credentials
Credentials credentials = ServiceAccountCredentials
    .fromStream(new FileInputStream("path/to/service-account.json"));

PubSubSink<String> sink = PubSubSink.newBuilder()
    .withSerializationSchema(new SimpleStringSchema())
    .withProjectName("my-gcp-project")
    .withTopicName("my-topic")
    .withCredentials(credentials)
    .build();

env.addSource(/* some source */)
    .addSink(sink);

Using with Pub/Sub Emulator for Testing

// Start Pub/Sub emulator first:
// gcloud beta emulators pubsub start --host-port=localhost:8085

PubSubSink<String> sink = PubSubSink.newBuilder()
    .withSerializationSchema(new SimpleStringSchema())
    .withProjectName("test-project")
    .withTopicName("test-topic")
    .withHostAndPortForEmulator("localhost:8085")
    .build();

env.fromElements("test-message-1", "test-message-2")
    .addSink(sink);

Publishing with Custom Message Attributes

For advanced use cases requiring message attributes, you would need to implement a custom serialization approach or use the Pub/Sub client library directly within a custom sink function:

import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.pubsub.v1.PubsubMessage;
import com.google.protobuf.ByteString;

public class CustomPubSubSink extends RichSinkFunction<MyEventWithAttributes> {
    private transient Publisher publisher;
    
    @Override
    public void open(Configuration parameters) throws Exception {
        // Initialize publisher
        publisher = Publisher.newBuilder(TopicName.of("project", "topic")).build();
    }
    
    @Override
    public void invoke(MyEventWithAttributes event, Context context) throws Exception {
        PubsubMessage message = PubsubMessage.newBuilder()
            .setData(ByteString.copyFromUtf8(event.getData()))
            .putAttributes("eventType", event.getEventType())
            .putAttributes("source", event.getSource())
            .build();
        
        publisher.publish(message);
    }
    
    @Override
    public void close() throws Exception {
        if (publisher != null) {
            publisher.shutdown();
        }
    }
}

Error Handling and Reliability

At-Least-Once Delivery

The PubSubSink provides at-least-once delivery guarantees. Messages may be delivered multiple times in case of failures, but no messages are lost.

Automatic Retries

The sink automatically retries failed publish operations according to the Google Cloud Pub/Sub client's default retry policy:

  • Maximum attempts: Based on gRPC client configuration
  • Exponential backoff with jitter
  • Configurable through publisher settings

Checkpoint Integration

The sink integrates with Flink's checkpointing mechanism:

  • All outstanding publish requests are completed before checkpoint completion
  • Failed publish operations will fail the checkpoint
  • Provides durability guarantees in combination with Flink's state management

Back-pressure Handling

The sink handles back-pressure scenarios:

  • Blocks on publish when Pub/Sub service is unavailable
  • Respects Flink's back-pressure mechanisms
  • Provides flow control to prevent memory issues

Performance Considerations

Batching

The underlying Google Cloud Pub/Sub publisher automatically batches messages for optimal throughput while respecting latency requirements.

Publisher Settings

For high-throughput scenarios, consider tuning the publisher settings by implementing a custom sink based on the PubSubSink pattern with explicit publisher configuration.

Resource Management

  • Each sink subtask creates its own Publisher instance
  • Publishers are properly shut down during sink lifecycle management
  • gRPC channels are managed automatically by the Google Cloud client library

Important Notes

  • Message Ordering: Pub/Sub does not guarantee message ordering by default. Use message ordering keys if ordering is required.

  • Message Size Limits: Pub/Sub has a maximum message size limit (10 MB). Ensure your serialized messages are within this limit.

  • Topic Creation: Topics must exist before publishing. The sink does not create topics automatically.

  • Authentication: Uses Google Cloud default authentication if no explicit credentials are provided. Ensure proper service account configuration in production environments.

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-gcp-pubsub-2-11

docs

deserialization.md

index.md

sink.md

source.md

tile.json