import fs from "node:fs"; import path from "node:path"; import type { DatabaseSync } from "node:sqlite"; import { extractTranscriptMetadata, ensureArchiveDir } from "../../gateway/session-archive.js"; import { indexSession, initHistoryDb, type SessionHistoryRow } from "./history-db.js"; export type MigrationResult = { processed: number; moved: number; indexed: number; errors: number; skipped: number; }; /** * One-time migration to index existing orphaned .jsonl files */ export function migrateOrphanedSessions(sessionsDir: string, agentId: string): MigrationResult { const result: MigrationResult = { processed: 0, moved: 0, indexed: 0, errors: 0, skipped: 0, }; try { // Initialize the history database const historyDb = initHistoryDb(sessionsDir); // Ensure archive directory exists ensureArchiveDir(sessionsDir); // Read the sessions.json file to find the currently active session const sessionsJsonPath = path.join(sessionsDir, "sessions.json"); let activeSessions: Set = new Set(); if (fs.existsSync(sessionsJsonPath)) { try { const sessionsData = JSON.parse(fs.readFileSync(sessionsJsonPath, "utf-8")); if (typeof sessionsData === "object" && sessionsData !== null) { // Extract sessionIds from all session entries Object.values(sessionsData).forEach((entry: unknown) => { if ( entry && typeof entry === "object" && "sessionId" in entry && typeof entry.sessionId === "string" ) { activeSessions.add(entry.sessionId); } }); } } catch (error) { console.error("Failed to parse sessions.json:", error); } } // Get existing sessions in the database to avoid duplicates const existingStmt = historyDb.prepare(` SELECT sessionId FROM session_history WHERE agentId = ? `); const existingSessions = new Set( (existingStmt.all(agentId) as Array<{ sessionId: string }>).map((row) => row.sessionId), ); // Scan for .jsonl files in the sessions directory const files = fs.readdirSync(sessionsDir); const transcriptFiles = files.filter( (file) => file.endsWith(".jsonl") && !file.includes(".bak.") && !file.includes(".reset.") && !file.includes(".deleted."), ); for (const fileName of transcriptFiles) { result.processed++; const filePath = path.join(sessionsDir, fileName); try { // Extract session ID from filename (assuming format like {uuid}.jsonl) const sessionId = path.basename(fileName, ".jsonl"); // Skip if this is the currently active session or already indexed if (activeSessions.has(sessionId) || existingSessions.has(sessionId)) { result.skipped++; continue; } // Validate that this looks like a session UUID if (!isValidSessionId(sessionId)) { result.skipped++; continue; } // Extract metadata from the transcript file const metadata = extractTranscriptMetadata(sessionId, sessionsJsonPath, fileName, agentId); // Move file to archive directory const archiveDir = path.join(sessionsDir, "archive"); const archiveFilePath = path.join(archiveDir, fileName); // Check if archive file already exists let finalArchivePath = archiveFilePath; if (fs.existsSync(archiveFilePath)) { const ts = new Date().toISOString().replaceAll(":", "-"); const ext = path.extname(fileName); const base = path.basename(fileName, ext); finalArchivePath = path.join(archiveDir, `${base}.migrated.${ts}${ext}`); } fs.renameSync(filePath, finalArchivePath); result.moved++; // Create history database entry const historyEntry: SessionHistoryRow = { sessionId, agentId, sessionKey: `agent:${agentId}:main`, // Default session key pattern displayName: undefined, // Let auto-titling handle this createdAt: metadata.createdAt || Date.now(), updatedAt: metadata.updatedAt || Date.now(), archivedAt: Date.now(), messageCount: metadata.messageCount, filePath: finalArchivePath, firstMessage: metadata.firstMessage, channel: undefined, chatType: "direct", totalTokens: metadata.totalTokens, status: "archived", }; indexSession(historyDb, historyEntry); result.indexed++; } catch (error) { console.error(`Error migrating ${fileName}:`, error); result.errors++; } } // Set migration completed flag const metaStmt = historyDb.prepare(` INSERT OR REPLACE INTO meta (key, value) VALUES ('migration_completed', ?) `); metaStmt.run(Date.now().toString()); return result; } catch (error) { console.error("Migration failed:", error); result.errors++; return result; } } /** * Check if migration has already been completed */ export function isMigrationCompleted(sessionsDir: string): boolean { try { const dbPath = path.join(sessionsDir, "history.db"); if (!fs.existsSync(dbPath)) { return false; } const historyDb = initHistoryDb(sessionsDir); // Check if meta table exists and has migration flag try { const metaStmt = historyDb.prepare(` SELECT value FROM meta WHERE key = 'migration_completed' `); const result = metaStmt.get() as { value: string } | undefined; return result !== undefined; } catch { // Meta table might not exist yet return false; } } catch { return false; } } /** * Basic validation for session ID format (UUID-like) */ function isValidSessionId(sessionId: string): boolean { // Basic UUID pattern check const uuidPattern = /^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$/i; return uuidPattern.test(sessionId); } /** * Initialize history database with migration check */ export function initHistoryDbWithMigration(sessionsDir: string, agentId: string): DatabaseSync { const historyDb = initHistoryDb(sessionsDir); // Check if migration is needed and run it if (!isMigrationCompleted(sessionsDir)) { console.log("Running session history migration..."); const result = migrateOrphanedSessions(sessionsDir, agentId); console.log( `Migration completed: processed=${result.processed}, moved=${result.moved}, indexed=${result.indexed}, errors=${result.errors}, skipped=${result.skipped}`, ); } return historyDb; }