or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

ajax-operations.mdcombination-operators.mdcore-types.mderror-handling.mdfetch-operations.mdfiltering-operators.mdindex.mdobservable-creation.mdschedulers.mdsubjects.mdtesting-utilities.mdtransformation-operators.mdwebsocket-operations.md

subjects.mddocs/

0

# Subjects

1

2

Special observables that can act as both observer and observable, enabling multicasting patterns where multiple subscribers receive the same values.

3

4

## Capabilities

5

6

### Subject

7

8

Basic subject that multicasts values to multiple subscribers.

9

10

```typescript { .api }

11

/**

12

* Subject acts as both Observable and Observer, enabling multicasting

13

*/

14

class Subject<T> extends Observable<T> implements Observer<T> {

15

/**

16

* Whether the subject has been closed/completed

17

*/

18

readonly closed: boolean;

19

20

/**

21

* Whether the subject has observers

22

*/

23

readonly hasError: boolean;

24

25

/**

26

* Whether the subject is currently being observed

27

*/

28

readonly isStopped: boolean;

29

30

/**

31

* Current observers count

32

*/

33

readonly observers: Observer<T>[];

34

35

/**

36

* Emit a value to all subscribers

37

* @param value - Value to emit

38

*/

39

next(value: T): void;

40

41

/**

42

* Emit an error to all subscribers and complete the subject

43

* @param err - Error to emit

44

*/

45

error(err: any): void;

46

47

/**

48

* Complete the subject, notifying all subscribers

49

*/

50

complete(): void;

51

52

/**

53

* Unsubscribe all observers and clean up resources

54

*/

55

unsubscribe(): void;

56

57

/**

58

* Create observable that shares this subject's notifications

59

* @returns Observable instance

60

*/

61

asObservable(): Observable<T>;

62

}

63

```

64

65

**Usage Examples:**

66

67

```typescript

68

import { Subject } from "rxjs";

69

70

const subject = new Subject<string>();

71

72

// Multiple subscribers

73

subject.subscribe(value => console.log('Observer A:', value));

74

subject.subscribe(value => console.log('Observer B:', value));

75

76

// Emit values - both observers receive them

77

subject.next('Hello');

78

subject.next('World');

79

subject.complete();

80

81

// New subscriber after completion receives nothing

82

subject.subscribe(value => console.log('Observer C:', value)); // No output

83

```

84

85

### BehaviorSubject

86

87

Subject that stores the current value and emits it to new subscribers immediately.

88

89

```typescript { .api }

90

/**

91

* Subject that holds a current value and emits it immediately to new subscribers

92

*/

93

class BehaviorSubject<T> extends Subject<T> {

94

/**

95

* Create BehaviorSubject with initial value

96

* @param initialValue - Initial value to store and emit

97

*/

98

constructor(initialValue: T);

99

100

/**

101

* Current value held by the subject

102

*/

103

readonly value: T;

104

105

/**

106

* Get current value (synchronous)

107

* @returns Current stored value

108

*/

109

getValue(): T;

110

}

111

```

112

113

**Usage Examples:**

114

115

```typescript

116

import { BehaviorSubject } from "rxjs";

117

118

// Create with initial value

119

const behaviorSubject = new BehaviorSubject<number>(0);

120

121

// New subscriber immediately gets current value (0)

122

behaviorSubject.subscribe(val => console.log('Subscriber A:', val));

123

124

// Emit new values

125

behaviorSubject.next(1);

126

behaviorSubject.next(2);

127

128

// New subscriber gets current value (2) immediately

129

behaviorSubject.subscribe(val => console.log('Subscriber B:', val));

130

131

// Access current value synchronously

132

console.log('Current value:', behaviorSubject.value); // 2

133

console.log('Current value (method):', behaviorSubject.getValue()); // 2

134

```

135

136

### ReplaySubject

137

138

Subject that stores recent values and replays them to new subscribers.

139

140

```typescript { .api }

141

/**

142

* Subject that replays recent values to new subscribers

143

*/

144

class ReplaySubject<T> extends Subject<T> {

145

/**

146

* Create ReplaySubject with buffer configuration

147

* @param bufferSize - Number of values to buffer (default: Infinity)

148

* @param windowTime - Time in ms to keep values (default: Infinity)

149

* @param timestampProvider - Custom timestamp provider

150

*/

151

constructor(

152

bufferSize?: number,

153

windowTime?: number,

154

timestampProvider?: TimestampProvider

155

);

156

}

157

158

interface TimestampProvider {

159

now(): number;

160

}

161

```

