or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

host-client.mdindex.mdinstance-client.mdmanager-client.mdsequence-client.mdtopics.md

topics.mddocs/

0

# Topics and Service Discovery

1

2

Topics provide a service discovery mechanism enabling data exchange between sequences, external systems, and clients through named data streams.

3

4

## Capabilities

5

6

### Topic Management

7

8

Create, list, and delete topics for data exchange.

9

10

```typescript { .api }

11

/**

12

* Creates a new topic for data exchange (via HostClient)

13

* @param id - Topic identifier/name

14

* @param contentType - MIME type for topic data (e.g., "application/json", "text/plain")

15

* @returns Promise resolving to topic creation result

16

*/

17

createTopic(id: string, contentType: string): Promise<{ topicName: string }>;

18

19

/**

20

* Lists all available topics (via HostClient)

21

* @returns Promise resolving to array of topic information

22

*/

23

getTopics(): Promise<STHRestAPI.GetTopicsResponse>;

24

25

/**

26

* Deletes a topic (via HostClient)

27

* @param id - Topic identifier to delete

28

* @returns Promise resolving to deletion confirmation

29

*/

30

deleteTopic(id: string): Promise<{ message: string }>;

31

```

32

33

**Usage Examples:**

34

35

```typescript

36

import { HostClient } from "@scramjet/api-client";

37

38

const host = new HostClient("http://localhost:8000/api/v1");

39

40

// Create topics for different data types

41

await host.createTopic("sensor-data", "application/json");

42

await host.createTopic("log-stream", "text/plain");

43

await host.createTopic("binary-data", "application/octet-stream");

44

45

// List all topics

46

const topics = await host.getTopics();

47

topics.forEach(topic => {

48

console.log(`Topic: ${topic.id} (${topic.contentType})`);

49

});

50

51

// Clean up topic when no longer needed

52

await host.deleteTopic("sensor-data");

53

```

54

55

### Data Publishing

56

57

Send data to topics for consumption by subscribers.

58

59

```typescript { .api }

60

/**

61

* Sends data to a named topic (via HostClient)

62

* @param topic - Topic name to send data to

63

* @param stream - Data stream to publish

64

* @param requestInit - Optional request configuration

65

* @param contentType - Content type override (defaults to "application/x-ndjson")

66

* @param end - Whether to signal end of stream to subscribers

67

* @returns Promise resolving to publish result

68

*/

69

sendTopic<T>(

70

topic: string,

71

stream: Parameters<HttpClient["sendStream"]>[1],

72

requestInit?: RequestInit,

73

contentType?: string,

74

end?: boolean

75

): Promise<T>;

76

77

/**

78

* Convenience alias for sendTopic

79

*/

80

readonly sendNamedData: typeof sendTopic;

81

```

82

83

**Usage Examples:**

84

85

```typescript

86

import { Readable } from "stream";

87

88

// Send JSON data to a topic

89

const sensorData = [

90

{ timestamp: Date.now(), temperature: 23.5, humidity: 60 },

91

{ timestamp: Date.now() + 1000, temperature: 23.7, humidity: 59 }

92

];

93

94

const jsonStream = Readable.from(

95

sensorData.map(data => JSON.stringify(data) + '\n')

96

);

97

98

await host.sendTopic("sensor-readings", jsonStream, {}, "application/json");

99

100

// Send text data

101

const logEntries = Readable.from([

102

"INFO: Application started\n",

103

"DEBUG: Configuration loaded\n",

104

"INFO: Server listening on port 3000\n"

105

]);

106

107

await host.sendTopic("application-logs", logEntries, {}, "text/plain");

108

109

// Send binary data

110

const binaryData = fs.createReadStream("./data.bin");

111

await host.sendTopic("file-upload", binaryData, {}, "application/octet-stream");

112

```

113

114

### Data Subscription

115

116

Subscribe to topics to receive published data.

117

118

```typescript { .api }

119

/**

120

* Gets data stream from a named topic (via HostClient)

121

* @param topic - Topic name to subscribe to

122

* @param requestInit - Optional request configuration

123

* @param contentType - Expected content type (defaults to "application/x-ndjson")

124

* @returns Promise resolving to readable stream of topic data

125

*/

126

getTopic(

127

topic: string,

128

requestInit?: RequestInit,

129

contentType?: string

130

): ReturnType<HttpClient["getStream"]>;

131

132

/**

133

* Convenience alias for getTopic

134

*/

135

readonly getNamedData: typeof getTopic;

136

```

137

138

