0
# Service Types
1
2
Type definitions for Pipedream's built-in services including database storage and HTTP response handling.
3
4
## Capabilities
5
6
### Database Service
7
8
Persistent key-value storage service for maintaining state between component executions.
9
10
```typescript { .api }
11
/**
12
* Database service for persistent key-value storage
13
*/
14
interface DatabaseService {
15
/** Service type identifier */
16
type: "$.service.db";
17
18
/**
19
* Retrieve a value by key
20
* @param key - The key to retrieve
21
* @returns The stored value or undefined if not found
22
*/
23
get: (key: string) => any;
24
25
/**
26
* Store a value by key
27
* @param key - The key to store under
28
* @param value - The value to store (must be JSON serializable)
29
*/
30
set: (key: string, value: any) => void;
31
}
32
```
33
34
**Usage Examples:**
35
36
```typescript
37
import { PipedreamComponent, DatabaseService } from "@pipedream/types";
38
39
// Basic state management
40
const stateComponent: PipedreamComponent = {
41
name: "State Management Component",
42
version: "1.0.0",
43
props: {
44
timer: {
45
type: "$.interface.timer",
46
default: { intervalSeconds: 300 }
47
},
48
db: "$.service.db"
49
},
50
async run(event) {
51
// Get current counter value
52
const counter = this.db.get("counter") || 0;
53
54
// Increment counter
55
const newCounter = counter + 1;
56
this.db.set("counter", newCounter);
57
58
// Store execution metadata
59
this.db.set("lastExecution", {
60
timestamp: Date.now(),
61
counter: newCounter,
62
eventType: event.type || "timer"
63
});
64
65
console.log(`Execution #${newCounter}`);
66
67
this.$emit({
68
executionNumber: newCounter,
69
timestamp: Date.now()
70
});
71
}
72
};
73
74
// Complex state with nested data
75
const complexStateComponent: PipedreamComponent = {
76
name: "Complex State Component",
77
version: "1.0.0",
78
props: {
79
timer: {
80
type: "$.interface.timer",
81
default: { intervalSeconds: 600 }
82
},
83
db: "$.service.db"
84
},
85
async run(event) {
86
// Get complex state object
87
const appState = this.db.get("appState") || {
88
users: {},
89
metrics: {
90
totalProcessed: 0,
91
errorCount: 0,
92
lastSuccess: null
93
},
94
config: {
95
batchSize: 100,
96
retryAttempts: 3
97
}
98
};
99
100
try {
101
// Process new users
102
const newUsers = await fetchNewUsers();
103
104
newUsers.forEach(user => {
105
appState.users[user.id] = {
106
...user,
107
processedAt: Date.now(),
108
status: "processed"
109
};
110
appState.metrics.totalProcessed++;
111
});
112
113
appState.metrics.lastSuccess = Date.now();
114
115
// Save updated state
116
this.db.set("appState", appState);
117
118
// Emit summary
119
this.$emit({
120
processedCount: newUsers.length,
121
totalUsers: Object.keys(appState.users).length,
122
totalProcessed: appState.metrics.totalProcessed
123
});
124
125
} catch (error) {
126
appState.metrics.errorCount++;
127
this.db.set("appState", appState);
128
throw error;
129
}
130
}
131
};
132
```
133
134
### Database Patterns
135
136
#### Pagination State Management
137
138
```typescript
139
const paginationComponent: PipedreamComponent = {
140
name: "Pagination Component",
141
version: "1.0.0",
142
props: {
143
timer: {
144
type: "$.interface.timer",
145
default: { intervalSeconds: 900 }
146
},
147
db: "$.service.db"
148
},
149
async run(event) {
150
let pageToken = this.db.get("nextPageToken");
151
let processedCount = 0;
152
153
do {
154
const response = await fetchPage(pageToken);
155
156
// Process items from this page
157
response.items.forEach(item => {
158
this.$emit(item, {
159
id: item.id,
160
summary: `Item: ${item.name}`
161
});
162
processedCount++;
163
});
164
165
pageToken = response.nextPageToken;
166
167
// Update pagination state
168
this.db.set("nextPageToken", pageToken);
169
this.db.set("lastPageProcessed", {
170
timestamp: Date.now(),
171
itemCount: response.items.length,
172
hasMore: !!pageToken
173
});
174
175
} while (pageToken);
176
177
console.log(`Processed ${processedCount} items total`);
178
}
179
};
180
```
181
182
#### Deduplication Tracking
183
184
```typescript
185
const dedupeComponent: PipedreamComponent = {
186
name: "Deduplication Component",
187
version: "1.0.0",
188
props: {
189
http: {
190
type: "$.interface.http"
191
},
192
db: "$.service.db"
193
},
194
async run(event) {
195
const seenIds = this.db.get("seenIds") || new Set();
196
const items = event.body.items || [];
197
const newItems = [];
198
199
items.forEach(item => {
200
if (!seenIds.has(item.id)) {
201
seenIds.add(item.id);
202
newItems.push(item);
203
}
204
});
205
206
// Keep only recent IDs (prevent unbounded growth)
207
if (seenIds.size > 10000) {
208
const idsArray = Array.from(seenIds);
209
const recentIds = new Set(idsArray.slice(-5000));
210
this.db.set("seenIds", recentIds);
211
} else {
212
this.db.set("seenIds", seenIds);
213
}
214
215
// Emit only new items
216
newItems.forEach(item => {
217
this.$emit(item, {
218
id: item.id,
219
summary: `New item: ${item.name}`
220
});
221
});
222
223
console.log(`${newItems.length} new items out of ${items.length} total`);
224
}
225
};
226
```
227
228
#### Configuration Management
229
230
```typescript
231
const configurableComponent: PipedreamComponent = {
232
name: "Configurable Component",
233
version: "1.0.0",
234
props: {
235
timer: {
236
type: "$.interface.timer",
237
default: { intervalSeconds: 300 }
238
},
239
resetConfig: {
240
type: "boolean",
241
label: "Reset Configuration",
242
description: "Reset to default configuration",
243
optional: true,
244
default: false
245
},
246
db: "$.service.db"
247
},
248
async run(event) {
249
// Default configuration
250
const defaultConfig = {
251
batchSize: 50,
252
retryAttempts: 3,
253
timeoutMs: 30000,
254
enableLogging: true,
255
filters: {
256
minPriority: 1,
257
excludeTypes: []
258
}
259
};
260
261
// Get or reset configuration
262
let config = this.resetConfig ? null : this.db.get("config");
263
if (!config) {
264
config = defaultConfig;
265
this.db.set("config", config);
266
console.log("Using default configuration");
267
}
268
269
// Use configuration in processing
270
const items = await fetchItems({
271
limit: config.batchSize,
272
timeout: config.timeoutMs
273
});
274
275
const filteredItems = items.filter(item =>
276
item.priority >= config.filters.minPriority &&
277
!config.filters.excludeTypes.includes(item.type)
278
);
279
280
if (config.enableLogging) {
281
console.log(`Processed ${filteredItems.length}/${items.length} items`);
282
}
283
284
filteredItems.forEach(item => {
285
this.$emit(item, {
286
id: item.id,
287
summary: `${item.type}: ${item.name}`
288
});
289
});
290
}
291
};
292
```
293
294
### HTTP Response Service
295
296
Service for sending HTTP responses in HTTP interface components.
297
298
```typescript { .api }
299
/**
300
* Method for sending HTTP responses
301
*/
302
interface HttpRespondMethod {
303
/**
304
* Send an HTTP response
305
* @param response - Response configuration
306
*/
307
(response: {
308
/** HTTP status code */
309
status: number;
310
/** Response headers (optional) */
311
headers?: Record<string, string>;
312
/** Response body (optional) */
313
body?: string | object | Buffer;
314
}): void;
315
}
316
```
317
318
**Usage Examples:**
319
320
```typescript
321
import { PipedreamComponent, HttpRespondMethod } from "@pipedream/types";
322
323
// Basic HTTP responses
324
const httpResponseComponent: PipedreamComponent = {
325
name: "HTTP Response Component",
326
version: "1.0.0",
327
props: {
328
http: {
329
type: "$.interface.http",
330
customResponse: true
331
}
332
},
333
async run(event) {
334
try {
335
// Process the request
336
const result = await processData(event.body);
337
338
// Send JSON response
339
this.http.respond({
340
status: 200,
341
headers: {
342
"Content-Type": "application/json",
343
"X-Processing-Time": `${Date.now() - event.timestamp}ms`
344
},
345
body: {
346
success: true,
347
data: result,
348
timestamp: new Date().toISOString()
349
}
350
});
351
352
} catch (error) {
353
// Send error response
354
this.http.respond({
355
status: 500,
356
headers: {
357
"Content-Type": "application/json"
358
},
359
body: {
360
success: false,
361
error: error.message,
362
code: error.code || "PROCESSING_ERROR"
363
}
364
});
365
}
366
}
367
};
368
369
// Different response types
370
const diverseResponseComponent: PipedreamComponent = {
371
name: "Diverse Response Component",
372
version: "1.0.0",
373
props: {
374
http: {
375
type: "$.interface.http",
376
customResponse: true
377
}
378
},
379
async run(event) {
380
const { path, method, query } = event;
381
382
// Route different endpoints
383
if (path === "/health") {
384
this.http.respond({
385
status: 200,
386
headers: { "Content-Type": "text/plain" },
387
body: "OK"
388
});
389
390
} else if (path === "/api/data" && method === "GET") {
391
const data = await fetchData(query);
392
this.http.respond({
393
status: 200,
394
headers: {
395
"Content-Type": "application/json",
396
"Cache-Control": "max-age=300"
397
},
398
body: data
399
});
400
401
} else if (path === "/webhook" && method === "POST") {
402
// Process webhook
403
await processWebhook(event.body);
404
this.http.respond({
405
status: 202,
406
headers: { "Content-Type": "application/json" },
407
body: { received: true, id: generateId() }
408
});
409
410
} else if (path === "/download") {
411
// Binary response
412
const fileData = await generateFile();
413
this.http.respond({
414
status: 200,
415
headers: {
416
"Content-Type": "application/octet-stream",
417
"Content-Disposition": "attachment; filename=data.csv"
418
},
419
body: fileData
420
});
421
422
} else {
423
// Not found
424
this.http.respond({
425
status: 404,
426
headers: { "Content-Type": "application/json" },
427
body: { error: "Endpoint not found" }
428
});
429
}
430
}
431
};
432
```
433
434
### Service Combinations
435
436
Components often use multiple services together:
437
438
```typescript
439
const multiServiceComponent: PipedreamComponent = {
440
name: "Multi-Service Component",
441
version: "1.0.0",
442
props: {
443
http: {
444
type: "$.interface.http",
445
customResponse: true
446
},
447
db: "$.service.db"
448
},
449
async run(event) {
450
// Use database to track request counts
451
const requestCount = this.db.get("requestCount") || 0;
452
this.db.set("requestCount", requestCount + 1);
453
454
// Rate limiting using database
455
const windowStart = Math.floor(Date.now() / 60000) * 60000; // 1-minute window
456
const windowKey = `requests:${windowStart}`;
457
const windowCount = this.db.get(windowKey) || 0;
458
459
if (windowCount >= 100) {
460
this.http.respond({
461
status: 429,
462
headers: {
463
"Content-Type": "application/json",
464
"Retry-After": "60"
465
},
466
body: { error: "Rate limit exceeded" }
467
});
468
return;
469
}
470
471
// Update window count
472
this.db.set(windowKey, windowCount + 1);
473
474
// Process request
475
const result = await processRequest(event.body);
476
477
// Send response
478
this.http.respond({
479
status: 200,
480
headers: {
481
"Content-Type": "application/json",
482
"X-Request-Count": `${requestCount + 1}`,
483
"X-Window-Count": `${windowCount + 1}`
484
},
485
body: {
486
success: true,
487
data: result,
488
stats: {
489
totalRequests: requestCount + 1,
490
windowRequests: windowCount + 1
491
}
492
}
493
});
494
495
// Emit for analytics
496
this.$emit({
497
type: "api_request",
498
path: event.path,
499
method: event.method,
500
responseStatus: 200,
501
requestCount: requestCount + 1
502
});
503
}
504
};
505
```