CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-scramjet--api-client

API Client for use with Scramjet Transform Hub providing typed interfaces for managing sequences, instances, and Hub operations

Pending
Overview
Eval results
Files

topics.mddocs/

Topics and Service Discovery

Topics provide a service discovery mechanism enabling data exchange between sequences, external systems, and clients through named data streams.

Capabilities

Topic Management

Create, list, and delete topics for data exchange.

/**
 * Creates a new topic for data exchange (via HostClient)
 * @param id - Topic identifier/name
 * @param contentType - MIME type for topic data (e.g., "application/json", "text/plain")
 * @returns Promise resolving to topic creation result
 */
createTopic(id: string, contentType: string): Promise<{ topicName: string }>;

/**
 * Lists all available topics (via HostClient)
 * @returns Promise resolving to array of topic information
 */
getTopics(): Promise<STHRestAPI.GetTopicsResponse>;

/**
 * Deletes a topic (via HostClient)
 * @param id - Topic identifier to delete
 * @returns Promise resolving to deletion confirmation
 */
deleteTopic(id: string): Promise<{ message: string }>;

Usage Examples:

import { HostClient } from "@scramjet/api-client";

const host = new HostClient("http://localhost:8000/api/v1");

// Create topics for different data types
await host.createTopic("sensor-data", "application/json");
await host.createTopic("log-stream", "text/plain");
await host.createTopic("binary-data", "application/octet-stream");

// List all topics
const topics = await host.getTopics();
topics.forEach(topic => {
  console.log(`Topic: ${topic.id} (${topic.contentType})`);
});

// Clean up topic when no longer needed
await host.deleteTopic("sensor-data");

Data Publishing

Send data to topics for consumption by subscribers.

/**
 * Sends data to a named topic (via HostClient)
 * @param topic - Topic name to send data to
 * @param stream - Data stream to publish
 * @param requestInit - Optional request configuration
 * @param contentType - Content type override (defaults to "application/x-ndjson")
 * @param end - Whether to signal end of stream to subscribers
 * @returns Promise resolving to publish result
 */
sendTopic<T>(
  topic: string,
  stream: Parameters<HttpClient["sendStream"]>[1],
  requestInit?: RequestInit,
  contentType?: string,
  end?: boolean
): Promise<T>;

/**
 * Convenience alias for sendTopic
 */
readonly sendNamedData: typeof sendTopic;

Usage Examples:

import { Readable } from "stream";

// Send JSON data to a topic
const sensorData = [
  { timestamp: Date.now(), temperature: 23.5, humidity: 60 },
  { timestamp: Date.now() + 1000, temperature: 23.7, humidity: 59 }
];

const jsonStream = Readable.from(
  sensorData.map(data => JSON.stringify(data) + '\n')
);

await host.sendTopic("sensor-readings", jsonStream, {}, "application/json");

// Send text data
const logEntries = Readable.from([
  "INFO: Application started\n",
  "DEBUG: Configuration loaded\n",
  "INFO: Server listening on port 3000\n"
]);

await host.sendTopic("application-logs", logEntries, {}, "text/plain");

// Send binary data
const binaryData = fs.createReadStream("./data.bin");
await host.sendTopic("file-upload", binaryData, {}, "application/octet-stream");

Data Subscription

Subscribe to topics to receive published data.

/**
 * Gets data stream from a named topic (via HostClient)
 * @param topic - Topic name to subscribe to
 * @param requestInit - Optional request configuration
 * @param contentType - Expected content type (defaults to "application/x-ndjson")
 * @returns Promise resolving to readable stream of topic data
 */
getTopic(
  topic: string,
  requestInit?: RequestInit,
  contentType?: string
): ReturnType<HttpClient["getStream"]>;

/**
 * Convenience alias for getTopic
 */
readonly getNamedData: typeof getTopic;

Usage Examples:

// Subscribe to JSON data stream
const sensorStream = await host.getTopic("sensor-readings", {}, "application/json");