**Usage Examples:**

139

140

```typescript

141

// Subscribe to JSON data stream

142

const sensorStream = await host.getTopic("sensor-readings", {}, "application/json");

143

144

sensorStream.on('data', (chunk) => {

145

const readings = chunk.toString().split('\n').filter(Boolean);

146

readings.forEach(line => {

147

const data = JSON.parse(line);

148

console.log(`Temperature: ${data.temperature}°C, Humidity: ${data.humidity}%`);

149

});

150

});

151

152

// Subscribe to log stream

153

const logStream = await host.getTopic("application-logs", {}, "text/plain");

154

logStream.pipe(process.stdout); // Direct pipe to console

155

156

// Handle connection errors

157

sensorStream.on('error', (error) => {

158

console.error('Topic subscription error:', error.message);

159

// Implement reconnection logic

160

});

161

162

sensorStream.on('end', () => {

163

console.log('Topic stream ended');

164

});

165

```

166

167

### Manager-Level Topics

168

169

Access topics across multiple hubs through ManagerClient.

170

171

```typescript { .api }

172

/**

173

* Sends data to a named topic across the hub network (via ManagerClient)

174

* @param topic - Topic name

175

* @param stream - Data stream to send

176

* @param requestInit - Optional request configuration

177

* @param contentType - Content type

178

* @param end - Whether to signal end of stream

179

* @returns Promise resolving to send result

180

*/

181

sendNamedData<T>(

182

topic: string,

183

stream: Parameters<HttpClient["sendStream"]>[1],

184

requestInit?: RequestInit,

185

contentType?: string,

186

end?: boolean

187

): Promise<T>;

188

189

/**

190

* Gets data stream from a named topic across the hub network (via ManagerClient)

191

* @param topic - Topic name

192

* @param requestInit - Optional request configuration

193

* @returns Promise resolving to aggregated topic data stream

194

*/

195

getNamedData(topic: string, requestInit?: RequestInit): Promise<Readable>;

196

```

197

198

## Response Types

199

200

```typescript { .api }

201

interface STHRestAPI {

202

GetTopicsResponse: Array<{

203

id: string;

204

contentType: string;

205

created: string;

206

subscribers?: number;

207

publishers?: number;

208

[key: string]: any;

209

}>;

210

}

211

```

212

213

## Common Patterns

214

215

### Publisher-Subscriber Pattern

216

217

```typescript

218

class DataPublisher {

219

constructor(private host: HostClient, private topicId: string) {}

220

221

async initialize(contentType: string) {

222

await this.host.createTopic(this.topicId, contentType);

223

}

224

225

async publish(data: any[]) {

226

const dataStream = Readable.from(

227

data.map(item => JSON.stringify(item) + '\n')

228

);

229

230

await this.host.sendTopic(this.topicId, dataStream, {}, "application/json");

231

}

232

233

async cleanup() {

234

await this.host.deleteTopic(this.topicId);

235

}

236

}

237

238

class DataSubscriber {

239

constructor(private host: HostClient, private topicId: string) {}

240

241

async subscribe(onData: (data: any) => void) {

242

const stream = await this.host.getTopic(this.topicId, {}, "application/json");

243

244

stream.on('data', (chunk) => {

245

const lines = chunk.toString().split('\n').filter(Boolean);

246

lines.forEach(line => {

247

try {

248

const data = JSON.parse(line);

249

onData(data);

250

} catch (error) {

251

console.error('Failed to parse topic data:', error.message);

252

}

253

});

254

});

255

256

return stream;

257

}

258

}

259

260

// Usage

261

const publisher = new DataPublisher(host, "market-data");

262

const subscriber = new DataSubscriber(host, "market-data");

263

264

await publisher.initialize("application/json");

265

266

// Subscribe to data

267

await subscriber.subscribe((data) => {

268

console.log('Received market data:', data);

269

});

270

271

// Publish data

272

await publisher.publish([

273

{ symbol: "AAPL", price: 150.25, volume: 1000 },

274

{ symbol: "GOOGL", price: 2800.50, volume: 500 }

275

]);

276

```

277

278

### Real-time Data Pipeline

279

280

