or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

http-client.mdindex.mdlogger.mdmappers.mdrepository-management.mdstate-management.mdtypes-extraction.mdtypes-loading.mdworker-management.md
tile.json

tessl/npm-devrev--ts-adaas

TypeScript library for DevRev's Airdrop platform providing control protocol for data extraction and loading workflows

Workspace
tessl
Visibility
Public
Created
Last updated
Describes
npmpkg:npm/@devrev/ts-adaas@1.12.x

To install, run

npx @tessl/cli install tessl/npm-devrev--ts-adaas@1.12.0

index.mddocs/

DevRev Airdrop SDK

DevRev Airdrop SDK (@devrev/ts-adaas) is a TypeScript library that enables developers to build snap-ins integrating with DevRev's Airdrop platform. The SDK provides a complete control protocol implementation for event-driven data extraction and loading workflows, allowing bidirectional data synchronization between external systems and DevRev.

Package Information

  • Package Name: @devrev/ts-adaas
  • Package Type: npm
  • Language: TypeScript
  • Installation: npm install @devrev/ts-adaas

Core Imports

import {
  processTask,
  spawn,
  WorkerAdapter,
  AirdropEvent,
  ExtractorEventType,
  LoaderEventType,
  EventType,
} from '@devrev/ts-adaas';

For CommonJS:

const {
  processTask,
  spawn,
  WorkerAdapter,
  AirdropEvent,
  ExtractorEventType,
  LoaderEventType,
  EventType,
} = require('@devrev/ts-adaas');

Basic Usage

Extraction Workflow

import { spawn, processTask, ExtractorEventType, ExternalSyncUnit } from '@devrev/ts-adaas';

// Define your connector state
interface MyConnectorState {
  users: { completed: boolean };
  tasks: { completed: boolean };
}

const initialState: MyConnectorState = {
  users: { completed: false },
  tasks: { completed: false },
};

// Main handler
async function run(events: AirdropEvent[]) {
  for (const event of events) {
    await spawn<MyConnectorState>({
      event,
      initialState,
      workerPath: './worker.js',
      options: {
        timeout: 10 * 60 * 1000, // 10 minutes
        batchSize: 2000,
      },
    });
  }
}

// Worker implementation
processTask({
  task: async ({ adapter }) => {
    // Extract external sync units
    const syncUnits: ExternalSyncUnit[] = [
      {
        id: 'repo-1',
        name: 'My Repository',
        description: 'Repository containing tasks',
      },
    ];

    await adapter.emit(ExtractorEventType.ExtractionExternalSyncUnitsDone, {
      external_sync_units: syncUnits,
    });
  },
  onTimeout: async ({ adapter }) => {
    await adapter.emit(ExtractorEventType.ExtractionExternalSyncUnitsError, {
      error: { message: 'Lambda timeout' },
    });
  },
});

Loading Workflow

import { spawn, processTask, LoaderEventType } from '@devrev/ts-adaas';

processTask({
  task: async ({ adapter }) => {
    const mappers = adapter.mappers;

    // Get mapping from DevRev ID to external system
    const mapping = await mappers.getByTargetId({
      sync_unit: adapter.event.payload.event_context.sync_unit,
      target: 'don:integration:dvrv-us-1:devo/abc:ticket/123',
    });

    // Create item in external system and update mapping
    const externalId = await createItemInExternalSystem(mapping);

    await mappers.update({
      id: mapping.data.sync_mapper_record.id,
      sync_unit: adapter.event.payload.event_context.sync_unit,
      external_ids: { add: [externalId] },
      targets: { add: [] },
      status: 'operational',
    });

    await adapter.emit(LoaderEventType.DataLoadingDone);
  },
  onTimeout: async ({ adapter }) => {
    await adapter.emit(LoaderEventType.DataLoadingError, {
      error: { message: 'Lambda timeout' },
    });
  },
});

Architecture

