Worker management in the Airdrop SDK handles the lifecycle of worker threads that process extraction and loading tasks. The SDK provides functions to spawn workers, process tasks within workers, and interact with the Airdrop platform.
Initializes a new worker thread and manages its lifecycle. This function should be invoked when the snap-in receives a message from the Airdrop platform. The worker script provided handles the event accordingly.
/**
* Spawns a new worker thread to process an Airdrop event
* @param params - Configuration for spawning the worker
* @returns Promise that resolves when the worker completes
*/
function spawn<ConnectorState>(params: SpawnFactoryInterface<ConnectorState>): Promise<void>;
interface SpawnFactoryInterface<ConnectorState> {
/** The event object received from the Airdrop platform */
event: AirdropEvent;
/** Initial state of the snap-in */
initialState: ConnectorState;
/** Path to the worker file (optional, defaults to appropriate worker based on event type) */
workerPath?: string;
/** Configuration options for the worker */
options?: WorkerAdapterOptions;
/** Initial domain mapping configuration */
initialDomainMapping?: InitialDomainMapping;
}Usage Examples:
import { spawn, AirdropEvent } from '@devrev/ts-adaas';
interface ExtractorState {
users: { completed: boolean };
tasks: { completed: boolean };
}
const initialState: ExtractorState = {
users: { completed: false },
tasks: { completed: false },
};
// Spawn worker for extraction
async function run(events: AirdropEvent[]) {
for (const event of events) {
await spawn<ExtractorState>({
event,
initialState,
workerPath: './extraction-worker.js',
options: {
timeout: 10 * 60 * 1000, // 10 minutes
batchSize: 2000,
},
});
}
}
// Spawn worker with initial domain mapping
await spawn<ExtractorState>({
event,
initialState,
workerPath: './worker.js',
initialDomainMapping: {
starting_recipe_blueprint: { /* blueprint config */ },
additional_mappings: { /* additional mappings */ },
},
});Retrieves the current state from the Airdrop platform and initializes a new WorkerAdapter. It executes the code specified in the task parameter. If a timeout occurs, the function handles it by executing the onTimeout callback, ensuring the worker exits gracefully.
/**
* Processes a task within a worker thread with timeout handling
* @param params - Task configuration including main task and timeout handler
* @returns Promise that resolves when task completes or times out
*/
function processTask<ConnectorState>(params: ProcessTaskInterface<ConnectorState>): Promise<void>;
interface ProcessTaskInterface<ConnectorState> {
/** Function defining the logic associated with the given event type */
task: (params: TaskAdapterInterface<ConnectorState>) => Promise<void>;
/** Function managing timeout, including saving progress */
onTimeout: (params: TaskAdapterInterface<ConnectorState>) => Promise<void>;
}
interface TaskAdapterInterface<ConnectorState> {
/** The initialized WorkerAdapter instance */
adapter: WorkerAdapter<ConnectorState>;
}Usage Examples:
import { processTask, ExtractorEventType, ExternalSyncUnit } from '@devrev/ts-adaas';
// External sync units extraction
processTask({
task: async ({ adapter }) => {
// Fetch data from external system
const todoLists = await httpClient.getTodoLists();
// Normalize to external sync units
const externalSyncUnits: ExternalSyncUnit[] = todoLists.map((todoList) => ({
id: todoList.id,
name: todoList.name,
description: todoList.description,
item_count: todoList.item_count,
}));
// Emit completion event
await adapter.emit(ExtractorEventType.ExtractionExternalSyncUnitsDone, {
external_sync_units: externalSyncUnits,
});
},
onTimeout: async ({ adapter }) => {
// Handle timeout by emitting error event
await adapter.emit(ExtractorEventType.ExtractionExternalSyncUnitsError, {
error: {
message: 'Failed to extract external sync units. Lambda timeout.',
},
});
},
});
// Data extraction with state management
processTask({
task: async ({ adapter }) => {
// Initialize repositories
adapter.initializeRepos([
{
itemType: 'tasks',
normalize: (task: any) => ({
id: task.id,
created_date: task.created_at,
modified_date: task.updated_at,
data: task,
}),
},
]);
// Fetch and push data
const tasks = await httpClient.getTasks();
await adapter.getRepo('tasks')?.push(tasks);
// Update state
adapter.state = {
...adapter.state,
tasks: { completed: true },
};
await adapter.emit(ExtractorEventType.ExtractionDataDone);
},
onTimeout: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: 'Lambda timeout' },
});
},
});The primary interface for interacting with the Airdrop platform. Provides utilities to emit events, update state, and manage repositories for data uploads.
/**
* WorkerAdapter provides the interface for workers to interact with Airdrop platform
*/
class WorkerAdapter<ConnectorState> {
constructor(params: WorkerAdapterInterface<ConnectorState>);
/** Get or set the adapter state */
get state(): AdapterState<ConnectorState>;
set state(value: AdapterState<ConnectorState>);
/** Get the Airdrop event being processed */
get event(): AirdropEvent;
/** Get the mappers instance for sync mapper operations */
get mappers(): Mappers;
/**
* Emit an event to the Airdrop platform
* @param eventType - The event type to emit
* @param data - Optional event data
* @returns Promise that resolves when event is emitted
*/
emit(eventType: ExtractorEventType | LoaderEventType, data?: EventData): Promise<void>;
/**
* Initialize repositories for data normalization and upload
* @param repos - Array of repository configurations
*/
initializeRepos(repos: RepoInterface[]): void;
/**
* Get a repository by item type
* @param itemType - The item type identifier
* @returns The repository instance or undefined if not found
*/
getRepo(itemType: string): Repo | undefined;
/**
* Load items from DevRev platform into external system
* @param params - Configuration for item types to load
* @returns Promise with loading reports and processed files
*/
loadItemTypes(params: ItemTypesToLoadParams): Promise<LoadItemTypesResponse>;
/**
* Load attachments from DevRev platform into external system
* @param params - Configuration with attachment creation function
* @returns Promise with loading reports and processed files
*/
loadAttachments(params: {
create: ExternalSystemLoadingFunction<ExternalSystemAttachment>;
}): Promise<LoadItemTypesResponse>;
/**
* Stream attachments from external system to DevRev platform
* @param params - Configuration with streaming function, optional processors, and batch size
* @returns Promise resolving when streaming completes or with error/delay information
*/
streamAttachments<NewBatch>(params: {
stream: ExternalSystemAttachmentStreamingFunction;
processors?: ExternalSystemAttachmentProcessors<ConnectorState, NormalizedAttachment[], NewBatch>;
batchSize?: number;
}): Promise<StreamAttachmentsReturnType>;
/**
* Process a single attachment for streaming to DevRev
* @param attachment - Normalized attachment metadata
* @param stream - Function to stream attachment content
* @returns Promise resolving when attachment is processed or with error/delay
*/
processAttachment(
attachment: NormalizedAttachment,
stream: ExternalSystemAttachmentStreamingFunction
): Promise<ProcessAttachmentReturnType>;
/**
* Manually save the current adapter state to the platform
* @returns Promise that resolves when state is saved
*/
postState(): Promise<void>;
/**
* Upload all initialized repositories to the platform
* @returns Promise that resolves when all repos are uploaded
*/
uploadAllRepos(): Promise<void>;
/**
* Mark the adapter as being in timeout state
* Prevents further state modifications
*/
handleTimeout(): void;
/** Get the array of uploaded artifacts */
get artifacts(): Artifact[];
/** Set or add to the artifacts array */
set artifacts(artifacts: Artifact[]);
/** Get the loader reports for loading operations */
get reports(): LoaderReport[];
/** Get the list of processed file IDs */
get processedFiles(): string[];
}
interface WorkerAdapterInterface<ConnectorState> {
/** The event object received from the platform */
event: AirdropEvent;
/** The adapter state */
adapterState: State<ConnectorState>;
/** Configuration options */
options?: WorkerAdapterOptions;
}Usage Examples:
import { WorkerAdapter, ExtractorEventType } from '@devrev/ts-adaas';
// Within processTask
processTask({
task: async ({ adapter }) => {
// Access event information
const eventType = adapter.event.payload.event_type;
const syncUnit = adapter.event.payload.event_context.sync_unit;
// Access and update state
const currentState = adapter.state;
adapter.state = {
...currentState,
lastProcessed: new Date().toISOString(),
};
// Initialize and use repositories
adapter.initializeRepos([
{ itemType: 'users', normalize: normalizeUser },
{ itemType: 'tasks', normalize: normalizeTask },
]);
const usersRepo = adapter.getRepo('users');
await usersRepo?.push(userData);
// Emit events
await adapter.emit(ExtractorEventType.ExtractionDataProgress, {
progress: 50,
});
// Access mappers for loading operations
const mapping = await adapter.mappers.getByTargetId({
sync_unit: syncUnit,
target: devrevId,
});
},
onTimeout: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: 'Timeout' },
});
},
});Configuration options for worker behavior and performance tuning.
interface WorkerAdapterOptions {
/**
* Flag for local development mode
* When true, intermediary files are stored locally
* Default: false
*/
isLocalDevelopment?: boolean;
/**
* Timeout duration for lambda function in milliseconds
* Default: 10 * 60 * 1000 (10 minutes)
* Maximum: 13 * 60 * 1000 (13 minutes)
*/
timeout?: number;
/**
* Maximum number of items to process before sending to platform
* Default: 2000
*/
batchSize?: number;
}Usage Examples:
// Local development configuration
await spawn({
event,
initialState,
workerPath: './worker.js',
options: {
isLocalDevelopment: true, // Store files locally
timeout: 5 * 60 * 1000, // 5 minute timeout
batchSize: 1000, // Smaller batches for testing
},
});
// Production configuration
await spawn({
event,
initialState,
workerPath: './worker.js',
options: {
isLocalDevelopment: false,
timeout: 12 * 60 * 1000, // 12 minute timeout
batchSize: 5000, // Larger batches for efficiency
},
});Events used for communication between main thread and worker threads.
enum WorkerEvent {
WorkerMessage = 'message',
WorkerOnline = 'online',
WorkerError = 'error',
WorkerExit = 'exit',
}
enum WorkerMessageSubject {
WorkerMessageEmitted = 'emit',
WorkerMessageExit = 'exit',
WorkerMessageLog = 'log',
}
interface WorkerMessageEmitted {
subject: WorkerMessageSubject.WorkerMessageEmitted;
payload: {
eventType: ExtractorEventType | LoaderEventType;
};
}
interface WorkerMessageExit {
subject: WorkerMessageSubject.WorkerMessageExit;
}
type WorkerMessage = WorkerMessageEmitted | WorkerMessageExit;Data structure passed to worker threads on initialization.
interface WorkerData<ConnectorState> {
event: AirdropEvent;
initialState: ConnectorState;
workerPath: string;
initialDomainMapping?: InitialDomainMapping;
options?: WorkerAdapterOptions;
}The following interfaces are used internally by the SDK:
interface SpawnInterface {
event: AirdropEvent;
worker: Worker;
options?: WorkerAdapterOptions;
resolve: (value: void | PromiseLike<void>) => void;
originalConsole?: Console;
}
interface GetWorkerPathInterface {
event: AirdropEvent;
connectorWorkerPath?: string | null;
}Always update state before emitting completion events to ensure state is persisted:
processTask({
task: async ({ adapter }) => {
// Process data
const results = await processData();
// Update state first
adapter.state = {
...adapter.state,
lastProcessed: results.lastId,
completed: true,
};
// Then emit completion event (this saves state automatically)
await adapter.emit(ExtractorEventType.ExtractionDataDone);
},
onTimeout: async ({ adapter }) => {
// Save partial progress on timeout
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: 'Timeout' },
});
},
});Always provide meaningful error messages in timeout handlers:
processTask({
task: async ({ adapter }) => {
try {
await performTask();
await adapter.emit(ExtractorEventType.ExtractionDataDone);
} catch (error) {
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: `Task failed: ${error.message}` },
});
}
},
onTimeout: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: 'Lambda timeout exceeded' },
});
},
});Adjust batch size based on data volume and processing time:
// For large items (e.g., with many attachments)
await spawn({
event,
initialState,
workerPath: './worker.js',
options: {
batchSize: 500, // Smaller batches for large items
},
});
// For small items (e.g., simple records)
await spawn({
event,
initialState,
workerPath: './worker.js',
options: {
batchSize: 5000, // Larger batches for efficiency
},
});