CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/maven-org-apache-flink--flink-connector-gcp-pubsub-2-12

Apache Flink connector for Google Cloud Pub/Sub with exactly-once processing guarantees for real-time stream processing applications

Pending
Overview
Eval results
Files

pubsub-sink.mddocs/

PubSub Sink

The PubSubSink provides reliable message publishing to Google Cloud Pub/Sub topics with at-least-once delivery guarantees and checkpoint synchronization to ensure message delivery before checkpoint completion.

Capabilities

Sink Creation

Create a PubSubSink using the builder pattern with required project name, topic name, and serialization schema.

/**
 * Creates a new builder for PubSubSink configuration
 * @return SerializationSchemaBuilder instance to start configuration
 */
public static SerializationSchemaBuilder newBuilder();

public static class SerializationSchemaBuilder {
    /**
     * Set serialization schema for converting objects to PubSub message payloads
     * @param serializationSchema Schema for serializing objects to byte arrays
     * @return ProjectNameBuilder for next configuration step
     */
    public <IN> ProjectNameBuilder<IN> withSerializationSchema(SerializationSchema<IN> serializationSchema);
}

Builder Configuration

Configure project name, topic name, and optional parameters.

public interface ProjectNameBuilder<IN> {
    /**
     * Set the GCP project name containing the topic
     * @param projectName Google Cloud project name
     * @return TopicNameBuilder for next configuration step
     */
    TopicNameBuilder<IN> withProjectName(String projectName);
}

public interface TopicNameBuilder<IN> {
    /**
     * Set the Pub/Sub topic name to publish to
     * @param topicName Pub/Sub topic name
     * @return PubSubSinkBuilder for optional configuration
     */
    PubSubSinkBuilder<IN> withTopicName(String topicName);
}

public static class PubSubSinkBuilder<IN> {
    /**
     * Set custom GCP credentials (optional, defaults to environment credentials)
     * @param credentials Google Cloud credentials
     * @return Current builder instance
     */
    public PubSubSinkBuilder<IN> withCredentials(Credentials credentials);
    
    /**
     * Set emulator host and port for testing (optional, for emulator use only)
     * @param hostAndPort Host and port combination (e.g., "localhost:8085")
     * @return Current builder instance
     */
    public PubSubSinkBuilder<IN> withHostAndPortForEmulator(String hostAndPort);
    
    /**
     * Build the configured PubSubSink instance
     * @return Configured PubSubSink ready for use
     * @throws IOException If credentials cannot be obtained
     * @throws IllegalArgumentException If required fields are missing or topic does not exist
     */
    public PubSubSink<IN> build() throws IOException;
}

Usage Examples

Basic String Message Publisher

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;
import org.apache.flink.api.common.serialization.SimpleStringSchema;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// Create input stream
DataStream<String> inputStream = env.fromElements(
    "Hello PubSub",
    "Message 1",
    "Message 2"
);

// Create and configure sink
PubSubSink<String> sink = PubSubSink.newBuilder()
    .withSerializationSchema(new SimpleStringSchema())
    .withProjectName("my-gcp-project")
    .withTopicName("my-topic")
    .build();

// Add sink to stream
inputStream.addSink(sink);

env.execute("Basic PubSub Producer");

JSON Object Publisher

import org.apache.flink.api.common.serialization.SerializationSchema;
import com.fasterxml.jackson.databind.ObjectMapper;

public class JsonUserSerializer implements SerializationSchema<User> {
    private transient ObjectMapper objectMapper;
    
    @Override
    public void open(SerializationSchema.InitializationContext context) {
        objectMapper = new ObjectMapper();
    }
    
    @Override
    public byte[] serialize(User user) {
        try {
            return objectMapper.writeValueAsBytes(user);
        } catch (Exception e) {
            throw new RuntimeException("Failed to serialize user", e);
        }
    }
}

// Usage
DataStream<User> userStream = // ... create user stream

PubSubSink<User> userSink = PubSubSink.newBuilder()
    .withSerializationSchema(new JsonUserSerializer())
    .withProjectName("my-project")
    .withTopicName("user-events")
    .build();

userStream.addSink(userSink);

