CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-pipedream--types

TypeScript types for Pipedream components (sources and actions)

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

service-types.mddocs/

Service Types

Type definitions for Pipedream's built-in services including database storage and HTTP response handling.

Capabilities

Database Service

Persistent key-value storage service for maintaining state between component executions.

/**
 * Database service for persistent key-value storage
 */
interface DatabaseService {
  /** Service type identifier */
  type: "$.service.db";
  
  /**
   * Retrieve a value by key
   * @param key - The key to retrieve
   * @returns The stored value or undefined if not found
   */
  get: (key: string) => any;
  
  /**
   * Store a value by key  
   * @param key - The key to store under
   * @param value - The value to store (must be JSON serializable)
   */
  set: (key: string, value: any) => void;
}

Usage Examples:

import { PipedreamComponent, DatabaseService } from "@pipedream/types";

// Basic state management
const stateComponent: PipedreamComponent = {
  name: "State Management Component",
  version: "1.0.0",
  props: {
    timer: {
      type: "$.interface.timer",
      default: { intervalSeconds: 300 }
    },
    db: "$.service.db"
  },
  async run(event) {
    // Get current counter value
    const counter = this.db.get("counter") || 0;
    
    // Increment counter
    const newCounter = counter + 1;
    this.db.set("counter", newCounter);
    
    // Store execution metadata
    this.db.set("lastExecution", {
      timestamp: Date.now(),
      counter: newCounter,
      eventType: event.type || "timer"
    });
    
    console.log(`Execution #${newCounter}`);
    
    this.$emit({
      executionNumber: newCounter,
      timestamp: Date.now()
    });
  }
};

// Complex state with nested data
const complexStateComponent: PipedreamComponent = {
  name: "Complex State Component", 
  version: "1.0.0",
  props: {
    timer: {
      type: "$.interface.timer",
      default: { intervalSeconds: 600 }
    },
    db: "$.service.db"
  },
  async run(event) {
    // Get complex state object
    const appState = this.db.get("appState") || {
      users: {},
      metrics: {
        totalProcessed: 0,
        errorCount: 0,
        lastSuccess: null
      },
      config: {
        batchSize: 100,
        retryAttempts: 3
      }
    };
    
    try {
      // Process new users
      const newUsers = await fetchNewUsers();
      
      newUsers.forEach(user => {
        appState.users[user.id] = {
          ...user,
          processedAt: Date.now(),
          status: "processed"
        };
        appState.metrics.totalProcessed++;
      });
      
      appState.metrics.lastSuccess = Date.now();
      
      // Save updated state
      this.db.set("appState", appState);
      
      // Emit summary
      this.$emit({
        processedCount: newUsers.length,
        totalUsers: Object.keys(appState.users).length,
        totalProcessed: appState.metrics.totalProcessed
      });
      
    } catch (error) {
      appState.metrics.errorCount++;
      this.db.set("appState", appState);
      throw error;
    }
  }
};

Database Patterns

Pagination State Management

const paginationComponent: PipedreamComponent = {
  name: "Pagination Component",
  version: "1.0.0",
  props: {
    timer: {
      type: "$.interface.timer",
      default: { intervalSeconds: 900 }
    },
    db: "$.service.db"
  },
  async run(event) {
    let pageToken = this.db.get("nextPageToken");
    let processedCount = 0;
    
    do {
      const response = await fetchPage(pageToken);
      
      // Process items from this page
      response.items.forEach(item => {
        this.$emit(item, {
          id: item.id,
          summary: `Item: ${item.name}`
        });
        processedCount++;
      });
      
      pageToken = response.nextPageToken;
      
      // Update pagination state
      this.db.set("nextPageToken", pageToken);
      this.db.set("lastPageProcessed", {
        timestamp: Date.now(),
        itemCount: response.items.length,
        hasMore: !!pageToken
      });
      
    } while (pageToken);
    
    console.log(`Processed ${processedCount} items total`);
  }
};

Deduplication Tracking

const dedupeComponent: PipedreamComponent = {
  name: "Deduplication Component",
  version: "1.0.0",
  props: {
    http: {
      type: "$.interface.http"
    },
    db: "$.service.db"
  },
  async run(event) {
    const seenIds = this.db.get("seenIds") || new Set();
    const items = event.body.items || [];
    const newItems = [];
    
    items.forEach(item => {
      if (!seenIds.has(item.id)) {
        seenIds.add(item.id);
        newItems.push(item);
      }
    });
    
    // Keep only recent IDs (prevent unbounded growth)
    if (seenIds.size > 10000) {
      const idsArray = Array.from(seenIds);
      const recentIds = new Set(idsArray.slice(-5000));
      this.db.set("seenIds", recentIds);
    } else {
      this.db.set("seenIds", seenIds);
    }
    
    // Emit only new items
    newItems.forEach(item => {
      this.$emit(item, {
        id: item.id,
        summary: `New item: ${item.name}`
      });
    });
    
    console.log(`${newItems.length} new items out of ${items.length} total`);
  }
};

Configuration Management

