or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

ajax-operations.mdcombination-operators.mdcore-types.mderror-handling.mdfetch-operations.mdfiltering-operators.mdindex.mdobservable-creation.mdschedulers.mdsubjects.mdtesting-utilities.mdtransformation-operators.mdwebsocket-operations.md

fetch-operations.mddocs/

0

# Fetch Operations

1

2

Modern fetch-based HTTP requests with full observable integration, streaming support, and comprehensive error handling for web APIs.

3

4

## Capabilities

5

6

### fromFetch

7

8

Create observables from fetch requests with full streaming and cancellation support.

9

10

```typescript { .api }

11

/**

12

* Create observable from fetch request with streaming and cancellation support

13

* @param input - Request URL or Request object

14

* @param initWithSelector - Fetch init options with optional response selector

15

* @returns Observable emitting Response or selected response data

16

*/

17

function fromFetch<T>(

18

input: string | Request,

19

initWithSelector?: RequestInit & {

20

selector?: (response: Response) => ObservableInput<T>;

21

}

22

): Observable<T extends never ? Response : T>;

23

```

24

25

**Usage Examples:**

26

27

```typescript

28

import { fromFetch } from "rxjs/fetch";

29

import { switchMap, catchError } from "rxjs/operators";

30

import { of } from "rxjs";

31

32

// Simple GET request

33

fromFetch('/api/users').pipe(

34

switchMap(response => {

35

if (response.ok) {

36

return response.json();

37

} else {

38

throw new Error(`HTTP ${response.status}: ${response.statusText}`);

39

}

40

}),

41

catchError(err => {

42

console.error('Request failed:', err);

43

return of({ users: [], error: 'Failed to load users' });

44

})

45

).subscribe(data => console.log('Users:', data));

46

47

// POST request with JSON body

48

const postData = { name: 'Alice', email: 'alice@example.com' };

49

50

fromFetch('/api/users', {

51

method: 'POST',

52

headers: {

53

'Content-Type': 'application/json',

54

},

55

body: JSON.stringify(postData)

56

}).pipe(

57

switchMap(response => {

58

if (response.ok) {

59

return response.json();

60

} else {

61

throw new Error(`Failed to create user: ${response.status}`);

62

}

63

})

64

).subscribe(

65

user => console.log('Created user:', user),

66

err => console.error('Error:', err)

67

);

68

69

// Request with automatic JSON parsing using selector

70

fromFetch('/api/data', {

71

selector: response => response.json()

72

}).subscribe(

73

data => console.log('Data:', data),

74

err => console.error('Error:', err)

75

);

76

77

// Request with custom headers and timeout

78

fromFetch('/api/secure-data', {

79

method: 'GET',

80

headers: {

81

'Authorization': 'Bearer ' + token,

82

'Accept': 'application/json'

83

},

84

signal: AbortSignal.timeout(5000) // 5 second timeout

85

}).pipe(

86

switchMap(response => {

87

if (response.status === 401) {

88

throw new Error('Unauthorized - token may be expired');

89

}

90

if (!response.ok) {

91

throw new Error(`HTTP ${response.status}`);

92

}

93

return response.json();

94

})

95

).subscribe(

96

data => console.log('Secure data:', data),

97

err => console.error('Request error:', err)

98

);

99

```

100

101

### Advanced Fetch Patterns

102

103

**Streaming Response Bodies:**

104

105

```typescript

106

import { fromFetch } from "rxjs/fetch";

107

import { switchMap, tap } from "rxjs/operators";

108

109

// Stream large response as text chunks

110

fromFetch('/api/large-dataset').pipe(

111

switchMap(response => {

112

if (!response.ok) {

113

throw new Error(`HTTP ${response.status}`);

114

}

115

116

// Get readable stream

117

const reader = response.body?.getReader();

118

if (!reader) {

119

throw new Error('Response body not readable');

120

}

121

122

return new Observable(subscriber => {

123

const pump = () => {

124

reader.read().then(({ done, value }) => {

125

if (done) {

126

subscriber.complete();

127

return;

128

}

129

130

// Emit chunk as Uint8Array

131

subscriber.next(value);

132

pump();

133

}).catch(err => subscriber.error(err));

134

};

135

136

pump();

137

138

// Cleanup

139

return () => reader.cancel();

140

});

141

}),

142

tap(chunk => console.log('Received chunk:', chunk.length, 'bytes'))

143

).subscribe(

144

chunk => {

145

// Process each chunk

146

const text = new TextDecoder().decode(chunk);

147

console.log('Chunk text:', text);

148

},

149

err => console.error('Stream error:', err),

150

() => console.log('Stream complete')

151

);

152

```

