or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

http-client.mdindex.mdlogger.mdmappers.mdrepository-management.mdstate-management.mdtypes-extraction.mdtypes-loading.mdworker-management.md
tile.json

types-extraction.mddocs/

Extraction Types

Type definitions for extraction workflows in the Airdrop SDK. These types define the structure of events, data, and responses used during data extraction from external systems to DevRev.

Capabilities

Event Types

Enums defining the types of events exchanged between the Airdrop platform and extractors.

/**
 * EventType defines events sent to extractors from Airdrop platform
 * Extractors receive these events to determine what action to take
 */
enum EventType {
  // External Sync Units
  ExtractionExternalSyncUnitsStart = 'EXTRACTION_EXTERNAL_SYNC_UNITS_START',

  // Metadata
  ExtractionMetadataStart = 'EXTRACTION_METADATA_START',

  // Data Extraction
  ExtractionDataStart = 'EXTRACTION_DATA_START',
  ExtractionDataContinue = 'EXTRACTION_DATA_CONTINUE',
  ExtractionDataDelete = 'EXTRACTION_DATA_DELETE',

  // Attachments Extraction
  ExtractionAttachmentsStart = 'EXTRACTION_ATTACHMENTS_START',
  ExtractionAttachmentsContinue = 'EXTRACTION_ATTACHMENTS_CONTINUE',
  ExtractionAttachmentsDelete = 'EXTRACTION_ATTACHMENTS_DELETE',

  // Loading Events
  StartLoadingData = 'START_LOADING_DATA',
  ContinueLoadingData = 'CONTINUE_LOADING_DATA',
  StartLoadingAttachments = 'START_LOADING_ATTACHMENTS',
  ContinueLoadingAttachments = 'CONTINUE_LOADING_ATTACHMENTS',
  StartDeletingLoaderState = 'START_DELETING_LOADER_STATE',
  StartDeletingLoaderAttachmentState = 'START_DELETING_LOADER_ATTACHMENT_STATE',
}

/**
 * ExtractorEventType defines events sent from extractors to Airdrop platform
 * Extractors emit these events to inform the platform about progress
 */
enum ExtractorEventType {
  // External Sync Units
  ExtractionExternalSyncUnitsDone = 'EXTRACTION_EXTERNAL_SYNC_UNITS_DONE',
  ExtractionExternalSyncUnitsError = 'EXTRACTION_EXTERNAL_SYNC_UNITS_ERROR',

  // Metadata
  ExtractionMetadataDone = 'EXTRACTION_METADATA_DONE',
  ExtractionMetadataError = 'EXTRACTION_METADATA_ERROR',

  // Data Extraction
  ExtractionDataProgress = 'EXTRACTION_DATA_PROGRESS',
  ExtractionDataDelay = 'EXTRACTION_DATA_DELAY',
  ExtractionDataDone = 'EXTRACTION_DATA_DONE',
  ExtractionDataError = 'EXTRACTION_DATA_ERROR',
  ExtractionDataDeleteDone = 'EXTRACTION_DATA_DELETE_DONE',
  ExtractionDataDeleteError = 'EXTRACTION_DATA_DELETE_ERROR',

  // Attachments Extraction
  ExtractionAttachmentsProgress = 'EXTRACTION_ATTACHMENTS_PROGRESS',
  ExtractionAttachmentsDelay = 'EXTRACTION_ATTACHMENTS_DELAY',
  ExtractionAttachmentsDone = 'EXTRACTION_ATTACHMENTS_DONE',
  ExtractionAttachmentsError = 'EXTRACTION_ATTACHMENTS_ERROR',
  ExtractionAttachmentsDeleteDone = 'EXTRACTION_ATTACHMENTS_DELETE_DONE',
  ExtractionAttachmentsDeleteError = 'EXTRACTION_ATTACHMENTS_DELETE_ERROR',

  // Unknown
  UnknownEventType = 'UNKNOWN_EVENT_TYPE',
}

