or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

batch-rdd.mddirect-streaming.mdindex.mdjava-api.mdoffset-management.mdreceiver-streaming.md

direct-streaming.mddocs/

0

# Direct Streaming

1

2

Direct streaming creates input streams that directly query Kafka brokers without using any receiver. This approach provides exactly-once semantics, lower latency, and better throughput compared to receiver-based streaming.

3

4

## Capabilities

5

6

### Basic Direct Stream Creation

7

8

Creates a direct stream with automatic offset management.

9

10

```scala { .api }

11

/**

12

* Create an input stream that directly pulls messages from Kafka Brokers

13

* without using any receiver. This stream can guarantee that each message

14

* from Kafka is included in transformations exactly once.

15

*

16

* @param ssc StreamingContext object

17

* @param kafkaParams Kafka configuration parameters. Requires "metadata.broker.list"

18

* or "bootstrap.servers" to be set with Kafka broker(s) specified in

19

* host1:port1,host2:port2 form. If not starting from checkpoint,

20

* "auto.offset.reset" may be set to "largest" or "smallest"

21

* @param topics Names of the topics to consume

22

* @tparam K type of Kafka message key

23

* @tparam V type of Kafka message value

24

* @tparam KD type of Kafka message key decoder

25

* @tparam VD type of Kafka message value decoder

26

* @return DStream of (Kafka message key, Kafka message value)

27

*/

28

def createDirectStream[

29

K: ClassTag,

30

V: ClassTag,

31

KD <: Decoder[K]: ClassTag,

32

VD <: Decoder[V]: ClassTag

33

](

34

ssc: StreamingContext,

35

kafkaParams: Map[String, String],

36

topics: Set[String]

37

): InputDStream[(K, V)]

38

```

39

40

**Usage Example:**

41

42

```scala

43

import org.apache.spark.streaming.kafka._

44

import kafka.serializer.StringDecoder

45

46

val kafkaParams = Map[String, String](

47

"metadata.broker.list" -> "localhost:9092",

48

"auto.offset.reset" -> "largest"

49

)

50

val topics = Set("user-events", "purchase-events")

51

52

val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](

53

streamingContext, kafkaParams, topics

54

)

55

56

stream.foreachRDD { rdd =>

57

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

58

rdd.foreach { case (key, value) =>

59

println(s"Key: $key, Value: $value")

60

}

61

// Commit offsets to external store if needed

62

offsetRanges.foreach(println)

63

}

64

```

65

66

### Advanced Direct Stream with Custom Message Handler

67

68

Creates a direct stream with explicit starting offsets and custom message transformation.

69

70

```scala { .api }

71

/**

72

* Create an input stream that directly pulls messages from Kafka Brokers

73

* with explicit starting offsets and custom message handler.

74

*

75

* @param ssc StreamingContext object

76

* @param kafkaParams Kafka configuration parameters

77

* @param fromOffsets Per-topic/partition Kafka offsets defining the inclusive starting point

78

* @param messageHandler Function for translating each message and metadata into desired type

79

* @tparam K type of Kafka message key

80

* @tparam V type of Kafka message value

81

* @tparam KD type of Kafka message key decoder

82

* @tparam VD type of Kafka message value decoder

83

* @tparam R type returned by messageHandler

84

* @return DStream of R

85

*/

86

def createDirectStream[

87

K: ClassTag,

88

V: ClassTag,

89

KD <: Decoder[K]: ClassTag,

90

VD <: Decoder[V]: ClassTag,

91

R: ClassTag

92

](

93

ssc: StreamingContext,

94

kafkaParams: Map[String, String],

95

fromOffsets: Map[TopicAndPartition, Long],

96

messageHandler: MessageAndMetadata[K, V] => R

97

): InputDStream[R]

98

```

99

100

**Usage Example:**

101

102

```scala

103

import kafka.common.TopicAndPartition

104

105

val fromOffsets = Map(

106

TopicAndPartition("events", 0) -> 1000L,

107

TopicAndPartition("events", 1) -> 2000L

108

)

109

110

val messageHandler = (mmd: MessageAndMetadata[String, String]) => {

111

s"${mmd.topic}:${mmd.partition}:${mmd.offset} -> ${mmd.key()}:${mmd.message()}"

112

}

113

114

val customStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](

115

streamingContext, kafkaParams, fromOffsets, messageHandler

116

)

117

118

customStream.print()

119

```

