| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559 |
- import fs from 'fs/promises';
- import os from 'os';
- import path from 'path';
- import { DatabaseSync } from 'node:sqlite';
- import {
- ALL_PROJECTS_MEMORY_EXPORT_FORMAT_VERSION,
- EdgeClawMemoryService,
- MemoryBundleValidationError,
- hashText,
- } from '../../../src/context/memory/edgeclaw-memory-core/lib/index.js';
- import { extractProjectDirectory } from '../projects.js';
- import {
- buildMemoryDefaults,
- readPilotDeckConfigFile,
- } from './pilotdeckConfig.js';
- const MEMORY_ROOT_DIR = path.join(process.env.PILOT_HOME || path.join(os.homedir(), '.pilotdeck'), 'memory');
- const MEMORY_WORKSPACES_ROOT = path.join(MEMORY_ROOT_DIR, 'workspaces');
- const MEMORY_GLOBAL_ROOT = path.join(MEMORY_ROOT_DIR, 'global');
- const MEMORY_SCHEDULER_INTERVAL_MS = 60_000;
- const GLOBAL_MAINTENANCE_TASK_KEY = '__edgeclaw_memory_global_maintenance__';
- const servicesByDataDir = new Map();
- const workspaceTaskChains = new Map();
- let schedulerTimer = null;
- let schedulerCyclePromise = null;
- function normalizePath(projectPath) {
- return typeof projectPath === 'string' && projectPath.trim()
- ? path.resolve(projectPath.trim())
- : '';
- }
- function resolveWorkspaceDataDir(projectPath) {
- return path.join(MEMORY_WORKSPACES_ROOT, hashText(path.resolve(projectPath)));
- }
- function buildServiceForDataDir(dataDir, workspaceDir = dataDir) {
- let memoryDefaults = {};
- try {
- memoryDefaults = buildMemoryDefaults(readPilotDeckConfigFile().config);
- } catch {
- memoryDefaults = {};
- }
- const service = new EdgeClawMemoryService({
- workspaceDir,
- rootDir: MEMORY_ROOT_DIR,
- dbPath: path.join(dataDir, 'control.sqlite'),
- memoryDir: path.join(dataDir, 'memory'),
- source: 'pilotdeck',
- ...memoryDefaults,
- });
- if (memoryDefaults.defaultIndexingSettings) {
- service.saveSettings(memoryDefaults.defaultIndexingSettings);
- }
- return service;
- }
- function readWorkspaceDirFromDataDir(dataDir) {
- const dbPath = path.join(dataDir, 'control.sqlite');
- try {
- const db = new DatabaseSync(dbPath);
- try {
- const row = db.prepare(
- 'SELECT state_json FROM pipeline_state WHERE state_key = ?',
- ).get('workspaceDir');
- if (!row || typeof row.state_json !== 'string') {
- return null;
- }
- const parsed = JSON.parse(row.state_json);
- return typeof parsed === 'string' && parsed.trim()
- ? path.resolve(parsed.trim())
- : null;
- } finally {
- db.close();
- }
- } catch {
- return null;
- }
- }
- function getOrCreateServiceForDataDir(dataDir, workspaceDir = dataDir) {
- const normalizedDataDir = path.resolve(dataDir);
- let service = servicesByDataDir.get(normalizedDataDir);
- if (!service) {
- const restoredWorkspaceDir = readWorkspaceDirFromDataDir(normalizedDataDir);
- service = buildServiceForDataDir(normalizedDataDir, restoredWorkspaceDir ?? workspaceDir);
- servicesByDataDir.set(normalizedDataDir, service);
- }
- return {
- dataDir: normalizedDataDir,
- service,
- };
- }
- function getOrCreateServiceForProjectPath(projectPath) {
- const normalizedProjectPath = normalizePath(projectPath);
- if (!normalizedProjectPath) {
- throw new Error('projectPath is required');
- }
- const dataDir = resolveWorkspaceDataDir(normalizedProjectPath);
- const existing = servicesByDataDir.get(path.resolve(dataDir));
- if (existing && existing.workspaceDir !== normalizedProjectPath) {
- try {
- existing.close();
- } catch {
- // ignore close failures when refreshing workspace context
- }
- servicesByDataDir.delete(path.resolve(dataDir));
- }
- return {
- projectPath: normalizedProjectPath,
- ...getOrCreateServiceForDataDir(dataDir, normalizedProjectPath),
- };
- }
- function enqueueTaskWithKeys(keys, task) {
- const normalizedKeys = Array.from(new Set(
- keys
- .map((key) => String(key || '').trim())
- .filter(Boolean),
- ));
- const previous = Promise.all(
- normalizedKeys.map((key) => (workspaceTaskChains.get(key) ?? Promise.resolve()).catch(() => undefined)),
- );
- const next = previous.then(task);
- const sentinel = next.then(() => undefined, () => undefined);
- normalizedKeys.forEach((key) => workspaceTaskChains.set(key, sentinel));
- sentinel.finally(() => {
- normalizedKeys.forEach((key) => {
- if (workspaceTaskChains.get(key) === sentinel) {
- workspaceTaskChains.delete(key);
- }
- });
- });
- return next;
- }
- function enqueueWorkspaceTask(dataDir, task) {
- return enqueueTaskWithKeys([path.resolve(dataDir)], task);
- }
- function enqueueMaintenanceTask(dataDir, task) {
- return enqueueTaskWithKeys([path.resolve(dataDir), GLOBAL_MAINTENANCE_TASK_KEY], task);
- }
- async function pathExists(targetPath) {
- try {
- await fs.access(targetPath);
- return true;
- } catch {
- return false;
- }
- }
- function normalizeSnapshotRelativePath(relativePath, label) {
- if (typeof relativePath !== 'string' || !relativePath.trim()) {
- throw new MemoryBundleValidationError(`Invalid ${label}.relativePath`);
- }
- const segments = relativePath.replace(/\\/g, '/').split('/').filter(Boolean);
- if (segments.length === 0 || segments.some((segment) => segment === '.' || segment === '..')) {
- throw new MemoryBundleValidationError(`Invalid ${label}.relativePath`);
- }
- return segments.join('/');
- }
- function normalizeSnapshotFileRecord(value, label) {
- if (!value || typeof value !== 'object' || Array.isArray(value)) {
- throw new MemoryBundleValidationError(`Invalid ${label}`);
- }
- if (typeof value.content !== 'string') {
- throw new MemoryBundleValidationError(`Invalid ${label}.content`);
- }
- return {
- relativePath: normalizeSnapshotRelativePath(value.relativePath, label),
- content: value.content,
- };
- }
- async function listSnapshotFiles(rootDir) {
- if (!(await pathExists(rootDir))) {
- return [];
- }
- const files = [];
- async function walk(currentDir) {
- const entries = await fs.readdir(currentDir, { withFileTypes: true });
- entries.sort((left, right) => left.name.localeCompare(right.name));
- for (const entry of entries) {
- const absolutePath = path.join(currentDir, entry.name);
- if (entry.isDirectory()) {
- await walk(absolutePath);
- continue;
- }
- if (!entry.isFile()) {
- continue;
- }
- const relativePath = path.relative(rootDir, absolutePath).replace(/\\/g, '/');
- files.push({
- relativePath,
- content: await fs.readFile(absolutePath, 'utf8'),
- });
- }
- }
- await walk(rootDir);
- return files;
- }
- async function replaceSnapshotFiles(rootDir, files) {
- await fs.rm(rootDir, { recursive: true, force: true });
- await fs.mkdir(rootDir, { recursive: true });
- for (let index = 0; index < files.length; index += 1) {
- const record = normalizeSnapshotFileRecord(files[index], `files[${index}]`);
- const absolutePath = path.resolve(rootDir, record.relativePath);
- const relativeCheck = path.relative(rootDir, absolutePath);
- if (
- !relativeCheck
- || relativeCheck.startsWith('..')
- || path.isAbsolute(relativeCheck)
- ) {
- throw new MemoryBundleValidationError(`Invalid files[${index}].relativePath`);
- }
- await fs.mkdir(path.dirname(absolutePath), { recursive: true });
- await fs.writeFile(absolutePath, record.content, 'utf8');
- }
- }
- function createEmptyTransferCounts() {
- return {
- managedFiles: 0,
- memoryFiles: 0,
- project: 0,
- feedback: 0,
- user: 0,
- tmp: 0,
- projectMetas: 0,
- };
- }
- function addTransferCounts(total, partial) {
- total.managedFiles += Number(partial?.managedFiles ?? 0);
- total.memoryFiles += Number(partial?.memoryFiles ?? 0);
- total.project += Number(partial?.project ?? 0);
- total.feedback += Number(partial?.feedback ?? 0);
- total.user += Number(partial?.user ?? 0);
- total.tmp += Number(partial?.tmp ?? 0);
- total.projectMetas += Number(partial?.projectMetas ?? 0);
- }
- function normalizeAllProjectsBundle(value) {
- if (!value || typeof value !== 'object' || Array.isArray(value)) {
- throw new MemoryBundleValidationError('Invalid all-projects memory bundle');
- }
- if (value.formatVersion !== ALL_PROJECTS_MEMORY_EXPORT_FORMAT_VERSION) {
- throw new MemoryBundleValidationError(
- `Unsupported all-projects memory bundle formatVersion. Expected ${ALL_PROJECTS_MEMORY_EXPORT_FORMAT_VERSION}.`,
- );
- }
- if (value.scope !== 'all_projects') {
- throw new MemoryBundleValidationError('Unsupported all-projects memory bundle scope.');
- }
- if (!Array.isArray(value.projects)) {
- throw new MemoryBundleValidationError('Invalid all-projects memory bundle projects.');
- }
- const globalFiles = Array.isArray(value.globalFiles)
- ? value.globalFiles.map((item, index) => normalizeSnapshotFileRecord(item, `globalFiles[${index}]`))
- : [];
- const seenGlobalPaths = new Set();
- for (const record of globalFiles) {
- if (seenGlobalPaths.has(record.relativePath)) {
- throw new MemoryBundleValidationError(`Duplicate globalFiles path: ${record.relativePath}`);
- }
- seenGlobalPaths.add(record.relativePath);
- }
- const seenProjectPaths = new Set();
- const projects = value.projects.map((project, index) => {
- if (!project || typeof project !== 'object' || Array.isArray(project)) {
- throw new MemoryBundleValidationError(`Invalid projects[${index}]`);
- }
- const projectPath = normalizePath(project.projectPath);
- if (!projectPath) {
- throw new MemoryBundleValidationError(`Invalid projects[${index}].projectPath`);
- }
- if (seenProjectPaths.has(projectPath)) {
- throw new MemoryBundleValidationError(`Duplicate projectPath in projects[${index}]`);
- }
- seenProjectPaths.add(projectPath);
- if (!project.bundle || typeof project.bundle !== 'object' || Array.isArray(project.bundle)) {
- throw new MemoryBundleValidationError(`Invalid projects[${index}].bundle`);
- }
- return {
- projectPath,
- projectName: typeof project.projectName === 'string' && project.projectName.trim()
- ? project.projectName.trim()
- : path.basename(projectPath),
- bundle: project.bundle,
- };
- });
- return {
- formatVersion: ALL_PROJECTS_MEMORY_EXPORT_FORMAT_VERSION,
- scope: 'all_projects',
- exportedAt: typeof value.exportedAt === 'string' && value.exportedAt.trim()
- ? value.exportedAt.trim()
- : new Date().toISOString(),
- projects,
- globalFiles,
- };
- }
- async function listWorkspaceDataDirs() {
- try {
- const entries = await fs.readdir(MEMORY_WORKSPACES_ROOT, { withFileTypes: true });
- const dirs = [];
- for (const entry of entries) {
- if (!entry.isDirectory()) continue;
- const dataDir = path.join(MEMORY_WORKSPACES_ROOT, entry.name);
- const hasDb = await pathExists(path.join(dataDir, 'control.sqlite'));
- const hasMemoryDir = await pathExists(path.join(dataDir, 'memory'));
- if (hasDb && hasMemoryDir) {
- dirs.push(dataDir);
- }
- }
- return dirs.sort((left, right) => left.localeCompare(right));
- } catch {
- return [];
- }
- }
- async function executeScheduledMaintenanceForDataDir(dataDir) {
- const { service } = getOrCreateServiceForDataDir(dataDir);
- return enqueueMaintenanceTask(dataDir, async () => service.runDueScheduledMaintenance('scheduled:server_scheduler'));
- }
- export async function resolveProjectPathFromRequest(req) {
- const queryProjectPath = normalizePath(req.query?.projectPath);
- if (queryProjectPath) {
- return queryProjectPath;
- }
- const bodyProjectPath = normalizePath(req.body?.projectPath);
- if (bodyProjectPath) {
- return bodyProjectPath;
- }
- const projectName = typeof req.query?.projectName === 'string'
- ? req.query.projectName.trim()
- : typeof req.params?.projectName === 'string'
- ? req.params.projectName.trim()
- : '';
- if (!projectName) {
- throw new Error('projectPath or projectName is required');
- }
- return path.resolve(await extractProjectDirectory(projectName));
- }
- export async function getMemoryServiceForRequest(req) {
- const projectPath = await resolveProjectPathFromRequest(req);
- return getOrCreateServiceForProjectPath(projectPath);
- }
- export async function runManualMemoryFlush(service, dataDir, options = {}) {
- return enqueueMaintenanceTask(dataDir, async () => service.flush({
- reason: options.reason ?? 'manual',
- ...(typeof options.batchSize === 'number' ? { batchSize: options.batchSize } : {}),
- ...(Array.isArray(options.sessionKeys) ? { sessionKeys: options.sessionKeys } : {}),
- }));
- }
- export async function runManualMemoryDream(service, dataDir) {
- return enqueueMaintenanceTask(dataDir, async () => service.dream('manual'));
- }
- export async function rollbackLastMemoryDream(service, dataDir) {
- return enqueueMaintenanceTask(dataDir, async () => service.rollbackLastDream());
- }
- export async function runMemorySchedulerCycle() {
- try {
- if (!readPilotDeckConfigFile().config.memory?.enabled) {
- return null;
- }
- } catch {
- // If config cannot be read, keep the scheduler's default enabled behavior.
- }
- if (schedulerCyclePromise) {
- return schedulerCyclePromise;
- }
- schedulerCyclePromise = (async () => {
- const workspaceDataDirs = await listWorkspaceDataDirs();
- for (const dataDir of workspaceDataDirs) {
- try {
- await executeScheduledMaintenanceForDataDir(dataDir);
- } catch (error) {
- console.error(`[memory-scheduler] scheduled maintenance failed for ${dataDir}:`, error);
- }
- }
- })().finally(() => {
- schedulerCyclePromise = null;
- });
- return schedulerCyclePromise;
- }
- export function startMemoryScheduler() {
- try {
- if (!readPilotDeckConfigFile().config.memory?.enabled) {
- return;
- }
- } catch {
- // If config cannot be read, keep the scheduler's default enabled behavior.
- }
- if (schedulerTimer) {
- return;
- }
- schedulerTimer = setInterval(() => {
- void runMemorySchedulerCycle();
- }, MEMORY_SCHEDULER_INTERVAL_MS);
- if (typeof schedulerTimer.unref === 'function') {
- schedulerTimer.unref();
- }
- void runMemorySchedulerCycle();
- }
- export function stopMemoryScheduler() {
- if (schedulerTimer) {
- clearInterval(schedulerTimer);
- schedulerTimer = null;
- }
- }
- export function getMemorySchedulerStatus() {
- let enabled = true;
- let configError = null;
- try {
- enabled = readPilotDeckConfigFile().config.memory?.enabled !== false;
- } catch (error) {
- configError = error instanceof Error ? error.message : String(error);
- }
- return {
- enabled,
- running: Boolean(schedulerTimer),
- intervalMs: MEMORY_SCHEDULER_INTERVAL_MS,
- ...(configError ? { configError } : {}),
- };
- }
- export function closeMemoryServices() {
- stopMemoryScheduler();
- for (const service of servicesByDataDir.values()) {
- try {
- service.close();
- } catch {
- // ignore close failures during shutdown
- }
- }
- servicesByDataDir.clear();
- workspaceTaskChains.clear();
- }
- export async function clearAllMemoryData() {
- for (const service of servicesByDataDir.values()) {
- try {
- service.close();
- } catch {
- // ignore close failures during clear
- }
- }
- servicesByDataDir.clear();
- workspaceTaskChains.clear();
- await fs.rm(MEMORY_ROOT_DIR, { recursive: true, force: true });
- await fs.mkdir(MEMORY_WORKSPACES_ROOT, { recursive: true });
- await fs.mkdir(MEMORY_GLOBAL_ROOT, { recursive: true });
- return {
- scope: 'all_memory',
- clearedAt: new Date().toISOString(),
- cleared: {
- l0Sessions: 0,
- pipelineState: 0,
- memoryFiles: 0,
- projectMetas: 0,
- },
- };
- }
- export async function exportAllProjectsMemoryBundle() {
- const workspaceDataDirs = await listWorkspaceDataDirs();
- const projects = [];
- for (const dataDir of workspaceDataDirs) {
- const restoredWorkspaceDir = readWorkspaceDirFromDataDir(dataDir);
- const { service } = getOrCreateServiceForDataDir(dataDir, restoredWorkspaceDir ?? dataDir);
- const projectMeta = service.getProjectMeta();
- projects.push({
- projectPath: service.workspaceDir,
- projectName: projectMeta?.projectName || path.basename(service.workspaceDir),
- bundle: service.exportBundle(),
- });
- }
- return {
- formatVersion: ALL_PROJECTS_MEMORY_EXPORT_FORMAT_VERSION,
- scope: 'all_projects',
- exportedAt: new Date().toISOString(),
- globalFiles: await listSnapshotFiles(MEMORY_GLOBAL_ROOT),
- projects,
- };
- }
- export async function importAllProjectsMemoryBundle(bundle) {
- const normalized = normalizeAllProjectsBundle(bundle);
- await clearAllMemoryData();
- const imported = createEmptyTransferCounts();
- const warnings = [];
- for (const project of normalized.projects) {
- const { service } = getOrCreateServiceForProjectPath(project.projectPath);
- const result = service.importBundle(project.bundle);
- addTransferCounts(imported, result.imported);
- if (Array.isArray(result.warnings) && result.warnings.length > 0) {
- warnings.push(...result.warnings.map((warning) => `[${project.projectName}] ${warning}`));
- }
- }
- await replaceSnapshotFiles(MEMORY_GLOBAL_ROOT, normalized.globalFiles);
- return {
- formatVersion: ALL_PROJECTS_MEMORY_EXPORT_FORMAT_VERSION,
- scope: 'all_projects',
- importedAt: new Date().toISOString(),
- projectCount: normalized.projects.length,
- imported,
- ...(warnings.length > 0 ? { warnings } : {}),
- };
- }
|