or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

component-context.mdcomponent-structure.mdevent-system.mdindex.mdinterface-types.mdprop-types.mdservice-types.md

event-system.mddocs/

0

# Event System

1

2

Types for event emission, metadata, and deduplication strategies used throughout the Pipedream component lifecycle.

3

4

## Capabilities

5

6

### Event Emission

7

8

Core interface for emitting events from components.

9

10

```typescript { .api }

11

/**

12

* Method for emitting events from components

13

*/

14

interface EmitMethod {

15

/**

16

* Emit an event with optional metadata

17

* @param event - The event data to emit

18

* @param metadata - Optional metadata for the event

19

*/

20

(event: any, metadata?: EventMetadata): void;

21

}

22

```

23

24

**Usage Examples:**

25

26

```typescript

27

import { PipedreamComponent, EmitMethod } from "@pipedream/types";

28

29

const eventEmitterComponent: PipedreamComponent = {

30

name: "Event Emitter Component",

31

version: "1.0.0",

32

props: {

33

timer: {

34

type: "$.interface.timer",

35

default: { intervalSeconds: 60 }

36

}

37

},

38

async run(event) {

39

// Simple event emission

40

this.$emit({ message: "Hello World" });

41

42

// Event with metadata

43

this.$emit(

44

{ data: "processed data", count: 42 },

45

{

46

id: "unique-event-id",

47

summary: "Data processed successfully",

48

ts: Date.now()

49

}

50

);

51

52

// Multiple events

53

const items = await fetchItems();

54

items.forEach((item, index) => {

55

this.$emit(item, {

56

id: item.id,

57

summary: `Item ${index + 1}: ${item.name}`,

58

ts: Date.now()

59

});

60

});

61

}

62

};

63

```

64

65

### Event Metadata

66

67

Metadata interface for providing additional information about emitted events.

68

69

```typescript { .api }

70

/**

71

* Metadata for emitted events

72

*/

73

interface EventMetadata {

74

/** Unique identifier for deduplication (optional) */

75

id?: string | number;

76

/** Human-readable summary for UI display (optional) */

77

summary?: string;

78

/** Event timestamp in milliseconds (optional) */

79

ts?: number;

80

}

81

```

82

83

**Usage Examples:**

84

85

```typescript

86

// Event with ID for deduplication

87

this.$emit(userUpdate, {

88

id: userUpdate.userId,

89

summary: `User ${userUpdate.name} updated`,

90

ts: Date.now()

91

});

92

93

// Event with compound ID

94

this.$emit(orderItem, {

95

id: `${orderItem.orderId}-${orderItem.itemId}`,

96

summary: `Order ${orderItem.orderId} item ${orderItem.itemName}`,

97

ts: orderItem.updatedAt

98

});

99

100

// Event with auto-generated timestamp

101

this.$emit(notification, {

102

id: notification.id,

103

summary: `Notification: ${notification.type}`,

104

ts: Date.now()

105

});

106

```

107

108

### Deduplication Strategies

109

110

Strategies for handling duplicate events based on their IDs.

111

112

```typescript { .api }

113

/**

114

* Event deduplication strategy options

115

*/

116

type DedupeStrategy =

117

| "unique" // Only emit events with unique IDs

118

| "greatest" // Only emit event if ID is greater than previous

119

| "last"; // Only emit the most recent event (overwrites previous)

120

```

121

122

**Usage Examples:**

123

124

```typescript

125

// Unique deduplication - only new IDs are emitted

126

const uniqueComponent: PipedreamComponent = {

127

name: "Unique Events Component",

128

version: "1.0.0",

129

dedupe: "unique",

130

props: { /* props */ },

131

async run(event) {

132

const items = await fetchNewItems();

133

items.forEach(item => {

134

this.$emit(item, {

135

id: item.id, // Only items with new IDs will be emitted

136

summary: `New item: ${item.name}`

137

});

138

});

139

}

140

};

141

142

// Greatest deduplication - only emit if ID is higher

143

const incrementalComponent: PipedreamComponent = {

144

name: "Incremental Events Component",

145

version: "1.0.0",

146

dedupe: "greatest",

147

props: { /* props */ },

148

async run(event) {

149

const records = await fetchRecordsSince(this.db.get("lastId") || 0);

150

records.forEach(record => {

151

this.$emit(record, {

152

id: record.sequenceNumber, // Only higher sequence numbers emitted

153

summary: `Record ${record.sequenceNumber}`

154

});

155

});

156

}

157

};

158

159

// Last deduplication - only keep most recent

160

const latestComponent: PipedreamComponent = {

161

name: "Latest Events Component",

162

version: "1.0.0",

163

dedupe: "last",

164

props: { /* props */ },

165

async run(event) {

166

const status = await fetchCurrentStatus();

167

this.$emit(status, {

168

id: status.entityId, // Only latest status per entity

169

summary: `Status: ${status.value}`,

170

ts: status.timestamp

171

});

172

}

173

};

174

```

175

176

## Event Processing Patterns

177

178

### Batch Event Processing

179

180

Processing and emitting multiple events in a single run:

181

182