120

121

### Java Direct Stream API

122

123

Java-friendly API for direct stream creation.

124

125

```java { .api }

126

/**

127

* Create an input stream that directly pulls messages from Kafka Brokers

128

* without using any receiver (Java API).

129

*

130

* @param jssc JavaStreamingContext object

131

* @param keyClass Class of the keys in the Kafka records

132

* @param valueClass Class of the values in the Kafka records

133

* @param keyDecoderClass Class of the key decoder

134

* @param valueDecoderClass Class type of the value decoder

135

* @param kafkaParams Kafka configuration parameters

136

* @param topics Names of the topics to consume

137

* @tparam K type of Kafka message key

138

* @tparam V type of Kafka message value

139

* @tparam KD type of Kafka message key decoder

140

* @tparam VD type of Kafka message value decoder

141

* @return DStream of (Kafka message key, Kafka message value)

142

*/

143

public static <K, V, KD extends Decoder<K>, VD extends Decoder<V>>

144

JavaPairInputDStream<K, V> createDirectStream(

145

JavaStreamingContext jssc,

146

Class<K> keyClass,

147

Class<V> valueClass,

148

Class<KD> keyDecoderClass,

149

Class<VD> valueDecoderClass,

150

Map<String, String> kafkaParams,

151

Set<String> topics

152

)

153

```

154

155

**Java Usage Example:**

156

157

```java

158

import org.apache.spark.streaming.kafka.KafkaUtils;

159

import kafka.serializer.StringDecoder;

160

161

Map<String, String> kafkaParams = new HashMap<>();

162

kafkaParams.put("metadata.broker.list", "localhost:9092");

163

kafkaParams.put("auto.offset.reset", "largest");

164

165

Set<String> topics = Collections.singleton("my-topic");

166

167

JavaPairInputDStream<String, String> stream = KafkaUtils.createDirectStream(

168

jssc,

169

String.class,

170

String.class,

171

StringDecoder.class,

172

StringDecoder.class,

173

kafkaParams,

174

topics

175

);

176

177

stream.foreachRDD(rdd -> {

178

OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();

179

rdd.foreach(record -> {

180

System.out.println("Key: " + record._1 + ", Value: " + record._2);

181

});

182

});

183

```

184

185

## Key Features

186

187

### Exactly-Once Semantics

188

- No receivers: Stream directly queries Kafka brokers

189

- Manual offset tracking: Offsets tracked by stream itself, not Zookeeper

190

- Failure recovery: Enable checkpointing in StreamingContext for driver failure recovery

191

- Idempotent output: Ensure output operations are idempotent for end-to-end exactly-once semantics

192

193

### Rate Limiting

194

Configure maximum messages per partition per second using `spark.streaming.kafka.maxRatePerPartition`:

195

196

```scala

197

val conf = new SparkConf()

198

.setAppName("KafkaDirectStream")

199

.set("spark.streaming.kafka.maxRatePerPartition", "1000")

200

```

201

202

### Offset Access

203

Access offset information from generated RDDs:

204

205

```scala

206

stream.foreachRDD { rdd =>

207

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

208

offsetRanges.foreach { offsetRange =>

209

println(s"${offsetRange.topic} ${offsetRange.partition} " +

210

s"${offsetRange.fromOffset} ${offsetRange.untilOffset}")

211

}

212

}

213

```

214

215

## Configuration Parameters

216

217

### Required Parameters

218

```scala

219

Map(

220

"metadata.broker.list" -> "host1:port1,host2:port2" // Kafka brokers (NOT zookeeper)

221

)

222

```

223

224

### Optional Parameters

225

```scala

226

Map(

227

"auto.offset.reset" -> "largest", // or "smallest"

228

"group.id" -> "my-consumer-group",

229

"enable.auto.commit" -> "false"

230

)

231

```

232

233

## Error Handling

234

235

- **SparkException**: Thrown for connectivity issues, invalid offsets, or configuration problems

236

- **Offset validation**: Automatic validation that requested offsets are available on Kafka brokers

237

- **Leader discovery**: Automatic discovery of partition leaders with fallback handling