or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

Files

docs

component-context.mdcomponent-structure.mdevent-system.mdindex.mdinterface-types.mdprop-types.mdservice-types.md

service-types.mddocs/

0

# Service Types

1

2

Type definitions for Pipedream's built-in services including database storage and HTTP response handling.

3

4

## Capabilities

5

6

### Database Service

7

8

Persistent key-value storage service for maintaining state between component executions.

9

10

```typescript { .api }

11

/**

12

* Database service for persistent key-value storage

13

*/

14

interface DatabaseService {

15

/** Service type identifier */

16

type: "$.service.db";

17

18

/**

19

* Retrieve a value by key

20

* @param key - The key to retrieve

21

* @returns The stored value or undefined if not found

22

*/

23

get: (key: string) => any;

24

25

/**

26

* Store a value by key

27

* @param key - The key to store under

28

* @param value - The value to store (must be JSON serializable)

29

*/

30

set: (key: string, value: any) => void;

31

}

32

```

33

34

**Usage Examples:**

35

36

```typescript

37

import { PipedreamComponent, DatabaseService } from "@pipedream/types";

38

39

// Basic state management

40

const stateComponent: PipedreamComponent = {

41

name: "State Management Component",

42

version: "1.0.0",

43

props: {

44

timer: {

45

type: "$.interface.timer",

46

default: { intervalSeconds: 300 }

47

},

48

db: "$.service.db"

49

},

50

async run(event) {

51

// Get current counter value

52

const counter = this.db.get("counter") || 0;

53

54

// Increment counter

55

const newCounter = counter + 1;

56

this.db.set("counter", newCounter);

57

58

// Store execution metadata

59

this.db.set("lastExecution", {

60

timestamp: Date.now(),

61

counter: newCounter,

62

eventType: event.type || "timer"

63

});

64

65

console.log(`Execution #${newCounter}`);

66

67

this.$emit({

68

executionNumber: newCounter,

69

timestamp: Date.now()

70

});

71

}

72

};

73

74

// Complex state with nested data

75

const complexStateComponent: PipedreamComponent = {

76

name: "Complex State Component",

77

version: "1.0.0",

78

props: {

79

timer: {

80

type: "$.interface.timer",

81

default: { intervalSeconds: 600 }

82

},

83

db: "$.service.db"

84

},

85

async run(event) {

86

// Get complex state object

87

const appState = this.db.get("appState") || {

88

users: {},

89

metrics: {

90

totalProcessed: 0,

91

errorCount: 0,

92

lastSuccess: null

93

},

94

config: {

95

batchSize: 100,

96

retryAttempts: 3

97

}

98

};

99

100

try {

101

// Process new users

102

const newUsers = await fetchNewUsers();

103

104

newUsers.forEach(user => {

105

appState.users[user.id] = {

106

...user,

107

processedAt: Date.now(),

108

status: "processed"

109

};

110

appState.metrics.totalProcessed++;

111

});

112

113

appState.metrics.lastSuccess = Date.now();

114

115

// Save updated state

116

this.db.set("appState", appState);

117

118

// Emit summary

119

this.$emit({

120

processedCount: newUsers.length,

121

totalUsers: Object.keys(appState.users).length,

122

totalProcessed: appState.metrics.totalProcessed

123

});

124

125

} catch (error) {

126

appState.metrics.errorCount++;

127

this.db.set("appState", appState);

128

throw error;

129

}

130

}

131

};

132

```

133

134

### Database Patterns

135

136

#### Pagination State Management

137

138

```typescript

139

const paginationComponent: PipedreamComponent = {

140

name: "Pagination Component",

141

version: "1.0.0",

142

props: {

143

timer: {

144

type: "$.interface.timer",

145

default: { intervalSeconds: 900 }

146

},

147

db: "$.service.db"

148

},

149

async run(event) {

150

let pageToken = this.db.get("nextPageToken");

151

let processedCount = 0;

152

153

do {

154

const response = await fetchPage(pageToken);

155

156

// Process items from this page

157

response.items.forEach(item => {

158

this.$emit(item, {

159

id: item.id,

160

summary: `Item: ${item.name}`

161

});

162

processedCount++;

163

});

164

165

pageToken = response.nextPageToken;

166

167

// Update pagination state

168

this.db.set("nextPageToken", pageToken);

169

this.db.set("lastPageProcessed", {

170

timestamp: Date.now(),

171

itemCount: response.items.length,

172

hasMore: !!pageToken

173

});

174

175

} while (pageToken);

176

177

console.log(`Processed ${processedCount} items total`);

178

}

179

};

