or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

http-client.mdindex.mdlogger.mdmappers.mdrepository-management.mdstate-management.mdtypes-extraction.mdtypes-loading.mdworker-management.md
tile.json

state-management.mddocs/

State Management

State management in the Airdrop SDK maintains extraction and loading state across worker invocations. The SDK automatically persists state when events are emitted, ensuring recovery from timeouts and enabling incremental processing.

Capabilities

Adapter State

The main state type combining connector-specific state with SDK state.

/**
 * AdapterState combines connector state with SDK state
 * @template ConnectorState - Your snap-in's custom state type
 */
type AdapterState<ConnectorState> = ConnectorState & SdkState;

Usage Examples:

// Define your connector state
interface MyConnectorState {
  users: { completed: boolean; lastProcessedId?: string };
  tasks: { completed: boolean; cursor?: string };
}

// Access and update state in worker
processTask({
  task: async ({ adapter }) => {
    // Get current state (includes both connector and SDK state)
    const state: AdapterState<MyConnectorState> = adapter.state;

    // Update connector state
    adapter.state = {
      ...state,
      users: { completed: true, lastProcessedId: 'user-123' },
    };

    // SDK state is automatically managed
    console.log('Last sync:', state.lastSyncStarted);
    console.log('Last successful sync:', state.lastSuccessfulSyncStarted);
  },
  onTimeout: async ({ adapter }) => {
    await adapter.emit(ExtractorEventType.ExtractionDataError, {
      error: { message: 'Timeout' },
    });
  },
});

ToDevRev State

State specific to extraction workflows (data moving to DevRev).

/**
 * ToDevRev contains extraction-specific state
 * Automatically managed by SDK during extraction phases
 */
interface ToDevRev {
  /** Attachment metadata for tracking attachment extraction progress */
  attachmentsMetadata: {
    /** Array of artifact IDs containing attachment data */
    artifactIds: string[];
    /** Index of last processed attachment */
    lastProcessed: number;
    /** Optional list of processed attachment IDs */
    lastProcessedAttachmentsIdsList?: string[];
  };
}

Usage Examples:

processTask({
  task: async ({ adapter }) => {
    // Access extraction state
    const toDevRev = adapter.state.toDevRev;

    if (toDevRev) {
      // Check attachment processing progress
      const artifactIds = toDevRev.attachmentsMetadata.artifactIds;
      const lastProcessed = toDevRev.attachmentsMetadata.lastProcessed;

      console.log(`Processed ${lastProcessed + 1} of ${artifactIds.length} artifacts`);
    }

    // SDK automatically updates toDevRev state during attachment extraction
    await adapter.emit(ExtractorEventType.ExtractionAttachmentsDone);
  },
  onTimeout: async ({ adapter }) => {
    await adapter.emit(ExtractorEventType.ExtractionAttachmentsError, {
      error: { message: 'Timeout' },
    });
  },
});

FromDevRev State

State specific to loading workflows (data coming from DevRev).

/**
 * FromDevRev contains loading-specific state
 * Tracks files and items to be loaded to external system
 */
interface FromDevRev {
  /** Array of files containing items to load */
  filesToLoad: FileToLoad[];
}

interface FileToLoad {
  /** File identifier */
  id: string;
  /** File name */
  file_name: string;
  /** Type of items in file */
  itemType: string;
  /** Total count of items in file */
  count: number;
  /** Line number of next item to process */
  lineToProcess: number;
  /** Whether file processing is completed */
  completed: boolean;
}

Usage Examples:

processTask({
  task: async ({ adapter }) => {
    // Access loading state
    const fromDevRev = adapter.state.fromDevRev;

    if (!fromDevRev) {
      console.log('No files to load');
      await adapter.emit(LoaderEventType.DataLoadingDone);
      return;
    }

    // Process each file
    for (const file of fromDevRev.filesToLoad) {
      if (file.completed) {
        continue;
      }

      console.log(
        `Processing ${file.file_name}: line ${file.lineToProcess} of ${file.count}`
      );

      // Load items from file
      const items = await readItemsFromFile(file.file_name, file.lineToProcess);

      for (const item of items) {
        await loadItemToExternalSystem(item);
        file.lineToProcess++;

        // Update state periodically
        adapter.state = adapter.state;
      }

      // Mark file as completed
      file.completed = true;
    }

    await adapter.emit(LoaderEventType.DataLoadingDone);
  },
  onTimeout: async ({ adapter }) => {
    // State is automatically saved with current progress
    await adapter.emit(LoaderEventType.DataLoadingError, {
      error: { message: 'Timeout' },
    });
  },
});

