or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

index.mdqueuing-strategies.mdreadable-streams.mdtransform-streams.mdwritable-streams.md

queuing-strategies.mddocs/

0

# Queuing Strategies

1

2

Backpressure management through queuing strategies that control how data is buffered within streams and determine when backpressure should be applied.

3

4

## Capabilities

5

6

### CountQueuingStrategy Class

7

8

A queuing strategy that counts the number of chunks in the queue, regardless of their individual sizes.

9

10

```typescript { .api }

11

/**

12

* A queuing strategy that counts the number of chunks

13

*/

14

class CountQueuingStrategy implements QueuingStrategy<any> {

15

constructor(options: { highWaterMark: number });

16

17

/** Returns the high water mark provided to the constructor */

18

readonly highWaterMark: number;

19

20

/** Measures the size of chunk by always returning 1 */

21

readonly size: (chunk: any) => 1;

22

}

23

```

24

25

**Usage Examples:**

26

27

```typescript

28

import { ReadableStream, CountQueuingStrategy } from "web-streams-polyfill";

29

30

// Readable stream that buffers up to 5 chunks

31

const stream = new ReadableStream({

32

start(controller) {

33

controller.enqueue("chunk 1");

34

controller.enqueue("chunk 2");

35

controller.enqueue("chunk 3");

36

// Stream will signal backpressure after 5 chunks

37

}

38

}, new CountQueuingStrategy({ highWaterMark: 5 }));

39

40

// Writable stream with count-based backpressure

41

const writable = new WritableStream({

42

write(chunk) {

43

console.log("Processing:", chunk);

44

return new Promise(resolve => setTimeout(resolve, 100));

45

}

46

}, new CountQueuingStrategy({ highWaterMark: 3 }));

47

48

// Transform stream with different strategies for each side

49

const transform = new TransformStream({

50

transform(chunk, controller) {

51

// Split each input chunk into multiple output chunks

52

const parts = chunk.toString().split(' ');

53

parts.forEach(part => controller.enqueue(part));

54

}

55

},

56

new CountQueuingStrategy({ highWaterMark: 2 }), // writable side

57

new CountQueuingStrategy({ highWaterMark: 10 }) // readable side

58

);

59

```

60

61

### ByteLengthQueuingStrategy Class

62

63

A queuing strategy that measures the size of chunks by their byte length, useful for streams dealing with binary data or text where the size of individual chunks matters.

64

65

```typescript { .api }

66

/**

67

* A queuing strategy that counts the number of bytes in each chunk

68

*/

69

class ByteLengthQueuingStrategy implements QueuingStrategy<ArrayBufferView> {

70

constructor(options: { highWaterMark: number });

71

72

/** Returns the high water mark provided to the constructor */

73

readonly highWaterMark: number;

74

75

/** Measures the size of chunk by returning the value of its byteLength property */

76

readonly size: (chunk: ArrayBufferView) => number;

77

}

78

```

79

80

**Usage Examples:**

81

82

```typescript

83

import { ReadableStream, ByteLengthQueuingStrategy } from "web-streams-polyfill";

84

85

// Readable stream that buffers up to 1MB of data

86

const byteStream = new ReadableStream({

87

start(controller) {

88

const chunk1 = new Uint8Array(512 * 1024); // 512KB

89

const chunk2 = new Uint8Array(256 * 1024); // 256KB

90

91

controller.enqueue(chunk1);

92

controller.enqueue(chunk2);

93

// Stream will signal backpressure after 1MB total

94

}

95

}, new ByteLengthQueuingStrategy({ highWaterMark: 1024 * 1024 }));

96

97

// File upload stream with byte-based backpressure

98

const uploadStream = new WritableStream({

99

async write(chunk) {

100

// Upload chunk to server

101

await fetch('/upload', {

102

method: 'POST',

103

body: chunk

104

});

105

}

106

}, new ByteLengthQueuingStrategy({ highWaterMark: 64 * 1024 })); // 64KB buffer

107

108

// Transform stream that processes binary data

109

const compressionTransform = new TransformStream({

110

transform(chunk, controller) {

111

// Simulate compression (input: Uint8Array, output: compressed Uint8Array)

112

const compressed = new Uint8Array(chunk.byteLength / 2); // 50% compression

113

compressed.set(new Uint8Array(chunk.buffer.slice(0, compressed.length)));

114

controller.enqueue(compressed);

115

}

116

},

117

new ByteLengthQueuingStrategy({ highWaterMark: 1024 * 1024 }), // 1MB input buffer

118

new ByteLengthQueuingStrategy({ highWaterMark: 512 * 1024 }) // 512KB output buffer

119

);

120

```