180

```

181

182

#### Deduplication Tracking

183

184

```typescript

185

const dedupeComponent: PipedreamComponent = {

186

name: "Deduplication Component",

187

version: "1.0.0",

188

props: {

189

http: {

190

type: "$.interface.http"

191

},

192

db: "$.service.db"

193

},

194

async run(event) {

195

const seenIds = this.db.get("seenIds") || new Set();

196

const items = event.body.items || [];

197

const newItems = [];

198

199

items.forEach(item => {

200

if (!seenIds.has(item.id)) {

201

seenIds.add(item.id);

202

newItems.push(item);

203

}

204

});

205

206

// Keep only recent IDs (prevent unbounded growth)

207

if (seenIds.size > 10000) {

208

const idsArray = Array.from(seenIds);

209

const recentIds = new Set(idsArray.slice(-5000));

210

this.db.set("seenIds", recentIds);

211

} else {

212

this.db.set("seenIds", seenIds);

213

}

214

215

// Emit only new items

216

newItems.forEach(item => {

217

this.$emit(item, {

218

id: item.id,

219

summary: `New item: ${item.name}`

220

});

221

});

222

223

console.log(`${newItems.length} new items out of ${items.length} total`);

224

}

225

};

226

```

227

228

#### Configuration Management

229

230

```typescript

231

const configurableComponent: PipedreamComponent = {

232

name: "Configurable Component",

233

version: "1.0.0",

234

props: {

235

timer: {

236

type: "$.interface.timer",

237

default: { intervalSeconds: 300 }

238

},

239

resetConfig: {

240

type: "boolean",

241

label: "Reset Configuration",

242

description: "Reset to default configuration",

243

optional: true,

244

default: false

245

},

246

db: "$.service.db"

247

},

248

async run(event) {

249

// Default configuration

250

const defaultConfig = {

251

batchSize: 50,

252

retryAttempts: 3,

253

timeoutMs: 30000,

254

enableLogging: true,

255

filters: {

256

minPriority: 1,

257

excludeTypes: []

258

}

259

};

260

261

// Get or reset configuration

262

let config = this.resetConfig ? null : this.db.get("config");

263

if (!config) {

264

config = defaultConfig;

265

this.db.set("config", config);

266

console.log("Using default configuration");

267

}

268

269

// Use configuration in processing

270

const items = await fetchItems({

271

limit: config.batchSize,

272

timeout: config.timeoutMs

273

});

274

275

const filteredItems = items.filter(item =>

276

item.priority >= config.filters.minPriority &&

277

!config.filters.excludeTypes.includes(item.type)

278

);

279

280

if (config.enableLogging) {

281

console.log(`Processed ${filteredItems.length}/${items.length} items`);

282

}

283

284

filteredItems.forEach(item => {

285

this.$emit(item, {

286

id: item.id,

287

summary: `${item.type}: ${item.name}`

288

});

289

});

290

}

291

};

292

```

293

294

### HTTP Response Service

295

296

Service for sending HTTP responses in HTTP interface components.

297

298

```typescript { .api }

299

/**

300

* Method for sending HTTP responses

301

*/

302

interface HttpRespondMethod {

303

/**

304

* Send an HTTP response

305

* @param response - Response configuration

306

*/

307

(response: {

308

/** HTTP status code */

309

status: number;

310

/** Response headers (optional) */

311

headers?: Record<string, string>;

312

/** Response body (optional) */

313

body?: string | object | Buffer;

314

}): void;

315

}

316

```

317

318

**Usage Examples:**

319

320

```typescript

321

import { PipedreamComponent, HttpRespondMethod } from "@pipedream/types";

322

323

// Basic HTTP responses

324

