CtrlK
BlogDocsLog inGet started
Tessl Logo

tessl/npm-pipedream--types

TypeScript types for Pipedream components (sources and actions)

Pending

Quality

Pending

Does it follow best practices?

Impact

Pending

No eval scenarios have been run

Overview
Eval results
Files

event-system.mddocs/

Event System

Types for event emission, metadata, and deduplication strategies used throughout the Pipedream component lifecycle.

Capabilities

Event Emission

Core interface for emitting events from components.

/**
 * Method for emitting events from components
 */
interface EmitMethod {
  /**
   * Emit an event with optional metadata
   * @param event - The event data to emit
   * @param metadata - Optional metadata for the event
   */
  (event: any, metadata?: EventMetadata): void;
}

Usage Examples:

import { PipedreamComponent, EmitMethod } from "@pipedream/types";

const eventEmitterComponent: PipedreamComponent = {
  name: "Event Emitter Component",
  version: "1.0.0",
  props: {
    timer: {
      type: "$.interface.timer",
      default: { intervalSeconds: 60 }
    }
  },
  async run(event) {
    // Simple event emission
    this.$emit({ message: "Hello World" });
    
    // Event with metadata
    this.$emit(
      { data: "processed data", count: 42 },
      {
        id: "unique-event-id",
        summary: "Data processed successfully",
        ts: Date.now()
      }
    );
    
    // Multiple events
    const items = await fetchItems();
    items.forEach((item, index) => {
      this.$emit(item, {
        id: item.id,
        summary: `Item ${index + 1}: ${item.name}`,
        ts: Date.now()
      });
    });
  }
};

Event Metadata

Metadata interface for providing additional information about emitted events.

/**
 * Metadata for emitted events
 */
interface EventMetadata {
  /** Unique identifier for deduplication (optional) */
  id?: string | number;
  /** Human-readable summary for UI display (optional) */
  summary?: string;
  /** Event timestamp in milliseconds (optional) */
  ts?: number;
}

Usage Examples:

// Event with ID for deduplication
this.$emit(userUpdate, {
  id: userUpdate.userId,
  summary: `User ${userUpdate.name} updated`,
  ts: Date.now()
});

// Event with compound ID
this.$emit(orderItem, {
  id: `${orderItem.orderId}-${orderItem.itemId}`,
  summary: `Order ${orderItem.orderId} item ${orderItem.itemName}`,
  ts: orderItem.updatedAt
});

// Event with auto-generated timestamp
this.$emit(notification, {
  id: notification.id,
  summary: `Notification: ${notification.type}`,
  ts: Date.now()
});

Deduplication Strategies

Strategies for handling duplicate events based on their IDs.

/**
 * Event deduplication strategy options
 */
type DedupeStrategy = 
  | "unique"    // Only emit events with unique IDs
  | "greatest"  // Only emit event if ID is greater than previous
  | "last";     // Only emit the most recent event (overwrites previous)

Usage Examples:

// Unique deduplication - only new IDs are emitted
const uniqueComponent: PipedreamComponent = {
  name: "Unique Events Component",
  version: "1.0.0",
  dedupe: "unique",
  props: { /* props */ },
  async run(event) {
    const items = await fetchNewItems();
    items.forEach(item => {
      this.$emit(item, {
        id: item.id, // Only items with new IDs will be emitted
        summary: `New item: ${item.name}`
      });
    });
  }
};

// Greatest deduplication - only emit if ID is higher
const incrementalComponent: PipedreamComponent = {
  name: "Incremental Events Component", 
  version: "1.0.0",
  dedupe: "greatest",
  props: { /* props */ },
  async run(event) {
    const records = await fetchRecordsSince(this.db.get("lastId") || 0);
    records.forEach(record => {
      this.$emit(record, {
        id: record.sequenceNumber, // Only higher sequence numbers emitted
        summary: `Record ${record.sequenceNumber}`
      });
    });
  }
};

// Last deduplication - only keep most recent
const latestComponent: PipedreamComponent = {
  name: "Latest Events Component",
  version: "1.0.0", 
  dedupe: "last",
  props: { /* props */ },
  async run(event) {
    const status = await fetchCurrentStatus();
    this.$emit(status, {
      id: status.entityId, // Only latest status per entity
      summary: `Status: ${status.value}`,
      ts: status.timestamp
    });
  }
};

Event Processing Patterns

Batch Event Processing

Processing and emitting multiple events in a single run:

