or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

emulator-testing.mdindex.mdpubsub-sink.mdpubsub-source.md

index.mddocs/

0

# Apache Flink GCP Pub/Sub Connector

1

2

Apache Flink connector for Google Cloud Pub/Sub enables consuming messages from and publishing messages to Google Pub/Sub topics with exactly-once processing guarantees. The connector provides both source and sink capabilities for real-time stream processing applications with automatic acknowledgment management through Flink's checkpointing mechanism.

3

4

## Package Information

5

6

- **Package Name**: flink-connector-gcp-pubsub_2.12

7

- **Package Type**: Maven

8

- **Group ID**: org.apache.flink

9

- **Language**: Java

10

- **Installation**:

11

```xml

12

<dependency>

13

<groupId>org.apache.flink</groupId>

14

<artifactId>flink-connector-gcp-pubsub_2.12</artifactId>

15

<version>1.14.6</version>

16

</dependency>

17

```

18

19

## Core Imports

20

21

```java

22

import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;

23

import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;

24

import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;

25

import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriberFactory;

26

import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubSubscriber;

27

import org.apache.flink.api.common.serialization.DeserializationSchema;

28

import org.apache.flink.api.common.serialization.SerializationSchema;

29

import org.apache.flink.util.Collector;

30

```

31

32

For credentials management:

33

```java

34

import com.google.auth.Credentials;

35

import com.google.cloud.pubsub.v1.SubscriptionAdminSettings;

36

```

37

38

For Google Cloud Pub/Sub types:

39

```java

40

import com.google.pubsub.v1.PubsubMessage;

41

import com.google.pubsub.v1.ReceivedMessage;

42

import java.util.List;

43

import java.time.Duration;

44

```

45

46

For emulator testing:

47

```java

48

import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials;

49

import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider;

50

import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator;

51

```

52

53

## Basic Usage

54

55

### Consuming Messages (Source)

56

57

```java

58

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

59

import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;

60

import org.apache.flink.api.common.serialization.SimpleStringSchema;

61

62

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

63

64

// Enable checkpointing (required for exactly-once guarantees)

65

env.enableCheckpointing(30000); // checkpoint every 30 seconds

66

67

// Create PubSubSource

68

PubSubSource<String> pubsubSource = PubSubSource.newBuilder()

69

.withDeserializationSchema(new SimpleStringSchema())

70

.withProjectName("my-gcp-project")

71

.withSubscriptionName("my-subscription")

72

.build();

73

74

// Add source to stream

75

DataStream<String> stream = env.addSource(pubsubSource);

76

stream.print();

77

78

env.execute("PubSub Consumer");

79

```

80

81

### Publishing Messages (Sink)

82

83

```java

84

import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSink;

85

import org.apache.flink.api.common.serialization.SimpleStringSchema;

86

87

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

88

89

// Create data stream

90

DataStream<String> inputStream = env.fromElements("Hello", "World", "PubSub");

91

92

// Create PubSubSink

93

PubSubSink<String> pubsubSink = PubSubSink.newBuilder()

94

.withSerializationSchema(new SimpleStringSchema())

95

.withProjectName("my-gcp-project")

96

.withTopicName("my-topic")

97

.build();

98

99

// Add sink to stream

100

inputStream.addSink(pubsubSink);

101

102

env.execute("PubSub Producer");

103

```

104

105

## Architecture

106

107

The Apache Flink GCP Pub/Sub connector is built around several key components:

108

109

- **PubSubSource**: Source function for consuming messages with exactly-once processing guarantees

110

- **PubSubSink**: Sink function for publishing messages with at-least-once delivery semantics

111

- **Builder Pattern**: Fluent API for configuring both sources and sinks with optional parameters

112

- **Checkpointing Integration**: Automatic message acknowledgment tied to Flink checkpoint completion

113

- **Rate Limiting**: Configurable message consumption rate limits per parallel subtask

114

- **Emulator Support**: Built-in support for Google Pub/Sub emulator for testing scenarios

115

116

The connector ensures data consistency through Flink's distributed checkpointing mechanism, where Pub/Sub messages are only acknowledged after successful checkpoint completion, preventing message loss during failure scenarios.

117

118

## Capabilities

119

120

### PubSub Message Source

121

122

Source functionality for consuming messages from Google Cloud Pub/Sub subscriptions with exactly-once processing guarantees and configurable rate limiting.

123

124