const httpResponseComponent: PipedreamComponent = {

325

name: "HTTP Response Component",

326

version: "1.0.0",

327

props: {

328

http: {

329

type: "$.interface.http",

330

customResponse: true

331

}

332

},

333

async run(event) {

334

try {

335

// Process the request

336

const result = await processData(event.body);

337

338

// Send JSON response

339

this.http.respond({

340

status: 200,

341

headers: {

342

"Content-Type": "application/json",

343

"X-Processing-Time": `${Date.now() - event.timestamp}ms`

344

},

345

body: {

346

success: true,

347

data: result,

348

timestamp: new Date().toISOString()

349

}

350

});

351

352

} catch (error) {

353

// Send error response

354

this.http.respond({

355

status: 500,

356

headers: {

357

"Content-Type": "application/json"

358

},

359

body: {

360

success: false,

361

error: error.message,

362

code: error.code || "PROCESSING_ERROR"

363

}

364

});

365

}

366

}

367

};

368

369

// Different response types

370

const diverseResponseComponent: PipedreamComponent = {

371

name: "Diverse Response Component",

372

version: "1.0.0",

373

props: {

374

http: {

375

type: "$.interface.http",

376

customResponse: true

377

}

378

},

379

async run(event) {

380

const { path, method, query } = event;

381

382

// Route different endpoints

383

if (path === "/health") {

384

this.http.respond({

385

status: 200,

386

headers: { "Content-Type": "text/plain" },

387

body: "OK"

388

});

389

390

} else if (path === "/api/data" && method === "GET") {

391

const data = await fetchData(query);

392

this.http.respond({

393

status: 200,

394

headers: {

395

"Content-Type": "application/json",

396

"Cache-Control": "max-age=300"

397

},

398

body: data

399

});

400

401

} else if (path === "/webhook" && method === "POST") {

402

// Process webhook

403

await processWebhook(event.body);

404

this.http.respond({

405

status: 202,

406

headers: { "Content-Type": "application/json" },

407

body: { received: true, id: generateId() }

408

});

409

410

} else if (path === "/download") {

411

// Binary response

412

const fileData = await generateFile();

413

this.http.respond({

414

status: 200,

415

headers: {

416

"Content-Type": "application/octet-stream",

417

"Content-Disposition": "attachment; filename=data.csv"

418

},

419

body: fileData

420

});

421

422

} else {

423

// Not found

424

this.http.respond({

425

status: 404,

426

headers: { "Content-Type": "application/json" },

427

body: { error: "Endpoint not found" }

428

});

429

}

430

}

431

};

432

```

433

434

### Service Combinations

435

436

Components often use multiple services together:

437

438

```typescript

439

const multiServiceComponent: PipedreamComponent = {

440

name: "Multi-Service Component",

441

version: "1.0.0",

442

props: {

443

http: {

444

type: "$.interface.http",

445

customResponse: true

446

},

447

db: "$.service.db"

448

},

449

async run(event) {

450

// Use database to track request counts

451

const requestCount = this.db.get("requestCount") || 0;

452

this.db.set("requestCount", requestCount + 1);

453

454

// Rate limiting using database

455

const windowStart = Math.floor(Date.now() / 60000) * 60000; // 1-minute window

456

const windowKey = `requests:${windowStart}`;

457

const windowCount = this.db.get(windowKey) || 0;

458

459

if (windowCount >= 100) {

460

this.http.respond({

461

status: 429,

462

headers: {

463

"Content-Type": "application/json",

464

"Retry-After": "60"

465

},

466

body: { error: "Rate limit exceeded" }

467

});

468

return;

469

}

470

471

// Update window count

472

this.db.set(windowKey, windowCount + 1);

473

474

// Process request

475

const result = await processRequest(event.body);

476

477

// Send response

478

this.http.respond({

479

status: 200,

480

headers: {

481

"Content-Type": "application/json",

482

"X-Request-Count": `${requestCount + 1}`,

483

"X-Window-Count": `${windowCount + 1}`

484

},

485

body: {

486

success: true,

487

data: result,

488

stats: {

489

totalRequests: requestCount + 1,

490

windowRequests: windowCount + 1

491

}

492

}

493

});

494

495

// Emit for analytics

496

this.$emit({

497

type: "api_request",

498

path: event.path,

499

method: event.method,

500

responseStatus: 200,

501

requestCount: requestCount + 1

502

});

503

}

504

};

505

```