SDK State

Internal SDK state fields automatically managed by the SDK.

/**
 * SdkState contains SDK-managed state fields
 * These fields are automatically updated by the SDK
 */
interface SdkState {
  /** ISO 8601 timestamp when current sync started (optional) */
  lastSyncStarted?: string;
  /** ISO 8601 timestamp when last successful sync started (optional) */
  lastSuccessfulSyncStarted?: string;
  /** Extraction-specific state (optional) */
  toDevRev?: ToDevRev;
  /** Loading-specific state (optional) */
  fromDevRev?: FromDevRev;
  /** Snap-in version ID (optional) */
  snapInVersionId?: string;
}

Usage Examples:

processTask({
  task: async ({ adapter }) => {
    const state = adapter.state;

    // Check if this is an incremental sync
    if (state.lastSuccessfulSyncStarted) {
      console.log('Last successful sync:', state.lastSuccessfulSyncStarted);

      // Use lastSuccessfulSyncStarted for incremental extraction
      const extractFrom = state.lastSuccessfulSyncStarted;
      await extractDataSince(extractFrom);
    } else {
      console.log('This is an initial sync');
      await extractAllData();
    }

    await adapter.emit(ExtractorEventType.ExtractionDataDone);
  },
  onTimeout: async ({ adapter }) => {
    await adapter.emit(ExtractorEventType.ExtractionDataError, {
      error: { message: 'Timeout' },
    });
  },
});

State Interface

Configuration interface for initializing state.

/**
 * StateInterface defines parameters for state initialization
 */
interface StateInterface<ConnectorState> {
  /** Airdrop event */
  event: AirdropEvent;
  /** Initial connector state */
  initialState: ConnectorState;
  /** Optional initial domain mapping */
  initialDomainMapping?: InitialDomainMapping;
  /** Optional worker options */
  options?: WorkerAdapterOptions;
}

Default SDK States

Pre-defined SDK state structures for extraction and loading.

/**
 * Default SDK state for extraction workflows
 */
const extractionSdkState = {
  lastSyncStarted: '',
  lastSuccessfulSyncStarted: '',
  snapInVersionId: '',
  toDevRev: {
    attachmentsMetadata: {
      artifactIds: [],
      lastProcessed: 0,
      lastProcessedAttachmentsIdsList: [],
    },
  },
};

/**
 * Default SDK state for loading workflows
 */
const loadingSdkState = {
  snapInVersionId: '',
  fromDevRev: {
    filesToLoad: [],
  },
};

State Persistence

Automatic State Saving

State is automatically persisted when emitting events (except for stateless event types):

processTask({
  task: async ({ adapter }) => {
    // Update state
    adapter.state = {
      ...adapter.state,
      users: { completed: true },
    };

    // State is automatically saved when emitting
    await adapter.emit(ExtractorEventType.ExtractionDataDone);
  },
  onTimeout: async ({ adapter }) => {
    // State is saved even on timeout
    await adapter.emit(ExtractorEventType.ExtractionDataError, {
      error: { message: 'Timeout' },
    });
  },
});

Manual State Updates

Update state multiple times during processing:

processTask({
  task: async ({ adapter }) => {
    // Process users
    await processUsers();
    adapter.state = {
      ...adapter.state,
      users: { completed: true, lastId: 'user-100' },
    };

    // Process tasks
    await processTasks();
    adapter.state = {
      ...adapter.state,
      tasks: { completed: true, cursor: 'task-cursor-abc' },
    };

    // Final emit saves latest state
    await adapter.emit(ExtractorEventType.ExtractionDataDone);
  },
  onTimeout: async ({ adapter }) => {
    await adapter.emit(ExtractorEventType.ExtractionDataError, {
      error: { message: 'Timeout' },
    });
  },
});

Common Patterns

Incremental Extraction with State

interface ExtractorState {
  lastProcessedTimestamp?: string;
  processedCount: number;
}