Publisher with Custom Credentials

import com.google.auth.oauth2.ServiceAccountCredentials;
import java.io.FileInputStream;

Credentials credentials = ServiceAccountCredentials.fromStream(
    new FileInputStream("path/to/service-account-key.json")
);

PubSubSink<String> sink = PubSubSink.newBuilder()
    .withSerializationSchema(new SimpleStringSchema())
    .withProjectName("my-project")
    .withTopicName("my-topic")
    .withCredentials(credentials)
    .build();

Streaming Data Pipeline

import org.apache.flink.streaming.api.datastream.DataStream;

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.enableCheckpointing(30000);

// Source: Read from one PubSub subscription
PubSubSource<String> source = PubSubSource.newBuilder()
    .withDeserializationSchema(new SimpleStringSchema())
    .withProjectName("my-project")
    .withSubscriptionName("input-subscription")
    .build();

// Transform: Process messages
DataStream<String> processedStream = env.addSource(source)
    .map(message -> "Processed: " + message.toUpperCase())
    .filter(message -> message.length() > 10);

// Sink: Publish to another PubSub topic
PubSubSink<String> sink = PubSubSink.newBuilder()
    .withSerializationSchema(new SimpleStringSchema())
    .withProjectName("my-project")
    .withTopicName("output-topic")
    .build();

processedStream.addSink(sink);

env.execute("PubSub Processing Pipeline");

Message Publishing Behavior

Delivery Guarantees

The PubSubSink provides at-least-once delivery guarantees:

  1. Asynchronous Publishing: Messages are published asynchronously to PubSub
  2. Checkpoint Synchronization: Before checkpoint completion, all outstanding publish requests must complete successfully
  3. Failure Handling: If any publish operation fails, the checkpoint fails and the job restarts
  4. Retry Logic: Built-in retry mechanisms handle transient failures

Checkpoint Integration

The sink integrates with Flink's checkpointing mechanism to ensure reliable delivery:

@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
    // Flush all buffered messages
    publisher.publishAllOutstanding();
    
    // Wait for all pending publish operations to complete
    waitForFuturesToComplete();
    
    // If any publish operation failed, throw exception to fail checkpoint
    if (exceptionAtomicReference.get() != null) {
        throw exceptionAtomicReference.get();
    }
}

Message Format

Published messages contain:

  • Data: Serialized message payload as bytes
  • Message ID: Unique identifier assigned by PubSub
  • Publish Time: Timestamp when message was published
  • Attributes: Empty (custom attributes not currently supported)

Error Handling

Publishing Errors

  • Serialization Errors: Exceptions during serialization cause immediate job failure
  • Network Errors: Handled by Google Cloud client library with automatic retry
  • Authentication Errors: Cause job failure with clear error messages
  • Topic Not Found: Causes job failure - topic must exist before starting job

Retry Configuration

The sink uses Google Cloud Pub/Sub client's default retry settings:

  • Maximum Attempts: Configurable through client library
  • Exponential Backoff: Automatic delay increases between retries
  • Total Timeout: Maximum time to spend on retries

Monitoring and Metrics

  • Pending Futures: Number of outstanding publish operations
  • Publish Failures: Count of failed publish attempts
  • Throughput: Messages published per second

Performance Considerations

Batching

The Google Cloud Pub/Sub client automatically batches messages for efficiency:

  • Batch Size: Multiple messages sent in single request
  • Batch Delay: Maximum time to wait before sending partial batch
  • Memory Usage: Batched messages consume memory until published

Parallelism

  • Parallel Sinks: Each parallel subtask creates its own publisher
  • Independent Publishing: Subtasks publish independently without coordination
  • Scaling: Increase parallelism to improve throughput

Resource Management

  • Connection Pooling: Managed by Google Cloud client library
  • Memory Management: Outstanding publish requests consume memory
  • CPU Usage: Serialization and network I/O are CPU-intensive operations

Install with Tessl CLI

npx tessl i tessl/maven-org-apache-flink--flink-connector-gcp-pubsub-2-12

docs

emulator-testing.md

index.md

pubsub-sink.md

pubsub-source.md

tile.json