153

154

**Request Cancellation with AbortController:**

155

156

```typescript

157

import { fromFetch } from "rxjs/fetch";

158

import { takeUntil, switchMap } from "rxjs/operators";

159

import { Subject, timer } from "rxjs";

160

161

const cancelSubject = new Subject<void>();

162

163

// Request that can be cancelled

164

fromFetch('/api/slow-endpoint', {

165

signal: new AbortController().signal

166

}).pipe(

167

takeUntil(cancelSubject), // Cancel when cancelSubject emits

168

switchMap(response => response.json())

169

).subscribe(

170

data => console.log('Data:', data),

171

err => {

172

if (err.name === 'AbortError') {

173

console.log('Request was cancelled');

174

} else {

175

console.error('Request error:', err);

176

}

177

}

178

);

179

180

// Cancel the request after 3 seconds

181

timer(3000).subscribe(() => {

182

console.log('Cancelling request...');

183

cancelSubject.next();

184

});

185

```

186

187

**Retry with Exponential Backoff:**

188

189

```typescript

190

import { fromFetch } from "rxjs/fetch";

191

import { retryWhen, delay, scan, switchMap } from "rxjs/operators";

192

import { throwError, timer } from "rxjs";

193

194

fromFetch('/api/unreliable-endpoint').pipe(

195

switchMap(response => {

196

if (!response.ok) {

197

throw new Error(`HTTP ${response.status}`);

198

}

199

return response.json();

200

}),

201

retryWhen(errors =>

202

errors.pipe(

203

scan((retryCount, err) => {

204

console.log(`Attempt ${retryCount + 1} failed:`, err.message);

205

206

// Stop retrying after 3 attempts

207

if (retryCount >= 2) {

208

throw err;

209

}

210

return retryCount + 1;

211

}, 0),

212

// Exponential backoff: 1s, 2s, 4s

213

switchMap(retryCount => timer(Math.pow(2, retryCount) * 1000))

214

)

215

)

216

).subscribe(

217

data => console.log('Success:', data),

218

err => console.error('Final error after retries:', err)

219

);

220

```

221

222

**File Upload with Progress:**

223

224

```typescript

225

import { fromFetch } from "rxjs/fetch";

226

import { switchMap } from "rxjs/operators";

227

228

function uploadFile(file: File, url: string) {

229

const formData = new FormData();

230

formData.append('file', file);

231

232

return fromFetch(url, {

233

method: 'POST',

234

body: formData,

235

// Note: Don't set Content-Type header for FormData

236

// Browser will set it automatically with boundary

237

}).pipe(

238

switchMap(response => {

239

if (!response.ok) {

240

throw new Error(`Upload failed: ${response.status}`);

241

}

242

return response.json();

243

})

244

);

245

}

246

247

// Usage

248

const fileInput = document.querySelector('input[type="file"]') as HTMLInputElement;

249

const file = fileInput.files?.[0];

250

251

if (file) {

252

uploadFile(file, '/api/upload').subscribe(

253

result => console.log('Upload successful:', result),

254

err => console.error('Upload error:', err)

255

);

256

}

257

```

258

259

**Parallel Requests with Error Handling:**

260

261

```typescript

262

import { fromFetch } from "rxjs/fetch";

263

import { forkJoin, of } from "rxjs";

264

import { switchMap, catchError } from "rxjs/operators";

265

266

// Fetch multiple resources in parallel

267

const requests = [

268

'/api/users',

269

'/api/posts',

270

'/api/comments'

271

].map(url =>

272

fromFetch(url).pipe(

273

switchMap(response => {

274

if (!response.ok) {

275

throw new Error(`Failed to fetch ${url}: ${response.status}`);

276

}

277

return response.json();

278

}),

279

catchError(err => {

280

console.error(`Error fetching ${url}:`, err);

281

return of(null); // Return null for failed requests

282

})

283

)

284

);

285

286

forkJoin(requests).subscribe(

287

([users, posts, comments]) => {

288

console.log('Users:', users);

289

console.log('Posts:', posts);

290

console.log('Comments:', comments);

291

292

// Handle cases where some requests failed (null values)

293

if (users) {

294

// Process users

295

}

296

if (posts) {

297

// Process posts

298

}

299

}

300

);

301

```

302

303

**Request Deduplication:**

304

305

