or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

consumer-strategies.mdindex.mdlocation-strategies.mdoffset-management.mdper-partition-config.mdstream-creation.md

stream-creation.mddocs/

0

# Stream Creation

1

2

Core functionality for creating Kafka-backed Spark streams and RDDs with configurable location strategies and consumer strategies. Provides both streaming (DStream) and batch (RDD) interfaces for consuming Kafka data.

3

4

## Capabilities

5

6

### Direct Stream Creation (Scala)

7

8

Creates a DStream where each Kafka topic/partition corresponds to an RDD partition, enabling efficient parallel processing.

9

10

```scala { .api }

11

def createDirectStream[K, V](

12

ssc: StreamingContext,

13

locationStrategy: LocationStrategy,

14

consumerStrategy: ConsumerStrategy[K, V]

15

): InputDStream[ConsumerRecord[K, V]]

16

```

17

18

**Parameters:**

19

- `ssc`: StreamingContext - The Spark Streaming context

20

- `locationStrategy`: LocationStrategy - How to schedule consumers (use LocationStrategies.PreferConsistent in most cases)

21

- `consumerStrategy`: ConsumerStrategy[K, V] - How to create and configure consumers (use ConsumerStrategies.Subscribe in most cases)

22

- Returns: InputDStream[ConsumerRecord[K, V]] - Stream of Kafka consumer records

23

24

**Usage Example:**

25

26

```scala

27

import org.apache.spark.streaming.kafka010._

28

import org.apache.spark.streaming.{StreamingContext, Seconds}

29

import org.apache.kafka.clients.consumer.ConsumerRecord

30

import org.apache.kafka.common.serialization.StringDeserializer

31

32

val ssc = new StreamingContext(spark.sparkContext, Seconds(5))

33

34

val kafkaParams = Map[String, Object](

35

"bootstrap.servers" -> "localhost:9092",

36

"key.deserializer" -> classOf[StringDeserializer],

37

"value.deserializer" -> classOf[StringDeserializer],

38

"group.id" -> "my-group",

39

"auto.offset.reset" -> "latest"

40

)

41

42

val topics = Array("topic1", "topic2")

43

44

val stream = KafkaUtils.createDirectStream[String, String](

45

ssc,

46

LocationStrategies.PreferConsistent,

47

ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)

48

)

49

50

// The stream produces ConsumerRecord objects

51

stream.foreachRDD { rdd =>

52

rdd.foreach { record: ConsumerRecord[String, String] =>

53

println(s"Topic: ${record.topic}, Partition: ${record.partition}, " +

54

s"Offset: ${record.offset}, Key: ${record.key}, Value: ${record.value}")

55

}

56

}

57

```

58

59

### Direct Stream Creation with Per-Partition Config (Scala)

60

61

Creates a DStream with custom per-partition configuration for rate limiting and other settings.

62

63

```scala { .api }

64

def createDirectStream[K, V](

65

ssc: StreamingContext,

66

locationStrategy: LocationStrategy,

67

consumerStrategy: ConsumerStrategy[K, V],

68

perPartitionConfig: PerPartitionConfig

69

): InputDStream[ConsumerRecord[K, V]]

70

```

71

72

**Parameters:**

73

- `ssc`: StreamingContext - The Spark Streaming context

74

- `locationStrategy`: LocationStrategy - How to schedule consumers

75

- `consumerStrategy`: ConsumerStrategy[K, V] - How to create and configure consumers

76

- `perPartitionConfig`: PerPartitionConfig - Per-partition configuration settings

77

- Returns: InputDStream[ConsumerRecord[K, V]] - Stream of Kafka consumer records

78

79

**Usage Example:**

80

81

```scala

82

import org.apache.spark.streaming.kafka010._

83

84

// Custom per-partition configuration

85

class CustomPerPartitionConfig extends PerPartitionConfig {

86

def maxRatePerPartition(topicPartition: TopicPartition): Long = {

87

// Different rates for different partitions

88

if (topicPartition.topic() == "high-volume-topic") 1000 else 500

89

}

90

91

override def minRatePerPartition(topicPartition: TopicPartition): Long = 10

92

}

93

94

val stream = KafkaUtils.createDirectStream[String, String](

95

ssc,

96

LocationStrategies.PreferConsistent,

97

ConsumerStrategies.Subscribe[String, String](topics, kafkaParams),

98

new CustomPerPartitionConfig()

99

)

100

```

101

102

### Direct Stream Creation (Java)

103

104

Java API for creating direct streams from Kafka.

105

106

```scala { .api }

107

def createDirectStream[K, V](

108

jssc: JavaStreamingContext,

109

locationStrategy: LocationStrategy,

110

consumerStrategy: ConsumerStrategy[K, V]

111

): JavaInputDStream[ConsumerRecord[K, V]]

112

113

def createDirectStream[K, V](

114

jssc: JavaStreamingContext,

115

locationStrategy: LocationStrategy,

116

consumerStrategy: ConsumerStrategy[K, V],

117

perPartitionConfig: PerPartitionConfig

118

): JavaInputDStream[ConsumerRecord[K, V]]

119

```

120

121

**Usage Example (Java):**

122

123