The Airdrop SDK is built around several key components:

  • Worker Management: The spawn function creates worker threads, while processTask handles task execution within workers. The WorkerAdapter class provides the primary interface for interacting with the Airdrop platform.

  • Event-Driven Protocol: The SDK operates on an event-driven model where the Airdrop platform sends events (EventType) to snap-ins, and snap-ins respond with events (ExtractorEventType or LoaderEventType) to drive the extraction and loading pipeline.

  • Type System: Comprehensive TypeScript type definitions ensure type safety throughout extraction and loading workflows, with generic support for connector-specific state via ConnectorState type parameter.

  • State Management: Bidirectional state management with ToDevRev state for extraction and FromDevRev state for loading, automatically persisted on event emission.

  • Sync Mappers: The Mappers class manages sync mapper records that link external system entities to DevRev entities, enabling bidirectional data synchronization.

  • Repository Pattern: The repository pattern allows structured data normalization and batched uploads during extraction.

  • HTTP Client: Pre-configured Axios client with automatic retry logic for network resilience.

  • Logging: Context-aware logging system that handles main thread and worker thread logging differently.

Capabilities

Worker Management

Core worker lifecycle management including spawning worker threads, processing tasks with timeout handling, and interacting with the Airdrop platform through the WorkerAdapter interface.

function spawn<ConnectorState>(params: SpawnFactoryInterface<ConnectorState>): Promise<void>;

interface SpawnFactoryInterface<ConnectorState> {
  event: AirdropEvent;
  initialState: ConnectorState;
  workerPath?: string;
  options?: WorkerAdapterOptions;
  initialDomainMapping?: InitialDomainMapping;
}

function processTask<ConnectorState>(params: ProcessTaskInterface<ConnectorState>): Promise<void>;

interface ProcessTaskInterface<ConnectorState> {
  task: (params: TaskAdapterInterface<ConnectorState>) => Promise<void>;
  onTimeout: (params: TaskAdapterInterface<ConnectorState>) => Promise<void>;
}

class WorkerAdapter<ConnectorState> {
  constructor(params: WorkerAdapterInterface<ConnectorState>);

  get state(): AdapterState<ConnectorState>;
  set state(value: AdapterState<ConnectorState>);

  get event(): AirdropEvent;
  get mappers(): Mappers;

  emit(eventType: ExtractorEventType | LoaderEventType, data?: EventData): Promise<void>;
  initializeRepos(repos: RepoInterface[]): void;
  getRepo(itemType: string): Repo | undefined;
}

Worker Management

Extraction Types

Type definitions for extraction workflows including events, external sync units, and attachment processing.

enum EventType {
  ExtractionExternalSyncUnitsStart = 'EXTRACTION_EXTERNAL_SYNC_UNITS_START',
  ExtractionMetadataStart = 'EXTRACTION_METADATA_START',
  ExtractionDataStart = 'EXTRACTION_DATA_START',
  ExtractionDataContinue = 'EXTRACTION_DATA_CONTINUE',
  ExtractionAttachmentsStart = 'EXTRACTION_ATTACHMENTS_START',
  // ... loading and other event types
}

enum ExtractorEventType {
  ExtractionExternalSyncUnitsDone = 'EXTRACTION_EXTERNAL_SYNC_UNITS_DONE',
  ExtractionMetadataDone = 'EXTRACTION_METADATA_DONE',
  ExtractionDataDone = 'EXTRACTION_DATA_DONE',
  ExtractionAttachmentsDone = 'EXTRACTION_ATTACHMENTS_DONE',
  // ... error and progress event types
}

interface AirdropEvent {
  context: {
    secrets: { service_account_token: string };
    snap_in_version_id: string;
    snap_in_id: string;
  };
  payload: AirdropMessage;
  execution_metadata: { devrev_endpoint: string };
  input_data: InputData;
}

Extraction Types

Loading Types

Type definitions for loading workflows including data loading, attachment handling, and loader reports.

enum LoaderEventType {
  DataLoadingDone = 'DATA_LOADING_DONE',
  DataLoadingError = 'DATA_LOADING_ERROR',
  AttachmentLoadingDone = 'ATTACHMENT_LOADING_DONE',
  AttachmentLoadingError = 'ATTACHMENT_LOADING_ERROR',
  // ... other loader event types
}

interface ExternalSystemItem {
  id: { devrev: DonV2; external?: string };
  created_date: string;
  modified_date: string;
  data: any;
}

interface ExternalSystemItemLoadingParams<Type> {
  item: Type;
  mappers: Mappers;
  event: AirdropEvent;
}

