or run

npx @tessl/cli init
Log in

Version

Tile

Overview

Evals

Files

docs

http-client.mdindex.mdlogger.mdmappers.mdrepository-management.mdstate-management.mdtypes-extraction.mdtypes-loading.mdworker-management.md
tile.json

repository-management.mddocs/

Repository Management

Repository management in the Airdrop SDK provides a pattern for normalizing and batching extracted data before uploading to the Airdrop platform. Repositories handle data transformation and automatic batched uploads during extraction workflows.

Capabilities

Repository Interface

Configuration interface for defining repositories.

/**
 * RepoInterface defines the structure of a repository for extracted data
 */
interface RepoInterface {
  /** Type identifier for items stored in this repository */
  itemType: string;
  /** Optional function to normalize raw data into standard format */
  normalize?: (record: object) => NormalizedItem | NormalizedAttachment;
}

Usage Examples:

import { processTask, ExtractorEventType, RepoInterface } from '@devrev/ts-adaas';

processTask({
  task: async ({ adapter }) => {
    // Define repositories
    const repos: RepoInterface[] = [
      {
        itemType: 'tasks',
        normalize: (rawTask: any) => ({
          id: rawTask.id,
          created_date: rawTask.created_at,
          modified_date: rawTask.updated_at,
          data: rawTask,
        }),
      },
      {
        itemType: 'comments',
        normalize: (rawComment: any) => ({
          id: rawComment.id,
          created_date: rawComment.created_at,
          modified_date: rawComment.updated_at,
          data: rawComment,
        }),
      },
    ];

    // Initialize repositories
    adapter.initializeRepos(repos);

    // Fetch and push data
    const tasks = await externalApi.getTasks();
    await adapter.getRepo('tasks')?.push(tasks);

    const comments = await externalApi.getComments();
    await adapter.getRepo('comments')?.push(comments);

    await adapter.emit(ExtractorEventType.ExtractionDataDone);
  },
  onTimeout: async ({ adapter }) => {
    await adapter.emit(ExtractorEventType.ExtractionDataError, {
      error: { message: 'Timeout' },
    });
  },
});

Normalized Item

Standard structure for normalized data items.

/**
 * NormalizedItem represents the standardized structure of an item after normalization
 */
interface NormalizedItem {
  /** Unique identifier for the item */
  id: string;
  /** RFC3339 timestamp when the item was created */
  created_date: string;
  /** RFC3339 timestamp when the item was last modified */
  modified_date: string;
  /** The actual data payload of the item */
  data: object;
}

Usage Examples:

// Normalize raw API data to NormalizedItem
const normalizeTask = (rawTask: any): NormalizedItem => {
  return {
    id: rawTask.id,
    created_date: rawTask.created_at,
    modified_date: rawTask.updated_at,
    data: {
      title: rawTask.title,
      description: rawTask.description,
      status: rawTask.status,
      assignee: rawTask.assignee_id,
      priority: rawTask.priority,
    },
  };
};

// Use in repository
adapter.initializeRepos([
  {
    itemType: 'tasks',
    normalize: normalizeTask,
  },
]);

Normalized Attachment

Standard structure for normalized attachments.

/**
 * NormalizedAttachment represents the standardized structure of an attachment
 */
interface NormalizedAttachment {
  /** URL where the attachment can be accessed */
  url: string;
  /** Unique identifier for the attachment */
  id: string;
  /** File name of the attachment */
  file_name: string;
  /** ID of the parent item this attachment belongs to */
  parent_id: string;
  /** Optional ID of the author/creator */
  author_id?: string;
  /** Optional flag indicating if attachment is inline */
  inline?: boolean;
  /** Optional ID of grandparent entity (number for backwards compatibility) */
  grand_parent_id?: number | string;
}

Usage Examples:

// Normalize raw attachment data
const normalizeAttachment = (rawAttachment: any): NormalizedAttachment => {
  return {
    url: rawAttachment.download_url,
    id: rawAttachment.id,
    file_name: rawAttachment.filename,
    parent_id: rawAttachment.task_id,
    author_id: rawAttachment.uploaded_by_id,
    inline: rawAttachment.is_inline || false,
    grand_parent_id: rawAttachment.project_id,
  };
};

