Repository management in the Airdrop SDK provides a pattern for normalizing and batching extracted data before uploading to the Airdrop platform. Repositories handle data transformation and automatic batched uploads during extraction workflows.
Configuration interface for defining repositories.
/**
* RepoInterface defines the structure of a repository for extracted data
*/
interface RepoInterface {
/** Type identifier for items stored in this repository */
itemType: string;
/** Optional function to normalize raw data into standard format */
normalize?: (record: object) => NormalizedItem | NormalizedAttachment;
}Usage Examples:
import { processTask, ExtractorEventType, RepoInterface } from '@devrev/ts-adaas';
processTask({
task: async ({ adapter }) => {
// Define repositories
const repos: RepoInterface[] = [
{
itemType: 'tasks',
normalize: (rawTask: any) => ({
id: rawTask.id,
created_date: rawTask.created_at,
modified_date: rawTask.updated_at,
data: rawTask,
}),
},
{
itemType: 'comments',
normalize: (rawComment: any) => ({
id: rawComment.id,
created_date: rawComment.created_at,
modified_date: rawComment.updated_at,
data: rawComment,
}),
},
];
// Initialize repositories
adapter.initializeRepos(repos);
// Fetch and push data
const tasks = await externalApi.getTasks();
await adapter.getRepo('tasks')?.push(tasks);
const comments = await externalApi.getComments();
await adapter.getRepo('comments')?.push(comments);
await adapter.emit(ExtractorEventType.ExtractionDataDone);
},
onTimeout: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: 'Timeout' },
});
},
});Standard structure for normalized data items.
/**
* NormalizedItem represents the standardized structure of an item after normalization
*/
interface NormalizedItem {
/** Unique identifier for the item */
id: string;
/** RFC3339 timestamp when the item was created */
created_date: string;
/** RFC3339 timestamp when the item was last modified */
modified_date: string;
/** The actual data payload of the item */
data: object;
}Usage Examples:
// Normalize raw API data to NormalizedItem
const normalizeTask = (rawTask: any): NormalizedItem => {
return {
id: rawTask.id,
created_date: rawTask.created_at,
modified_date: rawTask.updated_at,
data: {
title: rawTask.title,
description: rawTask.description,
status: rawTask.status,
assignee: rawTask.assignee_id,
priority: rawTask.priority,
},
};
};
// Use in repository
adapter.initializeRepos([
{
itemType: 'tasks',
normalize: normalizeTask,
},
]);Standard structure for normalized attachments.
/**
* NormalizedAttachment represents the standardized structure of an attachment
*/
interface NormalizedAttachment {
/** URL where the attachment can be accessed */
url: string;
/** Unique identifier for the attachment */
id: string;
/** File name of the attachment */
file_name: string;
/** ID of the parent item this attachment belongs to */
parent_id: string;
/** Optional ID of the author/creator */
author_id?: string;
/** Optional flag indicating if attachment is inline */
inline?: boolean;
/** Optional ID of grandparent entity (number for backwards compatibility) */
grand_parent_id?: number | string;
}Usage Examples:
// Normalize raw attachment data
const normalizeAttachment = (rawAttachment: any): NormalizedAttachment => {
return {
url: rawAttachment.download_url,
id: rawAttachment.id,
file_name: rawAttachment.filename,
parent_id: rawAttachment.task_id,
author_id: rawAttachment.uploaded_by_id,
inline: rawAttachment.is_inline || false,
grand_parent_id: rawAttachment.project_id,
};
};
// Use in repository
adapter.initializeRepos([
{
itemType: 'attachments',
normalize: normalizeAttachment,
},
]);
// Push attachments
const attachments = await externalApi.getAttachments();
await adapter.getRepo('attachments')?.push(attachments);Initialize repositories for data extraction.
/**
* Initialize repositories for data extraction
* Call this before pushing data to repositories
*/
WorkerAdapter.prototype.initializeRepos(repos: RepoInterface[]): void;Usage Examples:
processTask({
task: async ({ adapter }) => {
// Initialize multiple repositories
adapter.initializeRepos([
{
itemType: 'users',
normalize: (raw: any) => ({
id: raw.id,
created_date: raw.created_at,
modified_date: raw.updated_at,
data: raw,
}),
},
{
itemType: 'tasks',
normalize: (raw: any) => ({
id: raw.id,
created_date: raw.created_at,
modified_date: raw.updated_at,
data: raw,
}),
},
{
itemType: 'attachments',
normalize: (raw: any) => ({
url: raw.url,
id: raw.id,
file_name: raw.name,
parent_id: raw.parent_id,
}),
},
]);
// Now repositories are ready to receive data
},
onTimeout: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: 'Timeout' },
});
},
});Retrieve an initialized repository by item type.
/**
* Get a repository by item type
* Returns undefined if repository not found
*/
WorkerAdapter.prototype.getRepo(itemType: string): Repo | undefined;Usage Examples:
processTask({
task: async ({ adapter }) => {
adapter.initializeRepos([
{ itemType: 'tasks', normalize: normalizeTask },
{ itemType: 'comments', normalize: normalizeComment },
]);
// Get and use repositories
const tasksRepo = adapter.getRepo('tasks');
if (tasksRepo) {
const tasks = await externalApi.getTasks();
await tasksRepo.push(tasks);
}
const commentsRepo = adapter.getRepo('comments');
if (commentsRepo) {
const comments = await externalApi.getComments();
await commentsRepo.push(comments);
}
await adapter.emit(ExtractorEventType.ExtractionDataDone);
},
onTimeout: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: 'Timeout' },
});
},
});The Repo class provides methods for managing data within an initialized repository.
/**
* Repo class manages data normalization, batching, and upload for a specific item type
*/
class Repo {
/** The item type identifier for this repository */
readonly itemType: string;
/** Array of artifacts that have been uploaded from this repository */
uploadedArtifacts: Artifact[];
/**
* Push items to the repository
* Items are normalized automatically and uploaded when batch size is reached
* @param items - Array of items to push
* @returns Promise resolving to true on success, false on error
*/
push(items: Item[]): Promise<boolean>;
/**
* Upload the current batch of items to the platform
* @param batch - Optional specific batch to upload (if not provided, uploads all items in repo)
* @returns Promise resolving when upload completes, or ErrorRecord on error
*/
upload(batch?: (NormalizedItem | NormalizedAttachment | Item)[]): Promise<void | ErrorRecord>;
/**
* Get all items currently in the repository (not yet uploaded)
* @returns Array of items in the repository
*/
getItems(): (NormalizedItem | NormalizedAttachment | Item)[];
}
type Item = Record<string, any>;Usage Examples:
import { processTask, ExtractorEventType } from '@devrev/ts-adaas';
processTask({
task: async ({ adapter }) => {
// Initialize repository
adapter.initializeRepos([
{
itemType: 'tasks',
normalize: (raw: any) => ({
id: raw.id,
created_date: raw.created_at,
modified_date: raw.updated_at,
data: raw,
}),
},
]);
// Get repository
const tasksRepo = adapter.getRepo('tasks');
// Push data to repository
const tasks = await externalApi.getTasks();
const success = await tasksRepo?.push(tasks);
if (!success) {
console.error('Failed to push tasks to repository');
}
// Repository automatically uploads when batch size is reached
// You can also manually upload remaining items
await tasksRepo?.upload();
// Access uploaded artifacts if needed
console.log(`Uploaded ${tasksRepo?.uploadedArtifacts.length} artifacts`);
await adapter.emit(ExtractorEventType.ExtractionDataDone);
},
onTimeout: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: 'Timeout' },
});
},
});
// Paginated extraction with manual batching
processTask({
task: async ({ adapter }) => {
adapter.initializeRepos([
{
itemType: 'tasks',
normalize: normalizeTask,
},
]);
const repo = adapter.getRepo('tasks');
let page = 1;
let hasMore = true;
while (hasMore) {
const response = await externalApi.getTasks({ page, limit: 1000 });
// Push items - automatic normalization happens here
await repo?.push(response.tasks);
// Repository automatically uploads when batch size (default 2000) is reached
hasMore = response.has_more;
page++;
}
// Emit completion (automatically uploads remaining items)
await adapter.emit(ExtractorEventType.ExtractionDataDone);
},
onTimeout: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: 'Timeout' },
});
},
});processTask({
task: async ({ adapter }) => {
// 1. Initialize repositories
adapter.initializeRepos([
{
itemType: 'tasks',
normalize: (raw: any) => ({
id: raw.id,
created_date: raw.created_at,
modified_date: raw.updated_at,
data: raw,
}),
},
]);
// 2. Fetch data from external system
const tasks = await externalApi.getTasks();
// 3. Push to repository (normalization happens automatically)
await adapter.getRepo('tasks')?.push(tasks);
// 4. Repository automatically batches and uploads data
await adapter.emit(ExtractorEventType.ExtractionDataDone);
},
onTimeout: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: 'Timeout' },
});
},
});processTask({
task: async ({ adapter }) => {
adapter.initializeRepos([
{
itemType: 'tasks',
normalize: (raw: any) => ({
id: raw.id,
created_date: raw.created_at,
modified_date: raw.updated_at,
data: raw,
}),
},
]);
const repo = adapter.getRepo('tasks');
let page = 1;
let hasMore = true;
// Process pages
while (hasMore) {
const response = await externalApi.getTasks({ page });
// Push batch to repository
await repo?.push(response.tasks);
hasMore = response.has_more;
page++;
// Repository automatically uploads when batch size is reached
}
await adapter.emit(ExtractorEventType.ExtractionDataDone);
},
onTimeout: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: 'Timeout' },
});
},
});interface RawTask {
id: string;
created: string; // Unix timestamp
modified: string; // Unix timestamp
title: string;
description: string;
assignee: { id: string; name: string };
tags: string[];
}
const normalizeTask = (raw: RawTask): NormalizedItem => {
return {
id: raw.id,
// Convert Unix timestamps to RFC3339
created_date: new Date(parseInt(raw.created) * 1000).toISOString(),
modified_date: new Date(parseInt(raw.modified) * 1000).toISOString(),
data: {
// Transform data structure
title: raw.title,
description: raw.description || '',
assignee_id: raw.assignee.id,
assignee_name: raw.assignee.name,
tags: raw.tags.join(','),
},
};
};
processTask({
task: async ({ adapter }) => {
adapter.initializeRepos([
{
itemType: 'tasks',
normalize: normalizeTask,
},
]);
const tasks = await externalApi.getTasks();
await adapter.getRepo('tasks')?.push(tasks);
await adapter.emit(ExtractorEventType.ExtractionDataDone);
},
onTimeout: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: 'Timeout' },
});
},
});processTask({
task: async ({ adapter }) => {
// Initialize repositories for all item types
adapter.initializeRepos([
{ itemType: 'users', normalize: normalizeUser },
{ itemType: 'projects', normalize: normalizeProject },
{ itemType: 'tasks', normalize: normalizeTask },
{ itemType: 'comments', normalize: normalizeComment },
{ itemType: 'attachments', normalize: normalizeAttachment },
]);
// Extract users first (often needed as references)
const users = await externalApi.getUsers();
await adapter.getRepo('users')?.push(users);
// Extract projects
const projects = await externalApi.getProjects();
await adapter.getRepo('projects')?.push(projects);
// Extract tasks for each project
for (const project of projects) {
const tasks = await externalApi.getTasks(project.id);
await adapter.getRepo('tasks')?.push(tasks);
}
// Extract comments and attachments
const comments = await externalApi.getComments();
await adapter.getRepo('comments')?.push(comments);
const attachments = await externalApi.getAttachments();
await adapter.getRepo('attachments')?.push(attachments);
await adapter.emit(ExtractorEventType.ExtractionDataDone);
},
onTimeout: async ({ adapter }) => {
await adapter.emit(ExtractorEventType.ExtractionDataError, {
error: { message: 'Timeout' },
});
},
});const normalizeItem = (raw: any): NormalizedItem | NormalizedAttachment => {
// Check if item is an attachment based on presence of URL
if (raw.download_url) {
return {
url: raw.download_url,
id: raw.id,
file_name: raw.filename,
parent_id: raw.parent_id,
};
}
// Regular item
return {
id: raw.id,
created_date: raw.created_at,
modified_date: raw.updated_at,
data: raw,
};
};
adapter.initializeRepos([
{
itemType: 'items',
normalize: normalizeItem,
},
]);const normalizeTask = (raw: any): NormalizedItem => {
if (!raw.id || !raw.created_at || !raw.updated_at) {
throw new Error(`Invalid task data: missing required fields`);
}
return {
id: raw.id,
created_date: raw.created_at,
modified_date: raw.updated_at,
data: raw,
};
};const normalizeAttachment = (raw: any): NormalizedAttachment => {
return {
url: raw.download_url,
id: raw.id,
file_name: raw.filename || 'untitled',
parent_id: raw.parent_id,
author_id: raw.author_id || undefined,
inline: raw.inline === true,
};
};interface NormalizedTask extends NormalizedItem {
data: {
title: string;
status: string;
};
}
const normalizeTask = (raw: any): NormalizedTask => {
return {
id: raw.id,
created_date: raw.created_at,
modified_date: raw.updated_at,
data: {
title: raw.title,
status: raw.status,
},
};
};