Ctrl + K
DocumentationLog inGet started

langfuse-migration-deep-dive

tessl install github:jeremylongshore/claude-code-plugins-plus-skills --skill langfuse-migration-deep-dive
github.com/jeremylongshore/claude-code-plugins-plus-skills

Execute complex Langfuse migrations including data migration and platform changes. Use when migrating from other observability platforms, moving between Langfuse instances, or performing major infrastructure migrations. Trigger with phrases like "langfuse migration", "migrate to langfuse", "langfuse data migration", "langfuse platform migration", "switch to langfuse".

Review Score

83%

Validation Score

11/16

Implementation Score

77%

Activation Score

90%

Langfuse Migration Deep Dive

Overview

Comprehensive guide for complex migrations to or between Langfuse instances.

Prerequisites

  • Understanding of source and target systems
  • Database access (for data migrations)
  • Downtime window planned (if needed)
  • Rollback plan prepared

Migration Scenarios

ScenarioComplexityDowntimeData Loss Risk
Cloud to Cloud (same)LowNoneNone
Self-hosted to CloudMediumMinutesLow
Cloud to Self-hostedMediumMinutesLow
Other platform to LangfuseHighHoursMedium
SDK version upgradeLowNoneNone

Instructions

Scenario 1: Migrate from Cloud to Self-Hosted

Step 1: Export Data from Cloud

// scripts/export-cloud-data.ts
import { Langfuse } from "langfuse";
import fs from "fs/promises";

async function exportCloudData() {
  const langfuse = new Langfuse({
    publicKey: process.env.LANGFUSE_CLOUD_PUBLIC_KEY!,
    secretKey: process.env.LANGFUSE_CLOUD_SECRET_KEY!,
    baseUrl: "https://cloud.langfuse.com",
  });

  const exportDir = `export-${Date.now()}`;
  await fs.mkdir(exportDir, { recursive: true });

  // Export traces
  console.log("Exporting traces...");
  let page = 1;
  let totalTraces = 0;

  while (true) {
    const traces = await langfuse.fetchTraces({
      limit: 100,
      page,
    });

    if (traces.data.length === 0) break;

    await fs.writeFile(
      `${exportDir}/traces-${page}.json`,
      JSON.stringify(traces.data, null, 2)
    );

    totalTraces += traces.data.length;
    page++;

    // Rate limiting
    await new Promise((r) => setTimeout(r, 200));
  }

  console.log(`Exported ${totalTraces} traces`);

  // Export scores
  console.log("Exporting scores...");
  // Similar pagination for scores

  // Export datasets
  console.log("Exporting datasets...");
  const datasets = await langfuse.fetchDatasets({});
  await fs.writeFile(
    `${exportDir}/datasets.json`,
    JSON.stringify(datasets.data, null, 2)
  );

  return exportDir;
}

Step 2: Set Up Self-Hosted Instance

# docker-compose.self-hosted.yml
version: "3.8"

services:
  langfuse:
    image: langfuse/langfuse:latest
    ports:
      - "3000:3000"
    environment:
      - DATABASE_URL=postgresql://postgres:${DB_PASSWORD}@db:5432/langfuse
      - NEXTAUTH_SECRET=${NEXTAUTH_SECRET}
      - NEXTAUTH_URL=${LANGFUSE_URL}
      - SALT=${LANGFUSE_SALT}
      - ENCRYPTION_KEY=${ENCRYPTION_KEY}
    depends_on:
      db:
        condition: service_healthy

  db:
    image: postgres:15-alpine
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=${DB_PASSWORD}
      - POSTGRES_DB=langfuse
    volumes:
      - langfuse-db:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U postgres"]
      interval: 5s
      timeout: 5s
      retries: 5

volumes:
  langfuse-db:

Step 3: Import Data to Self-Hosted

// scripts/import-to-self-hosted.ts
import { Langfuse } from "langfuse";
import fs from "fs/promises";
import path from "path";

async function importToSelfHosted(exportDir: string) {
  const langfuse = new Langfuse({
    publicKey: process.env.LANGFUSE_SELF_HOSTED_PUBLIC_KEY!,
    secretKey: process.env.LANGFUSE_SELF_HOSTED_SECRET_KEY!,
    baseUrl: process.env.LANGFUSE_SELF_HOSTED_URL!,
  });

  // Get all trace files
  const files = await fs.readdir(exportDir);
  const traceFiles = files.filter((f) => f.startsWith("traces-"));

  for (const file of traceFiles) {
    const content = await fs.readFile(path.join(exportDir, file), "utf-8");
    const traces = JSON.parse(content);

    for (const traceData of traces) {
      // Recreate trace
      const trace = langfuse.trace({
        name: traceData.name,
        userId: traceData.userId,
        sessionId: traceData.sessionId,
        input: traceData.input,
        output: traceData.output,
        metadata: {
          ...traceData.metadata,
          migratedFrom: "cloud",
          originalId: traceData.id,
        },
      });

      // Note: Observations/generations need to be recreated separately
      // if you have access to the detailed data
    }

    await langfuse.flushAsync();
    console.log(`Imported ${file}`);
  }
}