Usage Examples:

import { processTask, EventType, ExtractorEventType } from '@devrev/ts-adaas';

processTask({
  task: async ({ adapter }) => {
    // Check which event type was received
    const eventType = adapter.event.payload.event_type;

    if (eventType === EventType.ExtractionDataStart) {
      // Handle initial data extraction
      await extractAllData();
      await adapter.emit(ExtractorEventType.ExtractionDataDone);
    } else if (eventType === EventType.ExtractionDataContinue) {
      // Handle continued data extraction
      await extractMoreData();
      await adapter.emit(ExtractorEventType.ExtractionDataDone);
    }
  },
  onTimeout: async ({ adapter }) => {
    await adapter.emit(ExtractorEventType.ExtractionDataError, {
      error: { message: 'Timeout' },
    });
  },
});

// Emit progress events
await adapter.emit(ExtractorEventType.ExtractionDataProgress);

// Emit delay events for rate limiting
await adapter.emit(ExtractorEventType.ExtractionDataDelay, {
  delay: 60, // Wait 60 seconds
});

Airdrop Event Structure

The main event structure received from the Airdrop platform.

/**
 * AirdropEvent is the complete event structure sent to extractors
 * Contains authentication, context, and payload information
 */
interface AirdropEvent {
  /** Context containing authentication and snap-in information */
  context: {
    secrets: {
      /** DevRev service account token for platform authentication */
      service_account_token: string;
    };
    /** Version ID of the snap-in */
    snap_in_version_id: string;
    /** ID of the snap-in */
    snap_in_id: string;
  };
  /** Event payload with connection data and event context */
  payload: AirdropMessage;
  /** Execution metadata */
  execution_metadata: {
    /** DevRev API endpoint URL */
    devrev_endpoint: string;
  };
  /** Input data from DevRev TypeScript SDK */
  input_data: InputData;
}

/**
 * AirdropMessage contains the event payload
 */
interface AirdropMessage {
  /** Connection information for the external system */
  connection_data: ConnectionData;
  /** Event context with sync details */
  event_context: EventContext;
  /** Type of event being sent */
  event_type: EventType;
  /** Optional event-specific data */
  event_data?: EventData;
}

Usage Examples:

processTask({
  task: async ({ adapter }) => {
    // Access authentication token
    const token = adapter.event.context.secrets.service_account_token;

    // Access DevRev endpoint
    const endpoint = adapter.event.execution_metadata.devrev_endpoint;

    // Access connection data
    const { org_id, key } = adapter.event.payload.connection_data;

    // Access event context
    const eventContext = adapter.event.payload.event_context;
    const syncUnit = eventContext.sync_unit;
    const mode = eventContext.mode; // INITIAL or INCREMENTAL
  },
  onTimeout: async ({ adapter }) => {
    await adapter.emit(ExtractorEventType.ExtractionDataError, {
      error: { message: 'Timeout' },
    });
  },
});

Event Context

Detailed context information about the extraction event.

/**
 * EventContext provides detailed information about the extraction request
 */
interface EventContext {
  /** Callback URL for emitting events back to platform */
  callback_url: string;

  /** Organization ID (preferred field) */
  dev_oid: string;
  /** Organization ID */
  dev_org_id: string;
  /** User ID (preferred field) */
  dev_uid: string;

  /** Event type identifier for Airdrop */
  event_type_adaas: string;

  /** External sync unit ID (preferred field) */
  external_sync_unit_id: string;
  /** External sync unit name */
  external_sync_unit_name: string;

  /** External system ID (preferred field) */
  external_system_id: string;
  /** External system name */
  external_system_name: string;
  /** External system type */
  external_system_type: string;

  /** RFC3339 timestamp to extract from (optional) */
  extract_from?: string;

  /** Import slug identifier */
  import_slug: string;

  /** Initial sync scope (optional) */
  initial_sync_scope?: InitialSyncScope;