const configurableComponent: PipedreamComponent = {
  name: "Configurable Component",
  version: "1.0.0",
  props: {
    timer: {
      type: "$.interface.timer",
      default: { intervalSeconds: 300 }
    },
    resetConfig: {
      type: "boolean",
      label: "Reset Configuration",
      description: "Reset to default configuration",
      optional: true,
      default: false
    },
    db: "$.service.db"
  },
  async run(event) {
    // Default configuration
    const defaultConfig = {
      batchSize: 50,
      retryAttempts: 3,
      timeoutMs: 30000,
      enableLogging: true,
      filters: {
        minPriority: 1,
        excludeTypes: []
      }
    };
    
    // Get or reset configuration
    let config = this.resetConfig ? null : this.db.get("config");
    if (!config) {
      config = defaultConfig;
      this.db.set("config", config);
      console.log("Using default configuration");
    }
    
    // Use configuration in processing
    const items = await fetchItems({
      limit: config.batchSize,
      timeout: config.timeoutMs
    });
    
    const filteredItems = items.filter(item => 
      item.priority >= config.filters.minPriority &&
      !config.filters.excludeTypes.includes(item.type)
    );
    
    if (config.enableLogging) {
      console.log(`Processed ${filteredItems.length}/${items.length} items`);
    }
    
    filteredItems.forEach(item => {
      this.$emit(item, {
        id: item.id,
        summary: `${item.type}: ${item.name}`
      });
    });
  }
};

HTTP Response Service

Service for sending HTTP responses in HTTP interface components.

/**
 * Method for sending HTTP responses
 */
interface HttpRespondMethod {
  /**
   * Send an HTTP response
   * @param response - Response configuration
   */
  (response: {
    /** HTTP status code */
    status: number;
    /** Response headers (optional) */
    headers?: Record<string, string>;
    /** Response body (optional) */
    body?: string | object | Buffer;
  }): void;
}

Usage Examples:

import { PipedreamComponent, HttpRespondMethod } from "@pipedream/types";

// Basic HTTP responses
const httpResponseComponent: PipedreamComponent = {
  name: "HTTP Response Component",
  version: "1.0.0",
  props: {
    http: {
      type: "$.interface.http",
      customResponse: true
    }
  },
  async run(event) {
    try {
      // Process the request
      const result = await processData(event.body);
      
      // Send JSON response
      this.http.respond({
        status: 200,
        headers: {
          "Content-Type": "application/json",
          "X-Processing-Time": `${Date.now() - event.timestamp}ms`
        },
        body: {
          success: true,
          data: result,
          timestamp: new Date().toISOString()
        }
      });
      
    } catch (error) {
      // Send error response  
      this.http.respond({
        status: 500,
        headers: {
          "Content-Type": "application/json"
        },
        body: {
          success: false,
          error: error.message,
          code: error.code || "PROCESSING_ERROR"
        }
      });
    }
  }
};

// Different response types
const diverseResponseComponent: PipedreamComponent = {
  name: "Diverse Response Component",
  version: "1.0.0",
  props: {
    http: {
      type: "$.interface.http", 
      customResponse: true
    }
  },
  async run(event) {
    const { path, method, query } = event;
    
    // Route different endpoints
    if (path === "/health") {
      this.http.respond({
        status: 200,
        headers: { "Content-Type": "text/plain" },
        body: "OK"
      });
      
    } else if (path === "/api/data" && method === "GET") {
      const data = await fetchData(query);
      this.http.respond({
        status: 200,
        headers: { 
          "Content-Type": "application/json",
          "Cache-Control": "max-age=300"
        },
        body: data
      });
      
    } else if (path === "/webhook" && method === "POST") {
      // Process webhook
      await processWebhook(event.body);
      this.http.respond({
        status: 202,
        headers: { "Content-Type": "application/json" },
        body: { received: true, id: generateId() }
      });
      
    } else if (path === "/download") {
      // Binary response
      const fileData = await generateFile();
      this.http.respond({
        status: 200,
        headers: {
          "Content-Type": "application/octet-stream",
          "Content-Disposition": "attachment; filename=data.csv"
        },
        body: fileData
      });
      
    } else {
      // Not found
      this.http.respond({
        status: 404,
        headers: { "Content-Type": "application/json" },
        body: { error: "Endpoint not found" }
      });
    }
  }
};

Service Combinations

Components often use multiple services together:

const multiServiceComponent: PipedreamComponent = {
  name: "Multi-Service Component",
  version: "1.0.0",
  props: {
    http: {
      type: "$.interface.http",
      customResponse: true
    },
    db: "$.service.db"
  },
  async run(event) {
    // Use database to track request counts
    const requestCount = this.db.get("requestCount") || 0;
    this.db.set("requestCount", requestCount + 1);
    
    // Rate limiting using database
    const windowStart = Math.floor(Date.now() / 60000) * 60000; // 1-minute window
    const windowKey = `requests:${windowStart}`;
    const windowCount = this.db.get(windowKey) || 0;
    
    if (windowCount >= 100) {
      this.http.respond({
        status: 429,
        headers: {
          "Content-Type": "application/json",
          "Retry-After": "60"
        },
        body: { error: "Rate limit exceeded" }
      });
      return;
    }
    
    // Update window count
    this.db.set(windowKey, windowCount + 1);
    
    // Process request
    const result = await processRequest(event.body);
    
    // Send response
    this.http.respond({
      status: 200,
      headers: { 
        "Content-Type": "application/json",
        "X-Request-Count": `${requestCount + 1}`,
        "X-Window-Count": `${windowCount + 1}`
      },
      body: { 
        success: true, 
        data: result,
        stats: {
          totalRequests: requestCount + 1,
          windowRequests: windowCount + 1
        }
      }
    });
    
    // Emit for analytics
    this.$emit({
      type: "api_request",
      path: event.path,
      method: event.method,
      responseStatus: 200,
      requestCount: requestCount + 1
    });
  }
};

Install with Tessl CLI

npx tessl i tessl/npm-pipedream--types

docs

component-context.md

component-structure.md

event-system.md

index.md

interface-types.md

prop-types.md

service-types.md

tile.json