or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

emulator-testing.mdindex.mdpubsub-sink.mdpubsub-source.md

pubsub-source.mddocs/

0

# PubSub Source

1

2

The PubSubSource provides exactly-once message consumption from Google Cloud Pub/Sub subscriptions with automatic acknowledgment management through Flink's checkpointing mechanism.

3

4

## Capabilities

5

6

### Source Creation

7

8

Create a PubSubSource using the builder pattern with required project name, subscription name, and deserialization schema.

9

10

```java { .api }

11

/**

12

* Creates a new builder for PubSubSource configuration

13

* @return DeserializationSchemaBuilder instance to start configuration

14

*/

15

public static DeserializationSchemaBuilder newBuilder();

16

17

public static class DeserializationSchemaBuilder {

18

/**

19

* Set standard Flink DeserializationSchema (extracts only message data)

20

* @param deserializationSchema Schema for deserializing message payload

21

* @return ProjectNameBuilder for next configuration step

22

*/

23

public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(DeserializationSchema<OUT> deserializationSchema);

24

25

/**

26

* Set PubSub-specific deserialization schema (provides access to full message)

27

* @param deserializationSchema Schema with access to PubSub message metadata

28

* @return ProjectNameBuilder for next configuration step

29

*/

30

public <OUT> ProjectNameBuilder<OUT> withDeserializationSchema(PubSubDeserializationSchema<OUT> deserializationSchema);

31

}

32

```

33

34

### Builder Configuration

35

36

Configure project name, subscription name, and optional parameters.

37

38

```java { .api }

39

public interface ProjectNameBuilder<OUT> {

40

/**

41

* Set the GCP project name containing the subscription

42

* @param projectName Google Cloud project name

43

* @return SubscriptionNameBuilder for next configuration step

44

*/

45

SubscriptionNameBuilder<OUT> withProjectName(String projectName);

46

}

47

48

public interface SubscriptionNameBuilder<OUT> {

49

/**

50

* Set the Pub/Sub subscription name to consume from

51

* @param subscriptionName Pub/Sub subscription name

52

* @return PubSubSourceBuilder for optional configuration

53

*/

54

PubSubSourceBuilder<OUT> withSubscriptionName(String subscriptionName);

55

}

56

57

public static class PubSubSourceBuilder<OUT> {

58

/**

59

* Set custom GCP credentials (optional, defaults to environment credentials)

60

* @param credentials Google Cloud credentials

61

* @return Current builder instance

62

*/

63

public PubSubSourceBuilder<OUT> withCredentials(Credentials credentials);

64

65

/**

66

* Set custom subscriber factory for advanced configuration (optional)

67

* @param pubSubSubscriberFactory Custom factory for creating subscribers

68

* @return Current builder instance

69

*/

70

public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(PubSubSubscriberFactory pubSubSubscriberFactory);

71

72

/**

73

* Configure default subscriber factory with specific parameters (optional)

74

* @param maxMessagesPerPull Number of messages pulled per request (default: 100)

75

* @param perRequestTimeout Timeout per pull request (default: 15 seconds)

76

* @param retries Number of retries for failed requests (default: 3)

77

* @return Current builder instance

78

*/

79

public PubSubSourceBuilder<OUT> withPubSubSubscriberFactory(int maxMessagesPerPull, Duration perRequestTimeout, int retries);

80

81

/**

82

* Set message rate limit per parallel source instance (optional)

83

* @param messagePerSecondRateLimit Messages per second limit (default: 100000)

84

* @return Current builder instance

85

*/

86

public PubSubSourceBuilder<OUT> withMessageRateLimit(int messagePerSecondRateLimit);

87

88

/**

89

* Build the configured PubSubSource instance

90

* @return Configured PubSubSource ready for use

91

* @throws IOException If credentials cannot be obtained

92

* @throws IllegalArgumentException If required fields are missing or checkpointing is disabled

93

*/

94

public PubSubSource<OUT> build() throws IOException;

95

}

96

```

97

98

## Usage Examples

99

100

### Basic String Message Consumer

101

102