```typescript

183

const batchProcessor: PipedreamComponent = {

184

name: "Batch Processor",

185

version: "1.0.0",

186

dedupe: "unique",

187

props: {

188

timer: {

189

type: "$.interface.timer",

190

default: { intervalSeconds: 300 }

191

},

192

db: "$.service.db"

193

},

194

async run(event) {

195

const lastProcessed = this.db.get("lastProcessedId") || 0;

196

const newItems = await fetchItemsSince(lastProcessed);

197

198

console.log(`Processing ${newItems.length} new items`);

199

200

let maxId = lastProcessed;

201

for (const item of newItems) {

202

// Process each item

203

const processed = await processItem(item);

204

205

// Emit processed item

206

this.$emit(processed, {

207

id: item.id,

208

summary: `Processed: ${item.name}`,

209

ts: Date.now()

210

});

211

212

maxId = Math.max(maxId, item.id);

213

}

214

215

// Update last processed ID

216

if (maxId > lastProcessed) {

217

this.db.set("lastProcessedId", maxId);

218

}

219

}

220

};

221

```

222

223

### Event Filtering

224

225

Selectively emitting events based on conditions:

226

227

```typescript

228

const filteringComponent: PipedreamComponent = {

229

name: "Filtering Component",

230

version: "1.0.0",

231

props: {

232

http: {

233

type: "$.interface.http"

234

},

235

minPriority: {

236

type: "integer",

237

label: "Minimum Priority",

238

description: "Only emit events with this priority or higher",

239

default: 1,

240

min: 1,

241

max: 5

242

}

243

},

244

async run(event) {

245

const notifications = event.body.notifications || [];

246

247

notifications.forEach(notification => {

248

// Only emit high-priority notifications

249

if (notification.priority >= this.minPriority) {

250

this.$emit(notification, {

251

id: notification.id,

252

summary: `Priority ${notification.priority}: ${notification.message}`,

253

ts: notification.timestamp

254

});

255

} else {

256

console.log(`Skipping low-priority notification: ${notification.id}`);

257

}

258

});

259

}

260

};

261

```

262

263

### Event Transformation

264

265

Transforming data before emission:

266

267

```typescript

268

const transformingComponent: PipedreamComponent = {

269

name: "Transforming Component",

270

version: "1.0.0",

271

props: {

272

timer: {

273

type: "$.interface.timer",

274

default: { intervalSeconds: 600 }

275

}

276

},

277

async run(event) {

278

const rawData = await fetchRawData();

279

280

const transformedData = rawData.map(item => ({

281

// Transform and normalize data

282

id: item.external_id,

283

name: item.display_name?.trim(),

284

email: item.email_address?.toLowerCase(),

285

status: item.is_active ? 'active' : 'inactive',

286

lastUpdated: new Date(item.updated_timestamp).toISOString(),

287

// Add computed fields

288

displayName: `${item.first_name} ${item.last_name}`.trim(),

289

domain: item.email_address?.split('@')[1]

290

}));

291

292

transformedData.forEach(item => {

293

this.$emit(item, {

294

id: item.id,

295

summary: `User: ${item.displayName} (${item.status})`,

296

ts: Date.now()

297

});

298

});

299

}

300

};

301

```

302

303

### Event Aggregation

304

305

Combining multiple data points into summary events:

306

307

```typescript

308

const aggregatingComponent: PipedreamComponent = {

309

name: "Aggregating Component",

310

version: "1.0.0",

311

dedupe: "last",

312

props: {

313

timer: {

314

type: "$.interface.timer",

315

default: { intervalSeconds: 3600 } // Hourly

316

},

317

db: "$.service.db"

318

},

319

async run(event) {

320

const hourlyStats = await calculateHourlyStats();

321

322

// Emit aggregated statistics

323

this.$emit({

324

timestamp: Date.now(),

325

period: "1hour",

326

metrics: {

327

totalUsers: hourlyStats.userCount,

328

newSignups: hourlyStats.newUsers,

329

activeUsers: hourlyStats.activeUsers,

330

totalRevenue: hourlyStats.revenue,

331

errorRate: hourlyStats.errorRate

332

},

333

trends: {

334

userGrowth: hourlyStats.userGrowthPercent,

335

revenueGrowth: hourlyStats.revenueGrowthPercent

336

}

337

}, {

338

id: `stats-${Math.floor(Date.now() / 3600000)}`, // Hour-based ID

339

summary: `Hourly Stats: ${hourlyStats.activeUsers} active users`,

340

ts: Date.now()

341

});

342

}

343

};

344

```

345

346

## Error Handling in Events

347

348

### Event Emission with Error Handling

349

350

```typescript

351

const robustComponent: PipedreamComponent = {

352

name: "Robust Component",

353

version: "1.0.0",

354

props: { /* props */ },

355

async run(event) {

356

const items = await fetchItems();

357

358

for (const item of items) {

359

try {

360

const processed = await processItem(item);

361

362

this.$emit(processed, {

363

id: item.id,

364

summary: `Successfully processed: ${item.name}`,

365

ts: Date.now()

366

});

367

368

} catch (error) {

369

// Emit error event

370

this.$emit({

371

error: true,

372

originalItem: item,

373

errorMessage: error.message,

374

errorCode: error.code

375

}, {

376

id: `error-${item.id}`,

377

summary: `Error processing ${item.name}: ${error.message}`,

378

ts: Date.now()

379

});

380

381

console.error(`Failed to process item ${item.id}:`, error);

382

}

383

}

384

}

385

};

386

```