or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-processing.mdconsumer-strategies.mdindex.mdlocation-strategies.mdoffset-management.mdstream-creation.md

stream-creation.mddocs/

0

# Stream Creation

1

2

The stream creation capability provides methods for creating Kafka DStreams that deliver exactly-once semantics and high-performance real-time data processing. These DStreams automatically handle consumer lifecycle, offset management, and integration with Spark's rate limiting mechanisms.

3

4

## Core Functions

5

6

### createDirectStream (Basic)

7

8

Creates a DStream where each Kafka topic/partition corresponds to an RDD partition with default rate limiting configuration.

9

10

```scala { .api }

11

def createDirectStream[K, V](

12

ssc: StreamingContext,

13

locationStrategy: LocationStrategy,

14

consumerStrategy: ConsumerStrategy[K, V]

15

): InputDStream[ConsumerRecord[K, V]]

16

```

17

18

**Parameters:**

19

- `ssc`: StreamingContext - The Spark Streaming context

20

- `locationStrategy`: LocationStrategy - Consumer placement strategy (use LocationStrategies.PreferConsistent in most cases)

21

- `consumerStrategy`: ConsumerStrategy[K, V] - Consumer configuration strategy (use ConsumerStrategies.Subscribe in most cases)

22

23

**Returns:** InputDStream[ConsumerRecord[K, V]] - Direct Kafka input stream

24

25

### createDirectStream (With Configuration)

26

27

Creates a DStream with custom per-partition configuration for advanced rate limiting and performance tuning.

28

29

```scala { .api }

30

def createDirectStream[K, V](

31

ssc: StreamingContext,

32

locationStrategy: LocationStrategy,

33

consumerStrategy: ConsumerStrategy[K, V],

34

perPartitionConfig: PerPartitionConfig

35

): InputDStream[ConsumerRecord[K, V]]

36

```

37

38

**Parameters:**

39

- `ssc`: StreamingContext - The Spark Streaming context

40

- `locationStrategy`: LocationStrategy - Consumer placement strategy

41

- `consumerStrategy`: ConsumerStrategy[K, V] - Consumer configuration strategy

42

- `perPartitionConfig`: PerPartitionConfig - Custom per-partition configuration

43

44

**Returns:** InputDStream[ConsumerRecord[K, V]] - Direct Kafka input stream with custom configuration

45

46

## Java API

47

48

### createDirectStream (Java - Basic)

49

50

Java version of the basic stream creation method.

51

52

```java { .api }

53

public static <K, V> JavaInputDStream<ConsumerRecord<K, V>> createDirectStream(

54

JavaStreamingContext jssc,

55

LocationStrategy locationStrategy,

56

ConsumerStrategy<K, V> consumerStrategy

57

)

58

```

59

60

### createDirectStream (Java - With Configuration)

61

62

Java version with custom per-partition configuration.

63

64

```java { .api }

65

public static <K, V> JavaInputDStream<ConsumerRecord<K, V>> createDirectStream(

66

JavaStreamingContext jssc,

67

LocationStrategy locationStrategy,

68

ConsumerStrategy<K, V> consumerStrategy,

69

PerPartitionConfig perPartitionConfig

70

)

71

```

72

73

## Usage Examples

74

75

### Basic Stream Creation

76

77

```scala

78

import org.apache.spark.streaming.kafka010._

79

import org.apache.kafka.clients.consumer.ConsumerRecord

80

import org.apache.kafka.common.serialization.StringDeserializer

81

82

val kafkaParams = Map[String, Object](

83

"bootstrap.servers" -> "localhost:9092",

84

"key.deserializer" -> classOf[StringDeserializer],

85

"value.deserializer" -> classOf[StringDeserializer],

86

"group.id" -> "spark-streaming-group",

87

"auto.offset.reset" -> "latest",

88

"enable.auto.commit" -> (false: java.lang.Boolean)

89

)

90

91

val topics = Array("orders", "payments", "users")

92

val stream = KafkaUtils.createDirectStream[String, String](

93

streamingContext,

94

LocationStrategies.PreferConsistent,

95

ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)

96

)

97

98

// Process the stream

99

stream.foreachRDD { rdd =>

100

if (!rdd.isEmpty()) {

101

rdd.foreach { record =>

102

println(s"Topic: ${record.topic}, Key: ${record.key}, Value: ${record.value}")

103

}

104

}

105

}

106

```

107

108

### Stream with Custom Rate Limiting

109

110

```scala

111

import org.apache.spark.streaming.kafka010._

112

113

// Custom per-partition configuration

114

class CustomPerPartitionConfig extends PerPartitionConfig {

115

def maxRatePerPartition(topicPartition: TopicPartition): Long = {

116

topicPartition.topic() match {

117

case "high-volume-topic" => 1000 // Higher rate for high-volume topic

118

case _ => 500 // Default rate

119

}

120

}

121

}

122

123

val customConfig = new CustomPerPartitionConfig()

124

125

val stream = KafkaUtils.createDirectStream[String, String](

126

streamingContext,

127

LocationStrategies.PreferConsistent,

128

ConsumerStrategies.Subscribe[String, String](topics, kafkaParams),

129

customConfig

130

)

131

```

132

133

### Java Stream Creation

134

135

```java

136

import org.apache.spark.streaming.kafka010.*;

137

import org.apache.kafka.clients.consumer.ConsumerRecord;

138

import org.apache.kafka.common.serialization.StringDeserializer;

139

140

Map<String, Object> kafkaParams = new HashMap<>();

141

kafkaParams.put("bootstrap.servers", "localhost:9092");

142

kafkaParams.put("key.deserializer", StringDeserializer.class);

143

kafkaParams.put("value.deserializer", StringDeserializer.class);

144

kafkaParams.put("group.id", "spark-streaming-group");

145

kafkaParams.put("auto.offset.reset", "latest");

146

kafkaParams.put("enable.auto.commit", false);

147

148

Collection<String> topics = Arrays.asList("topic1", "topic2");

149

JavaInputDStream<ConsumerRecord<String, String>> stream =

150

KafkaUtils.createDirectStream(

151

javaStreamingContext,

152

LocationStrategies.PreferConsistent(),

153

ConsumerStrategies.Subscribe(topics, kafkaParams)

154

);

155

156

stream.foreachRDD(rdd -> {

157

rdd.foreach(record -> {

158

System.out.println("Key: " + record.key() + ", Value: " + record.value());

159

});

160

});

161

```

162

163

## Configuration Notes

164

165

### Required Kafka Parameters

166

167

- `bootstrap.servers`: Kafka broker addresses (required)

168

- `key.deserializer`: Key deserializer class (required)

169

- `value.deserializer`: Value deserializer class (required)

170

- `group.id`: Consumer group ID (recommended)

171

172

### Recommended Kafka Parameters

173

174

- `enable.auto.commit`: Set to `false` for manual offset management

175

- `auto.offset.reset`: Set to "latest" or "earliest" based on requirements

176

177

### Rate Limiting

178

179

The configuration `spark.streaming.kafka.maxRatePerPartition` controls the maximum number of messages per second that each partition will accept. Set to 0 for unlimited rate.

180

181

## Important Notes

182

183

- All stream creation methods are marked as `@Experimental` in Spark 2.4.8

184

- The direct approach provides exactly-once semantics when combined with proper offset management

185

- Consumer instances are automatically managed and cached for performance

186

- Supports both Scala and Java APIs with appropriate type conversions

187

- DStreams created with these methods implement both `HasOffsetRanges` and `CanCommitOffsets` interfaces