// Use in repository
adapter.initializeRepos([
  {
    itemType: 'attachments',
    normalize: normalizeAttachment,
  },
]);

// Push attachments
const attachments = await externalApi.getAttachments();
await adapter.getRepo('attachments')?.push(attachments);

Initialize Repositories

Initialize repositories for data extraction.

/**
 * Initialize repositories for data extraction
 * Call this before pushing data to repositories
 */
WorkerAdapter.prototype.initializeRepos(repos: RepoInterface[]): void;

Usage Examples:

processTask({
  task: async ({ adapter }) => {
    // Initialize multiple repositories
    adapter.initializeRepos([
      {
        itemType: 'users',
        normalize: (raw: any) => ({
          id: raw.id,
          created_date: raw.created_at,
          modified_date: raw.updated_at,
          data: raw,
        }),
      },
      {
        itemType: 'tasks',
        normalize: (raw: any) => ({
          id: raw.id,
          created_date: raw.created_at,
          modified_date: raw.updated_at,
          data: raw,
        }),
      },
      {
        itemType: 'attachments',
        normalize: (raw: any) => ({
          url: raw.url,
          id: raw.id,
          file_name: raw.name,
          parent_id: raw.parent_id,
        }),
      },
    ]);

    // Now repositories are ready to receive data
  },
  onTimeout: async ({ adapter }) => {
    await adapter.emit(ExtractorEventType.ExtractionDataError, {
      error: { message: 'Timeout' },
    });
  },
});

Get Repository

Retrieve an initialized repository by item type.

/**
 * Get a repository by item type
 * Returns undefined if repository not found
 */
WorkerAdapter.prototype.getRepo(itemType: string): Repo | undefined;

Usage Examples:

processTask({
  task: async ({ adapter }) => {
    adapter.initializeRepos([
      { itemType: 'tasks', normalize: normalizeTask },
      { itemType: 'comments', normalize: normalizeComment },
    ]);

    // Get and use repositories
    const tasksRepo = adapter.getRepo('tasks');
    if (tasksRepo) {
      const tasks = await externalApi.getTasks();
      await tasksRepo.push(tasks);
    }

    const commentsRepo = adapter.getRepo('comments');
    if (commentsRepo) {
      const comments = await externalApi.getComments();
      await commentsRepo.push(comments);
    }

    await adapter.emit(ExtractorEventType.ExtractionDataDone);
  },
  onTimeout: async ({ adapter }) => {
    await adapter.emit(ExtractorEventType.ExtractionDataError, {
      error: { message: 'Timeout' },
    });
  },
});

Repo Class

The Repo class provides methods for managing data within an initialized repository.

/**
 * Repo class manages data normalization, batching, and upload for a specific item type
 */
class Repo {
  /** The item type identifier for this repository */
  readonly itemType: string;

  /** Array of artifacts that have been uploaded from this repository */
  uploadedArtifacts: Artifact[];

  /**
   * Push items to the repository
   * Items are normalized automatically and uploaded when batch size is reached
   * @param items - Array of items to push
   * @returns Promise resolving to true on success, false on error
   */
  push(items: Item[]): Promise<boolean>;

  /**
   * Upload the current batch of items to the platform
   * @param batch - Optional specific batch to upload (if not provided, uploads all items in repo)
   * @returns Promise resolving when upload completes, or ErrorRecord on error
   */
  upload(batch?: (NormalizedItem | NormalizedAttachment | Item)[]): Promise<void | ErrorRecord>;

  /**
   * Get all items currently in the repository (not yet uploaded)
   * @returns Array of items in the repository
   */
  getItems(): (NormalizedItem | NormalizedAttachment | Item)[];
}

type Item = Record<string, any>;

Usage Examples:

import { processTask, ExtractorEventType } from '@devrev/ts-adaas';

