TypeScript types for Pipedream components (sources and actions)
—
Quality
Pending
Does it follow best practices?
Impact
Pending
No eval scenarios have been run
Type definitions for Pipedream's built-in services including database storage and HTTP response handling.
Persistent key-value storage service for maintaining state between component executions.
/**
* Database service for persistent key-value storage
*/
interface DatabaseService {
/** Service type identifier */
type: "$.service.db";
/**
* Retrieve a value by key
* @param key - The key to retrieve
* @returns The stored value or undefined if not found
*/
get: (key: string) => any;
/**
* Store a value by key
* @param key - The key to store under
* @param value - The value to store (must be JSON serializable)
*/
set: (key: string, value: any) => void;
}Usage Examples:
import { PipedreamComponent, DatabaseService } from "@pipedream/types";
// Basic state management
const stateComponent: PipedreamComponent = {
name: "State Management Component",
version: "1.0.0",
props: {
timer: {
type: "$.interface.timer",
default: { intervalSeconds: 300 }
},
db: "$.service.db"
},
async run(event) {
// Get current counter value
const counter = this.db.get("counter") || 0;
// Increment counter
const newCounter = counter + 1;
this.db.set("counter", newCounter);
// Store execution metadata
this.db.set("lastExecution", {
timestamp: Date.now(),
counter: newCounter,
eventType: event.type || "timer"
});
console.log(`Execution #${newCounter}`);
this.$emit({
executionNumber: newCounter,
timestamp: Date.now()
});
}
};
// Complex state with nested data
const complexStateComponent: PipedreamComponent = {
name: "Complex State Component",
version: "1.0.0",
props: {
timer: {
type: "$.interface.timer",
default: { intervalSeconds: 600 }
},
db: "$.service.db"
},
async run(event) {
// Get complex state object
const appState = this.db.get("appState") || {
users: {},
metrics: {
totalProcessed: 0,
errorCount: 0,
lastSuccess: null
},
config: {
batchSize: 100,
retryAttempts: 3
}
};
try {
// Process new users
const newUsers = await fetchNewUsers();
newUsers.forEach(user => {
appState.users[user.id] = {
...user,
processedAt: Date.now(),
status: "processed"
};
appState.metrics.totalProcessed++;
});
appState.metrics.lastSuccess = Date.now();
// Save updated state
this.db.set("appState", appState);
// Emit summary
this.$emit({
processedCount: newUsers.length,
totalUsers: Object.keys(appState.users).length,
totalProcessed: appState.metrics.totalProcessed
});
} catch (error) {
appState.metrics.errorCount++;
this.db.set("appState", appState);
throw error;
}
}
};const paginationComponent: PipedreamComponent = {
name: "Pagination Component",
version: "1.0.0",
props: {
timer: {
type: "$.interface.timer",
default: { intervalSeconds: 900 }
},
db: "$.service.db"
},
async run(event) {
let pageToken = this.db.get("nextPageToken");
let processedCount = 0;
do {
const response = await fetchPage(pageToken);
// Process items from this page
response.items.forEach(item => {
this.$emit(item, {
id: item.id,
summary: `Item: ${item.name}`
});
processedCount++;
});
pageToken = response.nextPageToken;
// Update pagination state
this.db.set("nextPageToken", pageToken);
this.db.set("lastPageProcessed", {
timestamp: Date.now(),
itemCount: response.items.length,
hasMore: !!pageToken
});
} while (pageToken);
console.log(`Processed ${processedCount} items total`);
}
};const dedupeComponent: PipedreamComponent = {
name: "Deduplication Component",
version: "1.0.0",
props: {
http: {
type: "$.interface.http"
},
db: "$.service.db"
},
async run(event) {
const seenIds = this.db.get("seenIds") || new Set();
const items = event.body.items || [];
const newItems = [];
items.forEach(item => {
if (!seenIds.has(item.id)) {
seenIds.add(item.id);
newItems.push(item);
}
});
// Keep only recent IDs (prevent unbounded growth)
if (seenIds.size > 10000) {
const idsArray = Array.from(seenIds);
const recentIds = new Set(idsArray.slice(-5000));
this.db.set("seenIds", recentIds);
} else {
this.db.set("seenIds", seenIds);
}
// Emit only new items
newItems.forEach(item => {
this.$emit(item, {
id: item.id,
summary: `New item: ${item.name}`
});
});
console.log(`${newItems.length} new items out of ${items.length} total`);
}
};const configurableComponent: PipedreamComponent = {
name: "Configurable Component",
version: "1.0.0",
props: {
timer: {
type: "$.interface.timer",
default: { intervalSeconds: 300 }
},
resetConfig: {
type: "boolean",
label: "Reset Configuration",
description: "Reset to default configuration",
optional: true,
default: false
},
db: "$.service.db"
},
async run(event) {
// Default configuration
const defaultConfig = {
batchSize: 50,
retryAttempts: 3,
timeoutMs: 30000,
enableLogging: true,
filters: {
minPriority: 1,
excludeTypes: []
}
};
// Get or reset configuration
let config = this.resetConfig ? null : this.db.get("config");
if (!config) {
config = defaultConfig;
this.db.set("config", config);
console.log("Using default configuration");
}
// Use configuration in processing
const items = await fetchItems({
limit: config.batchSize,
timeout: config.timeoutMs
});
const filteredItems = items.filter(item =>
item.priority >= config.filters.minPriority &&
!config.filters.excludeTypes.includes(item.type)
);
if (config.enableLogging) {
console.log(`Processed ${filteredItems.length}/${items.length} items`);
}
filteredItems.forEach(item => {
this.$emit(item, {
id: item.id,
summary: `${item.type}: ${item.name}`
});
});
}
};Service for sending HTTP responses in HTTP interface components.
/**
* Method for sending HTTP responses
*/
interface HttpRespondMethod {
/**
* Send an HTTP response
* @param response - Response configuration
*/
(response: {
/** HTTP status code */
status: number;
/** Response headers (optional) */
headers?: Record<string, string>;
/** Response body (optional) */
body?: string | object | Buffer;
}): void;
}Usage Examples:
import { PipedreamComponent, HttpRespondMethod } from "@pipedream/types";
// Basic HTTP responses
const httpResponseComponent: PipedreamComponent = {
name: "HTTP Response Component",
version: "1.0.0",
props: {
http: {
type: "$.interface.http",
customResponse: true
}
},
async run(event) {
try {
// Process the request
const result = await processData(event.body);
// Send JSON response
this.http.respond({
status: 200,
headers: {
"Content-Type": "application/json",
"X-Processing-Time": `${Date.now() - event.timestamp}ms`
},
body: {
success: true,
data: result,
timestamp: new Date().toISOString()
}
});
} catch (error) {
// Send error response
this.http.respond({
status: 500,
headers: {
"Content-Type": "application/json"
},
body: {
success: false,
error: error.message,
code: error.code || "PROCESSING_ERROR"
}
});
}
}
};
// Different response types
const diverseResponseComponent: PipedreamComponent = {
name: "Diverse Response Component",
version: "1.0.0",
props: {
http: {
type: "$.interface.http",
customResponse: true
}
},
async run(event) {
const { path, method, query } = event;
// Route different endpoints
if (path === "/health") {
this.http.respond({
status: 200,
headers: { "Content-Type": "text/plain" },
body: "OK"
});
} else if (path === "/api/data" && method === "GET") {
const data = await fetchData(query);
this.http.respond({
status: 200,
headers: {
"Content-Type": "application/json",
"Cache-Control": "max-age=300"
},
body: data
});
} else if (path === "/webhook" && method === "POST") {
// Process webhook
await processWebhook(event.body);
this.http.respond({
status: 202,
headers: { "Content-Type": "application/json" },
body: { received: true, id: generateId() }
});
} else if (path === "/download") {
// Binary response
const fileData = await generateFile();
this.http.respond({
status: 200,
headers: {
"Content-Type": "application/octet-stream",
"Content-Disposition": "attachment; filename=data.csv"
},
body: fileData
});
} else {
// Not found
this.http.respond({
status: 404,
headers: { "Content-Type": "application/json" },
body: { error: "Endpoint not found" }
});
}
}
};Components often use multiple services together:
const multiServiceComponent: PipedreamComponent = {
name: "Multi-Service Component",
version: "1.0.0",
props: {
http: {
type: "$.interface.http",
customResponse: true
},
db: "$.service.db"
},
async run(event) {
// Use database to track request counts
const requestCount = this.db.get("requestCount") || 0;
this.db.set("requestCount", requestCount + 1);
// Rate limiting using database
const windowStart = Math.floor(Date.now() / 60000) * 60000; // 1-minute window
const windowKey = `requests:${windowStart}`;
const windowCount = this.db.get(windowKey) || 0;
if (windowCount >= 100) {
this.http.respond({
status: 429,
headers: {
"Content-Type": "application/json",
"Retry-After": "60"
},
body: { error: "Rate limit exceeded" }
});
return;
}
// Update window count
this.db.set(windowKey, windowCount + 1);
// Process request
const result = await processRequest(event.body);
// Send response
this.http.respond({
status: 200,
headers: {
"Content-Type": "application/json",
"X-Request-Count": `${requestCount + 1}`,
"X-Window-Count": `${windowCount + 1}`
},
body: {
success: true,
data: result,
stats: {
totalRequests: requestCount + 1,
windowRequests: windowCount + 1
}
}
});
// Emit for analytics
this.$emit({
type: "api_request",
path: event.path,
method: event.method,
responseStatus: 200,
requestCount: requestCount + 1
});
}
};Install with Tessl CLI
npx tessl i tessl/npm-pipedream--types