TypeScript types for Pipedream components (sources and actions)
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Types for event emission, metadata, and deduplication strategies used throughout the Pipedream component lifecycle.
Core interface for emitting events from components.
/**
* Method for emitting events from components
*/
interface EmitMethod {
/**
* Emit an event with optional metadata
* @param event - The event data to emit
* @param metadata - Optional metadata for the event
*/
(event: any, metadata?: EventMetadata): void;
}Usage Examples:
import { PipedreamComponent, EmitMethod } from "@pipedream/types";
const eventEmitterComponent: PipedreamComponent = {
name: "Event Emitter Component",
version: "1.0.0",
props: {
timer: {
type: "$.interface.timer",
default: { intervalSeconds: 60 }
}
},
async run(event) {
// Simple event emission
this.$emit({ message: "Hello World" });
// Event with metadata
this.$emit(
{ data: "processed data", count: 42 },
{
id: "unique-event-id",
summary: "Data processed successfully",
ts: Date.now()
}
);
// Multiple events
const items = await fetchItems();
items.forEach((item, index) => {
this.$emit(item, {
id: item.id,
summary: `Item ${index + 1}: ${item.name}`,
ts: Date.now()
});
});
}
};Metadata interface for providing additional information about emitted events.
/**
* Metadata for emitted events
*/
interface EventMetadata {
/** Unique identifier for deduplication (optional) */
id?: string | number;
/** Human-readable summary for UI display (optional) */
summary?: string;
/** Event timestamp in milliseconds (optional) */
ts?: number;
}Usage Examples:
// Event with ID for deduplication
this.$emit(userUpdate, {
id: userUpdate.userId,
summary: `User ${userUpdate.name} updated`,
ts: Date.now()
});
// Event with compound ID
this.$emit(orderItem, {
id: `${orderItem.orderId}-${orderItem.itemId}`,
summary: `Order ${orderItem.orderId} item ${orderItem.itemName}`,
ts: orderItem.updatedAt
});
// Event with auto-generated timestamp
this.$emit(notification, {
id: notification.id,
summary: `Notification: ${notification.type}`,
ts: Date.now()
});Strategies for handling duplicate events based on their IDs.
/**
* Event deduplication strategy options
*/
type DedupeStrategy =
| "unique" // Only emit events with unique IDs
| "greatest" // Only emit event if ID is greater than previous
| "last"; // Only emit the most recent event (overwrites previous)Usage Examples:
// Unique deduplication - only new IDs are emitted
const uniqueComponent: PipedreamComponent = {
name: "Unique Events Component",
version: "1.0.0",
dedupe: "unique",
props: { /* props */ },
async run(event) {
const items = await fetchNewItems();
items.forEach(item => {
this.$emit(item, {
id: item.id, // Only items with new IDs will be emitted
summary: `New item: ${item.name}`
});
});
}
};
// Greatest deduplication - only emit if ID is higher
const incrementalComponent: PipedreamComponent = {
name: "Incremental Events Component",
version: "1.0.0",
dedupe: "greatest",
props: { /* props */ },
async run(event) {
const records = await fetchRecordsSince(this.db.get("lastId") || 0);
records.forEach(record => {
this.$emit(record, {
id: record.sequenceNumber, // Only higher sequence numbers emitted
summary: `Record ${record.sequenceNumber}`
});
});
}
};
// Last deduplication - only keep most recent
const latestComponent: PipedreamComponent = {
name: "Latest Events Component",
version: "1.0.0",
dedupe: "last",
props: { /* props */ },
async run(event) {
const status = await fetchCurrentStatus();
this.$emit(status, {
id: status.entityId, // Only latest status per entity
summary: `Status: ${status.value}`,
ts: status.timestamp
});
}
};Processing and emitting multiple events in a single run:
const batchProcessor: PipedreamComponent = {
name: "Batch Processor",
version: "1.0.0",
dedupe: "unique",
props: {
timer: {
type: "$.interface.timer",
default: { intervalSeconds: 300 }
},
db: "$.service.db"
},
async run(event) {
const lastProcessed = this.db.get("lastProcessedId") || 0;
const newItems = await fetchItemsSince(lastProcessed);
console.log(`Processing ${newItems.length} new items`);
let maxId = lastProcessed;
for (const item of newItems) {
// Process each item
const processed = await processItem(item);
// Emit processed item
this.$emit(processed, {
id: item.id,
summary: `Processed: ${item.name}`,
ts: Date.now()
});
maxId = Math.max(maxId, item.id);
}
// Update last processed ID
if (maxId > lastProcessed) {
this.db.set("lastProcessedId", maxId);
}
}
};Selectively emitting events based on conditions:
const filteringComponent: PipedreamComponent = {
name: "Filtering Component",
version: "1.0.0",
props: {
http: {
type: "$.interface.http"
},
minPriority: {
type: "integer",
label: "Minimum Priority",
description: "Only emit events with this priority or higher",
default: 1,
min: 1,
max: 5
}
},
async run(event) {
const notifications = event.body.notifications || [];
notifications.forEach(notification => {
// Only emit high-priority notifications
if (notification.priority >= this.minPriority) {
this.$emit(notification, {
id: notification.id,
summary: `Priority ${notification.priority}: ${notification.message}`,
ts: notification.timestamp
});
} else {
console.log(`Skipping low-priority notification: ${notification.id}`);
}
});
}
};Transforming data before emission:
const transformingComponent: PipedreamComponent = {
name: "Transforming Component",
version: "1.0.0",
props: {
timer: {
type: "$.interface.timer",
default: { intervalSeconds: 600 }
}
},
async run(event) {
const rawData = await fetchRawData();
const transformedData = rawData.map(item => ({
// Transform and normalize data
id: item.external_id,
name: item.display_name?.trim(),
email: item.email_address?.toLowerCase(),
status: item.is_active ? 'active' : 'inactive',
lastUpdated: new Date(item.updated_timestamp).toISOString(),
// Add computed fields
displayName: `${item.first_name} ${item.last_name}`.trim(),
domain: item.email_address?.split('@')[1]
}));
transformedData.forEach(item => {
this.$emit(item, {
id: item.id,
summary: `User: ${item.displayName} (${item.status})`,
ts: Date.now()
});
});
}
};Combining multiple data points into summary events:
const aggregatingComponent: PipedreamComponent = {
name: "Aggregating Component",
version: "1.0.0",
dedupe: "last",
props: {
timer: {
type: "$.interface.timer",
default: { intervalSeconds: 3600 } // Hourly
},
db: "$.service.db"
},
async run(event) {
const hourlyStats = await calculateHourlyStats();
// Emit aggregated statistics
this.$emit({
timestamp: Date.now(),
period: "1hour",
metrics: {
totalUsers: hourlyStats.userCount,
newSignups: hourlyStats.newUsers,
activeUsers: hourlyStats.activeUsers,
totalRevenue: hourlyStats.revenue,
errorRate: hourlyStats.errorRate
},
trends: {
userGrowth: hourlyStats.userGrowthPercent,
revenueGrowth: hourlyStats.revenueGrowthPercent
}
}, {
id: `stats-${Math.floor(Date.now() / 3600000)}`, // Hour-based ID
summary: `Hourly Stats: ${hourlyStats.activeUsers} active users`,
ts: Date.now()
});
}
};const robustComponent: PipedreamComponent = {
name: "Robust Component",
version: "1.0.0",
props: { /* props */ },
async run(event) {
const items = await fetchItems();
for (const item of items) {
try {
const processed = await processItem(item);
this.$emit(processed, {
id: item.id,
summary: `Successfully processed: ${item.name}`,
ts: Date.now()
});
} catch (error) {
// Emit error event
this.$emit({
error: true,
originalItem: item,
errorMessage: error.message,
errorCode: error.code
}, {
id: `error-${item.id}`,
summary: `Error processing ${item.name}: ${error.message}`,
ts: Date.now()
});
console.error(`Failed to process item ${item.id}:`, error);
}
}
}
};Install with Tessl CLI
npx tessl i tessl/npm-pipedream--types