State management in the Airdrop SDK maintains extraction and loading state across worker invocations. The SDK automatically persists state when events are emitted, ensuring recovery from timeouts and enabling incremental processing.
The main state type combining connector-specific state with SDK state.
/**
* AdapterState combines connector state with SDK state
* @template ConnectorState - Your snap-in's custom state type
*/
type AdapterState<ConnectorState> = ConnectorState & SdkState;Usage Examples:
// Define your connector state
interface MyConnectorState {
users: { completed: boolean; lastProcessedId?: string };
tasks: { completed: boolean; cursor?: string };
}
// Access and update state in worker
processTask({
task: async ({ adapter }) => {
// Get current state (includes both connector and SDK state)
const state: AdapterState<MyConnectorState> = adapter.state;
// Update connector state
adapter.state = {
...state,
users: { completed: true, lastProcessedId: 'user-123' },
};
// SDK state is automatically managed
console.log('Last sync:', state.lastSyncStarted);
console.log('Last successful sync:', state.lastSuccessfulSyncStarted);
},
onTimeout: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: 'Timeout' },
});
},
});State specific to extraction workflows (data moving to DevRev).
/**
* ToDevRev contains extraction-specific state
* Automatically managed by SDK during extraction phases
*/
interface ToDevRev {
/** Attachment metadata for tracking attachment extraction progress */
attachmentsMetadata: {
/** Array of artifact IDs containing attachment data */
artifactIds: string[];
/** Index of last processed attachment */
lastProcessed: number;
/** Optional list of processed attachment IDs */
lastProcessedAttachmentsIdsList?: string[];
};
}Usage Examples:
processTask({
task: async ({ adapter }) => {
// Access extraction state
const toDevRev = adapter.state.toDevRev;
if (toDevRev) {
// Check attachment processing progress
const artifactIds = toDevRev.attachmentsMetadata.artifactIds;
const lastProcessed = toDevRev.attachmentsMetadata.lastProcessed;
console.log(`Processed ${lastProcessed + 1} of ${artifactIds.length} artifacts`);
}
// SDK automatically updates toDevRev state during attachment extraction
await adapter.emit(ExtractorEventType.ExtractionAttachmentsDone);
},
onTimeout: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionAttachmentsError, {
error: { message: 'Timeout' },
});
},
});State specific to loading workflows (data coming from DevRev).
/**
* FromDevRev contains loading-specific state
* Tracks files and items to be loaded to external system
*/
interface FromDevRev {
/** Array of files containing items to load */
filesToLoad: FileToLoad[];
}
interface FileToLoad {
/** File identifier */
id: string;
/** File name */
file_name: string;
/** Type of items in file */
itemType: string;
/** Total count of items in file */
count: number;
/** Line number of next item to process */
lineToProcess: number;
/** Whether file processing is completed */
completed: boolean;
}Usage Examples:
processTask({
task: async ({ adapter }) => {
// Access loading state
const fromDevRev = adapter.state.fromDevRev;
if (!fromDevRev) {
console.log('No files to load');
await adapter.emit(LoaderEventType.DataLoadingDone);
return;
}
// Process each file
for (const file of fromDevRev.filesToLoad) {
if (file.completed) {
continue;
}
console.log(
`Processing ${file.file_name}: line ${file.lineToProcess} of ${file.count}`
);
// Load items from file
const items = await readItemsFromFile(file.file_name, file.lineToProcess);
for (const item of items) {
await loadItemToExternalSystem(item);
file.lineToProcess++;
// Update state periodically
adapter.state = adapter.state;
}
// Mark file as completed
file.completed = true;
}
await adapter.emit(LoaderEventType.DataLoadingDone);
},
onTimeout: async ({ adapter }) => {
// State is automatically saved with current progress
await adapter.emit(LoaderEventType.DataLoadingError, {
error: { message: 'Timeout' },
});
},
});Internal SDK state fields automatically managed by the SDK.
/**
* SdkState contains SDK-managed state fields
* These fields are automatically updated by the SDK
*/
interface SdkState {
/** ISO 8601 timestamp when current sync started (optional) */
lastSyncStarted?: string;
/** ISO 8601 timestamp when last successful sync started (optional) */
lastSuccessfulSyncStarted?: string;
/** Extraction-specific state (optional) */
toDevRev?: ToDevRev;
/** Loading-specific state (optional) */
fromDevRev?: FromDevRev;
/** Snap-in version ID (optional) */
snapInVersionId?: string;
}Usage Examples:
processTask({
task: async ({ adapter }) => {
const state = adapter.state;
// Check if this is an incremental sync
if (state.lastSuccessfulSyncStarted) {
console.log('Last successful sync:', state.lastSuccessfulSyncStarted);
// Use lastSuccessfulSyncStarted for incremental extraction
const extractFrom = state.lastSuccessfulSyncStarted;
await extractDataSince(extractFrom);
} else {
console.log('This is an initial sync');
await extractAllData();
}
await adapter.emit(ExtractorEventType.ExtractionDataDone);
},
onTimeout: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: 'Timeout' },
});
},
});Configuration interface for initializing state.
/**
* StateInterface defines parameters for state initialization
*/
interface StateInterface<ConnectorState> {
/** Airdrop event */
event: AirdropEvent;
/** Initial connector state */
initialState: ConnectorState;
/** Optional initial domain mapping */
initialDomainMapping?: InitialDomainMapping;
/** Optional worker options */
options?: WorkerAdapterOptions;
}Pre-defined SDK state structures for extraction and loading.
/**
* Default SDK state for extraction workflows
*/
const extractionSdkState = {
lastSyncStarted: '',
lastSuccessfulSyncStarted: '',
snapInVersionId: '',
toDevRev: {
attachmentsMetadata: {
artifactIds: [],
lastProcessed: 0,
lastProcessedAttachmentsIdsList: [],
},
},
};
/**
* Default SDK state for loading workflows
*/
const loadingSdkState = {
snapInVersionId: '',
fromDevRev: {
filesToLoad: [],
},
};State is automatically persisted when emitting events (except for stateless event types):
processTask({
task: async ({ adapter }) => {
// Update state
adapter.state = {
...adapter.state,
users: { completed: true },
};
// State is automatically saved when emitting
await adapter.emit(ExtractorEventType.ExtractionDataDone);
},
onTimeout: async ({ adapter }) => {
// State is saved even on timeout
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: 'Timeout' },
});
},
});Update state multiple times during processing:
processTask({
task: async ({ adapter }) => {
// Process users
await processUsers();
adapter.state = {
...adapter.state,
users: { completed: true, lastId: 'user-100' },
};
// Process tasks
await processTasks();
adapter.state = {
...adapter.state,
tasks: { completed: true, cursor: 'task-cursor-abc' },
};
// Final emit saves latest state
await adapter.emit(ExtractorEventType.ExtractionDataDone);
},
onTimeout: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: 'Timeout' },
});
},
});interface ExtractorState {
lastProcessedTimestamp?: string;
processedCount: number;
}
processTask({
task: async ({ adapter }) => {
const state = adapter.state;
const eventContext = adapter.event.payload.event_context;
// Determine extraction start point
let extractFrom: string;
if (eventContext.mode === 'INCREMENTAL') {
// Use lastSuccessfulSyncStarted or extract_from based on reset flag
extractFrom = eventContext.reset_extract_from
? eventContext.extract_from!
: state.lastSuccessfulSyncStarted || eventContext.extract_from!;
} else {
// Initial sync
extractFrom = eventContext.extract_from || '1970-01-01T00:00:00Z';
}
// Extract data
const data = await extractDataSince(extractFrom);
// Update state with progress
adapter.state = {
...state,
lastProcessedTimestamp: data.lastTimestamp,
processedCount: data.count,
};
await adapter.emit(ExtractorEventType.ExtractionDataDone);
},
onTimeout: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: 'Timeout' },
});
},
});interface ChunkedState {
currentPage: number;
totalPages: number;
itemsProcessed: number;
}
processTask({
task: async ({ adapter }) => {
let state = adapter.state;
// Resume from where we left off
const startPage = state.currentPage || 1;
for (let page = startPage; page <= (state.totalPages || Infinity); page++) {
const result = await fetchPage(page);
if (!state.totalPages) {
state.totalPages = result.totalPages;
}
// Process page data
await processPageData(result.data);
// Update state after each page
adapter.state = {
...state,
currentPage: page,
itemsProcessed: (state.itemsProcessed || 0) + result.data.length,
};
state = adapter.state;
// Check if done
if (page >= result.totalPages) {
break;
}
}
await adapter.emit(ExtractorEventType.ExtractionDataDone);
},
onTimeout: async ({ adapter }) => {
// State already saved with currentPage progress
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: 'Timeout' },
});
},
});processTask({
task: async ({ adapter }) => {
const fromDevRev = adapter.state.fromDevRev;
if (!fromDevRev) {
await adapter.emit(LoaderEventType.DataLoadingDone);
return;
}
// Track overall progress
let totalItems = 0;
let processedItems = 0;
for (const file of fromDevRev.filesToLoad) {
totalItems += file.count;
processedItems += file.lineToProcess;
}
console.log(`Overall progress: ${processedItems}/${totalItems}`);
// Process files
for (const file of fromDevRev.filesToLoad) {
if (file.completed) {
continue;
}
while (file.lineToProcess < file.count) {
const item = await readItemAtLine(file.file_name, file.lineToProcess);
await loadItem(item);
file.lineToProcess++;
processedItems++;
// Update state every 100 items
if (processedItems % 100 === 0) {
adapter.state = adapter.state;
console.log(`Progress: ${processedItems}/${totalItems}`);
}
}
file.completed = true;
}
await adapter.emit(LoaderEventType.DataLoadingDone);
},
onTimeout: async ({ adapter }) => {
await adapter.emit(LoaderEventType.DataLoadingError, {
error: { message: 'Timeout' },
});
},
});const initialState: MyConnectorState = {
users: { completed: false },
tasks: { completed: false },
initialized: true,
};
await spawn({
event,
initialState,
workerPath: './worker.js',
});// Define clear state structure
interface ExtractorState {
phases: {
users: PhaseState;
tasks: PhaseState;
comments: PhaseState;
};
metadata: {
startTime: string;
version: string;
};
}
interface PhaseState {
completed: boolean;
lastId?: string;
count?: number;
}processTask({
task: async ({ adapter }) => {
let state = adapter.state;
// Migrate old state format to new format
if (!state.version || state.version < '2.0') {
state = migrateState(state);
adapter.state = state;
}
// Continue processing
},
onTimeout: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: 'Timeout' },
});
},
});