Scenario 2: Migrate from LangSmith to Langfuse

Step 1: Create Adapter for LangSmith Data

// lib/migration/langsmith-adapter.ts

interface LangSmithRun {
  id: string;
  name: string;
  run_type: "chain" | "llm" | "tool";
  inputs: any;
  outputs: any;
  start_time: string;
  end_time: string;
  extra?: {
    metadata?: Record<string, any>;
  };
  child_runs?: LangSmithRun[];
}

interface LangfuseTraceData {
  name: string;
  input: any;
  output: any;
  metadata: Record<string, any>;
  spans: LangfuseSpanData[];
  generations: LangfuseGenerationData[];
}

interface LangfuseSpanData {
  name: string;
  input: any;
  output: any;
  startTime: Date;
  endTime: Date;
}

interface LangfuseGenerationData {
  name: string;
  model: string;
  input: any;
  output: any;
  startTime: Date;
  endTime: Date;
  usage?: { promptTokens: number; completionTokens: number };
}

function convertLangSmithRun(run: LangSmithRun): LangfuseTraceData {
  const spans: LangfuseSpanData[] = [];
  const generations: LangfuseGenerationData[] = [];

  // Process child runs
  function processRun(r: LangSmithRun, depth: number = 0) {
    if (r.run_type === "llm") {
      generations.push({
        name: r.name,
        model: r.extra?.metadata?.model || "unknown",
        input: r.inputs,
        output: r.outputs,
        startTime: new Date(r.start_time),
        endTime: new Date(r.end_time),
        usage: r.extra?.metadata?.usage,
      });
    } else {
      spans.push({
        name: r.name,
        input: r.inputs,
        output: r.outputs,
        startTime: new Date(r.start_time),
        endTime: new Date(r.end_time),
      });
    }

    for (const child of r.child_runs || []) {
      processRun(child, depth + 1);
    }
  }

  processRun(run);

  return {
    name: run.name,
    input: run.inputs,
    output: run.outputs,
    metadata: {
      ...run.extra?.metadata,
      migratedFrom: "langsmith",
      originalId: run.id,
    },
    spans,
    generations,
  };
}

Step 2: Bulk Import Converted Data

// scripts/migrate-from-langsmith.ts
import { Langfuse } from "langfuse";

async function migrateFromLangSmith(runs: LangSmithRun[]) {
  const langfuse = new Langfuse();
  let migrated = 0;
  let failed = 0;

  for (const run of runs) {
    try {
      const converted = convertLangSmithRun(run);

      const trace = langfuse.trace({
        name: converted.name,
        input: converted.input,
        output: converted.output,
        metadata: converted.metadata,
      });

      // Create spans
      for (const span of converted.spans) {
        const s = trace.span({
          name: span.name,
          input: span.input,
          startTime: span.startTime,
        });
        s.end({
          output: span.output,
          endTime: span.endTime,
        });
      }

      // Create generations
      for (const gen of converted.generations) {
        const g = trace.generation({
          name: gen.name,
          model: gen.model,
          input: gen.input,
          startTime: gen.startTime,
        });
        g.end({
          output: gen.output,
          endTime: gen.endTime,
          usage: gen.usage,
        });
      }

      migrated++;
    } catch (error) {
      console.error(`Failed to migrate run ${run.id}:`, error);
      failed++;
    }

    // Periodic flush
    if (migrated % 100 === 0) {
      await langfuse.flushAsync();
      console.log(`Progress: ${migrated} migrated, ${failed} failed`);
    }
  }

  await langfuse.flushAsync();
  return { migrated, failed };
}

Scenario 3: Zero-Downtime SDK Migration

// lib/migration/dual-write.ts

// During migration, write to both old and new systems
class DualWriteLangfuse {
  private oldClient: any; // Old observability system
  private newClient: Langfuse;
  private writeToOld: boolean = true;
  private writeToNew: boolean = true;

  constructor(config: {
    oldConfig: any;
    newConfig: ConstructorParameters<typeof Langfuse>[0];
  }) {
    this.oldClient = createOldClient(config.oldConfig);
    this.newClient = new Langfuse(config.newConfig);
  }

