0
# Event System
1
2
Types for event emission, metadata, and deduplication strategies used throughout the Pipedream component lifecycle.
3
4
## Capabilities
5
6
### Event Emission
7
8
Core interface for emitting events from components.
9
10
```typescript { .api }
11
/**
12
* Method for emitting events from components
13
*/
14
interface EmitMethod {
15
/**
16
* Emit an event with optional metadata
17
* @param event - The event data to emit
18
* @param metadata - Optional metadata for the event
19
*/
20
(event: any, metadata?: EventMetadata): void;
21
}
22
```
23
24
**Usage Examples:**
25
26
```typescript
27
import { PipedreamComponent, EmitMethod } from "@pipedream/types";
28
29
const eventEmitterComponent: PipedreamComponent = {
30
name: "Event Emitter Component",
31
version: "1.0.0",
32
props: {
33
timer: {
34
type: "$.interface.timer",
35
default: { intervalSeconds: 60 }
36
}
37
},
38
async run(event) {
39
// Simple event emission
40
this.$emit({ message: "Hello World" });
41
42
// Event with metadata
43
this.$emit(
44
{ data: "processed data", count: 42 },
45
{
46
id: "unique-event-id",
47
summary: "Data processed successfully",
48
ts: Date.now()
49
}
50
);
51
52
// Multiple events
53
const items = await fetchItems();
54
items.forEach((item, index) => {
55
this.$emit(item, {
56
id: item.id,
57
summary: `Item ${index + 1}: ${item.name}`,
58
ts: Date.now()
59
});
60
});
61
}
62
};
63
```
64
65
### Event Metadata
66
67
Metadata interface for providing additional information about emitted events.
68
69
```typescript { .api }
70
/**
71
* Metadata for emitted events
72
*/
73
interface EventMetadata {
74
/** Unique identifier for deduplication (optional) */
75
id?: string | number;
76
/** Human-readable summary for UI display (optional) */
77
summary?: string;
78
/** Event timestamp in milliseconds (optional) */
79
ts?: number;
80
}
81
```
82
83
**Usage Examples:**
84
85
```typescript
86
// Event with ID for deduplication
87
this.$emit(userUpdate, {
88
id: userUpdate.userId,
89
summary: `User ${userUpdate.name} updated`,
90
ts: Date.now()
91
});
92
93
// Event with compound ID
94
this.$emit(orderItem, {
95
id: `${orderItem.orderId}-${orderItem.itemId}`,
96
summary: `Order ${orderItem.orderId} item ${orderItem.itemName}`,
97
ts: orderItem.updatedAt
98
});
99
100
// Event with auto-generated timestamp
101
this.$emit(notification, {
102
id: notification.id,
103
summary: `Notification: ${notification.type}`,
104
ts: Date.now()
105
});
106
```
107
108
### Deduplication Strategies
109
110
Strategies for handling duplicate events based on their IDs.
111
112
```typescript { .api }
113
/**
114
* Event deduplication strategy options
115
*/
116
type DedupeStrategy =
117
| "unique" // Only emit events with unique IDs
118
| "greatest" // Only emit event if ID is greater than previous
119
| "last"; // Only emit the most recent event (overwrites previous)
120
```
121
122
**Usage Examples:**
123
124
```typescript
125
// Unique deduplication - only new IDs are emitted
126
const uniqueComponent: PipedreamComponent = {
127
name: "Unique Events Component",
128
version: "1.0.0",
129
dedupe: "unique",
130
props: { /* props */ },
131
async run(event) {
132
const items = await fetchNewItems();
133
items.forEach(item => {
134
this.$emit(item, {
135
id: item.id, // Only items with new IDs will be emitted
136
summary: `New item: ${item.name}`
137
});
138
});
139
}
140
};
141
142
// Greatest deduplication - only emit if ID is higher
143
const incrementalComponent: PipedreamComponent = {
144
name: "Incremental Events Component",
145
version: "1.0.0",
146
dedupe: "greatest",
147
props: { /* props */ },
148
async run(event) {
149
const records = await fetchRecordsSince(this.db.get("lastId") || 0);
150
records.forEach(record => {
151
this.$emit(record, {
152
id: record.sequenceNumber, // Only higher sequence numbers emitted
153
summary: `Record ${record.sequenceNumber}`
154
});
155
});
156
}
157
};
158
159
// Last deduplication - only keep most recent
160
const latestComponent: PipedreamComponent = {
161
name: "Latest Events Component",
162
version: "1.0.0",
163
dedupe: "last",
164
props: { /* props */ },
165
async run(event) {
166
const status = await fetchCurrentStatus();
167
this.$emit(status, {
168
id: status.entityId, // Only latest status per entity
169
summary: `Status: ${status.value}`,
170
ts: status.timestamp
171
});
172
}
173
};
174
```
175
176
## Event Processing Patterns
177
178
### Batch Event Processing
179
180
Processing and emitting multiple events in a single run:
181
182
```typescript
183
const batchProcessor: PipedreamComponent = {
184
name: "Batch Processor",
185
version: "1.0.0",
186
dedupe: "unique",
187
props: {
188
timer: {
189
type: "$.interface.timer",
190
default: { intervalSeconds: 300 }
191
},
192
db: "$.service.db"
193
},
194
async run(event) {
195
const lastProcessed = this.db.get("lastProcessedId") || 0;
196
const newItems = await fetchItemsSince(lastProcessed);
197
198
console.log(`Processing ${newItems.length} new items`);
199
200
let maxId = lastProcessed;
201
for (const item of newItems) {
202
// Process each item
203
const processed = await processItem(item);
204
205
// Emit processed item
206
this.$emit(processed, {
207
id: item.id,
208
summary: `Processed: ${item.name}`,
209
ts: Date.now()
210
});
211
212
maxId = Math.max(maxId, item.id);
213
}
214
215
// Update last processed ID
216
if (maxId > lastProcessed) {
217
this.db.set("lastProcessedId", maxId);
218
}
219
}
220
};
221
```
222
223
### Event Filtering
224
225
Selectively emitting events based on conditions:
226
227
```typescript
228
const filteringComponent: PipedreamComponent = {
229
name: "Filtering Component",
230
version: "1.0.0",
231
props: {
232
http: {
233
type: "$.interface.http"
234
},
235
minPriority: {
236
type: "integer",
237
label: "Minimum Priority",
238
description: "Only emit events with this priority or higher",
239
default: 1,
240
min: 1,
241
max: 5
242
}
243
},
244
async run(event) {
245
const notifications = event.body.notifications || [];
246
247
notifications.forEach(notification => {
248
// Only emit high-priority notifications
249
if (notification.priority >= this.minPriority) {
250
this.$emit(notification, {
251
id: notification.id,
252
summary: `Priority ${notification.priority}: ${notification.message}`,
253
ts: notification.timestamp
254
});
255
} else {
256
console.log(`Skipping low-priority notification: ${notification.id}`);
257
}
258
});
259
}
260
};
261
```
262
263
### Event Transformation
264
265
Transforming data before emission:
266
267
```typescript
268
const transformingComponent: PipedreamComponent = {
269
name: "Transforming Component",
270
version: "1.0.0",
271
props: {
272
timer: {
273
type: "$.interface.timer",
274
default: { intervalSeconds: 600 }
275
}
276
},
277
async run(event) {
278
const rawData = await fetchRawData();
279
280
const transformedData = rawData.map(item => ({
281
// Transform and normalize data
282
id: item.external_id,
283
name: item.display_name?.trim(),
284
email: item.email_address?.toLowerCase(),
285
status: item.is_active ? 'active' : 'inactive',
286
lastUpdated: new Date(item.updated_timestamp).toISOString(),
287
// Add computed fields
288
displayName: `${item.first_name} ${item.last_name}`.trim(),
289
domain: item.email_address?.split('@')[1]
290
}));
291
292
transformedData.forEach(item => {
293
this.$emit(item, {
294
id: item.id,
295
summary: `User: ${item.displayName} (${item.status})`,
296
ts: Date.now()
297
});
298
});
299
}
300
};
301
```
302
303
### Event Aggregation
304
305
Combining multiple data points into summary events:
306
307
```typescript
308
const aggregatingComponent: PipedreamComponent = {
309
name: "Aggregating Component",
310
version: "1.0.0",
311
dedupe: "last",
312
props: {
313
timer: {
314
type: "$.interface.timer",
315
default: { intervalSeconds: 3600 } // Hourly
316
},
317
db: "$.service.db"
318
},
319
async run(event) {
320
const hourlyStats = await calculateHourlyStats();
321
322
// Emit aggregated statistics
323
this.$emit({
324
timestamp: Date.now(),
325
period: "1hour",
326
metrics: {
327
totalUsers: hourlyStats.userCount,
328
newSignups: hourlyStats.newUsers,
329
activeUsers: hourlyStats.activeUsers,
330
totalRevenue: hourlyStats.revenue,
331
errorRate: hourlyStats.errorRate
332
},
333
trends: {
334
userGrowth: hourlyStats.userGrowthPercent,
335
revenueGrowth: hourlyStats.revenueGrowthPercent
336
}
337
}, {
338
id: `stats-${Math.floor(Date.now() / 3600000)}`, // Hour-based ID
339
summary: `Hourly Stats: ${hourlyStats.activeUsers} active users`,
340
ts: Date.now()
341
});
342
}
343
};
344
```
345
346
## Error Handling in Events
347
348
### Event Emission with Error Handling
349
350
```typescript
351
const robustComponent: PipedreamComponent = {
352
name: "Robust Component",
353
version: "1.0.0",
354
props: { /* props */ },
355
async run(event) {
356
const items = await fetchItems();
357
358
for (const item of items) {
359
try {
360
const processed = await processItem(item);
361
362
this.$emit(processed, {
363
id: item.id,
364
summary: `Successfully processed: ${item.name}`,
365
ts: Date.now()
366
});
367
368
} catch (error) {
369
// Emit error event
370
this.$emit({
371
error: true,
372
originalItem: item,
373
errorMessage: error.message,
374
errorCode: error.code
375
}, {
376
id: `error-${item.id}`,
377
summary: `Error processing ${item.name}: ${error.message}`,
378
ts: Date.now()
379
});
380
381
console.error(`Failed to process item ${item.id}:`, error);
382
}
383
}
384
}
385
};
386
```