0
# Topics and Service Discovery
1
2
Topics provide a service discovery mechanism enabling data exchange between sequences, external systems, and clients through named data streams.
3
4
## Capabilities
5
6
### Topic Management
7
8
Create, list, and delete topics for data exchange.
9
10
```typescript { .api }
11
/**
12
* Creates a new topic for data exchange (via HostClient)
13
* @param id - Topic identifier/name
14
* @param contentType - MIME type for topic data (e.g., "application/json", "text/plain")
15
* @returns Promise resolving to topic creation result
16
*/
17
createTopic(id: string, contentType: string): Promise<{ topicName: string }>;
18
19
/**
20
* Lists all available topics (via HostClient)
21
* @returns Promise resolving to array of topic information
22
*/
23
getTopics(): Promise<STHRestAPI.GetTopicsResponse>;
24
25
/**
26
* Deletes a topic (via HostClient)
27
* @param id - Topic identifier to delete
28
* @returns Promise resolving to deletion confirmation
29
*/
30
deleteTopic(id: string): Promise<{ message: string }>;
31
```
32
33
**Usage Examples:**
34
35
```typescript
36
import { HostClient } from "@scramjet/api-client";
37
38
const host = new HostClient("http://localhost:8000/api/v1");
39
40
// Create topics for different data types
41
await host.createTopic("sensor-data", "application/json");
42
await host.createTopic("log-stream", "text/plain");
43
await host.createTopic("binary-data", "application/octet-stream");
44
45
// List all topics
46
const topics = await host.getTopics();
47
topics.forEach(topic => {
48
console.log(`Topic: ${topic.id} (${topic.contentType})`);
49
});
50
51
// Clean up topic when no longer needed
52
await host.deleteTopic("sensor-data");
53
```
54
55
### Data Publishing
56
57
Send data to topics for consumption by subscribers.
58
59
```typescript { .api }
60
/**
61
* Sends data to a named topic (via HostClient)
62
* @param topic - Topic name to send data to
63
* @param stream - Data stream to publish
64
* @param requestInit - Optional request configuration
65
* @param contentType - Content type override (defaults to "application/x-ndjson")
66
* @param end - Whether to signal end of stream to subscribers
67
* @returns Promise resolving to publish result
68
*/
69
sendTopic<T>(
70
topic: string,
71
stream: Parameters<HttpClient["sendStream"]>[1],
72
requestInit?: RequestInit,
73
contentType?: string,
74
end?: boolean
75
): Promise<T>;
76
77
/**
78
* Convenience alias for sendTopic
79
*/
80
readonly sendNamedData: typeof sendTopic;
81
```
82
83
**Usage Examples:**
84
85
```typescript
86
import { Readable } from "stream";
87
88
// Send JSON data to a topic
89
const sensorData = [
90
{ timestamp: Date.now(), temperature: 23.5, humidity: 60 },
91
{ timestamp: Date.now() + 1000, temperature: 23.7, humidity: 59 }
92
];
93
94
const jsonStream = Readable.from(
95
sensorData.map(data => JSON.stringify(data) + '\n')
96
);
97
98
await host.sendTopic("sensor-readings", jsonStream, {}, "application/json");
99
100
// Send text data
101
const logEntries = Readable.from([
102
"INFO: Application started\n",
103
"DEBUG: Configuration loaded\n",
104
"INFO: Server listening on port 3000\n"
105
]);
106
107
await host.sendTopic("application-logs", logEntries, {}, "text/plain");
108
109
// Send binary data
110
const binaryData = fs.createReadStream("./data.bin");
111
await host.sendTopic("file-upload", binaryData, {}, "application/octet-stream");
112
```
113
114
### Data Subscription
115
116
Subscribe to topics to receive published data.
117
118
```typescript { .api }
119
/**
120
* Gets data stream from a named topic (via HostClient)
121
* @param topic - Topic name to subscribe to
122
* @param requestInit - Optional request configuration
123
* @param contentType - Expected content type (defaults to "application/x-ndjson")
124
* @returns Promise resolving to readable stream of topic data
125
*/
126
getTopic(
127
topic: string,
128
requestInit?: RequestInit,
129
contentType?: string
130
): ReturnType<HttpClient["getStream"]>;
131
132
/**
133
* Convenience alias for getTopic
134
*/
135
readonly getNamedData: typeof getTopic;
136
```
137
138
**Usage Examples:**
139
140
```typescript
141
// Subscribe to JSON data stream
142
const sensorStream = await host.getTopic("sensor-readings", {}, "application/json");
143
144
sensorStream.on('data', (chunk) => {
145
const readings = chunk.toString().split('\n').filter(Boolean);
146
readings.forEach(line => {
147
const data = JSON.parse(line);
148
console.log(`Temperature: ${data.temperature}°C, Humidity: ${data.humidity}%`);
149
});
150
});
151
152
// Subscribe to log stream
153
const logStream = await host.getTopic("application-logs", {}, "text/plain");
154
logStream.pipe(process.stdout); // Direct pipe to console
155
156
// Handle connection errors
157
sensorStream.on('error', (error) => {
158
console.error('Topic subscription error:', error.message);
159
// Implement reconnection logic
160
});
161
162
sensorStream.on('end', () => {
163
console.log('Topic stream ended');
164
});
165
```
166
167
### Manager-Level Topics
168
169
Access topics across multiple hubs through ManagerClient.
170
171
```typescript { .api }
172
/**
173
* Sends data to a named topic across the hub network (via ManagerClient)
174
* @param topic - Topic name
175
* @param stream - Data stream to send
176
* @param requestInit - Optional request configuration
177
* @param contentType - Content type
178
* @param end - Whether to signal end of stream
179
* @returns Promise resolving to send result
180
*/
181
sendNamedData<T>(
182
topic: string,
183
stream: Parameters<HttpClient["sendStream"]>[1],
184
requestInit?: RequestInit,
185
contentType?: string,
186
end?: boolean
187
): Promise<T>;
188
189
/**
190
* Gets data stream from a named topic across the hub network (via ManagerClient)
191
* @param topic - Topic name
192
* @param requestInit - Optional request configuration
193
* @returns Promise resolving to aggregated topic data stream
194
*/
195
getNamedData(topic: string, requestInit?: RequestInit): Promise<Readable>;
196
```
197
198
## Response Types
199
200
```typescript { .api }
201
interface STHRestAPI {
202
GetTopicsResponse: Array<{
203
id: string;
204
contentType: string;
205
created: string;
206
subscribers?: number;
207
publishers?: number;
208
[key: string]: any;
209
}>;
210
}
211
```
212
213
## Common Patterns
214
215
### Publisher-Subscriber Pattern
216
217
```typescript
218
class DataPublisher {
219
constructor(private host: HostClient, private topicId: string) {}
220
221
async initialize(contentType: string) {
222
await this.host.createTopic(this.topicId, contentType);
223
}
224
225
async publish(data: any[]) {
226
const dataStream = Readable.from(
227
data.map(item => JSON.stringify(item) + '\n')
228
);
229
230
await this.host.sendTopic(this.topicId, dataStream, {}, "application/json");
231
}
232
233
async cleanup() {
234
await this.host.deleteTopic(this.topicId);
235
}
236
}
237
238
class DataSubscriber {
239
constructor(private host: HostClient, private topicId: string) {}
240
241
async subscribe(onData: (data: any) => void) {
242
const stream = await this.host.getTopic(this.topicId, {}, "application/json");
243
244
stream.on('data', (chunk) => {
245
const lines = chunk.toString().split('\n').filter(Boolean);
246
lines.forEach(line => {
247
try {
248
const data = JSON.parse(line);
249
onData(data);
250
} catch (error) {
251
console.error('Failed to parse topic data:', error.message);
252
}
253
});
254
});
255
256
return stream;
257
}
258
}
259
260
// Usage
261
const publisher = new DataPublisher(host, "market-data");
262
const subscriber = new DataSubscriber(host, "market-data");
263
264
await publisher.initialize("application/json");
265
266
// Subscribe to data
267
await subscriber.subscribe((data) => {
268
console.log('Received market data:', data);
269
});
270
271
// Publish data
272
await publisher.publish([
273
{ symbol: "AAPL", price: 150.25, volume: 1000 },
274
{ symbol: "GOOGL", price: 2800.50, volume: 500 }
275
]);
276
```
277
278
### Real-time Data Pipeline
279
280
```typescript
281
class RealTimeDataPipeline {
282
constructor(
283
private host: HostClient,
284
private inputTopic: string,
285
private outputTopic: string
286
) {}
287
288
async setup() {
289
await this.host.createTopic(this.inputTopic, "application/json");
290
await this.host.createTopic(this.outputTopic, "application/json");
291
}
292
293
async startProcessing(processor: (data: any) => any) {
294
// Subscribe to input
295
const inputStream = await this.host.getTopic(this.inputTopic);
296
297
// Create output stream
298
const { Writable } = require('stream');
299
const outputBuffer: string[] = [];
300
301
const outputStream = new Writable({
302
write(chunk, encoding, callback) {
303
outputBuffer.push(chunk.toString());
304
callback();
305
}
306
});
307
308
// Process data
309
inputStream.on('data', (chunk) => {
310
const lines = chunk.toString().split('\n').filter(Boolean);
311
312
lines.forEach(line => {
313
try {
314
const inputData = JSON.parse(line);
315
const processedData = processor(inputData);
316
317
outputBuffer.push(JSON.stringify(processedData) + '\n');
318
} catch (error) {
319
console.error('Processing error:', error.message);
320
}
321
});
322
});
323
324
// Periodically flush output buffer
325
setInterval(async () => {
326
if (outputBuffer.length > 0) {
327
const batch = outputBuffer.splice(0);
328
const batchStream = Readable.from(batch);
329
330
try {
331
await this.host.sendTopic(this.outputTopic, batchStream);
332
} catch (error) {
333
console.error('Failed to send processed data:', error.message);
334
}
335
}
336
}, 1000);
337
}
338
}
339
340
// Usage
341
const pipeline = new RealTimeDataPipeline(host, "raw-events", "processed-events");
342
await pipeline.setup();
343
344
await pipeline.startProcessing((event) => ({
345
...event,
346
processed: true,
347
timestamp: Date.now()
348
}));
349
```
350
351
### Multi-Hub Topic Broadcasting
352
353
```typescript
354
class MultiHubBroadcaster {
355
constructor(private manager: ManagerClient) {}
356
357
async broadcastToAllHubs(topicId: string, data: any[]) {
358
const hubs = await this.manager.getHosts();
359
const connectedHubs = hubs.filter(h => h.status === "connected");
360
361
const results = await Promise.allSettled(
362
connectedHubs.map(async (hub) => {
363
const hostClient = this.manager.getHostClient(hub.id);
364
365
// Ensure topic exists on each hub
366
try {
367
await hostClient.createTopic(topicId, "application/json");
368
} catch (error) {
369
// Topic might already exist, ignore error
370
}
371
372
// Send data
373
const dataStream = Readable.from(
374
data.map(item => JSON.stringify(item) + '\n')
375
);
376
377
return hostClient.sendTopic(topicId, dataStream);
378
})
379
);
380
381
// Report results
382
results.forEach((result, index) => {
383
const hubId = connectedHubs[index].id;
384
if (result.status === 'fulfilled') {
385
console.log(`✓ Broadcasted to hub ${hubId}`);
386
} else {
387
console.error(`✗ Failed to broadcast to hub ${hubId}:`, result.reason.message);
388
}
389
});
390
}
391
}
392
```
393
394
## Error Handling
395
396
```typescript
397
// Handle topic subscription errors with retry logic
398
async function subscribeWithRetry(
399
host: HostClient,
400
topicId: string,
401
onData: (data: any) => void,
402
maxRetries = 3
403
) {
404
let retries = 0;
405
406
const subscribe = async () => {
407
try {
408
const stream = await host.getTopic(topicId);
409
410
stream.on('data', (chunk) => {
411
try {
412
const lines = chunk.toString().split('\n').filter(Boolean);
413
lines.forEach(line => {
414
const data = JSON.parse(line);
415
onData(data);
416
});
417
} catch (error) {
418
console.error('Data parsing error:', error.message);
419
}
420
});
421
422
stream.on('error', (error) => {
423
console.error('Topic stream error:', error.message);
424
if (retries < maxRetries) {
425
retries++;
426
console.log(`Retrying subscription (${retries}/${maxRetries})...`);
427
setTimeout(subscribe, 1000 * retries);
428
} else {
429
console.error('Max retries reached, giving up');
430
}
431
});
432
433
stream.on('end', () => {
434
console.log('Topic stream ended, attempting to reconnect...');
435
if (retries < maxRetries) {
436
retries++;
437
setTimeout(subscribe, 1000);
438
}
439
});
440
441
// Reset retry counter on successful connection
442
retries = 0;
443
444
} catch (error) {
445
console.error('Failed to subscribe to topic:', error.message);
446
if (retries < maxRetries) {
447
retries++;
448
setTimeout(subscribe, 1000 * retries);
449
}
450
}
451
};
452
453
await subscribe();
454
}
455
```