interface ExternalSystemItemLoadingResponse {
  id?: string;
  error?: string;
  modifiedDate?: string;
  delay?: number;
}

Loading Types

State Management

State management interfaces for maintaining extraction and loading state across worker invocations.

type AdapterState<ConnectorState> = ConnectorState & SdkState;

interface ToDevRev {
  attachmentsMetadata: {
    artifactIds: string[];
    lastProcessed: number;
    lastProcessedAttachmentsIdsList?: string[];
  };
}

interface FromDevRev {
  filesToLoad: FileToLoad[];
}

State Management

Sync Mappers

Sync mapper management for linking external system entities to DevRev entities during loading operations.

class Mappers {
  getByTargetId(params: MappersGetByTargetIdParams): Promise<AxiosResponse<MappersGetByTargetIdResponse>>;
  getByExternalId(params: MappersGetByExternalIdParams): Promise<AxiosResponse<MappersGetByExternalIdResponse>>;
  create(params: MappersCreateParams): Promise<AxiosResponse<MappersCreateResponse>>;
  update(params: MappersUpdateParams): Promise<AxiosResponse<MappersUpdateResponse>>;
}

Sync Mappers

Repository Management

Repository interfaces for data normalization and batched uploads during extraction.

interface RepoInterface {
  itemType: string;
  normalize?: (record: object) => NormalizedItem | NormalizedAttachment;
}

interface NormalizedItem {
  id: string;
  created_date: string;
  modified_date: string;
  data: object;
}

interface NormalizedAttachment {
  url: string;
  id: string;
  file_name: string;
  parent_id: string;
  author_id?: string;
  inline?: boolean;
  grand_parent_id?: number | string;
}

Repository Management

HTTP Client

Pre-configured Axios client with automatic retry logic and error handling for network operations.

const axiosClient: AxiosInstance;
const axios: AxiosInstance;

HTTP Client

Logger Utilities

Logging utilities including context-aware logging and error serialization functions.

function serializeAxiosError(error: AxiosError): AxiosErrorResponse;
function formatAxiosError(error: AxiosError): object; // deprecated
function getPrintableState(state: Record<string, any>): PrintableState;

Logger Utilities

Initial Domain Mapping

Function for installing initial domain mappings when setting up snap-ins. This function creates recipe blueprints and installs domain mappings for the snap-in integration.

/**
 * Install initial domain mappings for a snap-in
 * Creates recipe blueprints and installs domain mappings to configure
 * the mapping between external system entities and DevRev entities
 * @param event - The Airdrop event
 * @param initialDomainMappingJson - Configuration for domain mappings
 * @returns Promise that resolves when installation completes
 */
function installInitialDomainMapping(
  event: AirdropEvent,
  initialDomainMappingJson: InitialDomainMapping
): Promise<void>;

interface InitialDomainMapping {
  /** Optional recipe blueprint configuration for initial setup */
  starting_recipe_blueprint?: object;
  /** Additional mapping configurations */
  additional_mappings?: object;
}

Usage Examples:

import { spawn, installInitialDomainMapping } from '@devrev/ts-adaas';

// Define initial domain mapping configuration
const initialDomainMapping = {
  starting_recipe_blueprint: {
    name: 'My Integration Recipe',
    description: 'Recipe for syncing external data',
    // ... recipe configuration
  },
  additional_mappings: {
    // ... additional mapping configuration
  },
};

// Use with spawn
await spawn({
  event,
  initialState,
  workerPath: './worker.js',
  initialDomainMapping,
});

// Or install independently
await installInitialDomainMapping(event, initialDomainMapping);

Common Types

Worker Configuration

interface WorkerAdapterOptions {
  isLocalDevelopment?: boolean;
  timeout?: number;
  batchSize?: number;
}

Error Handling

interface ErrorRecord {
  message: string;
}

enum SyncMode {
  INITIAL = 'INITIAL',
  INCREMENTAL = 'INCREMENTAL',
  LOADING = 'LOADING',
}

Event Data

interface EventData {
  external_sync_units?: ExternalSyncUnit[];
  error?: ErrorRecord;
  delay?: number;
  reports?: LoaderReport[];
  processed_files?: string[];
  stats_file?: string;
}