processTask({
  task: async ({ adapter }) => {
    // Initialize repository
    adapter.initializeRepos([
      {
        itemType: 'tasks',
        normalize: (raw: any) => ({
          id: raw.id,
          created_date: raw.created_at,
          modified_date: raw.updated_at,
          data: raw,
        }),
      },
    ]);

    // Get repository
    const tasksRepo = adapter.getRepo('tasks');

    // Push data to repository
    const tasks = await externalApi.getTasks();
    const success = await tasksRepo?.push(tasks);

    if (!success) {
      console.error('Failed to push tasks to repository');
    }

    // Repository automatically uploads when batch size is reached
    // You can also manually upload remaining items
    await tasksRepo?.upload();

    // Access uploaded artifacts if needed
    console.log(`Uploaded ${tasksRepo?.uploadedArtifacts.length} artifacts`);

    await adapter.emit(ExtractorEventType.ExtractionDataDone);
  },
  onTimeout: async ({ adapter }) => {
    await adapter.emit(ExtractorEventType.ExtractionDataError, {
      error: { message: 'Timeout' },
    });
  },
});

// Paginated extraction with manual batching
processTask({
  task: async ({ adapter }) => {
    adapter.initializeRepos([
      {
        itemType: 'tasks',
        normalize: normalizeTask,
      },
    ]);

    const repo = adapter.getRepo('tasks');
    let page = 1;
    let hasMore = true;

    while (hasMore) {
      const response = await externalApi.getTasks({ page, limit: 1000 });

      // Push items - automatic normalization happens here
      await repo?.push(response.tasks);

      // Repository automatically uploads when batch size (default 2000) is reached
      hasMore = response.has_more;
      page++;
    }

    // Emit completion (automatically uploads remaining items)
    await adapter.emit(ExtractorEventType.ExtractionDataDone);
  },
  onTimeout: async ({ adapter }) => {
    await adapter.emit(ExtractorEventType.ExtractionDataError, {
      error: { message: 'Timeout' },
    });
  },
});

Common Patterns

Basic Extraction with Repository

processTask({
  task: async ({ adapter }) => {
    // 1. Initialize repositories
    adapter.initializeRepos([
      {
        itemType: 'tasks',
        normalize: (raw: any) => ({
          id: raw.id,
          created_date: raw.created_at,
          modified_date: raw.updated_at,
          data: raw,
        }),
      },
    ]);

    // 2. Fetch data from external system
    const tasks = await externalApi.getTasks();

    // 3. Push to repository (normalization happens automatically)
    await adapter.getRepo('tasks')?.push(tasks);

    // 4. Repository automatically batches and uploads data
    await adapter.emit(ExtractorEventType.ExtractionDataDone);
  },
  onTimeout: async ({ adapter }) => {
    await adapter.emit(ExtractorEventType.ExtractionDataError, {
      error: { message: 'Timeout' },
    });
  },
});

Paginated Extraction

processTask({
  task: async ({ adapter }) => {
    adapter.initializeRepos([
      {
        itemType: 'tasks',
        normalize: (raw: any) => ({
          id: raw.id,
          created_date: raw.created_at,
          modified_date: raw.updated_at,
          data: raw,
        }),
      },
    ]);

    const repo = adapter.getRepo('tasks');
    let page = 1;
    let hasMore = true;

    // Process pages
    while (hasMore) {
      const response = await externalApi.getTasks({ page });

      // Push batch to repository
      await repo?.push(response.tasks);

      hasMore = response.has_more;
      page++;

      // Repository automatically uploads when batch size is reached
    }

    await adapter.emit(ExtractorEventType.ExtractionDataDone);
  },
  onTimeout: async ({ adapter }) => {
    await adapter.emit(ExtractorEventType.ExtractionDataError, {
      error: { message: 'Timeout' },
    });
  },
});

Complex Normalization

interface RawTask {
  id: string;
  created: string; // Unix timestamp
  modified: string; // Unix timestamp
  title: string;
  description: string;
  assignee: { id: string; name: string };
  tags: string[];
}