  /** Sync mode: INITIAL, INCREMENTAL, or LOADING */
  mode: string;

  /** Request ID */
  request_id: string;
  /** Airdrop request ID */
  request_id_adaas: string;

  /** Whether to reset extraction from extract_from timestamp (optional) */
  reset_extract_from?: boolean;

  /** Run ID (preferred field) */
  run_id: string;

  /** Sequence version */
  sequence_version: string;

  /** Snap-in slug */
  snap_in_slug: string;
  /** Snap-in version ID */
  snap_in_version_id: string;

  /** Sync tier */
  sync_tier: string;

  /** Sync unit (DevRev object ID) */
  sync_unit: DonV2;
  /** Sync unit ID */
  sync_unit_id: string;

  /** Worker data URL */
  worker_data_url: string;
}

/**
 * Initial sync scope options
 */
enum InitialSyncScope {
  FULL_HISTORY = 'full-history',
  TIME_SCOPED = 'time-scoped',
}

Usage Examples:

processTask({
  task: async ({ adapter }) => {
    const eventContext = adapter.event.payload.event_context;

    // Determine sync mode
    if (eventContext.mode === 'INITIAL') {
      // Full initial sync
      if (eventContext.initial_sync_scope === InitialSyncScope.FULL_HISTORY) {
        await extractFullHistory();
      } else {
        // Time-scoped initial sync
        const extractFrom = eventContext.extract_from;
        await extractFromDate(extractFrom);
      }
    } else if (eventContext.mode === 'INCREMENTAL') {
      // Incremental sync
      const lastSyncTime = adapter.state.lastSuccessfulSyncStarted;
      const extractFrom = eventContext.reset_extract_from
        ? eventContext.extract_from
        : lastSyncTime;
      await extractIncrementalData(extractFrom);
    }

    // Access identifiers
    const syncUnit = eventContext.sync_unit;
    const externalSystemId = eventContext.external_system_id;
  },
  onTimeout: async ({ adapter }) => {
    await adapter.emit(ExtractorEventType.ExtractionDataError, {
      error: { message: 'Timeout' },
    });
  },
});

Connection Data

Information about the connection to the external system.

/**
 * ConnectionData contains authentication and connection information
 */
interface ConnectionData {
  /** Organization ID */
  org_id: string;
  /** Organization name */
  org_name: string;
  /** Authentication key (e.g., API key, token) */
  key: string;
  /** Type of authentication key */
  key_type: string;
}

Usage Examples:

processTask({
  task: async ({ adapter }) => {
    const { key, key_type, org_id } = adapter.event.payload.connection_data;

    // Use connection data to authenticate with external system
    const httpClient = new ExternalSystemClient({
      apiKey: key,
      keyType: key_type,
      orgId: org_id,
    });

    await httpClient.authenticate();
  },
  onTimeout: async ({ adapter }) => {
    await adapter.emit(ExtractorEventType.ExtractionDataError, {
      error: { message: 'Timeout' },
    });
  },
});

Event Data

Data structure for event payloads sent to and from the platform.

/**
 * EventData contains optional data included with events
 */
interface EventData {
  /** External sync units to be processed (optional) */
  external_sync_units?: ExternalSyncUnit[];

  /** Error information if an error occurred (optional) */
  error?: ErrorRecord;

  /** Delay duration in seconds for rate limiting (optional) */
  delay?: number;

  /** Loader reports for loading operations (optional) */
  reports?: LoaderReport[];

  /** Processed file names (optional) */
  processed_files?: string[];

  /** Stats file name (optional) */
  stats_file?: string;
}

interface ErrorRecord {
  /** Error message */
  message: string;
}

Usage Examples:

// Emit external sync units
await adapter.emit(ExtractorEventType.ExtractionExternalSyncUnitsDone, {
  external_sync_units: [
    { id: 'repo-1', name: 'Main Repo', description: 'Main repository' },
    { id: 'repo-2', name: 'Test Repo', description: 'Test repository' },
  ],
});