processTask({
  task: async ({ adapter }) => {
    const state = adapter.state;
    const eventContext = adapter.event.payload.event_context;

    // Determine extraction start point
    let extractFrom: string;

    if (eventContext.mode === 'INCREMENTAL') {
      // Use lastSuccessfulSyncStarted or extract_from based on reset flag
      extractFrom = eventContext.reset_extract_from
        ? eventContext.extract_from!
        : state.lastSuccessfulSyncStarted || eventContext.extract_from!;
    } else {
      // Initial sync
      extractFrom = eventContext.extract_from || '1970-01-01T00:00:00Z';
    }

    // Extract data
    const data = await extractDataSince(extractFrom);

    // Update state with progress
    adapter.state = {
      ...state,
      lastProcessedTimestamp: data.lastTimestamp,
      processedCount: data.count,
    };

    await adapter.emit(ExtractorEventType.ExtractionDataDone);
  },
  onTimeout: async ({ adapter }) => {
    await adapter.emit(ExtractorEventType.ExtractionDataError, {
      error: { message: 'Timeout' },
    });
  },
});

Chunked Processing with State

interface ChunkedState {
  currentPage: number;
  totalPages: number;
  itemsProcessed: number;
}

processTask({
  task: async ({ adapter }) => {
    let state = adapter.state;

    // Resume from where we left off
    const startPage = state.currentPage || 1;

    for (let page = startPage; page <= (state.totalPages || Infinity); page++) {
      const result = await fetchPage(page);

      if (!state.totalPages) {
        state.totalPages = result.totalPages;
      }

      // Process page data
      await processPageData(result.data);

      // Update state after each page
      adapter.state = {
        ...state,
        currentPage: page,
        itemsProcessed: (state.itemsProcessed || 0) + result.data.length,
      };
      state = adapter.state;

      // Check if done
      if (page >= result.totalPages) {
        break;
      }
    }

    await adapter.emit(ExtractorEventType.ExtractionDataDone);
  },
  onTimeout: async ({ adapter }) => {
    // State already saved with currentPage progress
    await adapter.emit(ExtractorEventType.ExtractionDataError, {
      error: { message: 'Timeout' },
    });
  },
});

Loading Progress Tracking

processTask({
  task: async ({ adapter }) => {
    const fromDevRev = adapter.state.fromDevRev;

    if (!fromDevRev) {
      await adapter.emit(LoaderEventType.DataLoadingDone);
      return;
    }

    // Track overall progress
    let totalItems = 0;
    let processedItems = 0;

    for (const file of fromDevRev.filesToLoad) {
      totalItems += file.count;
      processedItems += file.lineToProcess;
    }

    console.log(`Overall progress: ${processedItems}/${totalItems}`);

    // Process files
    for (const file of fromDevRev.filesToLoad) {
      if (file.completed) {
        continue;
      }

      while (file.lineToProcess < file.count) {
        const item = await readItemAtLine(file.file_name, file.lineToProcess);
        await loadItem(item);

        file.lineToProcess++;
        processedItems++;

        // Update state every 100 items
        if (processedItems % 100 === 0) {
          adapter.state = adapter.state;
          console.log(`Progress: ${processedItems}/${totalItems}`);
        }
      }

      file.completed = true;
    }

    await adapter.emit(LoaderEventType.DataLoadingDone);
  },
  onTimeout: async ({ adapter }) => {
    await adapter.emit(LoaderEventType.DataLoadingError, {
      error: { message: 'Timeout' },
    });
  },
});

Best Practices

Initialize State Early

const initialState: MyConnectorState = {
  users: { completed: false },
  tasks: { completed: false },
  initialized: true,
};

await spawn({
  event,
  initialState,
  workerPath: './worker.js',
});

Use Type-Safe State

// Define clear state structure
interface ExtractorState {
  phases: {
    users: PhaseState;
    tasks: PhaseState;
    comments: PhaseState;
  };
  metadata: {
    startTime: string;
    version: string;
  };
}

interface PhaseState {
  completed: boolean;
  lastId?: string;
  count?: number;
}

Handle State Migration

processTask({
  task: async ({ adapter }) => {
    let state = adapter.state;

    // Migrate old state format to new format
    if (!state.version || state.version < '2.0') {
      state = migrateState(state);
      adapter.state = state;
    }

    // Continue processing
  },
  onTimeout: async ({ adapter }) => {
    await adapter.emit(ExtractorEventType.ExtractionDataError, {
      error: { message: 'Timeout' },
    });
  },
});