```typescript

306

import { fromFetch } from "rxjs/fetch";

307

import { shareReplay, switchMap } from "rxjs/operators";

308

import { BehaviorSubject } from "rxjs";

309

310

// Cache and deduplicate identical requests

311

const requestCache = new Map<string, Observable<any>>();

312

313

function cachedFetch(url: string, ttl: number = 60000) {

314

if (requestCache.has(url)) {

315

return requestCache.get(url)!;

316

}

317

318

const request$ = fromFetch(url).pipe(

319

switchMap(response => {

320

if (!response.ok) {

321

throw new Error(`HTTP ${response.status}`);

322

}

323

return response.json();

324

}),

325

shareReplay({ bufferSize: 1, refCount: true })

326

);

327

328

requestCache.set(url, request$);

329

330

// Clear cache after TTL

331

timer(ttl).subscribe(() => {

332

requestCache.delete(url);

333

});

334

335

return request$;

336

}

337

338

// Multiple calls to same URL will share the same request

339

cachedFetch('/api/config').subscribe(config => console.log('Config 1:', config));

340

cachedFetch('/api/config').subscribe(config => console.log('Config 2:', config));

341

// Only one HTTP request is made

342

```

343

344

## Integration with Other RxJS Features

345

346

**Combining with WebSocket for Real-time Updates:**

347

348

```typescript

349

import { fromFetch } from "rxjs/fetch";

350

import { webSocket } from "rxjs/webSocket";

351

import { merge, switchMap } from "rxjs/operators";

352

353

// Initial data from REST API

354

const initialData$ = fromFetch('/api/data').pipe(

355

switchMap(response => response.json())

356

);

357

358

// Real-time updates via WebSocket

359

const updates$ = webSocket('ws://localhost:8080/updates');

360

361

// Combine initial data with real-time updates

362

merge(initialData$, updates$).subscribe(

363

data => console.log('Data update:', data)

364

);

365

```

366

367

**Request/Response Middleware Pattern:**

368

369

```typescript

370

import { fromFetch } from "rxjs/fetch";

371

import { switchMap, tap, finalize } from "rxjs/operators";

372

373

// Request interceptor

374

function withAuth(request: RequestInit = {}): RequestInit {

375

return {

376

...request,

377

headers: {

378

...request.headers,

379

'Authorization': `Bearer ${getAuthToken()}`

380

}

381

};

382

}

383

384

// Response interceptor

385

function withLogging<T>(source: Observable<T>): Observable<T> {

386

return source.pipe(

387

tap(response => console.log('Response received:', response)),

388

finalize(() => console.log('Request completed'))

389

);

390

}

391

392

// Usage with middleware

393

function apiCall(url: string, options?: RequestInit) {

394

return fromFetch(url, withAuth(options)).pipe(

395

switchMap(response => {

396

if (response.status === 401) {

397

// Handle auth refresh

398

return refreshToken().pipe(

399

switchMap(() => fromFetch(url, withAuth(options)))

400

);

401

}

402

if (!response.ok) {

403

throw new Error(`HTTP ${response.status}`);

404

}

405

return response.json();

406

})

407

);

408

}

409

410

// Apply logging middleware

411

withLogging(apiCall('/api/protected-data')).subscribe(

412

data => console.log('Protected data:', data),

413

err => console.error('Error:', err)

414

);

415

```

416

417

## Types

418

419

```typescript { .api }

420

interface RequestInit {

421

method?: string;

422

headers?: HeadersInit;

423

body?: BodyInit | null;

424

mode?: RequestMode;

425

credentials?: RequestCredentials;

426

cache?: RequestCache;

427

redirect?: RequestRedirect;

428

referrer?: string;

429

referrerPolicy?: ReferrerPolicy;

430

integrity?: string;

431

keepalive?: boolean;

432

signal?: AbortSignal | null;

433

window?: any;

434

}

435

436

interface Response {

437

readonly headers: Headers;

438

readonly ok: boolean;

439

readonly redirected: boolean;

440

readonly status: number;

441

readonly statusText: string;

442

readonly type: ResponseType;

443

readonly url: string;

444

readonly body: ReadableStream<Uint8Array> | null;

445

readonly bodyUsed: boolean;

446

447

arrayBuffer(): Promise<ArrayBuffer>;

448

blob(): Promise<Blob>;

449

formData(): Promise<FormData>;

450

json(): Promise<any>;

451

text(): Promise<string>;

452

clone(): Response;

453

}

454

455

type ObservableInput<T> = Observable<T> | Promise<T> | Iterable<T>;

456

type OperatorFunction<T, R> = (source: Observable<T>) => Observable<R>;

457

```