```java

103

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

104

import org.apache.flink.streaming.connectors.gcp.pubsub.PubSubSource;

105

import org.apache.flink.api.common.serialization.SimpleStringSchema;

106

107

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

108

env.enableCheckpointing(30000); // Required for exactly-once guarantees

109

110

PubSubSource<String> source = PubSubSource.newBuilder()

111

.withDeserializationSchema(new SimpleStringSchema())

112

.withProjectName("my-gcp-project")

113

.withSubscriptionName("my-subscription")

114

.build();

115

116

DataStream<String> stream = env.addSource(source);

117

stream.print();

118

119

env.execute("Basic PubSub Consumer");

120

```

121

122

### JSON Message Consumer with Custom Deserialization

123

124

```java

125

import org.apache.flink.streaming.connectors.gcp.pubsub.common.PubSubDeserializationSchema;

126

import com.google.pubsub.v1.PubsubMessage;

127

import com.fasterxml.jackson.databind.ObjectMapper;

128

129

public class JsonUserDeserializer implements PubSubDeserializationSchema<User> {

130

private transient ObjectMapper objectMapper;

131

132

@Override

133

public void open(DeserializationSchema.InitializationContext context) {

134

objectMapper = new ObjectMapper();

135

}

136

137

@Override

138

public User deserialize(PubsubMessage message) throws Exception {

139

String json = message.getData().toStringUtf8();

140

return objectMapper.readValue(json, User.class);

141

}

142

143

@Override

144

public boolean isEndOfStream(User nextElement) {

145

return false;

146

}

147

148

@Override

149

public TypeInformation<User> getProducedType() {

150

return TypeInformation.of(User.class);

151

}

152

}

153

154

// Usage

155

PubSubSource<User> userSource = PubSubSource.newBuilder()

156

.withDeserializationSchema(new JsonUserDeserializer())

157

.withProjectName("my-project")

158

.withSubscriptionName("user-events")

159

.build();

160

```

161

162

### Advanced Configuration with Rate Limiting

163

164

```java

165

import java.time.Duration;

166

167

PubSubSource<String> source = PubSubSource.newBuilder()

168

.withDeserializationSchema(new SimpleStringSchema())

169

.withProjectName("my-project")

170

.withSubscriptionName("high-volume-subscription")

171

.withPubSubSubscriberFactory(

172

500, // maxMessagesPerPull

173

Duration.ofSeconds(30), // perRequestTimeout

174

5 // retries

175

)

176

.withMessageRateLimit(10000) // 10K messages per second per subtask

177

.build();

178

```

179

180

### Using Custom Credentials

181

182

```java

183

import com.google.auth.oauth2.ServiceAccountCredentials;

184

import java.io.FileInputStream;

185

186

Credentials credentials = ServiceAccountCredentials.fromStream(

187

new FileInputStream("path/to/service-account-key.json")

188

);

189

190

PubSubSource<String> source = PubSubSource.newBuilder()

191

.withDeserializationSchema(new SimpleStringSchema())

192

.withProjectName("my-project")

193

.withSubscriptionName("my-subscription")

194

.withCredentials(credentials)

195

.build();

196

```

197

198

## Message Acknowledgment

199

200

The PubSubSource automatically manages message acknowledgment through Flink's checkpointing mechanism:

201

202

1. **Message Reception**: Messages are pulled from the subscription and added to pending acknowledgments

203

2. **Processing**: Messages are deserialized and emitted to the Flink stream

204

3. **Checkpointing**: When a checkpoint completes successfully, all messages received before that checkpoint are acknowledged

205

4. **Failure Recovery**: If a checkpoint fails, messages remain unacknowledged and will be redelivered by Pub/Sub

206

207

This ensures exactly-once processing semantics - each message is processed exactly once, even in the presence of failures.

208

209

## Important Requirements

210

211

- **Checkpointing**: Flink checkpointing MUST be enabled. The source will throw IllegalArgumentException if checkpointing is disabled

212

- **Checkpoint Frequency**: Checkpoint frequency should be much lower than Pub/Sub's acknowledgment timeout (default 600 seconds)

213

- **Parallel Processing**: Each parallel subtask creates its own subscriber and manages its own acknowledgments

214

- **Rate Limiting**: Rate limits are applied per parallel subtask, not globally

215

216

## Error Handling

217

218

- **Connection Failures**: Automatic retry with exponential backoff through the subscriber factory

219

- **Deserialization Errors**: Exceptions during deserialization will cause the job to fail

220

- **Acknowledgment Failures**: Failed acknowledgments during checkpoint completion will cause checkpoint failure

221

- **Timeout Handling**: Pull request timeouts are handled gracefully with configurable retry logic