const batchProcessor: PipedreamComponent = {
  name: "Batch Processor",
  version: "1.0.0",
  dedupe: "unique",
  props: {
    timer: {
      type: "$.interface.timer",
      default: { intervalSeconds: 300 }
    },
    db: "$.service.db"
  },
  async run(event) {
    const lastProcessed = this.db.get("lastProcessedId") || 0;
    const newItems = await fetchItemsSince(lastProcessed);
    
    console.log(`Processing ${newItems.length} new items`);
    
    let maxId = lastProcessed;
    for (const item of newItems) {
      // Process each item
      const processed = await processItem(item);
      
      // Emit processed item
      this.$emit(processed, {
        id: item.id,
        summary: `Processed: ${item.name}`,
        ts: Date.now()
      });
      
      maxId = Math.max(maxId, item.id);
    }
    
    // Update last processed ID
    if (maxId > lastProcessed) {
      this.db.set("lastProcessedId", maxId);
    }
  }
};

Event Filtering

Selectively emitting events based on conditions:

const filteringComponent: PipedreamComponent = {
  name: "Filtering Component",
  version: "1.0.0",
  props: {
    http: {
      type: "$.interface.http"
    },
    minPriority: {
      type: "integer",
      label: "Minimum Priority",
      description: "Only emit events with this priority or higher",
      default: 1,
      min: 1,
      max: 5
    }
  },
  async run(event) {
    const notifications = event.body.notifications || [];
    
    notifications.forEach(notification => {
      // Only emit high-priority notifications
      if (notification.priority >= this.minPriority) {
        this.$emit(notification, {
          id: notification.id,
          summary: `Priority ${notification.priority}: ${notification.message}`,
          ts: notification.timestamp
        });
      } else {
        console.log(`Skipping low-priority notification: ${notification.id}`);
      }
    });
  }
};

Event Transformation

Transforming data before emission:

const transformingComponent: PipedreamComponent = {
  name: "Transforming Component",
  version: "1.0.0",
  props: {
    timer: {
      type: "$.interface.timer",
      default: { intervalSeconds: 600 }
    }
  },
  async run(event) {
    const rawData = await fetchRawData();
    
    const transformedData = rawData.map(item => ({
      // Transform and normalize data
      id: item.external_id,
      name: item.display_name?.trim(),
      email: item.email_address?.toLowerCase(),
      status: item.is_active ? 'active' : 'inactive',
      lastUpdated: new Date(item.updated_timestamp).toISOString(),
      // Add computed fields
      displayName: `${item.first_name} ${item.last_name}`.trim(),
      domain: item.email_address?.split('@')[1]
    }));
    
    transformedData.forEach(item => {
      this.$emit(item, {
        id: item.id,
        summary: `User: ${item.displayName} (${item.status})`,
        ts: Date.now()
      });
    });
  }
};

Event Aggregation

Combining multiple data points into summary events:

const aggregatingComponent: PipedreamComponent = {
  name: "Aggregating Component",
  version: "1.0.0",
  dedupe: "last",
  props: {
    timer: {
      type: "$.interface.timer",
      default: { intervalSeconds: 3600 } // Hourly
    },
    db: "$.service.db"
  },
  async run(event) {
    const hourlyStats = await calculateHourlyStats();
    
    // Emit aggregated statistics
    this.$emit({
      timestamp: Date.now(),
      period: "1hour",
      metrics: {
        totalUsers: hourlyStats.userCount,
        newSignups: hourlyStats.newUsers,
        activeUsers: hourlyStats.activeUsers,
        totalRevenue: hourlyStats.revenue,
        errorRate: hourlyStats.errorRate
      },
      trends: {
        userGrowth: hourlyStats.userGrowthPercent,
        revenueGrowth: hourlyStats.revenueGrowthPercent
      }
    }, {
      id: `stats-${Math.floor(Date.now() / 3600000)}`, // Hour-based ID
      summary: `Hourly Stats: ${hourlyStats.activeUsers} active users`,
      ts: Date.now()
    });
  }
};

Error Handling in Events

Event Emission with Error Handling

const robustComponent: PipedreamComponent = {
  name: "Robust Component",
  version: "1.0.0",
  props: { /* props */ },
  async run(event) {
    const items = await fetchItems();
    
    for (const item of items) {
      try {
        const processed = await processItem(item);
        
        this.$emit(processed, {
          id: item.id,
          summary: `Successfully processed: ${item.name}`,
          ts: Date.now()
        });
        
      } catch (error) {
        // Emit error event
        this.$emit({
          error: true,
          originalItem: item,
          errorMessage: error.message,
          errorCode: error.code
        }, {
          id: `error-${item.id}`,
          summary: `Error processing ${item.name}: ${error.message}`,
          ts: Date.now()
        });
        
        console.error(`Failed to process item ${item.id}:`, error);
      }
    }
  }
};

Install with Tessl CLI

npx tessl i tessl/npm-pipedream--types

docs

component-context.md

component-structure.md

event-system.md

index.md

interface-types.md

prop-types.md

service-types.md

tile.json