```java { .api }

125

public static DeserializationSchemaBuilder newBuilder();

126

127

public static class PubSubSourceBuilder<OUT> {

128

public PubSubSourceBuilder<OUT> withCredentials(Credentials credentials);

129

public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(PubSubSubscriberFactory factory);

130

public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(int maxMessagesPerPull, Duration perRequestTimeout, int retries);

131

public PubSubSourceBuilder<OUT> withMessageRateLimit(int messagePerSecondRateLimit);

132

public PubSubSource<OUT> build();

133

}

134

```

135

136

[PubSub Source](./pubsub-source.md)

137

138

### PubSub Message Sink

139

140

Sink functionality for publishing messages to Google Cloud Pub/Sub topics with reliable delivery and checkpoint synchronization.

141

142

```java { .api }

143

public static SerializationSchemaBuilder newBuilder();

144

145

public static class PubSubSinkBuilder<IN> {

146

public PubSubSinkBuilder<IN> withCredentials(Credentials credentials);

147

public PubSubSinkBuilder<IN> withHostAndPortForEmulator(String hostAndPort);

148

public PubSubSink<IN> build();

149

}

150

```

151

152

[PubSub Sink](./pubsub-sink.md)

153

154

### Testing Support

155

156

Emulator support for local development and testing scenarios without requiring actual Google Cloud Pub/Sub infrastructure.

157

158

```java { .api }

159

public final class EmulatorCredentials extends OAuth2Credentials {

160

public static EmulatorCredentials getInstance();

161

}

162

163

public final class EmulatorCredentialsProvider implements CredentialsProvider {

164

public static EmulatorCredentialsProvider create();

165

}

166

```

167

168

[Emulator Testing](./emulator-testing.md)

169

170

## Core Interfaces

171

172

### PubSubDeserializationSchema

173

174

Interface for custom deserialization with access to full PubSub message metadata.

175

176

```java { .api }

177

public interface PubSubDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {

178

default void open(DeserializationSchema.InitializationContext context) throws Exception;

179

boolean isEndOfStream(T nextElement);

180

T deserialize(PubsubMessage message) throws Exception;

181

default void deserialize(PubsubMessage message, Collector<T> out) throws Exception;

182

}

183

```

184

185

### PubSubSubscriberFactory

186

187

Factory interface for creating custom Pub/Sub subscribers with specialized configurations.

188

189

```java { .api }

190

public interface PubSubSubscriberFactory extends Serializable {

191

PubSubSubscriber getSubscriber(Credentials credentials) throws IOException;

192

}

193

```

194

195

### Checkpointing Support Classes

196

197

Core classes for managing checkpoint-based message acknowledgment.

198

199

```java { .api }

200

public interface Acknowledger<AcknowledgeId> {

201

void acknowledge(List<AcknowledgeId> ids);

202

}

203

204

public interface PubSubSubscriber extends Acknowledger<String> {

205

List<ReceivedMessage> pull();

206

void close() throws Exception;

207

}

208

209

public class AcknowledgeIdsForCheckpoint<AcknowledgeId> implements Serializable {

210

AcknowledgeIdsForCheckpoint(long checkpointId, List<AcknowledgeId> acknowledgeIds);

211

public long getCheckpointId();

212

public void setCheckpointId(long checkpointId);

213

public List<AcknowledgeId> getAcknowledgeIds();

214

public void setAcknowledgeIds(List<AcknowledgeId> acknowledgeIds);

215

}

216

217

public class AcknowledgeOnCheckpoint<ACKID extends Serializable>

218

implements ListCheckpointed<AcknowledgeIdsForCheckpoint<ACKID>>, CheckpointListener {

219

public AcknowledgeOnCheckpoint(Acknowledger<ACKID> acknowledger);

220

public void addAcknowledgeId(ACKID acknowledgeId);

221

public void acknowledgeIdsUpToCheckpoint(long checkpointId);

222

}

223

```

224

225

## Important Notes

226

227

- **Checkpointing Required**: PubSubSource requires Flink checkpointing to be enabled for exactly-once guarantees

228

- **Parallel Processing**: Both source and sink support parallel execution across multiple Flink subtasks

229

- **Rate Limiting**: Default rate limit is 100,000 messages per second per parallel source instance

230

- **Credentials**: If not explicitly provided, credentials are automatically loaded from the environment

231

- **Error Handling**: Failed message publishing in sink will cause job failure during checkpointing