```typescript

281

class RealTimeDataPipeline {

282

constructor(

283

private host: HostClient,

284

private inputTopic: string,

285

private outputTopic: string

286

) {}

287

288

async setup() {

289

await this.host.createTopic(this.inputTopic, "application/json");

290

await this.host.createTopic(this.outputTopic, "application/json");

291

}

292

293

async startProcessing(processor: (data: any) => any) {

294

// Subscribe to input

295

const inputStream = await this.host.getTopic(this.inputTopic);

296

297

// Create output stream

298

const { Writable } = require('stream');

299

const outputBuffer: string[] = [];

300

301

const outputStream = new Writable({

302

write(chunk, encoding, callback) {

303

outputBuffer.push(chunk.toString());

304

callback();

305

}

306

});

307

308

// Process data

309

inputStream.on('data', (chunk) => {

310

const lines = chunk.toString().split('\n').filter(Boolean);

311

312

lines.forEach(line => {

313

try {

314

const inputData = JSON.parse(line);

315

const processedData = processor(inputData);

316

317

outputBuffer.push(JSON.stringify(processedData) + '\n');

318

} catch (error) {

319

console.error('Processing error:', error.message);

320

}

321

});

322

});

323

324

// Periodically flush output buffer

325

setInterval(async () => {

326

if (outputBuffer.length > 0) {

327

const batch = outputBuffer.splice(0);

328

const batchStream = Readable.from(batch);

329

330

try {

331

await this.host.sendTopic(this.outputTopic, batchStream);

332

} catch (error) {

333

console.error('Failed to send processed data:', error.message);

334

}

335

}

336

}, 1000);

337

}

338

}

339

340

// Usage

341

const pipeline = new RealTimeDataPipeline(host, "raw-events", "processed-events");

342

await pipeline.setup();

343

344

await pipeline.startProcessing((event) => ({

345

...event,

346

processed: true,

347

timestamp: Date.now()

348

}));

349

```

350

351

### Multi-Hub Topic Broadcasting

352

353

```typescript

354

class MultiHubBroadcaster {

355

constructor(private manager: ManagerClient) {}

356

357

async broadcastToAllHubs(topicId: string, data: any[]) {

358

const hubs = await this.manager.getHosts();

359

const connectedHubs = hubs.filter(h => h.status === "connected");

360

361

const results = await Promise.allSettled(

362

connectedHubs.map(async (hub) => {

363

const hostClient = this.manager.getHostClient(hub.id);

364

365

// Ensure topic exists on each hub

366

try {

367

await hostClient.createTopic(topicId, "application/json");

368

} catch (error) {

369

// Topic might already exist, ignore error

370

}

371

372

// Send data

373

const dataStream = Readable.from(

374

data.map(item => JSON.stringify(item) + '\n')

375

);

376

377

return hostClient.sendTopic(topicId, dataStream);

378

})

379

);

380

381

// Report results

382

results.forEach((result, index) => {

383

const hubId = connectedHubs[index].id;

384

if (result.status === 'fulfilled') {

385

console.log(`✓ Broadcasted to hub ${hubId}`);

386

} else {

387

console.error(`✗ Failed to broadcast to hub ${hubId}:`, result.reason.message);

388

}

389

});

390

}

391

}

392

```

393

394

## Error Handling

395

396

```typescript

397

// Handle topic subscription errors with retry logic

398

async function subscribeWithRetry(

399

host: HostClient,

400

topicId: string,

401

onData: (data: any) => void,

402

maxRetries = 3

403

) {

404

let retries = 0;

405

406

const subscribe = async () => {

407

try {

408

const stream = await host.getTopic(topicId);

409

410

stream.on('data', (chunk) => {

411

try {

412

const lines = chunk.toString().split('\n').filter(Boolean);

413

lines.forEach(line => {

414

const data = JSON.parse(line);

415

onData(data);

416

});

417

} catch (error) {

418

console.error('Data parsing error:', error.message);

419

}

420

});

421

422

stream.on('error', (error) => {

423

console.error('Topic stream error:', error.message);

424

if (retries < maxRetries) {

425

retries++;

426

console.log(`Retrying subscription (${retries}/${maxRetries})...`);

427

setTimeout(subscribe, 1000 * retries);

428

} else {

429

console.error('Max retries reached, giving up');

430

}

431

});

432

433

stream.on('end', () => {

434

console.log('Topic stream ended, attempting to reconnect...');

435

if (retries < maxRetries) {

436

retries++;

437

setTimeout(subscribe, 1000);

438

}

439

});

440

441

// Reset retry counter on successful connection

442

retries = 0;

443

444

} catch (error) {

445

console.error('Failed to subscribe to topic:', error.message);

446

if (retries < maxRetries) {

447

retries++;

448

setTimeout(subscribe, 1000 * retries);

449

}

450

}

451

};

452

453

await subscribe();

454

}

455

```