// Emit error
await adapter.emit(ExtractorEventType.ExtractionDataError, {
  error: { message: 'Failed to connect to external system' },
});

// Emit delay for rate limiting
await adapter.emit(ExtractorEventType.ExtractionDataDelay, {
  delay: 60, // Wait 60 seconds before retrying
});

External Sync Unit

Represents a unit of synchronization (e.g., repository, project, workspace).

/**
 * ExternalSyncUnit defines a synchronizable unit in the external system
 */
interface ExternalSyncUnit {
  /** Unique identifier for the sync unit */
  id: string;
  /** Display name of the sync unit */
  name: string;
  /** Description of the sync unit */
  description: string;
  /** Optional count of items in the sync unit */
  item_count?: number;
  /** Optional type of items in the sync unit */
  item_type?: string;
}

Usage Examples:

processTask({
  task: async ({ adapter }) => {
    // Fetch repositories from external system
    const repositories = await externalApi.getRepositories();

    // Convert to external sync units
    const syncUnits: ExternalSyncUnit[] = repositories.map((repo) => ({
      id: repo.id,
      name: repo.name,
      description: repo.description || `Repository: ${repo.name}`,
      item_count: repo.issue_count,
      item_type: 'issues',
    }));

    // Emit sync units
    await adapter.emit(ExtractorEventType.ExtractionExternalSyncUnitsDone, {
      external_sync_units: syncUnits,
    });
  },
  onTimeout: async ({ adapter }) => {
    await adapter.emit(ExtractorEventType.ExtractionExternalSyncUnitsError, {
      error: { message: 'Timeout' },
    });
  },
});

Extractor Event Structure

Events sent from extractors to the Airdrop platform.

/**
 * ExtractorEvent is the structure of events sent to Airdrop platform
 */
interface ExtractorEvent {
  /** Type of event being emitted */
  event_type: string;
  /** Event context from the original Airdrop event */
  event_context: EventContext;
  /** Optional event data */
  event_data?: EventData;
  /** Optional worker metadata */
  worker_metadata?: WorkerMetadata;
}

/**
 * WorkerMetadata contains information about the worker
 */
interface WorkerMetadata {
  /** Version of the Airdrop library being used */
  adaas_library_version: string;
}

Attachment Processing Types

Types for handling attachment streaming and processing.

/**
 * Function type for streaming attachments from external systems
 */
type ExternalSystemAttachmentStreamingFunction = (
  params: ExternalSystemAttachmentStreamingParams
) => Promise<ExternalSystemAttachmentStreamingResponse>;

interface ExternalSystemAttachmentStreamingParams {
  /** Normalized attachment to stream */
  item: NormalizedAttachment;
  /** Airdrop event context */
  event: AirdropEvent;
}

interface ExternalSystemAttachmentStreamingResponse {
  /** HTTP stream response (optional) */
  httpStream?: AxiosResponse;
  /** Error if streaming failed (optional) */
  error?: ErrorRecord;
  /** Delay in seconds if rate limited (optional) */
  delay?: number;
}

/**
 * Return type for attachment processing functions
 */
type ProcessAttachmentReturnType =
  | {
      delay?: number;
      error?: { message: string };
    }
  | undefined;

/**
 * Function type for processing individual attachments
 */
type ExternalProcessAttachmentFunction = (params: {
  attachment: NormalizedAttachment;
  stream: ExternalSystemAttachmentStreamingFunction;
}) => Promise<ProcessAttachmentReturnType>;

/**
 * Function type for reducing/batching attachments
 */
type ExternalSystemAttachmentReducerFunction<Batch, NewBatch, ConnectorState> = (params: {
  attachments: Batch;
  adapter: WorkerAdapter<ConnectorState>;
  batchSize?: number;
}) => NewBatch;

/**
 * Function type for iterating over reduced attachments
 */