sensorStream.on('data', (chunk) => {
  const readings = chunk.toString().split('\n').filter(Boolean);
  readings.forEach(line => {
    const data = JSON.parse(line);
    console.log(`Temperature: ${data.temperature}°C, Humidity: ${data.humidity}%`);
  });
});

// Subscribe to log stream
const logStream = await host.getTopic("application-logs", {}, "text/plain");
logStream.pipe(process.stdout); // Direct pipe to console

// Handle connection errors
sensorStream.on('error', (error) => {
  console.error('Topic subscription error:', error.message);
  // Implement reconnection logic
});

sensorStream.on('end', () => {
  console.log('Topic stream ended');
});

Manager-Level Topics

Access topics across multiple hubs through ManagerClient.

/**
 * Sends data to a named topic across the hub network (via ManagerClient)
 * @param topic - Topic name
 * @param stream - Data stream to send
 * @param requestInit - Optional request configuration
 * @param contentType - Content type
 * @param end - Whether to signal end of stream
 * @returns Promise resolving to send result
 */
sendNamedData<T>(
  topic: string,
  stream: Parameters<HttpClient["sendStream"]>[1],
  requestInit?: RequestInit,
  contentType?: string,
  end?: boolean
): Promise<T>;

/**
 * Gets data stream from a named topic across the hub network (via ManagerClient)
 * @param topic - Topic name
 * @param requestInit - Optional request configuration
 * @returns Promise resolving to aggregated topic data stream
 */
getNamedData(topic: string, requestInit?: RequestInit): Promise<Readable>;

Response Types

interface STHRestAPI {
  GetTopicsResponse: Array<{
    id: string;
    contentType: string;
    created: string;
    subscribers?: number;
    publishers?: number;
    [key: string]: any;
  }>;
}

Common Patterns

Publisher-Subscriber Pattern

class DataPublisher {
  constructor(private host: HostClient, private topicId: string) {}
  
  async initialize(contentType: string) {
    await this.host.createTopic(this.topicId, contentType);
  }
  
  async publish(data: any[]) {
    const dataStream = Readable.from(
      data.map(item => JSON.stringify(item) + '\n')
    );
    
    await this.host.sendTopic(this.topicId, dataStream, {}, "application/json");
  }
  
  async cleanup() {
    await this.host.deleteTopic(this.topicId);
  }
}

class DataSubscriber {
  constructor(private host: HostClient, private topicId: string) {}
  
  async subscribe(onData: (data: any) => void) {
    const stream = await this.host.getTopic(this.topicId, {}, "application/json");
    
    stream.on('data', (chunk) => {
      const lines = chunk.toString().split('\n').filter(Boolean);
      lines.forEach(line => {
        try {
          const data = JSON.parse(line);
          onData(data);
        } catch (error) {
          console.error('Failed to parse topic data:', error.message);
        }
      });
    });
    
    return stream;
  }
}

// Usage
const publisher = new DataPublisher(host, "market-data");
const subscriber = new DataSubscriber(host, "market-data");

await publisher.initialize("application/json");

// Subscribe to data
await subscriber.subscribe((data) => {
  console.log('Received market data:', data);
});

// Publish data
await publisher.publish([
  { symbol: "AAPL", price: 150.25, volume: 1000 },
  { symbol: "GOOGL", price: 2800.50, volume: 500 }
]);

Real-time Data Pipeline

class RealTimeDataPipeline {
  constructor(
    private host: HostClient,
    private inputTopic: string,
    private outputTopic: string
  ) {}
  
  async setup() {
    await this.host.createTopic(this.inputTopic, "application/json");
    await this.host.createTopic(this.outputTopic, "application/json");
  }
  
