0
# Stream Collection
1
2
The stream collector provides utilities for converting Node.js streams and Web API ReadableStreams to byte arrays, essential for processing HTTP response bodies in the Smithy ecosystem.
3
4
## Capabilities
5
6
### Stream Collector Function
7
8
Main utility function for converting various stream types to Uint8Array format.
9
10
```typescript { .api }
11
/**
12
* Converts a stream to a byte array.
13
* Supports both Node.js Readable streams and Web API ReadableStreams.
14
*/
15
const streamCollector: StreamCollector;
16
17
/**
18
* Type definition for the stream collector function
19
*/
20
type StreamCollector = (stream: Readable | ReadableStream): Promise<Uint8Array>;
21
```
22
23
**Usage Examples:**
24
25
```typescript
26
import { streamCollector } from "@smithy/node-http-handler";
27
import { Readable } from "stream";
28
import { createReadStream } from "fs";
29
30
// Collect from Node.js Readable stream
31
const fileStream = createReadStream("data.txt");
32
const fileBytes = await streamCollector(fileStream);
33
console.log(fileBytes); // Uint8Array containing file contents
34
35
// Collect from string-based stream
36
const textStream = Readable.from(["Hello", " ", "World"]);
37
const textBytes = await streamCollector(textStream);
38
const text = new TextDecoder().decode(textBytes);
39
console.log(text); // "Hello World"
40
41
// Collect from Web API ReadableStream (Node.js 18+)
42
const response = await fetch("https://api.example.com/data");
43
if (response.body) {
44
const responseBytes = await streamCollector(response.body);
45
const responseText = new TextDecoder().decode(responseBytes);
46
console.log(responseText);
47
}
48
49
// Collect from HTTP response stream
50
import { NodeHttpHandler } from "@smithy/node-http-handler";
51
import { HttpRequest } from "@smithy/protocol-http";
52
53
const handler = new NodeHttpHandler();
54
const request = new HttpRequest({
55
method: "GET",
56
hostname: "api.example.com",
57
path: "/large-file"
58
});
59
60
const { response } = await handler.handle(request);
61
if (response.body) {
62
const bodyBytes = await streamCollector(response.body);
63
console.log(`Downloaded ${bodyBytes.length} bytes`);
64
}
65
```
66
67
### Collector Class
68
69
Internal writable stream implementation used for collecting data chunks from Node.js streams.
70
71
```typescript { .api }
72
/**
73
* Internal writable stream for collecting data chunks.
74
* Extends Node.js Writable stream.
75
*/
76
class Collector extends Writable {
77
/**
78
* Array of collected buffer chunks
79
*/
80
bufferedBytes: Buffer[];
81
82
/**
83
* Create a new collector instance - inherits Writable constructor
84
* @param options - WritableOptions for the underlying Writable stream
85
*/
86
constructor(options?: WritableOptions);
87
88
/**
89
* Stream write implementation
90
* @param chunk - Data chunk to write
91
* @param encoding - Character encoding (if applicable)
92
* @param callback - Completion callback
93
*/
94
_write(chunk: Buffer, encoding: string, callback: (err?: Error) => void): void;
95
}
96
```
97
98
## Stream Type Handling
99
100
The stream collector automatically detects and handles different stream types:
101
102
### Node.js Readable Streams
103
104
```typescript
105
// Traditional Node.js streams
106
const nodeStream = new Readable({
107
read() {
108
this.push("chunk 1");
109
this.push("chunk 2");
110
this.push(null); // End stream
111
}
112
});
113
114
const bytes = await streamCollector(nodeStream);
115
```
116
117
### Web API ReadableStreams
118
119
```typescript
120
// Web API streams (Node.js 18+)
121
const webStream = new ReadableStream({
122
start(controller) {
123
controller.enqueue(new TextEncoder().encode("Hello"));
124
controller.enqueue(new TextEncoder().encode(" World"));
125
controller.close();
126
}
127
});
128
129
const bytes = await streamCollector(webStream);
130
```
131
132
## Error Handling
133
134
The stream collector provides comprehensive error handling for stream processing:
135
136
```typescript
137
import { streamCollector } from "@smithy/node-http-handler";
138
import { Readable } from "stream";
139
140
// Handle stream errors
141
const errorStream = new Readable({
142
read() {
143
// Simulate an error
144
this.emit("error", new Error("Stream processing failed"));
145
}
146
});
147
148
try {
149
const bytes = await streamCollector(errorStream);
150
} catch (error) {
151
console.error("Stream collection failed:", error.message);
152
}
153
154
// Handle network stream errors
155
import { NodeHttpHandler } from "@smithy/node-http-handler";
156
157
const handler = new NodeHttpHandler();
158
try {
159
const { response } = await handler.handle(request);
160
const bodyBytes = await streamCollector(response.body);
161
} catch (error) {
162
if (error.code === "ECONNRESET") {
163
console.error("Connection was reset during stream collection");
164
} else if (error.code === "ETIMEDOUT") {
165
console.error("Stream collection timed out");
166
} else {
167
console.error("Stream collection error:", error.message);
168
}
169
}
170
```
171
172
## Types
173
174
```typescript { .api }
175
// From Node.js stream module
176
interface Readable {
177
pipe<T extends Writable>(destination: T): T;
178
on(event: string, listener: Function): this;
179
// ... other Readable properties and methods
180
}
181
182
// From stream/web module (Node.js 16.5.0+)
183
interface ReadableStream<R = any> {
184
getReader(): ReadableStreamDefaultReader<R>;
185
// ... other ReadableStream properties and methods
186
}
187
188
// From Node.js stream module
189
abstract class Writable {
190
abstract _write(chunk: any, encoding: string, callback: (error?: Error | null) => void): void;
191
end(): void;
192
on(event: string, listener: Function): this;
193
// ... other Writable properties and methods
194
}
195
196
// From Node.js stream module
197
interface WritableOptions {
198
highWaterMark?: number;
199
objectMode?: boolean;
200
// ... other Writable options
201
}
202
```
203
204
The stream collector is optimized for memory efficiency and handles both legacy Node.js streams and modern Web API streams seamlessly, making it ideal for processing HTTP response bodies of any size.