const normalizeTask = (raw: RawTask): NormalizedItem => {
  return {
    id: raw.id,
    // Convert Unix timestamps to RFC3339
    created_date: new Date(parseInt(raw.created) * 1000).toISOString(),
    modified_date: new Date(parseInt(raw.modified) * 1000).toISOString(),
    data: {
      // Transform data structure
      title: raw.title,
      description: raw.description || '',
      assignee_id: raw.assignee.id,
      assignee_name: raw.assignee.name,
      tags: raw.tags.join(','),
    },
  };
};

processTask({
  task: async ({ adapter }) => {
    adapter.initializeRepos([
      {
        itemType: 'tasks',
        normalize: normalizeTask,
      },
    ]);

    const tasks = await externalApi.getTasks();
    await adapter.getRepo('tasks')?.push(tasks);

    await adapter.emit(ExtractorEventType.ExtractionDataDone);
  },
  onTimeout: async ({ adapter }) => {
    await adapter.emit(ExtractorEventType.ExtractionDataError, {
      error: { message: 'Timeout' },
    });
  },
});

Multiple Item Types

processTask({
  task: async ({ adapter }) => {
    // Initialize repositories for all item types
    adapter.initializeRepos([
      { itemType: 'users', normalize: normalizeUser },
      { itemType: 'projects', normalize: normalizeProject },
      { itemType: 'tasks', normalize: normalizeTask },
      { itemType: 'comments', normalize: normalizeComment },
      { itemType: 'attachments', normalize: normalizeAttachment },
    ]);

    // Extract users first (often needed as references)
    const users = await externalApi.getUsers();
    await adapter.getRepo('users')?.push(users);

    // Extract projects
    const projects = await externalApi.getProjects();
    await adapter.getRepo('projects')?.push(projects);

    // Extract tasks for each project
    for (const project of projects) {
      const tasks = await externalApi.getTasks(project.id);
      await adapter.getRepo('tasks')?.push(tasks);
    }

    // Extract comments and attachments
    const comments = await externalApi.getComments();
    await adapter.getRepo('comments')?.push(comments);

    const attachments = await externalApi.getAttachments();
    await adapter.getRepo('attachments')?.push(attachments);

    await adapter.emit(ExtractorEventType.ExtractionDataDone);
  },
  onTimeout: async ({ adapter }) => {
    await adapter.emit(ExtractorEventType.ExtractionDataError, {
      error: { message: 'Timeout' },
    });
  },
});

Conditional Normalization

const normalizeItem = (raw: any): NormalizedItem | NormalizedAttachment => {
  // Check if item is an attachment based on presence of URL
  if (raw.download_url) {
    return {
      url: raw.download_url,
      id: raw.id,
      file_name: raw.filename,
      parent_id: raw.parent_id,
    };
  }

  // Regular item
  return {
    id: raw.id,
    created_date: raw.created_at,
    modified_date: raw.updated_at,
    data: raw,
  };
};

adapter.initializeRepos([
  {
    itemType: 'items',
    normalize: normalizeItem,
  },
]);

Best Practices

Validate Data Before Normalization

const normalizeTask = (raw: any): NormalizedItem => {
  if (!raw.id || !raw.created_at || !raw.updated_at) {
    throw new Error(`Invalid task data: missing required fields`);
  }

  return {
    id: raw.id,
    created_date: raw.created_at,
    modified_date: raw.updated_at,
    data: raw,
  };
};

Handle Missing Optional Fields

const normalizeAttachment = (raw: any): NormalizedAttachment => {
  return {
    url: raw.download_url,
    id: raw.id,
    file_name: raw.filename || 'untitled',
    parent_id: raw.parent_id,
    author_id: raw.author_id || undefined,
    inline: raw.inline === true,
  };
};

Use Type Guards

interface NormalizedTask extends NormalizedItem {
  data: {
    title: string;
    status: string;
  };
}

const normalizeTask = (raw: any): NormalizedTask => {
  return {
    id: raw.id,
    created_date: raw.created_at,
    modified_date: raw.updated_at,
    data: {
      title: raw.title,
      status: raw.status,
    },
  };
};