  async startProcessing(processor: (data: any) => any) {
    // Subscribe to input
    const inputStream = await this.host.getTopic(this.inputTopic);
    
    // Create output stream
    const { Writable } = require('stream');
    const outputBuffer: string[] = [];
    
    const outputStream = new Writable({
      write(chunk, encoding, callback) {
        outputBuffer.push(chunk.toString());
        callback();
      }
    });
    
    // Process data
    inputStream.on('data', (chunk) => {
      const lines = chunk.toString().split('\n').filter(Boolean);
      
      lines.forEach(line => {
        try {
          const inputData = JSON.parse(line);
          const processedData = processor(inputData);
          
          outputBuffer.push(JSON.stringify(processedData) + '\n');
        } catch (error) {
          console.error('Processing error:', error.message);
        }
      });
    });
    
    // Periodically flush output buffer
    setInterval(async () => {
      if (outputBuffer.length > 0) {
        const batch = outputBuffer.splice(0);
        const batchStream = Readable.from(batch);
        
        try {
          await this.host.sendTopic(this.outputTopic, batchStream);
        } catch (error) {
          console.error('Failed to send processed data:', error.message);
        }
      }
    }, 1000);
  }
}

// Usage
const pipeline = new RealTimeDataPipeline(host, "raw-events", "processed-events");
await pipeline.setup();

await pipeline.startProcessing((event) => ({
  ...event,
  processed: true,
  timestamp: Date.now()
}));

Multi-Hub Topic Broadcasting

class MultiHubBroadcaster {
  constructor(private manager: ManagerClient) {}
  
  async broadcastToAllHubs(topicId: string, data: any[]) {
    const hubs = await this.manager.getHosts();
    const connectedHubs = hubs.filter(h => h.status === "connected");
    
    const results = await Promise.allSettled(
      connectedHubs.map(async (hub) => {
        const hostClient = this.manager.getHostClient(hub.id);
        
        // Ensure topic exists on each hub
        try {
          await hostClient.createTopic(topicId, "application/json");
        } catch (error) {
          // Topic might already exist, ignore error
        }
        
        // Send data
        const dataStream = Readable.from(
          data.map(item => JSON.stringify(item) + '\n')
        );
        
        return hostClient.sendTopic(topicId, dataStream);
      })
    );
    
    // Report results
    results.forEach((result, index) => {
      const hubId = connectedHubs[index].id;
      if (result.status === 'fulfilled') {
        console.log(`✓ Broadcasted to hub ${hubId}`);
      } else {
        console.error(`✗ Failed to broadcast to hub ${hubId}:`, result.reason.message);
      }
    });
  }
}

Error Handling

// Handle topic subscription errors with retry logic
async function subscribeWithRetry(
  host: HostClient,
  topicId: string,
  onData: (data: any) => void,
  maxRetries = 3
) {
  let retries = 0;
  
  const subscribe = async () => {
    try {
      const stream = await host.getTopic(topicId);
      
      stream.on('data', (chunk) => {
        try {
          const lines = chunk.toString().split('\n').filter(Boolean);
          lines.forEach(line => {
            const data = JSON.parse(line);
            onData(data);
          });
        } catch (error) {
          console.error('Data parsing error:', error.message);
        }
      });
      
      stream.on('error', (error) => {
        console.error('Topic stream error:', error.message);
        if (retries < maxRetries) {
          retries++;
          console.log(`Retrying subscription (${retries}/${maxRetries})...`);
          setTimeout(subscribe, 1000 * retries);
        } else {
          console.error('Max retries reached, giving up');
        }
      });
      
      stream.on('end', () => {
        console.log('Topic stream ended, attempting to reconnect...');
        if (retries < maxRetries) {
          retries++;
          setTimeout(subscribe, 1000);
        }
      });
      
      // Reset retry counter on successful connection
      retries = 0;
      
    } catch (error) {
      console.error('Failed to subscribe to topic:', error.message);
      if (retries < maxRetries) {
        retries++;
        setTimeout(subscribe, 1000 * retries);
      }
    }
  };
  
  await subscribe();
}

Install with Tessl CLI

npx tessl i tessl/npm-scramjet--api-client

docs

host-client.md

index.md

instance-client.md

manager-client.md

sequence-client.md

topics.md

tile.json