Type definitions for extraction workflows in the Airdrop SDK. These types define the structure of events, data, and responses used during data extraction from external systems to DevRev.
Enums defining the types of events exchanged between the Airdrop platform and extractors.
/**
* EventType defines events sent to extractors from Airdrop platform
* Extractors receive these events to determine what action to take
*/
enum EventType {
// External Sync Units
ExtractionExternalSyncUnitsStart = 'EXTRACTION_EXTERNAL_SYNC_UNITS_START',
// Metadata
ExtractionMetadataStart = 'EXTRACTION_METADATA_START',
// Data Extraction
ExtractionDataStart = 'EXTRACTION_DATA_START',
ExtractionDataContinue = 'EXTRACTION_DATA_CONTINUE',
ExtractionDataDelete = 'EXTRACTION_DATA_DELETE',
// Attachments Extraction
ExtractionAttachmentsStart = 'EXTRACTION_ATTACHMENTS_START',
ExtractionAttachmentsContinue = 'EXTRACTION_ATTACHMENTS_CONTINUE',
ExtractionAttachmentsDelete = 'EXTRACTION_ATTACHMENTS_DELETE',
// Loading Events
StartLoadingData = 'START_LOADING_DATA',
ContinueLoadingData = 'CONTINUE_LOADING_DATA',
StartLoadingAttachments = 'START_LOADING_ATTACHMENTS',
ContinueLoadingAttachments = 'CONTINUE_LOADING_ATTACHMENTS',
StartDeletingLoaderState = 'START_DELETING_LOADER_STATE',
StartDeletingLoaderAttachmentState = 'START_DELETING_LOADER_ATTACHMENT_STATE',
}
/**
* ExtractorEventType defines events sent from extractors to Airdrop platform
* Extractors emit these events to inform the platform about progress
*/
enum ExtractorEventType {
// External Sync Units
ExtractionExternalSyncUnitsDone = 'EXTRACTION_EXTERNAL_SYNC_UNITS_DONE',
ExtractionExternalSyncUnitsError = 'EXTRACTION_EXTERNAL_SYNC_UNITS_ERROR',
// Metadata
ExtractionMetadataDone = 'EXTRACTION_METADATA_DONE',
ExtractionMetadataError = 'EXTRACTION_METADATA_ERROR',
// Data Extraction
ExtractionDataProgress = 'EXTRACTION_DATA_PROGRESS',
ExtractionDataDelay = 'EXTRACTION_DATA_DELAY',
ExtractionDataDone = 'EXTRACTION_DATA_DONE',
ExtractionDataError = 'EXTRACTION_DATA_ERROR',
ExtractionDataDeleteDone = 'EXTRACTION_DATA_DELETE_DONE',
ExtractionDataDeleteError = 'EXTRACTION_DATA_DELETE_ERROR',
// Attachments Extraction
ExtractionAttachmentsProgress = 'EXTRACTION_ATTACHMENTS_PROGRESS',
ExtractionAttachmentsDelay = 'EXTRACTION_ATTACHMENTS_DELAY',
ExtractionAttachmentsDone = 'EXTRACTION_ATTACHMENTS_DONE',
ExtractionAttachmentsError = 'EXTRACTION_ATTACHMENTS_ERROR',
ExtractionAttachmentsDeleteDone = 'EXTRACTION_ATTACHMENTS_DELETE_DONE',
ExtractionAttachmentsDeleteError = 'EXTRACTION_ATTACHMENTS_DELETE_ERROR',
// Unknown
UnknownEventType = 'UNKNOWN_EVENT_TYPE',
}Usage Examples:
import { processTask, EventType, ExtractorEventType } from '@devrev/ts-adaas';
processTask({
task: async ({ adapter }) => {
// Check which event type was received
const eventType = adapter.event.payload.event_type;
if (eventType === EventType.ExtractionDataStart) {
// Handle initial data extraction
await extractAllData();
await adapter.emit(ExtractorEventType.ExtractionDataDone);
} else if (eventType === EventType.ExtractionDataContinue) {
// Handle continued data extraction
await extractMoreData();
await adapter.emit(ExtractorEventType.ExtractionDataDone);
}
},
onTimeout: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: 'Timeout' },
});
},
});
// Emit progress events
await adapter.emit(ExtractorEventType.ExtractionDataProgress);
// Emit delay events for rate limiting
await adapter.emit(ExtractorEventType.ExtractionDataDelay, {
delay: 60, // Wait 60 seconds
});The main event structure received from the Airdrop platform.
/**
* AirdropEvent is the complete event structure sent to extractors
* Contains authentication, context, and payload information
*/
interface AirdropEvent {
/** Context containing authentication and snap-in information */
context: {
secrets: {
/** DevRev service account token for platform authentication */
service_account_token: string;
};
/** Version ID of the snap-in */
snap_in_version_id: string;
/** ID of the snap-in */
snap_in_id: string;
};
/** Event payload with connection data and event context */
payload: AirdropMessage;
/** Execution metadata */
execution_metadata: {
/** DevRev API endpoint URL */
devrev_endpoint: string;
};
/** Input data from DevRev TypeScript SDK */
input_data: InputData;
}
/**
* AirdropMessage contains the event payload
*/
interface AirdropMessage {
/** Connection information for the external system */
connection_data: ConnectionData;
/** Event context with sync details */
event_context: EventContext;
/** Type of event being sent */
event_type: EventType;
/** Optional event-specific data */
event_data?: EventData;
}Usage Examples:
processTask({
task: async ({ adapter }) => {
// Access authentication token
const token = adapter.event.context.secrets.service_account_token;
// Access DevRev endpoint
const endpoint = adapter.event.execution_metadata.devrev_endpoint;
// Access connection data
const { org_id, key } = adapter.event.payload.connection_data;
// Access event context
const eventContext = adapter.event.payload.event_context;
const syncUnit = eventContext.sync_unit;
const mode = eventContext.mode; // INITIAL or INCREMENTAL
},
onTimeout: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: 'Timeout' },
});
},
});Detailed context information about the extraction event.
/**
* EventContext provides detailed information about the extraction request
*/
interface EventContext {
/** Callback URL for emitting events back to platform */
callback_url: string;
/** Organization ID (preferred field) */
dev_oid: string;
/** Organization ID */
dev_org_id: string;
/** User ID (preferred field) */
dev_uid: string;
/** Event type identifier for Airdrop */
event_type_adaas: string;
/** External sync unit ID (preferred field) */
external_sync_unit_id: string;
/** External sync unit name */
external_sync_unit_name: string;
/** External system ID (preferred field) */
external_system_id: string;
/** External system name */
external_system_name: string;
/** External system type */
external_system_type: string;
/** RFC3339 timestamp to extract from (optional) */
extract_from?: string;
/** Import slug identifier */
import_slug: string;
/** Initial sync scope (optional) */
initial_sync_scope?: InitialSyncScope;
/** Sync mode: INITIAL, INCREMENTAL, or LOADING */
mode: string;
/** Request ID */
request_id: string;
/** Airdrop request ID */
request_id_adaas: string;
/** Whether to reset extraction from extract_from timestamp (optional) */
reset_extract_from?: boolean;
/** Run ID (preferred field) */
run_id: string;
/** Sequence version */
sequence_version: string;
/** Snap-in slug */
snap_in_slug: string;
/** Snap-in version ID */
snap_in_version_id: string;
/** Sync tier */
sync_tier: string;
/** Sync unit (DevRev object ID) */
sync_unit: DonV2;
/** Sync unit ID */
sync_unit_id: string;
/** Worker data URL */
worker_data_url: string;
}
/**
* Initial sync scope options
*/
enum InitialSyncScope {
FULL_HISTORY = 'full-history',
TIME_SCOPED = 'time-scoped',
}Usage Examples:
processTask({
task: async ({ adapter }) => {
const eventContext = adapter.event.payload.event_context;
// Determine sync mode
if (eventContext.mode === 'INITIAL') {
// Full initial sync
if (eventContext.initial_sync_scope === InitialSyncScope.FULL_HISTORY) {
await extractFullHistory();
} else {
// Time-scoped initial sync
const extractFrom = eventContext.extract_from;
await extractFromDate(extractFrom);
}
} else if (eventContext.mode === 'INCREMENTAL') {
// Incremental sync
const lastSyncTime = adapter.state.lastSuccessfulSyncStarted;
const extractFrom = eventContext.reset_extract_from
? eventContext.extract_from
: lastSyncTime;
await extractIncrementalData(extractFrom);
}
// Access identifiers
const syncUnit = eventContext.sync_unit;
const externalSystemId = eventContext.external_system_id;
},
onTimeout: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: 'Timeout' },
});
},
});Information about the connection to the external system.
/**
* ConnectionData contains authentication and connection information
*/
interface ConnectionData {
/** Organization ID */
org_id: string;
/** Organization name */
org_name: string;
/** Authentication key (e.g., API key, token) */
key: string;
/** Type of authentication key */
key_type: string;
}Usage Examples:
processTask({
task: async ({ adapter }) => {
const { key, key_type, org_id } = adapter.event.payload.connection_data;
// Use connection data to authenticate with external system
const httpClient = new ExternalSystemClient({
apiKey: key,
keyType: key_type,
orgId: org_id,
});
await httpClient.authenticate();
},
onTimeout: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: 'Timeout' },
});
},
});Data structure for event payloads sent to and from the platform.
/**
* EventData contains optional data included with events
*/
interface EventData {
/** External sync units to be processed (optional) */
external_sync_units?: ExternalSyncUnit[];
/** Error information if an error occurred (optional) */
error?: ErrorRecord;
/** Delay duration in seconds for rate limiting (optional) */
delay?: number;
/** Loader reports for loading operations (optional) */
reports?: LoaderReport[];
/** Processed file names (optional) */
processed_files?: string[];
/** Stats file name (optional) */
stats_file?: string;
}
interface ErrorRecord {
/** Error message */
message: string;
}Usage Examples:
// Emit external sync units
await adapter.emit(ExtractorEventType.ExtractionExternalSyncUnitsDone, {
external_sync_units: [
{ id: 'repo-1', name: 'Main Repo', description: 'Main repository' },
{ id: 'repo-2', name: 'Test Repo', description: 'Test repository' },
],
});
// Emit error
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: 'Failed to connect to external system' },
});
// Emit delay for rate limiting
await adapter.emit(ExtractorEventType.ExtractionDataDelay, {
delay: 60, // Wait 60 seconds before retrying
});Represents a unit of synchronization (e.g., repository, project, workspace).
/**
* ExternalSyncUnit defines a synchronizable unit in the external system
*/
interface ExternalSyncUnit {
/** Unique identifier for the sync unit */
id: string;
/** Display name of the sync unit */
name: string;
/** Description of the sync unit */
description: string;
/** Optional count of items in the sync unit */
item_count?: number;
/** Optional type of items in the sync unit */
item_type?: string;
}Usage Examples:
processTask({
task: async ({ adapter }) => {
// Fetch repositories from external system
const repositories = await externalApi.getRepositories();
// Convert to external sync units
const syncUnits: ExternalSyncUnit[] = repositories.map((repo) => ({
id: repo.id,
name: repo.name,
description: repo.description || `Repository: ${repo.name}`,
item_count: repo.issue_count,
item_type: 'issues',
}));
// Emit sync units
await adapter.emit(ExtractorEventType.ExtractionExternalSyncUnitsDone, {
external_sync_units: syncUnits,
});
},
onTimeout: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionExternalSyncUnitsError, {
error: { message: 'Timeout' },
});
},
});Events sent from extractors to the Airdrop platform.
/**
* ExtractorEvent is the structure of events sent to Airdrop platform
*/
interface ExtractorEvent {
/** Type of event being emitted */
event_type: string;
/** Event context from the original Airdrop event */
event_context: EventContext;
/** Optional event data */
event_data?: EventData;
/** Optional worker metadata */
worker_metadata?: WorkerMetadata;
}
/**
* WorkerMetadata contains information about the worker
*/
interface WorkerMetadata {
/** Version of the Airdrop library being used */
adaas_library_version: string;
}Types for handling attachment streaming and processing.
/**
* Function type for streaming attachments from external systems
*/
type ExternalSystemAttachmentStreamingFunction = (
params: ExternalSystemAttachmentStreamingParams
) => Promise<ExternalSystemAttachmentStreamingResponse>;
interface ExternalSystemAttachmentStreamingParams {
/** Normalized attachment to stream */
item: NormalizedAttachment;
/** Airdrop event context */
event: AirdropEvent;
}
interface ExternalSystemAttachmentStreamingResponse {
/** HTTP stream response (optional) */
httpStream?: AxiosResponse;
/** Error if streaming failed (optional) */
error?: ErrorRecord;
/** Delay in seconds if rate limited (optional) */
delay?: number;
}
/**
* Return type for attachment processing functions
*/
type ProcessAttachmentReturnType =
| {
delay?: number;
error?: { message: string };
}
| undefined;
/**
* Function type for processing individual attachments
*/
type ExternalProcessAttachmentFunction = (params: {
attachment: NormalizedAttachment;
stream: ExternalSystemAttachmentStreamingFunction;
}) => Promise<ProcessAttachmentReturnType>;
/**
* Function type for reducing/batching attachments
*/
type ExternalSystemAttachmentReducerFunction<Batch, NewBatch, ConnectorState> = (params: {
attachments: Batch;
adapter: WorkerAdapter<ConnectorState>;
batchSize?: number;
}) => NewBatch;
/**
* Function type for iterating over reduced attachments
*/
type ExternalSystemAttachmentIteratorFunction<NewBatch, ConnectorState> = (params: {
reducedAttachments: NewBatch;
adapter: WorkerAdapter<ConnectorState>;
stream: ExternalSystemAttachmentStreamingFunction;
}) => Promise<ProcessAttachmentReturnType>;
/**
* Attachment processors configuration
*/
interface ExternalSystemAttachmentProcessors<ConnectorState, Batch, NewBatch> {
reducer: ExternalSystemAttachmentReducerFunction<Batch, NewBatch, ConnectorState>;
iterator: ExternalSystemAttachmentIteratorFunction<NewBatch, ConnectorState>;
}Usage Examples:
// Define attachment streaming function
const streamAttachment: ExternalSystemAttachmentStreamingFunction = async ({
item,
event,
}) => {
try {
const httpStream = await axios.get(item.url, { responseType: 'stream' });
return { httpStream };
} catch (error) {
if (error.response?.status === 429) {
return { delay: 60 }; // Rate limited, wait 60 seconds
}
return { error: { message: `Failed to stream: ${error.message}` } };
}
};
// Process individual attachment
const processAttachment: ExternalProcessAttachmentFunction = async ({
attachment,
stream,
}) => {
const result = await stream({ item: attachment, event: adapter.event });
if (result.error) {
return { error: result.error };
}
if (result.delay) {
return { delay: result.delay };
}
// Handle successful stream
return undefined;
};The following types are deprecated and should not be used in new code:
/**
* @deprecated Use SyncMode instead
*/
enum ExtractionMode {
INITIAL = 'INITIAL',
INCREMENTAL = 'INCREMENTAL',
}
/**
* @deprecated
*/
interface EventContextIn {
callback_url: string;
dev_org: string;
dev_org_id: string;
dev_user: string;
dev_user_id: string;
external_sync_unit: string;
external_sync_unit_id: string;
external_sync_unit_name: string;
external_system: string;
external_system_type: string;
import_slug: string;
mode: string;
request_id: string;
snap_in_slug: string;
sync_run: string;
sync_run_id: string;
sync_tier: string;
sync_unit: DonV2;
sync_unit_id: string;
uuid: string;
worker_data_url: string;
}
/**
* @deprecated
*/
interface EventContextOut {
uuid: string;
sync_run: string;
sync_unit?: string;
}
/**
* @deprecated
*/
interface DomainObjectState {
name: string;
nextChunkId: number;
pages?: { pages: number[] };
lastModified: string;
isDone: boolean;
count: number;
}// 1. External Sync Units Discovery
EventType.ExtractionExternalSyncUnitsStart → ExtractorEventType.ExtractionExternalSyncUnitsDone;
// 2. Metadata Extraction
EventType.ExtractionMetadataStart → ExtractorEventType.ExtractionMetadataDone;
// 3. Data Extraction
EventType.ExtractionDataStart → ExtractorEventType.ExtractionDataDone;
EventType.ExtractionDataContinue → ExtractorEventType.ExtractionDataDone;
// 4. Attachments Extraction
EventType.ExtractionAttachmentsStart → ExtractorEventType.ExtractionAttachmentsDone;
EventType.ExtractionAttachmentsContinue → ExtractorEventType.ExtractionAttachmentsDone;processTask({
task: async ({ adapter }) => {
try {
await performExtraction();
await adapter.emit(ExtractorEventType.ExtractionDataDone);
} catch (error) {
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: error.message },
});
}
},
onTimeout: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: 'Lambda timeout' },
});
},
});