121

122

### Custom Queuing Strategies

123

124

You can create custom queuing strategies by implementing the QueuingStrategy interface.

125

126

```typescript { .api }

127

interface QueuingStrategy<T = any> {

128

/** The high water mark value */

129

highWaterMark?: number;

130

131

/** Function that computes and returns the finite non-negative size of the given chunk value */

132

size?: (chunk: T) => number;

133

}

134

```

135

136

**Usage Examples:**

137

138

```typescript

139

import { ReadableStream, WritableStream } from "web-streams-polyfill";

140

141

// Custom strategy that measures string length

142

const stringLengthStrategy = {

143

highWaterMark: 1000, // Buffer up to 1000 characters

144

size(chunk: string) {

145

return chunk.length;

146

}

147

};

148

149

const textStream = new ReadableStream({

150

start(controller) {

151

controller.enqueue("Hello"); // 5 characters

152

controller.enqueue("World"); // 5 characters

153

controller.enqueue("!"); // 1 character

154

// Total: 11 characters (well under 1000 limit)

155

}

156

}, stringLengthStrategy);

157

158

// Custom strategy for objects based on property count

159

const objectComplexityStrategy = {

160

highWaterMark: 100, // Buffer objects with up to 100 total properties

161

size(chunk: object) {

162

return Object.keys(chunk).length;

163

}

164

};

165

166

const objectStream = new WritableStream({

167

write(chunk) {

168

console.log(`Processing object with ${Object.keys(chunk).length} properties`);

169

}

170

}, objectComplexityStrategy);

171

172

// Custom strategy with priority-based sizing

173

interface PriorityItem {

174

data: any;

175

priority: 'low' | 'medium' | 'high';

176

}

177

178

const priorityStrategy = {

179

highWaterMark: 50,

180

size(chunk: PriorityItem) {

181

switch (chunk.priority) {

182

case 'high': return 10; // High priority items take more buffer space

183

case 'medium': return 5;

184

case 'low': return 1;

185

default: return 1;

186

}

187

}

188

};

189

190

const priorityStream = new ReadableStream({

191

start(controller) {

192

controller.enqueue({ data: "urgent", priority: 'high' as const });

193

controller.enqueue({ data: "normal", priority: 'medium' as const });

194

controller.enqueue({ data: "background", priority: 'low' as const });

195

}

196

}, priorityStrategy);

197

```

198

199

## Queuing Strategy Types

200

201

```typescript { .api }

202

interface QueuingStrategyInit {

203

/** The high water mark value, which must be a non-negative number */

204

highWaterMark: number;

205

}

206

207

type QueuingStrategySizeCallback<T = any> = (chunk: T) => number;

208

```

209

210

## Backpressure Management

211

212

Queuing strategies work together with stream controllers to manage backpressure:

213

214

```typescript

215

import { WritableStream, CountQueuingStrategy } from "web-streams-polyfill";

216

217

const slowProcessor = new WritableStream({

218

async write(chunk, controller) {

219

// Simulate slow processing

220

await new Promise(resolve => setTimeout(resolve, 1000));

221

console.log("Processed:", chunk);

222

}

223

}, new CountQueuingStrategy({ highWaterMark: 2 }));

224

225

const writer = slowProcessor.getWriter();

226

227

async function writeWithBackpressure() {

228

try {

229

await writer.write("chunk 1");

230

console.log("Desired size:", writer.desiredSize); // 1 (2 - 1)

231

232

await writer.write("chunk 2");

233

console.log("Desired size:", writer.desiredSize); // 0 (2 - 2)

234

235

await writer.write("chunk 3");

236

console.log("Desired size:", writer.desiredSize); // -1 (2 - 3, backpressure!)

237

238

// The write above will wait until there's space in the queue

239

// before resolving, providing automatic backpressure handling

240

241

} finally {

242

writer.releaseLock();

243

}

244

}

245

246

writeWithBackpressure();

247

```

248

249

## Default Strategies

250

251

When no strategy is provided, streams use default strategies:

252

253

```typescript

254

// These are equivalent:

255

const stream1 = new ReadableStream(source);

256

const stream2 = new ReadableStream(source, new CountQueuingStrategy({ highWaterMark: 1 }));

257

258

// For writable streams:

259

const writable1 = new WritableStream(sink);

260

const writable2 = new WritableStream(sink, new CountQueuingStrategy({ highWaterMark: 1 }));

261

262

// For byte streams, the default high water mark is 0:

263

const byteStream1 = new ReadableStream({ type: 'bytes' });

264

const byteStream2 = new ReadableStream({ type: 'bytes' }, { highWaterMark: 0 });

265

```