162

163

**Usage Examples:**

164

165

```typescript

166

import { ReplaySubject } from "rxjs";

167

168

// Replay last 3 values

169

const replaySubject = new ReplaySubject<number>(3);

170

171

// Emit some values

172

replaySubject.next(1);

173

replaySubject.next(2);

174

replaySubject.next(3);

175

replaySubject.next(4);

176

177

// New subscriber gets last 3 values (2, 3, 4)

178

replaySubject.subscribe(val => console.log('Subscriber A:', val));

179

180

// Time-based replay (last 1 second)

181

const timeReplay = new ReplaySubject<string>(Infinity, 1000);

182

timeReplay.next('old');

183

setTimeout(() => {

184

timeReplay.next('recent');

185

// New subscriber only gets 'recent' if subscribed after 1 second

186

timeReplay.subscribe(val => console.log('Time subscriber:', val));

187

}, 1100);

188

```

189

190

### AsyncSubject

191

192

Subject that only emits the last value when completed.

193

194

```typescript { .api }

195

/**

196

* Subject that only emits the last value when the sequence completes

197

*/

198

class AsyncSubject<T> extends Subject<T> {

199

/**

200

* Create AsyncSubject

201

*/

202

constructor();

203

}

204

```

205

206

**Usage Examples:**

207

208

```typescript

209

import { AsyncSubject } from "rxjs";

210

211

const asyncSubject = new AsyncSubject<number>();

212

213

// Subscribers don't receive values until completion

214

asyncSubject.subscribe(val => console.log('Subscriber A:', val));

215

216

asyncSubject.next(1);

217

asyncSubject.next(2);

218

asyncSubject.next(3); // Only this value will be emitted

219

220

asyncSubject.subscribe(val => console.log('Subscriber B:', val));

221

222

// Complete to emit the last value (3) to all subscribers

223

asyncSubject.complete();

224

225

// New subscriber after completion gets the last value

226

asyncSubject.subscribe(val => console.log('Subscriber C:', val)); // 3

227

```

228

229

## Advanced Patterns

230

231

### Subject as Event Bus

232

233

```typescript

234

import { Subject, filter } from "rxjs";

235

236

interface AppEvent {

237

type: string;

238

payload: any;

239

}

240

241

const eventBus = new Subject<AppEvent>();

242

243

// Subscribe to specific event types

244

eventBus.pipe(

245

filter(event => event.type === 'user-login')

246

).subscribe(event => {

247

console.log('User logged in:', event.payload);

248

});

249

250

eventBus.pipe(

251

filter(event => event.type === 'user-logout')

252

).subscribe(event => {

253

console.log('User logged out');

254

});

255

256

// Emit events

257

eventBus.next({ type: 'user-login', payload: { userId: 123 } });

258

eventBus.next({ type: 'user-logout', payload: null });

259

```

260

261

### State Management with BehaviorSubject

262

263

```typescript

264

import { BehaviorSubject, map } from "rxjs";

265

266

interface AppState {

267

user: { id: number; name: string } | null;

268

loading: boolean;

269

}

270

271

class StateService {

272

private state$ = new BehaviorSubject<AppState>({

273

user: null,

274

loading: false

275

});

276

277

// Expose read-only state

278

readonly state = this.state$.asObservable();

279

280

// Specific selectors

281

readonly user$ = this.state$.pipe(map(state => state.user));

282

readonly loading$ = this.state$.pipe(map(state => state.loading));

283

284

setUser(user: { id: number; name: string }) {

285

this.state$.next({

286

...this.state$.value,

287

user

288

});

289

}

290

291

setLoading(loading: boolean) {

292

this.state$.next({

293

...this.state$.value,

294

loading

295

});

296

}

297

}

298

```

299

300

## Types

301

302

```typescript { .api }

303

interface Observer<T> {

304

next: (value: T) => void;

305

error: (err: any) => void;

306

complete: () => void;

307

}

308

309

interface SubjectLike<T> extends Observer<T> {

310

asObservable(): Observable<T>;

311

}

312

313

interface TimestampProvider {

314

now(): number;

315

}

316

```