type ExternalSystemAttachmentIteratorFunction<NewBatch, ConnectorState> = (params: {
  reducedAttachments: NewBatch;
  adapter: WorkerAdapter<ConnectorState>;
  stream: ExternalSystemAttachmentStreamingFunction;
}) => Promise<ProcessAttachmentReturnType>;

/**
 * Attachment processors configuration
 */
interface ExternalSystemAttachmentProcessors<ConnectorState, Batch, NewBatch> {
  reducer: ExternalSystemAttachmentReducerFunction<Batch, NewBatch, ConnectorState>;
  iterator: ExternalSystemAttachmentIteratorFunction<NewBatch, ConnectorState>;
}

Usage Examples:

// Define attachment streaming function
const streamAttachment: ExternalSystemAttachmentStreamingFunction = async ({
  item,
  event,
}) => {
  try {
    const httpStream = await axios.get(item.url, { responseType: 'stream' });
    return { httpStream };
  } catch (error) {
    if (error.response?.status === 429) {
      return { delay: 60 }; // Rate limited, wait 60 seconds
    }
    return { error: { message: `Failed to stream: ${error.message}` } };
  }
};

// Process individual attachment
const processAttachment: ExternalProcessAttachmentFunction = async ({
  attachment,
  stream,
}) => {
  const result = await stream({ item: attachment, event: adapter.event });

  if (result.error) {
    return { error: result.error };
  }

  if (result.delay) {
    return { delay: result.delay };
  }

  // Handle successful stream
  return undefined;
};

Deprecated Types

The following types are deprecated and should not be used in new code:

/**
 * @deprecated Use SyncMode instead
 */
enum ExtractionMode {
  INITIAL = 'INITIAL',
  INCREMENTAL = 'INCREMENTAL',
}

/**
 * @deprecated
 */
interface EventContextIn {
  callback_url: string;
  dev_org: string;
  dev_org_id: string;
  dev_user: string;
  dev_user_id: string;
  external_sync_unit: string;
  external_sync_unit_id: string;
  external_sync_unit_name: string;
  external_system: string;
  external_system_type: string;
  import_slug: string;
  mode: string;
  request_id: string;
  snap_in_slug: string;
  sync_run: string;
  sync_run_id: string;
  sync_tier: string;
  sync_unit: DonV2;
  sync_unit_id: string;
  uuid: string;
  worker_data_url: string;
}

/**
 * @deprecated
 */
interface EventContextOut {
  uuid: string;
  sync_run: string;
  sync_unit?: string;
}

/**
 * @deprecated
 */
interface DomainObjectState {
  name: string;
  nextChunkId: number;
  pages?: { pages: number[] };
  lastModified: string;
  isDone: boolean;
  count: number;
}

Common Patterns

Event Type Flow

// 1. External Sync Units Discovery
EventType.ExtractionExternalSyncUnitsStart → ExtractorEventType.ExtractionExternalSyncUnitsDone;

// 2. Metadata Extraction
EventType.ExtractionMetadataStart → ExtractorEventType.ExtractionMetadataDone;

// 3. Data Extraction
EventType.ExtractionDataStart → ExtractorEventType.ExtractionDataDone;
EventType.ExtractionDataContinue → ExtractorEventType.ExtractionDataDone;

// 4. Attachments Extraction
EventType.ExtractionAttachmentsStart → ExtractorEventType.ExtractionAttachmentsDone;
EventType.ExtractionAttachmentsContinue → ExtractorEventType.ExtractionAttachmentsDone;

Error Handling Pattern

processTask({
  task: async ({ adapter }) => {
    try {
      await performExtraction();
      await adapter.emit(ExtractorEventType.ExtractionDataDone);
    } catch (error) {
      await adapter.emit(ExtractorEventType.ExtractionDataError, {
        error: { message: error.message },
      });
    }
  },
  onTimeout: async ({ adapter }) => {
    await adapter.emit(ExtractorEventType.ExtractionDataError, {
      error: { message: 'Lambda timeout' },
    });
  },
});