```java

124

import org.apache.spark.streaming.kafka010.*;

125

import org.apache.spark.streaming.api.java.JavaStreamingContext;

126

import org.apache.kafka.clients.consumer.ConsumerRecord;

127

import org.apache.kafka.common.serialization.StringDeserializer;

128

import java.util.*;

129

130

JavaStreamingContext jssc = new JavaStreamingContext(spark.sparkContext(), Durations.seconds(5));

131

132

Map<String, Object> kafkaParams = new HashMap<>();

133

kafkaParams.put("bootstrap.servers", "localhost:9092");

134

kafkaParams.put("key.deserializer", StringDeserializer.class);

135

kafkaParams.put("value.deserializer", StringDeserializer.class);

136

kafkaParams.put("group.id", "my-group");

137

kafkaParams.put("auto.offset.reset", "latest");

138

139

Collection<String> topics = Arrays.asList("topic1", "topic2");

140

141

JavaInputDStream<ConsumerRecord<String, String>> stream =

142

KafkaUtils.createDirectStream(

143

jssc,

144

LocationStrategies.PreferConsistent(),

145

ConsumerStrategies.Subscribe(topics, kafkaParams)

146

);

147

```

148

149

### RDD Creation (Scala)

150

151

Creates a batch-oriented RDD interface for consuming from Kafka with specified offset ranges for exactly-once semantics.

152

153

```scala { .api }

154

def createRDD[K, V](

155

sc: SparkContext,

156

kafkaParams: java.util.Map[String, Object],

157

offsetRanges: Array[OffsetRange],

158

locationStrategy: LocationStrategy

159

): RDD[ConsumerRecord[K, V]]

160

```

161

162

**Parameters:**

163

- `sc`: SparkContext - The Spark context

164

- `kafkaParams`: java.util.Map[String, Object] - Kafka configuration parameters (must include "bootstrap.servers")

165

- `offsetRanges`: Array[OffsetRange] - Offset ranges defining the Kafka data for this RDD

166

- `locationStrategy`: LocationStrategy - How to schedule consumers

167

- Returns: RDD[ConsumerRecord[K, V]] - RDD of Kafka consumer records

168

169

**Usage Example:**

170

171

```scala

172

import org.apache.spark.streaming.kafka010._

173

import org.apache.kafka.common.TopicPartition

174

175

val kafkaParams = new java.util.HashMap[String, Object]()

176

kafkaParams.put("bootstrap.servers", "localhost:9092")

177

kafkaParams.put("key.deserializer", classOf[StringDeserializer])

178

kafkaParams.put("value.deserializer", classOf[StringDeserializer])

179

kafkaParams.put("group.id", "batch-group")

180

181

// Define specific offset ranges to consume

182

val offsetRanges = Array(

183

OffsetRange("topic1", 0, 0, 1000), // partition 0, offsets 0-999

184

OffsetRange("topic1", 1, 500, 1500), // partition 1, offsets 500-1499

185

OffsetRange("topic2", 0, 0, 2000) // topic2 partition 0, offsets 0-1999

186

)

187

188

val rdd = KafkaUtils.createRDD[String, String](

189

spark.sparkContext,

190

kafkaParams,

191

offsetRanges,

192

LocationStrategies.PreferConsistent

193

)

194

195

// Process the RDD

196

rdd.foreach { record =>

197

println(s"Consumed: ${record.key} -> ${record.value}")

198

}

199

```

200

201

### RDD Creation (Java)

202

203

Java API for creating batch-oriented RDDs from Kafka.

204

205

```scala { .api }

206

def createRDD[K, V](

207

jsc: JavaSparkContext,

208

kafkaParams: java.util.Map[String, Object],

209

offsetRanges: Array[OffsetRange],

210

locationStrategy: LocationStrategy

211

): JavaRDD[ConsumerRecord[K, V]]

212

```

213

214

**Usage Example (Java):**

215

216

```java

217

import org.apache.spark.api.java.JavaSparkContext;

218

import org.apache.spark.streaming.kafka010.*;

219

220

JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());

221

222

OffsetRange[] offsetRanges = {

223

OffsetRange.create("topic1", 0, 0L, 1000L),

224

OffsetRange.create("topic1", 1, 500L, 1500L)

225

};

226

227

JavaRDD<ConsumerRecord<String, String>> rdd =

228

KafkaUtils.createRDD(

229

jsc,

230

kafkaParams,

231

offsetRanges,

232

LocationStrategies.PreferConsistent()

233

);

234

```

235

236

## Rate Control

237

238

The direct stream supports automatic rate limiting through Spark's backpressure mechanism:

239

240

- Set `spark.streaming.backpressure.enabled=true` to enable backpressure

241

- Set `spark.streaming.kafka.maxRatePerPartition` to limit messages per second per partition

242

- Set `spark.streaming.kafka.minRatePerPartition` to set minimum processing rate

243

- Use custom `PerPartitionConfig` for fine-grained per-partition control

244

245

## Error Handling

246

247

The stream creation handles several Kafka-specific edge cases:

248

249

- **KAFKA-3370 Workaround**: Handles NoOffsetForPartitionException when auto.offset.reset=none

250

- **Parameter Validation**: Automatically fixes problematic Kafka parameters for executors

251

- **Consumer Caching**: Manages consumer lifecycle and caching for performance

252

- **Offset Management**: Ensures proper offset handling for exactly-once semantics