API Client for use with Scramjet Transform Hub providing typed interfaces for managing sequences, instances, and Hub operations
—
Topics provide a service discovery mechanism enabling data exchange between sequences, external systems, and clients through named data streams.
Create, list, and delete topics for data exchange.
/**
* Creates a new topic for data exchange (via HostClient)
* @param id - Topic identifier/name
* @param contentType - MIME type for topic data (e.g., "application/json", "text/plain")
* @returns Promise resolving to topic creation result
*/
createTopic(id: string, contentType: string): Promise<{ topicName: string }>;
/**
* Lists all available topics (via HostClient)
* @returns Promise resolving to array of topic information
*/
getTopics(): Promise<STHRestAPI.GetTopicsResponse>;
/**
* Deletes a topic (via HostClient)
* @param id - Topic identifier to delete
* @returns Promise resolving to deletion confirmation
*/
deleteTopic(id: string): Promise<{ message: string }>;Usage Examples:
import { HostClient } from "@scramjet/api-client";
const host = new HostClient("http://localhost:8000/api/v1");
// Create topics for different data types
await host.createTopic("sensor-data", "application/json");
await host.createTopic("log-stream", "text/plain");
await host.createTopic("binary-data", "application/octet-stream");
// List all topics
const topics = await host.getTopics();
topics.forEach(topic => {
console.log(`Topic: ${topic.id} (${topic.contentType})`);
});
// Clean up topic when no longer needed
await host.deleteTopic("sensor-data");Send data to topics for consumption by subscribers.
/**
* Sends data to a named topic (via HostClient)
* @param topic - Topic name to send data to
* @param stream - Data stream to publish
* @param requestInit - Optional request configuration
* @param contentType - Content type override (defaults to "application/x-ndjson")
* @param end - Whether to signal end of stream to subscribers
* @returns Promise resolving to publish result
*/
sendTopic<T>(
topic: string,
stream: Parameters<HttpClient["sendStream"]>[1],
requestInit?: RequestInit,
contentType?: string,
end?: boolean
): Promise<T>;
/**
* Convenience alias for sendTopic
*/
readonly sendNamedData: typeof sendTopic;Usage Examples:
import { Readable } from "stream";
// Send JSON data to a topic
const sensorData = [
{ timestamp: Date.now(), temperature: 23.5, humidity: 60 },
{ timestamp: Date.now() + 1000, temperature: 23.7, humidity: 59 }
];
const jsonStream = Readable.from(
sensorData.map(data => JSON.stringify(data) + '\n')
);
await host.sendTopic("sensor-readings", jsonStream, {}, "application/json");
// Send text data
const logEntries = Readable.from([
"INFO: Application started\n",
"DEBUG: Configuration loaded\n",
"INFO: Server listening on port 3000\n"
]);
await host.sendTopic("application-logs", logEntries, {}, "text/plain");
// Send binary data
const binaryData = fs.createReadStream("./data.bin");
await host.sendTopic("file-upload", binaryData, {}, "application/octet-stream");Subscribe to topics to receive published data.
/**
* Gets data stream from a named topic (via HostClient)
* @param topic - Topic name to subscribe to
* @param requestInit - Optional request configuration
* @param contentType - Expected content type (defaults to "application/x-ndjson")
* @returns Promise resolving to readable stream of topic data
*/
getTopic(
topic: string,
requestInit?: RequestInit,
contentType?: string
): ReturnType<HttpClient["getStream"]>;
/**
* Convenience alias for getTopic
*/
readonly getNamedData: typeof getTopic;Usage Examples:
// Subscribe to JSON data stream
const sensorStream = await host.getTopic("sensor-readings", {}, "application/json");
sensorStream.on('data', (chunk) => {
const readings = chunk.toString().split('\n').filter(Boolean);
readings.forEach(line => {
const data = JSON.parse(line);
console.log(`Temperature: ${data.temperature}°C, Humidity: ${data.humidity}%`);
});
});
// Subscribe to log stream
const logStream = await host.getTopic("application-logs", {}, "text/plain");
logStream.pipe(process.stdout); // Direct pipe to console
// Handle connection errors
sensorStream.on('error', (error) => {
console.error('Topic subscription error:', error.message);
// Implement reconnection logic
});
sensorStream.on('end', () => {
console.log('Topic stream ended');
});Access topics across multiple hubs through ManagerClient.
/**
* Sends data to a named topic across the hub network (via ManagerClient)
* @param topic - Topic name
* @param stream - Data stream to send
* @param requestInit - Optional request configuration
* @param contentType - Content type
* @param end - Whether to signal end of stream
* @returns Promise resolving to send result
*/
sendNamedData<T>(
topic: string,
stream: Parameters<HttpClient["sendStream"]>[1],
requestInit?: RequestInit,
contentType?: string,
end?: boolean
): Promise<T>;
/**
* Gets data stream from a named topic across the hub network (via ManagerClient)
* @param topic - Topic name
* @param requestInit - Optional request configuration
* @returns Promise resolving to aggregated topic data stream
*/
getNamedData(topic: string, requestInit?: RequestInit): Promise<Readable>;interface STHRestAPI {
GetTopicsResponse: Array<{
id: string;
contentType: string;
created: string;
subscribers?: number;
publishers?: number;
[key: string]: any;
}>;
}class DataPublisher {
constructor(private host: HostClient, private topicId: string) {}
async initialize(contentType: string) {
await this.host.createTopic(this.topicId, contentType);
}
async publish(data: any[]) {
const dataStream = Readable.from(
data.map(item => JSON.stringify(item) + '\n')
);
await this.host.sendTopic(this.topicId, dataStream, {}, "application/json");
}
async cleanup() {
await this.host.deleteTopic(this.topicId);
}
}
class DataSubscriber {
constructor(private host: HostClient, private topicId: string) {}
async subscribe(onData: (data: any) => void) {
const stream = await this.host.getTopic(this.topicId, {}, "application/json");
stream.on('data', (chunk) => {
const lines = chunk.toString().split('\n').filter(Boolean);
lines.forEach(line => {
try {
const data = JSON.parse(line);
onData(data);
} catch (error) {
console.error('Failed to parse topic data:', error.message);
}
});
});
return stream;
}
}
// Usage
const publisher = new DataPublisher(host, "market-data");
const subscriber = new DataSubscriber(host, "market-data");
await publisher.initialize("application/json");
// Subscribe to data
await subscriber.subscribe((data) => {
console.log('Received market data:', data);
});
// Publish data
await publisher.publish([
{ symbol: "AAPL", price: 150.25, volume: 1000 },
{ symbol: "GOOGL", price: 2800.50, volume: 500 }
]);class RealTimeDataPipeline {
constructor(
private host: HostClient,
private inputTopic: string,
private outputTopic: string
) {}
async setup() {
await this.host.createTopic(this.inputTopic, "application/json");
await this.host.createTopic(this.outputTopic, "application/json");
}
async startProcessing(processor: (data: any) => any) {
// Subscribe to input
const inputStream = await this.host.getTopic(this.inputTopic);
// Create output stream
const { Writable } = require('stream');
const outputBuffer: string[] = [];
const outputStream = new Writable({
write(chunk, encoding, callback) {
outputBuffer.push(chunk.toString());
callback();
}
});
// Process data
inputStream.on('data', (chunk) => {
const lines = chunk.toString().split('\n').filter(Boolean);
lines.forEach(line => {
try {
const inputData = JSON.parse(line);
const processedData = processor(inputData);
outputBuffer.push(JSON.stringify(processedData) + '\n');
} catch (error) {
console.error('Processing error:', error.message);
}
});
});
// Periodically flush output buffer
setInterval(async () => {
if (outputBuffer.length > 0) {
const batch = outputBuffer.splice(0);
const batchStream = Readable.from(batch);
try {
await this.host.sendTopic(this.outputTopic, batchStream);
} catch (error) {
console.error('Failed to send processed data:', error.message);
}
}
}, 1000);
}
}
// Usage
const pipeline = new RealTimeDataPipeline(host, "raw-events", "processed-events");
await pipeline.setup();
await pipeline.startProcessing((event) => ({
...event,
processed: true,
timestamp: Date.now()
}));class MultiHubBroadcaster {
constructor(private manager: ManagerClient) {}
async broadcastToAllHubs(topicId: string, data: any[]) {
const hubs = await this.manager.getHosts();
const connectedHubs = hubs.filter(h => h.status === "connected");
const results = await Promise.allSettled(
connectedHubs.map(async (hub) => {
const hostClient = this.manager.getHostClient(hub.id);
// Ensure topic exists on each hub
try {
await hostClient.createTopic(topicId, "application/json");
} catch (error) {
// Topic might already exist, ignore error
}
// Send data
const dataStream = Readable.from(
data.map(item => JSON.stringify(item) + '\n')
);
return hostClient.sendTopic(topicId, dataStream);
})
);
// Report results
results.forEach((result, index) => {
const hubId = connectedHubs[index].id;
if (result.status === 'fulfilled') {
console.log(`✓ Broadcasted to hub ${hubId}`);
} else {
console.error(`✗ Failed to broadcast to hub ${hubId}:`, result.reason.message);
}
});
}
}// Handle topic subscription errors with retry logic
async function subscribeWithRetry(
host: HostClient,
topicId: string,
onData: (data: any) => void,
maxRetries = 3
) {
let retries = 0;
const subscribe = async () => {
try {
const stream = await host.getTopic(topicId);
stream.on('data', (chunk) => {
try {
const lines = chunk.toString().split('\n').filter(Boolean);
lines.forEach(line => {
const data = JSON.parse(line);
onData(data);
});
} catch (error) {
console.error('Data parsing error:', error.message);
}
});
stream.on('error', (error) => {
console.error('Topic stream error:', error.message);
if (retries < maxRetries) {
retries++;
console.log(`Retrying subscription (${retries}/${maxRetries})...`);
setTimeout(subscribe, 1000 * retries);
} else {
console.error('Max retries reached, giving up');
}
});
stream.on('end', () => {
console.log('Topic stream ended, attempting to reconnect...');
if (retries < maxRetries) {
retries++;
setTimeout(subscribe, 1000);
}
});
// Reset retry counter on successful connection
retries = 0;
} catch (error) {
console.error('Failed to subscribe to topic:', error.message);
if (retries < maxRetries) {
retries++;
setTimeout(subscribe, 1000 * retries);
}
}
};
await subscribe();
}Install with Tessl CLI
npx tessl i tessl/npm-scramjet--api-client