TypeScript library for DevRev's Airdrop platform providing control protocol for data extraction and loading workflows
npx @tessl/cli install tessl/npm-devrev--ts-adaas@1.12.0DevRev Airdrop SDK (@devrev/ts-adaas) is a TypeScript library that enables developers to build snap-ins integrating with DevRev's Airdrop platform. The SDK provides a complete control protocol implementation for event-driven data extraction and loading workflows, allowing bidirectional data synchronization between external systems and DevRev.
npm install @devrev/ts-adaasimport {
processTask,
spawn,
WorkerAdapter,
AirdropEvent,
ExtractorEventType,
LoaderEventType,
EventType,
} from '@devrev/ts-adaas';For CommonJS:
const {
processTask,
spawn,
WorkerAdapter,
AirdropEvent,
ExtractorEventType,
LoaderEventType,
EventType,
} = require('@devrev/ts-adaas');import { spawn, processTask, ExtractorEventType, ExternalSyncUnit } from '@devrev/ts-adaas';
// Define your connector state
interface MyConnectorState {
users: { completed: boolean };
tasks: { completed: boolean };
}
const initialState: MyConnectorState = {
users: { completed: false },
tasks: { completed: false },
};
// Main handler
async function run(events: AirdropEvent[]) {
for (const event of events) {
await spawn<MyConnectorState>({
event,
initialState,
workerPath: './worker.js',
options: {
timeout: 10 * 60 * 1000, // 10 minutes
batchSize: 2000,
},
});
}
}
// Worker implementation
processTask({
task: async ({ adapter }) => {
// Extract external sync units
const syncUnits: ExternalSyncUnit[] = [
{
id: 'repo-1',
name: 'My Repository',
description: 'Repository containing tasks',
},
];
await adapter.emit(ExtractorEventType.ExtractionExternalSyncUnitsDone, {
external_sync_units: syncUnits,
});
},
onTimeout: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionExternalSyncUnitsError, {
error: { message: 'Lambda timeout' },
});
},
});import { spawn, processTask, LoaderEventType } from '@devrev/ts-adaas';
processTask({
task: async ({ adapter }) => {
const mappers = adapter.mappers;
// Get mapping from DevRev ID to external system
const mapping = await mappers.getByTargetId({
sync_unit: adapter.event.payload.event_context.sync_unit,
target: 'don:integration:dvrv-us-1:devo/abc:ticket/123',
});
// Create item in external system and update mapping
const externalId = await createItemInExternalSystem(mapping);
await mappers.update({
id: mapping.data.sync_mapper_record.id,
sync_unit: adapter.event.payload.event_context.sync_unit,
external_ids: { add: [externalId] },
targets: { add: [] },
status: 'operational',
});
await adapter.emit(LoaderEventType.DataLoadingDone);
},
onTimeout: async ({ adapter }) => {
await adapter.emit(LoaderEventType.DataLoadingError, {
error: { message: 'Lambda timeout' },
});
},
});The Airdrop SDK is built around several key components:
Worker Management: The spawn function creates worker threads, while processTask handles task execution within workers. The WorkerAdapter class provides the primary interface for interacting with the Airdrop platform.
Event-Driven Protocol: The SDK operates on an event-driven model where the Airdrop platform sends events (EventType) to snap-ins, and snap-ins respond with events (ExtractorEventType or LoaderEventType) to drive the extraction and loading pipeline.
Type System: Comprehensive TypeScript type definitions ensure type safety throughout extraction and loading workflows, with generic support for connector-specific state via ConnectorState type parameter.
State Management: Bidirectional state management with ToDevRev state for extraction and FromDevRev state for loading, automatically persisted on event emission.
Sync Mappers: The Mappers class manages sync mapper records that link external system entities to DevRev entities, enabling bidirectional data synchronization.
Repository Pattern: The repository pattern allows structured data normalization and batched uploads during extraction.
HTTP Client: Pre-configured Axios client with automatic retry logic for network resilience.
Logging: Context-aware logging system that handles main thread and worker thread logging differently.
Core worker lifecycle management including spawning worker threads, processing tasks with timeout handling, and interacting with the Airdrop platform through the WorkerAdapter interface.
function spawn<ConnectorState>(params: SpawnFactoryInterface<ConnectorState>): Promise<void>;
interface SpawnFactoryInterface<ConnectorState> {
event: AirdropEvent;
initialState: ConnectorState;
workerPath?: string;
options?: WorkerAdapterOptions;
initialDomainMapping?: InitialDomainMapping;
}
function processTask<ConnectorState>(params: ProcessTaskInterface<ConnectorState>): Promise<void>;
interface ProcessTaskInterface<ConnectorState> {
task: (params: TaskAdapterInterface<ConnectorState>) => Promise<void>;
onTimeout: (params: TaskAdapterInterface<ConnectorState>) => Promise<void>;
}
class WorkerAdapter<ConnectorState> {
constructor(params: WorkerAdapterInterface<ConnectorState>);
get state(): AdapterState<ConnectorState>;
set state(value: AdapterState<ConnectorState>);
get event(): AirdropEvent;
get mappers(): Mappers;
emit(eventType: ExtractorEventType | LoaderEventType, data?: EventData): Promise<void>;
initializeRepos(repos: RepoInterface[]): void;
getRepo(itemType: string): Repo | undefined;
}Type definitions for extraction workflows including events, external sync units, and attachment processing.
enum EventType {
ExtractionExternalSyncUnitsStart = 'EXTRACTION_EXTERNAL_SYNC_UNITS_START',
ExtractionMetadataStart = 'EXTRACTION_METADATA_START',
ExtractionDataStart = 'EXTRACTION_DATA_START',
ExtractionDataContinue = 'EXTRACTION_DATA_CONTINUE',
ExtractionAttachmentsStart = 'EXTRACTION_ATTACHMENTS_START',
// ... loading and other event types
}
enum ExtractorEventType {
ExtractionExternalSyncUnitsDone = 'EXTRACTION_EXTERNAL_SYNC_UNITS_DONE',
ExtractionMetadataDone = 'EXTRACTION_METADATA_DONE',
ExtractionDataDone = 'EXTRACTION_DATA_DONE',
ExtractionAttachmentsDone = 'EXTRACTION_ATTACHMENTS_DONE',
// ... error and progress event types
}
interface AirdropEvent {
context: {
secrets: { service_account_token: string };
snap_in_version_id: string;
snap_in_id: string;
};
payload: AirdropMessage;
execution_metadata: { devrev_endpoint: string };
input_data: InputData;
}Type definitions for loading workflows including data loading, attachment handling, and loader reports.
enum LoaderEventType {
DataLoadingDone = 'DATA_LOADING_DONE',
DataLoadingError = 'DATA_LOADING_ERROR',
AttachmentLoadingDone = 'ATTACHMENT_LOADING_DONE',
AttachmentLoadingError = 'ATTACHMENT_LOADING_ERROR',
// ... other loader event types
}
interface ExternalSystemItem {
id: { devrev: DonV2; external?: string };
created_date: string;
modified_date: string;
data: any;
}
interface ExternalSystemItemLoadingParams<Type> {
item: Type;
mappers: Mappers;
event: AirdropEvent;
}
interface ExternalSystemItemLoadingResponse {
id?: string;
error?: string;
modifiedDate?: string;
delay?: number;
}State management interfaces for maintaining extraction and loading state across worker invocations.
type AdapterState<ConnectorState> = ConnectorState & SdkState;
interface ToDevRev {
attachmentsMetadata: {
artifactIds: string[];
lastProcessed: number;
lastProcessedAttachmentsIdsList?: string[];
};
}
interface FromDevRev {
filesToLoad: FileToLoad[];
}Sync mapper management for linking external system entities to DevRev entities during loading operations.
class Mappers {
getByTargetId(params: MappersGetByTargetIdParams): Promise<AxiosResponse<MappersGetByTargetIdResponse>>;
getByExternalId(params: MappersGetByExternalIdParams): Promise<AxiosResponse<MappersGetByExternalIdResponse>>;
create(params: MappersCreateParams): Promise<AxiosResponse<MappersCreateResponse>>;
update(params: MappersUpdateParams): Promise<AxiosResponse<MappersUpdateResponse>>;
}Repository interfaces for data normalization and batched uploads during extraction.
interface RepoInterface {
itemType: string;
normalize?: (record: object) => NormalizedItem | NormalizedAttachment;
}
interface NormalizedItem {
id: string;
created_date: string;
modified_date: string;
data: object;
}
interface NormalizedAttachment {
url: string;
id: string;
file_name: string;
parent_id: string;
author_id?: string;
inline?: boolean;
grand_parent_id?: number | string;
}Pre-configured Axios client with automatic retry logic and error handling for network operations.
const axiosClient: AxiosInstance;
const axios: AxiosInstance;Logging utilities including context-aware logging and error serialization functions.
function serializeAxiosError(error: AxiosError): AxiosErrorResponse;
function formatAxiosError(error: AxiosError): object; // deprecated
function getPrintableState(state: Record<string, any>): PrintableState;Function for installing initial domain mappings when setting up snap-ins. This function creates recipe blueprints and installs domain mappings for the snap-in integration.
/**
* Install initial domain mappings for a snap-in
* Creates recipe blueprints and installs domain mappings to configure
* the mapping between external system entities and DevRev entities
* @param event - The Airdrop event
* @param initialDomainMappingJson - Configuration for domain mappings
* @returns Promise that resolves when installation completes
*/
function installInitialDomainMapping(
event: AirdropEvent,
initialDomainMappingJson: InitialDomainMapping
): Promise<void>;
interface InitialDomainMapping {
/** Optional recipe blueprint configuration for initial setup */
starting_recipe_blueprint?: object;
/** Additional mapping configurations */
additional_mappings?: object;
}Usage Examples:
import { spawn, installInitialDomainMapping } from '@devrev/ts-adaas';
// Define initial domain mapping configuration
const initialDomainMapping = {
starting_recipe_blueprint: {
name: 'My Integration Recipe',
description: 'Recipe for syncing external data',
// ... recipe configuration
},
additional_mappings: {
// ... additional mapping configuration
},
};
// Use with spawn
await spawn({
event,
initialState,
workerPath: './worker.js',
initialDomainMapping,
});
// Or install independently
await installInitialDomainMapping(event, initialDomainMapping);interface WorkerAdapterOptions {
isLocalDevelopment?: boolean;
timeout?: number;
batchSize?: number;
}interface ErrorRecord {
message: string;
}
enum SyncMode {
INITIAL = 'INITIAL',
INCREMENTAL = 'INCREMENTAL',
LOADING = 'LOADING',
}interface EventData {
external_sync_units?: ExternalSyncUnit[];
error?: ErrorRecord;
delay?: number;
reports?: LoaderReport[];
processed_files?: string[];
stats_file?: string;
}