  trace(params: any) {
    const traces: any[] = [];

    if (this.writeToOld) {
      try {
        traces.push({ type: "old", trace: this.oldClient.trace(params) });
      } catch (error) {
        console.error("Old client trace failed:", error);
      }
    }

    if (this.writeToNew) {
      try {
        traces.push({ type: "new", trace: this.newClient.trace(params) });
      } catch (error) {
        console.error("New client trace failed:", error);
      }
    }

    // Return proxy that writes to both
    return new Proxy(traces[0]?.trace || {}, {
      get: (target, prop) => {
        if (prop === "span" || prop === "generation") {
          return (...args: any[]) => {
            for (const { trace } of traces) {
              trace[prop]?.(...args);
            }
            return target[prop]?.(...args);
          };
        }
        return target[prop];
      },
    });
  }

  // Gradual cutover
  setWriteMode(options: { old: boolean; new: boolean }) {
    this.writeToOld = options.old;
    this.writeToNew = options.new;
  }

  async flush() {
    await Promise.all([
      this.writeToOld && this.oldClient.flush?.(),
      this.writeToNew && this.newClient.flushAsync(),
    ]);
  }
}

// Migration timeline
// Week 1: writeToOld: true, writeToNew: true (dual write)
// Week 2: writeToOld: true, writeToNew: true (verify new system)
// Week 3: writeToOld: false, writeToNew: true (cutover)
// Week 4: Remove old client code

Step 4: Validation and Verification

// scripts/validate-migration.ts

interface MigrationValidation {
  source: {
    traceCount: number;
    generationCount: number;
    dateRange: { start: Date; end: Date };
  };
  target: {
    traceCount: number;
    generationCount: number;
    dateRange: { start: Date; end: Date };
  };
  discrepancies: string[];
}

async function validateMigration(
  sourceClient: any,
  targetClient: Langfuse
): Promise<MigrationValidation> {
  const discrepancies: string[] = [];

  // Count source data
  const sourceTraces = await sourceClient.fetchTraces({ limit: 1 });
  const sourceCount = sourceTraces.totalCount || sourceTraces.data.length;

  // Count target data
  const targetTraces = await targetClient.fetchTraces({ limit: 1 });
  const targetCount = targetTraces.totalCount || targetTraces.data.length;

  // Compare counts
  const countDiff = Math.abs(sourceCount - targetCount);
  if (countDiff > sourceCount * 0.01) {
    discrepancies.push(
      `Trace count mismatch: source=${sourceCount}, target=${targetCount}`
    );
  }

  // Spot check random traces
  const sampleTraces = await sourceClient.fetchTraces({ limit: 10 });
  for (const trace of sampleTraces.data) {
    const targetTrace = await targetClient.fetchTraces({
      filter: { metadata: { originalId: trace.id } },
    });

    if (targetTrace.data.length === 0) {
      discrepancies.push(`Missing trace: ${trace.id}`);
    }
  }

  return {
    source: {
      traceCount: sourceCount,
      generationCount: 0, // Calculate as needed
      dateRange: { start: new Date(), end: new Date() },
    },
    target: {
      traceCount: targetCount,
      generationCount: 0,
      dateRange: { start: new Date(), end: new Date() },
    },
    discrepancies,
  };
}

Migration Checklist

PhaseTaskStatus
Planning
Inventory source data[ ]
Plan downtime window[ ]
Create rollback plan[ ]
Notify stakeholders[ ]
Execution
Export source data[ ]
Set up target system[ ]
Import data[ ]
Update application config[ ]
Validation
Verify trace counts[ ]
Spot check data quality[ ]
Test new integration[ ]
Monitor for errors[ ]
Cleanup
Remove old config[ ]
Archive source data[ ]
Update documentation[ ]

Error Handling

IssueCauseSolution
Data lossIncomplete exportRe-run with pagination
Duplicate tracesRe-importDedupe by originalId
Missing metadataFormat mismatchUpdate adapter
Performance issuesLarge importUse batch processing

Rollback Plan

// If migration fails, rollback to old system
async function rollback() {
  // 1. Update environment variables
  process.env.LANGFUSE_PUBLIC_KEY = process.env.OLD_LANGFUSE_PUBLIC_KEY;
  process.env.LANGFUSE_SECRET_KEY = process.env.OLD_LANGFUSE_SECRET_KEY;
  process.env.LANGFUSE_HOST = process.env.OLD_LANGFUSE_HOST;

  // 2. Restart application
  console.log("Rollback complete. Restart application to apply.");

  // 3. Notify team
  await sendSlackNotification("Langfuse migration rolled back");
}

Resources

  • Langfuse Self-Hosting
  • Langfuse API Reference
  • Data Migration Best Practices

Next Steps

Migration complete. Return to langfuse-install-auth for new project setup.