diff --git a/README.md b/README.md
index 093a661a..2a00d459 100644
--- a/README.md
+++ b/README.md
@@ -358,6 +358,7 @@ OpenKB settings are initialized by `openkb init` and stored in `.openkb/config.y
model: gpt-5.4 # LLM model (any LiteLLM-supported provider)
language: en # Wiki output language
pageindex_threshold: 20 # PDF pages threshold for PageIndex
+file_processing_jobs: 2 # Files to prepare concurrently during `openkb add
`
```
Model names use `provider/model` LiteLLM [format](https://docs.litellm.ai/docs/providers) (OpenAI models can omit the prefix):
@@ -372,6 +373,8 @@ Model names use `provider/model` LiteLLM [format](https://docs.litellm.ai/docs/p
Advanced options (entity_types, extra_headers, OAuth):
+`file_processing_jobs` (default `2`): number of files prepared concurrently during `openkb add `. Only the preparation stage is parallelized (hashing, duplicate prefiltering, raw/source staging, conversion); live-KB mutation stays serialized under the mutation lock, so raising it helps mainly when conversion is the bottleneck.
+
`entity_types` (optional): a YAML list overriding the entity-type vocabulary used for entity pages; omit it to use the default `person`, `organization`, `place`, `product`, `work`, `event`, `other`.
`extra_headers` (optional): a YAML mapping of extra HTTP headers sent with every LLM request (forwarded to LiteLLM's `extra_headers`). Useful for providers that expect custom headers, e.g. GitHub Copilot IDE-auth headers:
diff --git a/config.yaml.example b/config.yaml.example
index 45b4b8c3..aff75f0a 100644
--- a/config.yaml.example
+++ b/config.yaml.example
@@ -1,6 +1,10 @@
model: gpt-5.4 # LLM model (any LiteLLM-supported provider)
language: en # Wiki output language
pageindex_threshold: 20 # PDF pages threshold for PageIndex
+file_processing_jobs: 2 # Number of files to prepare concurrently during `openkb add `
+# Note: this parallelizes hashing/conversion/staging only. Live KB publish,
+# PageIndex indexing, LLM compilation, registry updates, and log writes remain
+# serialized under the KB mutation lock.
# Optional: extra HTTP headers sent with every LLM request (forwarded to
# LiteLLM's extra_headers). Some providers need these — e.g. GitHub Copilot
diff --git a/openkb/agent/compiler.py b/openkb/agent/compiler.py
index c2b92619..68f57691 100644
--- a/openkb/agent/compiler.py
+++ b/openkb/agent/compiler.py
@@ -31,6 +31,7 @@
from openkb import frontmatter
from openkb.config import DEFAULT_ENTITY_TYPES, get_extra_headers, resolve_entity_types
from openkb.lint import list_existing_wiki_targets, strip_ghost_wikilinks
+from openkb.locks import atomic_write_text
from openkb.schema import INDEX_SEED, get_agents_md
logger = logging.getLogger(__name__)
@@ -768,7 +769,7 @@ def _write_summary(wiki_dir: Path, doc_name: str, summary: str,
fm_lines.append(f"doc_type: {doc_type}")
fm_lines.append(_yaml_kv_line("full_text", f"sources/{doc_name}.{ext}"))
fm_block = "---\n" + "\n".join(fm_lines) + "\n---\n\n"
- (summaries_dir / f"{doc_name}.md").write_text(fm_block + summary, encoding="utf-8")
+ atomic_write_text(summaries_dir / f"{doc_name}.md", fm_block + summary)
_SAFE_NAME_RE = re.compile(r'[^\w\-]')
@@ -828,7 +829,7 @@ def _write_concept(wiki_dir: Path, name: str, content: str, source_file: str, is
if brief:
fm_lines.append(_yaml_kv_line("description", brief))
existing = frontmatter.block(fm_lines) + clean
- path.write_text(existing, encoding="utf-8")
+ atomic_write_text(path, existing)
return
# Guarantee type + refresh description on update; remove legacy brief:.
ex_parts2 = frontmatter.split(existing)
@@ -840,7 +841,7 @@ def _write_concept(wiki_dir: Path, name: str, content: str, source_file: str, is
# Drop legacy brief: lines (migrated to description:).
fm_block = frontmatter.drop_line(fm_block, "brief")
existing = fm_block + body
- path.write_text(existing, encoding="utf-8")
+ atomic_write_text(path, existing)
else:
clean_parts = frontmatter.split(content)
if clean_parts is not None:
@@ -852,7 +853,7 @@ def _write_concept(wiki_dir: Path, name: str, content: str, source_file: str, is
if brief:
fm_lines.append(_yaml_kv_line("description", brief))
fm_block = "---\n" + "\n".join(fm_lines) + "\n---\n\n"
- path.write_text(fm_block + content, encoding="utf-8")
+ atomic_write_text(path, fm_block + content)
def _write_entity(
@@ -916,10 +917,10 @@ def _build_entity_frontmatter(sources: list[str]) -> str:
break
merged = [source_file] + [s for s in recovered if s != source_file]
existing = _build_entity_frontmatter(merged) + clean
- path.write_text(existing, encoding="utf-8")
+ atomic_write_text(path, existing)
return
- path.write_text(_build_entity_frontmatter([source_file]) + clean, encoding="utf-8")
+ atomic_write_text(path, _build_entity_frontmatter([source_file]) + clean)
_set_fm_line = frontmatter.set_line
@@ -1024,7 +1025,7 @@ def _add_related_link(
text = _prepend_source_to_frontmatter(text, source_file)
text += f"\n\nSee also: {link}"
- path.write_text(text, encoding="utf-8")
+ atomic_write_text(path, text)
return True
@@ -1051,7 +1052,7 @@ def _backlink_summary_pages(
_ensure_h2_section(lines, section, quiet=True)
for slug in reversed(missing):
_insert_section_entry(lines, section, f"- [[{page_dir}/{slug}]]")
- summary_path.write_text("\n".join(lines), encoding="utf-8")
+ atomic_write_text(summary_path, "\n".join(lines))
def _backlink_pages(
@@ -1072,7 +1073,7 @@ def _backlink_pages(
lines = text.split("\n")
_ensure_h2_section(lines, "## Related Documents", quiet=True)
_insert_section_entry(lines, "## Related Documents", f"- {link}")
- path.write_text("\n".join(lines), encoding="utf-8")
+ atomic_write_text(path, "\n".join(lines))
def _backlink_summary(wiki_dir: Path, doc_name: str, concept_slugs: list[str]) -> None:
@@ -1178,7 +1179,7 @@ def _remove_doc_from_pages(
path.unlink()
deleted.append(path.stem)
elif new_text != text:
- path.write_text(new_text, encoding="utf-8")
+ atomic_write_text(path, new_text)
modified.append(path.stem)
return {"modified": modified, "deleted": deleted}
@@ -1274,7 +1275,7 @@ def remove_doc_from_index(wiki_dir: Path, doc_name: str, concept_slugs_deleted:
while _remove_section_entry(lines, "## Entities", entity_link):
pass
- index_path.write_text("\n".join(lines), encoding="utf-8")
+ atomic_write_text(index_path, "\n".join(lines))
def _update_index(
@@ -1298,7 +1299,7 @@ def _update_index(
index_path = wiki_dir / "index.md"
if not index_path.exists():
- index_path.write_text(INDEX_SEED, encoding="utf-8")
+ atomic_write_text(index_path, INDEX_SEED)
lines = index_path.read_text(encoding="utf-8").split("\n")
@@ -1344,7 +1345,7 @@ def _update_index(
else:
_insert_section_entry(lines, "## Entities", entry)
- index_path.write_text("\n".join(lines), encoding="utf-8")
+ atomic_write_text(index_path, "\n".join(lines))
# ---------------------------------------------------------------------------
@@ -2018,7 +2019,7 @@ async def compile_long_doc(
updated = fm_block + body
if updated != summary_content:
summary_content = updated
- summary_path.write_text(summary_content, encoding="utf-8")
+ atomic_write_text(summary_path, summary_content)
# Base context A. cache_control marker on the doc message creates a
# cache breakpoint covering (system + doc) for every concept call.
diff --git a/openkb/cli.py b/openkb/cli.py
index a6236b7d..8a8c7cb7 100644
--- a/openkb/cli.py
+++ b/openkb/cli.py
@@ -8,11 +8,16 @@
warnings.filterwarnings("ignore")
import asyncio
+from concurrent.futures import ThreadPoolExecutor
+from contextlib import contextmanager
+from dataclasses import dataclass
import json
import logging
import shutil
import sys
import time
+import unicodedata
+import uuid
from functools import wraps
from pathlib import Path
from typing import Literal
@@ -45,9 +50,10 @@ def filter(self, record: logging.LogRecord) -> bool:
DEFAULT_CONFIG, load_config, save_config, load_global_config, register_kb,
resolve_extra_headers, set_extra_headers,
)
-from openkb.converter import _registry_path, convert_document
+from openkb.converter import _convert_document_locked, _registry_path, _sanitize_stem, resolve_doc_name
from openkb.locks import atomic_write_json, atomic_write_text, kb_ingest_lock, kb_read_lock
from openkb.log import append_log
+from openkb.mutation import MutationSnapshot, publish_staged_tree, snapshot_paths
from openkb.schema import AGENTS_MD, INDEX_SEED, PAGE_CONTENT_DIRS
# Suppress warnings after all imports — markitdown overrides filters at import time
@@ -56,6 +62,8 @@ def filter(self, record: logging.LogRecord) -> bool:
load_dotenv() # load from cwd (covers running inside the KB dir)
+logger = logging.getLogger(__name__)
+
_KNOWN_PROVIDER_KEYS = (
"OPENAI_API_KEY", "ANTHROPIC_API_KEY", "GEMINI_API_KEY",
@@ -67,6 +75,7 @@ def filter(self, record: logging.LogRecord) -> bool:
# handled by LiteLLM itself) — no API key env var is needed, so the
# missing-key warning would be a false alarm for them.
_OAUTH_PROVIDERS = {"chatgpt", "github_copilot"}
+_AddOutcome = Literal["added", "skipped", "failed"]
def _extract_provider(model: str) -> str | None:
@@ -274,13 +283,477 @@ def _clear_existing_skill_dir(kb_dir: Path, name: str) -> None:
shutil.rmtree(target)
-def add_single_file(file_path: Path, kb_dir: Path) -> Literal["added", "skipped", "failed"]:
- """Convert, index, and compile a single document under the KB mutation lock."""
+@dataclass
+class _PreparedAdd:
+ file_path: Path
+ result: object | None = None
+ staging_dir: Path | None = None
+ outcome: _AddOutcome | None = None
+ error_stage: str = ""
+ error: Exception | None = None
+
+
+def _positive_int(value: object, default: int) -> int:
+ try:
+ parsed = int(value)
+ except (TypeError, ValueError):
+ return default
+ return max(1, parsed)
+
+
+def _log_add_timing(
+ stage: str,
+ file_path: Path | None,
+ started_at: float,
+ **fields: object,
+) -> None:
+ if not logger.isEnabledFor(logging.DEBUG):
+ return
+ elapsed = time.perf_counter() - started_at
+ subject = file_path.name if file_path is not None else ""
+ suffix = "".join(f" {key}={value}" for key, value in fields.items())
+ logger.debug("add %s for %s took %.3fs%s", stage, subject, elapsed, suffix)
+
+
+def _prefilter_known_files(
+ files: list[Path],
+ kb_dir: Path,
+ jobs: int,
+) -> tuple[list[Path], dict[Path, _PreparedAdd]]:
+ """Hash directory inputs before conversion and skip hashes already known."""
+ from openkb.state import HashRegistry
+
+ started = time.perf_counter()
+ with _kb_mutation_lock(kb_dir):
+ registry = HashRegistry.memory(
+ HashRegistry(kb_dir / ".openkb" / "hashes.json").all_entries()
+ )
+
+ def hash_one(file_path: Path) -> tuple[Path, str | None, Exception | None]:
+ try:
+ return file_path, HashRegistry.hash_file(file_path), None
+ except Exception as exc:
+ return file_path, None, exc
+
+ if jobs == 1:
+ hashed = [hash_one(file_path) for file_path in files]
+ else:
+ with ThreadPoolExecutor(max_workers=jobs) as executor:
+ futures = [executor.submit(hash_one, file_path) for file_path in files]
+ hashed = [future.result() for future in futures]
+
+ remaining: list[Path] = []
+ prepared: dict[Path, _PreparedAdd] = {}
+ for file_path, file_hash, error in hashed:
+ if error is not None:
+ prepared[file_path] = _PreparedAdd(
+ file_path=file_path,
+ outcome="failed",
+ error_stage="Hash",
+ error=error,
+ )
+ continue
+ if file_hash is not None and registry.is_known(file_hash):
+ prepared[file_path] = _PreparedAdd(
+ file_path=file_path,
+ outcome="skipped",
+ )
+ continue
+ remaining.append(file_path)
+
+ _log_add_timing(
+ "prefilter",
+ None,
+ started,
+ total=len(files),
+ remaining=len(remaining),
+ skipped=len(prepared),
+ jobs=jobs,
+ )
+ return remaining, prepared
+
+
+def _reserve_batch_doc_names(files: list[Path], kb_dir: Path) -> dict[Path, str]:
+ """Reserve doc_names in scan order so parallel prepare matches serial add."""
+ from openkb.state import HashRegistry
+
+ # In-memory registry view: same resolve contract as the on-disk registry
+ # but add() never persists. persist_legacy=True below therefore only
+ # *consumes* a matched legacy entry in memory (backfilling its path) so a
+ # later same-stem file in this batch no longer re-matches it via
+ # find_legacy_by_stem — the idempotency fix. With a persisting registry
+ # that same call would write disk on every reservation.
+ batch_registry = HashRegistry.memory(
+ HashRegistry(kb_dir / ".openkb" / "hashes.json").all_entries()
+ )
+ reserved: dict[Path, str] = {}
+ for file_path in files:
+ doc_name = resolve_doc_name(
+ file_path,
+ kb_dir,
+ batch_registry,
+ persist_legacy=True,
+ )
+ reserved[file_path] = doc_name
+ # Future files in the same batch must see this name as already taken;
+ # otherwise same-stem files prepared in parallel could all claim it.
+ batch_registry.add(
+ f"batch:{len(reserved)}:{_registry_path(file_path, kb_dir)}",
+ {
+ "name": file_path.name,
+ "doc_name": doc_name,
+ "path": _registry_path(file_path, kb_dir),
+ },
+ )
+ return reserved
+
+
+def _staging_dir_for(kb_dir: Path, file_path: Path) -> Path:
+ safe = _sanitize_stem(file_path.stem)
+ # uuid (not time.time_ns()) for uniqueness: two same-stem files sanitize to
+ # the same `safe`, so the timestamp was the only differentiator and a
+ # repeated wall-clock ns would make mkdir(exist_ok=False) crash the batch.
+ path = kb_dir / ".openkb" / "staging" / f"add-{safe}-{uuid.uuid4().hex[:8]}"
+ path.mkdir(parents=True, exist_ok=False)
+ return path
+
+
+def _cleanup_staging(path: Path | None) -> None:
+ if path is not None:
+ shutil.rmtree(path, ignore_errors=True)
+
+
+def _prepare_add_file(
+ file_path: Path,
+ kb_dir: Path,
+ staging_dir: Path | None,
+ doc_name: str | None = None,
+) -> _PreparedAdd:
+ """Run file-local conversion into an optional staging directory."""
+ started = time.perf_counter()
+ try:
+ result = _convert_document_locked(
+ file_path,
+ kb_dir,
+ staging_dir=staging_dir,
+ doc_name_override=doc_name,
+ )
+ except Exception as exc:
+ logger.debug("Conversion traceback:", exc_info=True)
+ _log_add_timing("prepare", file_path, started, outcome="failed")
+ return _PreparedAdd(
+ file_path=file_path,
+ staging_dir=staging_dir,
+ outcome="failed",
+ error_stage="Conversion",
+ error=exc,
+ )
+ if result.skipped:
+ prepared = _PreparedAdd(
+ file_path=file_path,
+ result=result,
+ staging_dir=staging_dir,
+ outcome="skipped",
+ )
+ _log_add_timing("prepare", file_path, started, outcome="skipped")
+ return prepared
+ prepared = _PreparedAdd(file_path=file_path, result=result, staging_dir=staging_dir)
+ _log_add_timing("prepare", file_path, started, outcome="prepared")
+ return prepared
+
+
+def _final_artifact_paths(result, kb_dir: Path) -> tuple[Path | None, Path | None]:
+ final_raw = None
+ final_source = None
+ if result.raw_path is not None:
+ final_raw = kb_dir / "raw" / result.raw_path.name
+ if result.source_path is not None:
+ final_source = kb_dir / "wiki" / "sources" / result.source_path.name
+ return final_raw, final_source
+
+
+def _snapshot_add_paths(kb_dir: Path, doc_name: str, final_raw: Path | None, final_source: Path | None) -> list[Path]:
+ paths = [
+ kb_dir / ".openkb" / "hashes.json",
+ kb_dir / ".openkb" / "pageindex.db",
+ # SQLite may keep recently-written PageIndex state in sidecar files.
+ # Snapshot missing paths too, so rollback removes sidecars created
+ # during a failed long-document ingest.
+ kb_dir / ".openkb" / "pageindex.db-wal",
+ kb_dir / ".openkb" / "pageindex.db-shm",
+ kb_dir / ".openkb" / "pageindex.db-journal",
+ kb_dir / ".openkb" / "files",
+ kb_dir / "wiki" / "summaries" / f"{doc_name}.md",
+ # Long-doc only: per-page sources JSON + extracted images. Listed
+ # unconditionally because snapshot_paths no-ops on missing targets,
+ # so the same helper covers short and long docs without branching.
+ kb_dir / "wiki" / "sources" / f"{doc_name}.json",
+ kb_dir / "wiki" / "sources" / "images" / doc_name,
+ kb_dir / "wiki" / "concepts",
+ kb_dir / "wiki" / "entities",
+ kb_dir / "wiki" / "index.md",
+ kb_dir / "wiki" / "log.md",
+ ]
+ if final_raw is not None:
+ paths.append(final_raw)
+ if final_source is not None:
+ paths.append(final_source)
+ return paths
+
+
+def _run_compile_with_retry(coro_factory, label: str) -> None:
+ """Run an async compile coroutine once, retrying once after 2s on failure.
+
+ ``coro_factory`` builds a fresh coroutine each call (an ``asyncio.run``-consumed
+ coroutine can't be awaited twice). Raises on the second failure so the caller's
+ snapshot-rollback path runs.
+ """
+ click.echo(f" {label}...")
+ started = time.perf_counter()
+ for attempt in range(2):
+ try:
+ asyncio.run(coro_factory())
+ _log_add_timing("compile", None, started, label=label)
+ return
+ except Exception as exc:
+ if attempt == 0:
+ click.echo(" Retrying compilation in 2s...")
+ time.sleep(2)
+ else:
+ click.echo(f" [ERROR] Compilation failed: {exc}")
+ logger.debug("Compilation traceback:", exc_info=True)
+ raise
+
+
+def _commit_prepared_add(prepared: _PreparedAdd, kb_dir: Path, model: str) -> _AddOutcome:
+ """Commit a prepared add while the caller holds the KB mutation lock."""
+ from openkb.agent.compiler import compile_long_doc, compile_short_doc
+ from openkb.state import HashRegistry
+
+ file_path = prepared.file_path
+ started = time.perf_counter()
+
+ def finish(outcome: _AddOutcome) -> _AddOutcome:
+ _log_add_timing("commit", file_path, started, outcome=outcome)
+ return outcome
+
+ if prepared.outcome == "failed":
+ click.echo(f" [ERROR] {prepared.error_stage} failed: {prepared.error}")
+ _cleanup_staging(prepared.staging_dir)
+ return finish("failed")
+ if prepared.outcome == "skipped":
+ click.echo(f" [SKIP] Already in knowledge base: {file_path.name}")
+ _cleanup_staging(prepared.staging_dir)
+ return finish("skipped")
+ if prepared.result is None:
+ click.echo(f" [ERROR] Conversion failed: no result for {file_path.name}")
+ _cleanup_staging(prepared.staging_dir)
+ return finish("failed")
+
+ result = prepared.result
+ registry = HashRegistry(kb_dir / ".openkb" / "hashes.json")
+ if result.file_hash and registry.is_known(result.file_hash):
+ click.echo(f" [SKIP] Already in knowledge base: {file_path.name}")
+ _cleanup_staging(prepared.staging_dir)
+ return finish("skipped")
+
+ doc_name = result.doc_name or file_path.stem
+ norm_doc_name = unicodedata.normalize("NFKC", doc_name)
+ norm_file_name = unicodedata.normalize("NFKC", file_path.name)
+ path_key = _registry_path(file_path, kb_dir)
+ for existing_hash, meta in registry.all_entries().items():
+ existing_name = meta.get("doc_name") or Path(meta.get("name", "")).stem
+ if (
+ existing_hash != result.file_hash
+ and unicodedata.normalize("NFKC", existing_name) == norm_doc_name
+ ):
+ existing_path = meta.get("path")
+ # Same document iff it shares our path, OR it is a legacy
+ # (pre-path-index) entry with no path whose filename matches — the
+ # pre-collision form of this same document being re-ingested. A
+ # path-indexed entry must NOT be matched by filename alone: two
+ # different files can share a name, and a concurrent add could
+ # claim this doc_name in the window between reservation and commit.
+ # NFKC-normalize names: macOS reports filenames in NFD, so a raw
+ # `==` would mis-classify a same-document re-add as a conflict.
+ if existing_path == path_key or (
+ not existing_path
+ and unicodedata.normalize("NFKC", meta.get("name") or "") == norm_file_name
+ ):
+ continue
+ click.echo(
+ " [ERROR] Document name conflict after parallel preparation: "
+ f"'{doc_name}' is already used by {meta.get('name', 'another document')}. "
+ "Re-run this file after the current batch."
+ )
+ _cleanup_staging(prepared.staging_dir)
+ return finish("failed")
+
+ final_raw, final_source = _final_artifact_paths(result, kb_dir)
+ snapshot: MutationSnapshot | None = None
+ index_result = None
+
+ try:
+ # Take the snapshot inside the try: a failure here (permission,
+ # relative_to ValueError) used to propagate uncaught — aborting the
+ # whole remaining batch and leaking an unjournaled backup dir.
+ # snapshot_paths now self-cleans its backup dir on partial failure, so
+ # when snapshot stays None there is nothing to roll back and the
+ # except branch handles that case.
+ #
+ # concepts/entities/files are snapshotted via hardlinks, not copies:
+ # the wiki writers are atomic (temp+replace) and PageIndex only appends
+ # new {doc_id} blobs, so a hardlink backup stays valid while the live
+ # tree is mutated. This is what keeps a per-file snapshot O(1) instead
+ # of O(corpus) — the cost that bit large-KB batch adds.
+ snapshot = snapshot_paths(
+ kb_dir,
+ _snapshot_add_paths(kb_dir, doc_name, final_raw, final_source),
+ operation="add",
+ details={"file_hash": result.file_hash, "name": file_path.name, "doc_name": doc_name},
+ hardlink_dirs={
+ kb_dir / "wiki" / "concepts",
+ kb_dir / "wiki" / "entities",
+ kb_dir / ".openkb" / "files",
+ },
+ )
+ publish_staged_tree(prepared.staging_dir, kb_dir)
+ if final_raw is not None:
+ result.raw_path = final_raw
+ if final_source is not None:
+ result.source_path = final_source
+
+ if result.is_long_doc:
+ click.echo(" Long document detected — indexing with PageIndex...")
+ index_started = time.perf_counter()
+ try:
+ from openkb.indexer import index_long_document
+
+ index_result = index_long_document(result.raw_path, kb_dir, doc_name=doc_name)
+ _log_add_timing("index", file_path, index_started, doc_name=doc_name)
+ except Exception as exc:
+ click.echo(f" [ERROR] Indexing failed: {exc}")
+ logger.debug("Indexing traceback:", exc_info=True)
+ raise
+
+ summary_path = kb_dir / "wiki" / "summaries" / f"{doc_name}.md"
+ _run_compile_with_retry(
+ lambda: compile_long_doc(
+ doc_name,
+ summary_path,
+ index_result.doc_id,
+ kb_dir,
+ model,
+ doc_description=index_result.description,
+ ),
+ label=f"Compiling long doc (doc_id={index_result.doc_id})",
+ )
+ else:
+ _run_compile_with_retry(
+ lambda: compile_short_doc(doc_name, result.source_path, kb_dir, model),
+ label="Compiling short doc",
+ )
+
+ if result.file_hash:
+ # Reuse the conflict-scan registry (line above): still valid under
+ # the same lock — nothing mutated it between scan and write.
+ doc_type = "long_pdf" if result.is_long_doc else file_path.suffix.lstrip(".")
+ meta = {
+ "name": file_path.name,
+ "doc_name": doc_name,
+ "type": doc_type,
+ "path": _registry_path(file_path, kb_dir),
+ }
+ if result.raw_path is not None:
+ meta["raw_path"] = _registry_path(result.raw_path, kb_dir)
+ if result.source_path is not None:
+ meta["source_path"] = _registry_path(result.source_path, kb_dir)
+ if index_result is not None:
+ meta["doc_id"] = index_result.doc_id
+ registry.remove_by_doc_name(doc_name)
+ for existing_hash, existing_meta in list(registry.all_entries().items()):
+ if (
+ existing_hash != result.file_hash
+ and not existing_meta.get("doc_name")
+ and existing_meta.get("name") == file_path.name
+ ):
+ registry.remove_by_hash(existing_hash)
+ registry.add(result.file_hash, meta)
+
+ # Commit point: the registry write is durable. Mark the journal
+ # committed so that any failure in the post-commit cleanup below
+ # (log append, backup removal) cannot trigger a recovery rollback
+ # that discards this completed ingest. mark_committed is the last
+ # step inside the try — together with registry.add it forms the
+ # atomic commit group, so a failure here correctly rolls back.
+ snapshot.mark_committed()
+ except Exception:
+ if snapshot is None:
+ # snapshot_paths failed before any mutation ran; it already removed
+ # its own backup dir, so there is nothing to roll back.
+ click.echo(f" [ERROR] Failed to prepare mutation snapshot for {file_path.name}.")
+ return finish("failed")
+ rollback_error = snapshot.rollback_best_effort()
+ if rollback_error is None:
+ snapshot.discard_best_effort()
+ else:
+ click.echo(
+ " [ERROR] Rollback failed; mutation journal retained for recovery: "
+ f"{snapshot.journal_path}"
+ )
+ return finish("failed")
+ finally:
+ _cleanup_staging(prepared.staging_dir)
+
+ # Post-commit side effects. These run only after the mutation is
+ # committed and the journal marked committed, so a failure here must NOT
+ # roll back — best-effort only. A stale "committed" journal left behind
+ # is harmless: the next recover_pending_journals sees status "committed"
+ # and discards it.
+ try:
+ append_log(kb_dir / "wiki", "ingest", file_path.name)
+ except Exception as exc:
+ logger.warning("Failed to append ingest log for %s: %s", file_path.name, exc)
+ cleanup_error = snapshot.discard_best_effort()
+ if cleanup_error is not None:
+ click.echo(
+ f" [WARN] {file_path.name} added, but mutation journal cleanup failed: {cleanup_error}"
+ )
+ click.echo(f" [OK] {file_path.name} added to knowledge base.")
+ return finish("added")
+
+
+@contextmanager
+def _kb_mutation_lock(kb_dir: Path):
+ """Acquire the ingest lock for an add-path mutation.
+
+ Journal draining lives in :func:`openkb.locks.kb_lock` now, so every
+ exclusive-lock holder — ``add``, ``remove``, ``recompile``, ``lint``,
+ ``chat`` — drains pending journals on first acquisition, not just the add
+ path. This wrapper remains the add pipeline's entry point (prepare/commit
+ run inside the lock); it no longer drains separately, since doing so would
+ double-scan and double-log on every per-file commit in a directory batch.
+ """
+ started = time.perf_counter()
with kb_ingest_lock(kb_dir / ".openkb"):
- return _add_single_file_locked(file_path, kb_dir)
+ _log_add_timing("lock_wait", None, started)
+ yield
+
+
+def add_single_file(file_path: Path, kb_dir: Path, *, stage: bool = True) -> _AddOutcome:
+ """Convert, index, and compile a single document under the KB mutation lock.
+
+ ``stage=True`` converts into an isolated staging dir before the commit
+ snapshot, so a crash between convert and commit can't orphan raw/source
+ files in the live KB. Callers whose file already lives in ``raw/`` (watch
+ mode, URL fetch) pass ``stage=False`` to keep convert's in-place
+ optimization.
+ """
+ with _kb_mutation_lock(kb_dir):
+ return _add_single_file_locked(file_path, kb_dir, stage=stage)
-def _add_single_file_locked(file_path: Path, kb_dir: Path) -> Literal["added", "skipped", "failed"]:
+def _add_single_file_locked(file_path: Path, kb_dir: Path, *, stage: bool = True) -> _AddOutcome:
"""Convert, index, and compile a single document into the knowledge base.
Steps:
@@ -297,106 +770,19 @@ def _add_single_file_locked(file_path: Path, kb_dir: Path) -> Literal["added", "
be an orphan) while preserving it on failure so the user can
retry without re-downloading.
"""
- from openkb.agent.compiler import compile_long_doc, compile_short_doc
- from openkb.state import HashRegistry
-
- logger = logging.getLogger(__name__)
openkb_dir = kb_dir / ".openkb"
config = load_config(openkb_dir / "config.yaml")
_setup_llm_key(kb_dir)
model: str = config.get("model", DEFAULT_CONFIG["model"])
- # 2. Convert document
click.echo(f"Adding: {file_path.name}")
- try:
- result = convert_document(file_path, kb_dir)
- except Exception as exc:
- click.echo(f" [ERROR] Conversion failed: {exc}")
- logger.debug("Conversion traceback:", exc_info=True)
- return "failed"
-
- if result.skipped:
- click.echo(f" [SKIP] Already in knowledge base: {file_path.name}")
- return "skipped"
-
- doc_name = result.doc_name or file_path.stem
- index_result = None # populated only on the long-doc branch
-
- # 3/4. Index and compile
- if result.is_long_doc:
- click.echo(f" Long document detected — indexing with PageIndex...")
- try:
- from openkb.indexer import index_long_document
- index_result = index_long_document(result.raw_path, kb_dir, doc_name=doc_name)
- except Exception as exc:
- click.echo(f" [ERROR] Indexing failed: {exc}")
- logger.debug("Indexing traceback:", exc_info=True)
- return "failed"
-
- summary_path = kb_dir / "wiki" / "summaries" / f"{doc_name}.md"
- click.echo(f" Compiling long doc (doc_id={index_result.doc_id})...")
- for attempt in range(2):
- try:
- asyncio.run(
- compile_long_doc(doc_name, summary_path, index_result.doc_id, kb_dir, model,
- doc_description=index_result.description)
- )
- break
- except Exception as exc:
- if attempt == 0:
- click.echo(f" Retrying compilation in 2s...")
- time.sleep(2)
- else:
- click.echo(f" [ERROR] Compilation failed: {exc}")
- logger.debug("Compilation traceback:", exc_info=True)
- return "failed"
- else:
- click.echo(f" Compiling short doc...")
- for attempt in range(2):
- try:
- asyncio.run(compile_short_doc(doc_name, result.source_path, kb_dir, model))
- break
- except Exception as exc:
- if attempt == 0:
- click.echo(f" Retrying compilation in 2s...")
- time.sleep(2)
- else:
- click.echo(f" [ERROR] Compilation failed: {exc}")
- logger.debug("Compilation traceback:", exc_info=True)
- return "failed"
-
- # Register hash only after successful compilation
- if result.file_hash:
- # Construct the registry NOW, not earlier: convert_document may have
- # backfilled a legacy entry (doc_name/path) on disk via its own
- # instance, and an earlier snapshot would clobber that backfill on
- # the full rewrite in add().
- registry = HashRegistry(openkb_dir / "hashes.json")
- doc_type = "long_pdf" if result.is_long_doc else file_path.suffix.lstrip(".")
- meta = {
- "name": file_path.name,
- "doc_name": doc_name,
- "type": doc_type,
- "path": _registry_path(file_path, kb_dir),
- }
- if result.raw_path is not None:
- meta["raw_path"] = _registry_path(result.raw_path, kb_dir)
- if result.source_path is not None:
- meta["source_path"] = _registry_path(result.source_path, kb_dir)
- # For long PDFs we also persist the PageIndex doc_id so `openkb
- # remove` can later call ``Collection.delete_document(doc_id)``
- # to free the managed PDF copy + SQLite row.
- if index_result is not None:
- meta["doc_id"] = index_result.doc_id
- # An edited document arrives with a new content hash; drop the
- # stale entry for the same doc_name so the registry keeps exactly
- # one entry per document.
- registry.remove_by_doc_name(doc_name)
- registry.add(result.file_hash, meta)
-
- append_log(kb_dir / "wiki", "ingest", file_path.name)
- click.echo(f" [OK] {file_path.name} added to knowledge base.")
- return "added"
+ # Stage unless the file already lives in raw/ (watch/URL), where convert's
+ # in-place optimization applies. Staging keeps convert's writes out of the
+ # live KB so a crash between convert and the commit snapshot can't orphan
+ # raw/source files with no journal to recover them.
+ staging_dir = _staging_dir_for(kb_dir, file_path) if stage else None
+ prepared = _prepare_add_file(file_path, kb_dir, staging_dir=staging_dir)
+ return _commit_prepared_add(prepared, kb_dir, model)
# ---------------------------------------------------------------------------
@@ -602,6 +988,7 @@ def init(model, language):
"model": model,
"language": language,
"pageindex_threshold": DEFAULT_CONFIG["pageindex_threshold"],
+ "file_processing_jobs": DEFAULT_CONFIG["file_processing_jobs"],
}
save_config(openkb_dir / "config.yaml", config)
atomic_write_json(openkb_dir / "hashes.json", {})
@@ -625,7 +1012,6 @@ def init(model, language):
@cli.command()
@click.argument("path")
@click.pass_context
-@_with_kb_lock(exclusive=True)
def add(ctx, path):
"""Add a document or directory of documents at PATH to the knowledge base.
@@ -647,10 +1033,11 @@ def add(ctx, path):
# that the registry can't reach via openkb remove.
from openkb.url_ingest import looks_like_url, fetch_url_to_raw
if looks_like_url(path):
- fetched = fetch_url_to_raw(path, kb_dir)
- if fetched is None:
- return
- outcome = add_single_file(fetched, kb_dir)
+ with _kb_mutation_lock(kb_dir):
+ fetched = fetch_url_to_raw(path, kb_dir)
+ if fetched is None:
+ return
+ outcome = _add_single_file_locked(fetched, kb_dir, stage=False)
# Only clean up on dedup-skip. On "failed" we keep the file so
# the user can retry (e.g. transient LLM error during compile)
# without re-downloading — and so they don't lose data when
@@ -674,9 +1061,55 @@ def add(ctx, path):
return
total = len(files)
click.echo(f"Found {total} supported file(s) in {path}.")
- for i, f in enumerate(files, 1):
- click.echo(f"\n[{i}/{total}] ", nl=False)
- add_single_file(f, kb_dir)
+ config = load_config(kb_dir / ".openkb" / "config.yaml")
+ jobs = min(
+ total,
+ _positive_int(
+ config.get("file_processing_jobs"),
+ DEFAULT_CONFIG["file_processing_jobs"],
+ ),
+ )
+ _setup_llm_key(kb_dir)
+ model = config.get("model", DEFAULT_CONFIG["model"])
+ files_to_prepare, prepared_outcomes = _prefilter_known_files(files, kb_dir, jobs)
+ with _kb_mutation_lock(kb_dir):
+ reserved_doc_names = _reserve_batch_doc_names(files_to_prepare, kb_dir)
+ click.echo(f"Preparing files with {jobs} worker(s).")
+ # Track every staging dir this batch creates so a Ctrl+C, a failed
+ # file, or an mkdir error can't orphan the ones whose file never
+ # reaches _commit_prepared_add's per-call cleanup. Recovery only scans
+ # journal/, and the mutation lock is released between per-file commits
+ # (so another process can acquire it mid-batch), so the batch itself —
+ # not a recovery sweep — must reap its own staging set.
+ batch_staging_dirs: list[Path] = []
+ try:
+ with ThreadPoolExecutor(max_workers=jobs) as executor:
+ futures = {}
+ for f in files_to_prepare:
+ staging_dir = _staging_dir_for(kb_dir, f)
+ batch_staging_dirs.append(staging_dir)
+ futures[f] = executor.submit(
+ _prepare_add_file,
+ f,
+ kb_dir,
+ staging_dir,
+ reserved_doc_names[f],
+ )
+ # Commit in scan order even though prepare finishes out of
+ # order. That keeps log.md and CLI progress stable for audit.
+ for i, f in enumerate(files, 1):
+ prepared = prepared_outcomes.get(f)
+ if prepared is None:
+ prepared = futures[f].result()
+ click.echo(f"\n[{i}/{total}] Adding: {prepared.file_path.name}")
+ with _kb_mutation_lock(kb_dir):
+ _commit_prepared_add(prepared, kb_dir, model)
+ finally:
+ # Idempotent: dirs already removed by each commit's
+ # _cleanup_staging are a no-op here; this reaps the rest on any
+ # exit path (clean finish, raised exception, or Ctrl+C).
+ for staging_dir in batch_staging_dirs:
+ _cleanup_staging(staging_dir)
else:
if target.suffix.lower() not in SUPPORTED_EXTENSIONS:
click.echo(
@@ -1412,7 +1845,7 @@ def on_new_files(paths):
f"Supported: {', '.join(sorted(SUPPORTED_EXTENSIONS))}"
)
continue
- add_single_file(fp, kb_dir)
+ add_single_file(fp, kb_dir, stage=False)
click.echo(f"Watching {raw_dir} for new documents. Press Ctrl+C to stop.")
watch_directory(raw_dir, on_new_files)
diff --git a/openkb/config.py b/openkb/config.py
index 9d0d6cd4..4c44addd 100644
--- a/openkb/config.py
+++ b/openkb/config.py
@@ -16,6 +16,7 @@
"model": "gpt-5.4-mini",
"language": "en",
"pageindex_threshold": 20,
+ "file_processing_jobs": 2,
}
# Default entity-type vocabulary. Overridable per-KB via the optional
diff --git a/openkb/converter.py b/openkb/converter.py
index 9ebac762..b589b2db 100644
--- a/openkb/converter.py
+++ b/openkb/converter.py
@@ -14,6 +14,7 @@
from openkb.config import load_config
from openkb.images import copy_relative_images, extract_base64_images, convert_pdf_with_images
+from openkb.locks import atomic_write_text, kb_ingest_lock
from openkb.state import HashRegistry
logger = logging.getLogger(__name__)
@@ -70,7 +71,13 @@ def _name_taken(candidate: str, registry: HashRegistry) -> bool:
return False
-def resolve_doc_name(src: Path, kb_dir: Path, registry: HashRegistry) -> str:
+def resolve_doc_name(
+ src: Path,
+ kb_dir: Path,
+ registry: HashRegistry,
+ *,
+ persist_legacy: bool = True,
+) -> str:
"""Resolve the stable wiki name for ``src`` (Scheme A).
Identity is keyed by path: a source we've seen before (same path, even
@@ -93,9 +100,10 @@ def resolve_doc_name(src: Path, kb_dir: Path, registry: HashRegistry) -> str:
file_hash, meta = legacy
meta = dict(meta)
name = meta.get("doc_name") or Path(meta.get("name", "")).stem
- meta["doc_name"] = name
- meta["path"] = path_key
- registry.add(file_hash, meta) # backfill + persist
+ if persist_legacy:
+ meta["doc_name"] = name
+ meta["path"] = path_key
+ registry.add(file_hash, meta) # backfill + persist
return name
candidate = _sanitize_stem(src.stem)
@@ -111,9 +119,20 @@ def get_pdf_page_count(path: Path) -> int:
return doc.page_count
-def convert_document(src: Path, kb_dir: Path) -> ConvertResult:
+def convert_document(
+ src: Path,
+ kb_dir: Path,
+ *,
+ staging_dir: Path | None = None,
+ doc_name_override: str | None = None,
+) -> ConvertResult:
"""Convert a document and integrate it into the knowledge base.
+ Acquires the KB ingest lock, then delegates to
+ :func:`_convert_document_locked`. Callers that already hold the lock
+ (the parallel add pipeline prepares files outside the lock) should call
+ ``_convert_document_locked`` directly to avoid re-entering it.
+
Steps:
1. Hash-check — skip if already known.
2. Copy source to ``raw/``.
@@ -122,12 +141,36 @@ def convert_document(src: Path, kb_dir: Path) -> ConvertResult:
5. Otherwise — run MarkItDown, extract base64 images, save to ``wiki/sources/``.
6. Register hash in the registry.
"""
+ with kb_ingest_lock(kb_dir / ".openkb"):
+ return _convert_document_locked(
+ src,
+ kb_dir,
+ staging_dir=staging_dir,
+ doc_name_override=doc_name_override,
+ )
+
+
+def _convert_document_locked(
+ src: Path,
+ kb_dir: Path,
+ *,
+ staging_dir: Path | None = None,
+ doc_name_override: str | None = None,
+) -> ConvertResult:
+ """Lock-held body of :func:`convert_document`.
+
+ The caller must already hold the KB ingest lock. Writes go to
+ ``staging_dir`` when set (isolated, published at commit) and to
+ ``kb_dir`` otherwise (in-place, e.g. watch mode where the file is
+ already in ``raw/``).
+ """
# ------------------------------------------------------------------
# Load config & state
# ------------------------------------------------------------------
openkb_dir = kb_dir / ".openkb"
config = load_config(openkb_dir / "config.yaml")
threshold: int = config.get("pageindex_threshold", 20)
+ artifact_root = staging_dir if staging_dir is not None else kb_dir
registry = HashRegistry(openkb_dir / "hashes.json")
# ------------------------------------------------------------------
@@ -142,14 +185,21 @@ def convert_document(src: Path, kb_dir: Path) -> ConvertResult:
file_hash=file_hash,
doc_name=stored.get("doc_name") or Path(stored.get("name", src.name)).stem,
)
- doc_name = resolve_doc_name(src, kb_dir, registry)
+ # Batch ingest reserves doc_names before parallel conversion so same-stem
+ # files behave like serial adds while still writing to isolated staging dirs.
+ doc_name = doc_name_override or resolve_doc_name(
+ src,
+ kb_dir,
+ registry,
+ persist_legacy=staging_dir is None,
+ )
# ------------------------------------------------------------------
# 2. Copy to raw/
# ------------------------------------------------------------------
- raw_dir = kb_dir / "raw"
+ raw_dir = artifact_root / "raw"
raw_dir.mkdir(parents=True, exist_ok=True)
- if src.resolve().is_relative_to(raw_dir.resolve()):
+ if staging_dir is None and src.resolve().is_relative_to(raw_dir.resolve()):
# Watch mode: the file already lives in raw/ — don't copy/rename.
raw_dest = src
else:
@@ -178,9 +228,9 @@ def convert_document(src: Path, kb_dir: Path) -> ConvertResult:
# ------------------------------------------------------------------
# 4/5. Convert to Markdown
# ------------------------------------------------------------------
- sources_dir = kb_dir / "wiki" / "sources"
+ sources_dir = artifact_root / "wiki" / "sources"
sources_dir.mkdir(parents=True, exist_ok=True)
- images_dir = kb_dir / "wiki" / "sources" / "images" / doc_name
+ images_dir = artifact_root / "wiki" / "sources" / "images" / doc_name
images_dir.mkdir(parents=True, exist_ok=True)
if src.suffix.lower() == ".md":
@@ -197,7 +247,7 @@ def convert_document(src: Path, kb_dir: Path) -> ConvertResult:
markdown = extract_base64_images(markdown, doc_name, images_dir)
dest_md = sources_dir / f"{doc_name}.md"
- dest_md.write_text(markdown, encoding="utf-8")
+ atomic_write_text(dest_md, markdown)
return ConvertResult(
raw_path=raw_dest,
diff --git a/openkb/lint.py b/openkb/lint.py
index 8b7674b7..ce695ecc 100644
--- a/openkb/lint.py
+++ b/openkb/lint.py
@@ -16,6 +16,7 @@
import yaml
from openkb import frontmatter
+from openkb.locks import atomic_write_text
from openkb.schema import PAGE_CONTENT_DIRS
# Matches [[wikilink]] or [[subdir/link]]
@@ -249,7 +250,7 @@ def fix_broken_links(
text, known_targets, norm_index=norm_index,
)
if cleaned != text:
- md.write_text(cleaned, encoding="utf-8")
+ atomic_write_text(md, cleaned)
files_changed += 1
ghosts_stripped += len(ghosts)
return files_changed, ghosts_stripped
diff --git a/openkb/locks.py b/openkb/locks.py
index 2fa2815a..72966fc1 100644
--- a/openkb/locks.py
+++ b/openkb/locks.py
@@ -9,6 +9,7 @@
import contextlib
import json
+import logging
import os
import tempfile
import threading
@@ -97,6 +98,32 @@ def _local_lock(lock_path: Path) -> _LocalRwLock:
return lock
+def _drain_pending_journals(openkb_dir: Path) -> None:
+ """Roll back any mutation journals an interrupted process left behind.
+
+ Draining recovery is part of *taking* the mutation lock, not part of any
+ one command: a process that acquires the exclusive lock must restore the
+ KB to a known state before mutating it. Wiring this into ``kb_lock`` means
+ every exclusive-lock holder — ``add``, ``remove``, ``recompile``, ``lint``,
+ ``chat`` — drains on first acquisition, so an ``add`` that crashed mid-
+ commit cannot leave an active journal on disk that a later ``add`` rolls
+ back over the top of an intervening ``remove``/``recompile`` (clobbering
+ those edits). ``openkb_dir`` is the ``kb_dir/.openkb`` directory callers
+ pass to ``kb_lock``; the journal lives at ``openkb_dir/journal``, so the
+ KB root is ``openkb_dir.parent``.
+
+ The delayed import breaks the ``locks`` ↔ ``mutation`` cycle (``mutation``
+ imports atomic-write helpers from this module at top level). Called only
+ on first OS-lock acquisition (the reentrant branch above returns early),
+ never on a read lock, so queries pay nothing.
+ """
+ from openkb.mutation import recover_pending_journals
+
+ log = logging.getLogger(__name__)
+ for message in recover_pending_journals(openkb_dir.parent):
+ log.warning(message)
+
+
@contextlib.contextmanager
def kb_lock(openkb_dir: Path, *, exclusive: bool) -> Iterator[None]:
"""Hold a KB-level advisory lock."""
@@ -134,6 +161,8 @@ def kb_lock(openkb_dir: Path, *, exclusive: bool) -> Iterator[None]:
flock(fh, exclusive=exclusive)
held[resolved] = (1, 0) if exclusive else (0, 1)
try:
+ if exclusive:
+ _drain_pending_journals(openkb_dir)
yield
finally:
held.pop(resolved, None)
diff --git a/openkb/mutation.py b/openkb/mutation.py
new file mode 100644
index 00000000..4c89cb0f
--- /dev/null
+++ b/openkb/mutation.py
@@ -0,0 +1,429 @@
+"""Transactional helpers for KB mutation paths."""
+from __future__ import annotations
+
+import errno
+import json
+import logging
+import os
+import shutil
+import tempfile
+import uuid
+from dataclasses import dataclass, field
+from pathlib import Path
+
+from openkb.locks import _fsync_directory, _target_mode, atomic_write_json
+
+logger = logging.getLogger(__name__)
+
+# Cap how many times recover_pending_journals retries an active journal whose
+# rollback keeps failing. Without a cap, a deterministically-failing rollback
+# (e.g. persistent ENOSPC) is retried on every lock acquisition forever,
+# re-doing the failed work and never releasing the backup dir + journal.
+MAX_ROLLBACK_ATTEMPTS = 5
+
+
+def _apply_mode(path: Path, mode: int) -> None:
+ """Set ``path``'s permission bits (no-op where ``os.chmod`` is absent)."""
+ if hasattr(os, "chmod"):
+ os.chmod(path, mode)
+
+
+def _fsync_file(path: Path) -> None:
+ """Best-effort fsync of a file's data, for durability after a rename.
+
+ Opens read+write so ``FlushFileBuffers`` works on Windows (a read-only
+ handle can be denied). Best-effort: a failure here only weakens durability
+ of already-written bytes (the OS write-back still flushes them); it must
+ not fail the publish.
+ """
+ try:
+ with open(path, "r+b") as fh:
+ os.fsync(fh.fileno())
+ except OSError:
+ pass
+
+
+def _hardlink_or_copy(src: Path, dst: Path) -> None:
+ """``copytree`` copy_function that hardlinks (O(1), shares the inode).
+
+ Used for directory backups the caller has marked hardlink-safe — trees
+ whose writers all go through atomic temp+replace (so the live file moves
+ to a new inode) or that are append-only across documents. The hardlink
+ backup then keeps pointing at the old inode while the live tree is
+ mutated, so rollback restores the pre-mutation bytes without copying them
+ up front. Falls back to a real copy on EXDEV/EPERM/EACCES — cross-device,
+ a filesystem that forbids hardlinks, or (Windows) an ACL / cloud-sync
+ folder (OneDrive/Dropbox) that blocks CREATE_HARD_LINK. If the copy also
+ fails it surfaces the real error.
+ """
+ try:
+ os.link(src, dst)
+ except OSError as exc:
+ if exc.errno not in (errno.EXDEV, errno.EPERM, errno.EACCES):
+ raise
+ shutil.copy2(src, dst)
+
+
+def _copy_file_atomic(src: Path, dest: Path) -> None:
+ """Stream ``src`` to ``dest`` through a temp file, then atomically replace.
+
+ Streams (never buffers the whole file) so copying a large raw PDF does
+ not spike peak memory. The temp-file + ``os.replace`` means a torn
+ intermediate state can never be observed at ``dest``. Used by snapshot
+ backup creation, rollback restore, and the cross-filesystem fallback of
+ :func:`_publish_staged_file` — so every byte copy in this module shares
+ one atomic, streaming, durable semantic: the parent directory is fsynced
+ and the result carries the umask mode (not ``mkstemp``'s 0600).
+ """
+ dest.parent.mkdir(parents=True, exist_ok=True)
+ # Capture the destination mode before the temp file shadows it: a brand-
+ # new file gets the process umask mode (0o666 & ~umask), an existing file
+ # keeps its current mode — the same rule ``atomic_write_bytes`` applies.
+ mode = _target_mode(dest)
+ fd, tmp_name = tempfile.mkstemp(prefix=f".{dest.name}.", suffix=".tmp", dir=dest.parent)
+ tmp_path = Path(tmp_name)
+ try:
+ with os.fdopen(fd, "wb") as out, src.open("rb") as inp:
+ shutil.copyfileobj(inp, out)
+ out.flush()
+ os.fsync(out.fileno())
+ os.replace(tmp_path, dest)
+ _apply_mode(dest, mode)
+ _fsync_directory(dest.parent)
+ finally:
+ tmp_path.unlink(missing_ok=True)
+
+
+def _publish_staged_file(src: Path, dest: Path) -> None:
+ """Publish one staged file into its live-KB location.
+
+ Staging sits on the same filesystem as ``raw/`` and ``wiki/sources/``, so
+ an O(1) atomic ``os.replace`` (rename) is used instead of streaming the
+ bytes — a full copy + fsync per published file was the old per-file cost.
+ Only on ``EXDEV`` (staging and the live KB genuinely on different devices)
+ does it fall back to :func:`_copy_file_atomic`. Both branches leave the
+ result durable (file data + parent dir fsynced) and at the umask mode.
+ """
+ dest.parent.mkdir(parents=True, exist_ok=True)
+ mode = _target_mode(dest)
+ try:
+ os.replace(src, dest)
+ except OSError as exc:
+ if exc.errno != errno.EXDEV:
+ raise
+ _copy_file_atomic(src, dest) # already fsyncs data + dir + sets mode
+ return
+ _apply_mode(dest, mode)
+ # Parity with _copy_file_atomic: the renamed inode's data may still be in
+ # the page cache. Without this, a crash right after publish can leave a
+ # 0-byte / stale raw or source file that committed metadata points at,
+ # even though the directory entry (fsynced below) survived.
+ _fsync_file(dest)
+ _fsync_directory(dest.parent)
+
+
+@dataclass
+class MutationSnapshot:
+ """Snapshot of final KB paths touched by a mutation attempt."""
+
+ kb_dir: Path
+ backup_dir: Path
+ journal_path: Path
+ operation: str
+ details: dict = field(default_factory=dict)
+ entries: dict[Path, Path | None] = field(default_factory=dict)
+ attempts: int = 0
+ # Dirs whose backup was hardlinked (in-process only; not persisted, so a
+ # crash-rebuilt snapshot leaves this empty and rollback falls back to the
+ # safe full-copy path). Drives O(touched) rollback via inode-diff restore.
+ hardlinked_dirs: set[Path] = field(default_factory=set)
+
+ def _journal_data(self, status: str) -> dict:
+ return {
+ "version": 1,
+ "operation": self.operation,
+ "status": status,
+ "kb_dir": str(self.kb_dir),
+ "backup_dir": str(self.backup_dir),
+ "details": self.details,
+ "attempts": self.attempts,
+ "entries": [
+ {
+ "target": str(target),
+ "backup": str(backup) if backup is not None else None,
+ }
+ for target, backup in self.entries.items()
+ ],
+ }
+
+ def write_journal(self, status: str) -> None:
+ self.journal_path.parent.mkdir(parents=True, exist_ok=True)
+ atomic_write_json(self.journal_path, self._journal_data(status))
+
+ def mark_committed(self) -> None:
+ """Mark the journal committed without removing the backup.
+
+ Call this the instant the mutation is durably applied (e.g. the
+ registry write has landed) so a subsequent
+ :func:`recover_pending_journals` discards the journal instead of
+ rolling it back. This is the commit signal; :meth:`discard` is the
+ post-commit cleanup that also removes the backup dir and journal
+ file and must itself be best-effort — it runs *after* the commit
+ point and its failure must never trigger a rollback.
+ """
+ self.write_journal("committed")
+
+ def rollback(self) -> None:
+ # Restore children before parents so directory deletes cannot remove
+ # paths that still need to be restored from a more specific backup.
+ for target, backup in sorted(
+ self.entries.items(),
+ key=lambda item: len(item[0].parts),
+ reverse=True,
+ ):
+ # A hardlinked dir backup supports an O(touched) inode-diff restore
+ # (leave untouched shared-inode files, only touch changed ones) —
+ # do NOT rmtree it first, which would discard those shared inodes.
+ if target.is_dir() and target in self.hardlinked_dirs:
+ if backup is not None and backup.is_dir():
+ _restore_hardlinked_dir(backup, target)
+ else:
+ shutil.rmtree(target, ignore_errors=True) # new dir, no backup
+ continue
+ # Non-hardlinked (file, or copied dir): unconditional remove + restore.
+ if target.is_dir():
+ shutil.rmtree(target, ignore_errors=True)
+ else:
+ target.unlink(missing_ok=True)
+ if backup is None:
+ continue
+ target.parent.mkdir(parents=True, exist_ok=True)
+ if backup.is_dir():
+ shutil.copytree(backup, target)
+ else:
+ _copy_file_atomic(backup, target)
+ self.write_journal("rolled_back")
+
+ def rollback_best_effort(self) -> Exception | None:
+ try:
+ self.rollback()
+ except Exception as exc:
+ logger.warning("Mutation rollback failed: %s", exc)
+ return exc
+ return None
+
+ def discard(self) -> None:
+ # Best-effort post-commit/post-rollback cleanup: callers have already
+ # written a terminal status (mark_committed or rollback), so there is
+ # nothing to re-write here — doing so would be dead work and would
+ # silently downgrade a "rolled_back" journal to "committed" moments
+ # before it is deleted.
+ shutil.rmtree(self.backup_dir, ignore_errors=True)
+ self.journal_path.unlink(missing_ok=True)
+
+ def discard_best_effort(self) -> Exception | None:
+ try:
+ self.discard()
+ except Exception as exc:
+ logger.warning("Mutation journal cleanup failed: %s", exc)
+ return exc
+ return None
+
+
+def _restore_hardlinked_dir(backup: Path, target: Path) -> None:
+ """O(touched) restore for a hardlinked directory backup.
+
+ The backup was built with ``os.link``, so live files the mutation never
+ touched still share the backup's inode — leave them. Only files the
+ mutation changed need work: new files (no backup counterpart) are removed,
+ modified files (atomic temp+replace → new inode) and deleted files are
+ restored from the backup's pre-mutation bytes. This avoids recopying the
+ whole tree on rollback — the cost that bit ``.openkb/files`` (the blob
+ store) and large concept/entity trees on every failed add.
+
+ Degrades gracefully to a full copy if the backup isn't actually hardlinked
+ (e.g. the EXDEV/EACCES fallback fired at snapshot time): every file then has
+ a different inode, so every file is treated as modified and recopied.
+ """
+ def _file_key(path: Path) -> tuple[int, int]:
+ st = path.stat() # follows symlinks; these trees hold regular files only
+ return (st.st_dev, st.st_ino)
+
+ backup_files = {p.relative_to(backup): p for p in backup.rglob("*") if p.is_file()}
+
+ # Pass 1: remove new + modified live regular files; leave untouched ones
+ # (they share the backup inode) in place.
+ if target.exists():
+ for live in list(target.rglob("*")):
+ if not live.is_file():
+ continue
+ counterpart = backup_files.get(live.relative_to(target))
+ if counterpart is None or _file_key(live) != _file_key(counterpart):
+ live.unlink()
+
+ # Pass 2: restore modified + deleted files from backup.
+ for rel, src in backup_files.items():
+ dest = target / rel
+ if not dest.exists() or _file_key(dest) != _file_key(src):
+ dest.parent.mkdir(parents=True, exist_ok=True)
+ shutil.copy2(src, dest)
+
+ # Pass 3: prune directories the mutation created that are now empty.
+ if target.exists():
+ for d in sorted((p for p in target.rglob("*") if p.is_dir()),
+ key=lambda p: len(p.parts), reverse=True):
+ if not (backup / d.relative_to(target)).exists() and not any(d.iterdir()):
+ d.rmdir()
+
+
+def snapshot_paths(
+ kb_dir: Path,
+ paths: list[Path],
+ *,
+ operation: str,
+ details: dict | None = None,
+ hardlink_dirs: set[Path] | None = None,
+) -> MutationSnapshot:
+ """Snapshot final KB paths before a mutation starts.
+
+ ``hardlink_dirs`` marks directories whose backup may be hardlinks instead
+ of copies (O(1), no per-file byte copy). A directory is only safe to list
+ here if every writer into it is either atomic temp+replace (new inode, so
+ the hardlink backup keeps the old bytes) or append-only. This is the
+ required caller contract for hardlinked dirs; any in-place writer into one
+ of those trees would silently corrupt the backup and make rollback a no-op
+ for that file.
+ """
+ kb_dir = kb_dir.resolve()
+ hardlink_resolved = {p.resolve() for p in (hardlink_dirs or ())}
+ journal_id = uuid.uuid4().hex
+ backup_dir = kb_dir / ".openkb" / "staging" / f"rollback-{journal_id}"
+ backup_dir.mkdir(parents=True, exist_ok=False)
+ snapshot = MutationSnapshot(
+ kb_dir=kb_dir,
+ backup_dir=backup_dir,
+ journal_path=kb_dir / ".openkb" / "journal" / f"{journal_id}.json",
+ operation=operation,
+ details=details or {},
+ )
+ try:
+ for path in paths:
+ target = path.resolve()
+ if target in snapshot.entries:
+ continue
+ if not target.exists():
+ snapshot.entries[target] = None
+ continue
+ rel = target.relative_to(kb_dir)
+ backup = backup_dir / rel
+ backup.parent.mkdir(parents=True, exist_ok=True)
+ if target.is_dir():
+ if target in hardlink_resolved:
+ shutil.copytree(target, backup, copy_function=_hardlink_or_copy)
+ snapshot.hardlinked_dirs.add(target)
+ else:
+ shutil.copytree(target, backup)
+ else:
+ _copy_file_atomic(target, backup)
+ snapshot.entries[target] = backup
+ # The active journal is the recovery signal: once this exists, a future
+ # process can restore every recorded target even if the current one exits.
+ snapshot.write_journal("active")
+ except Exception:
+ # Partial snapshot: backup_dir exists on disk but no journal was
+ # written. recover_pending_journals only scans journals, so remove the
+ # orphan backup here — otherwise it leaks forever with nothing able to
+ # reach or clean it.
+ shutil.rmtree(backup_dir, ignore_errors=True)
+ raise
+ return snapshot
+
+
+def _snapshot_from_journal(path: Path, data: dict) -> MutationSnapshot:
+ snapshot = MutationSnapshot(
+ kb_dir=Path(data["kb_dir"]),
+ backup_dir=Path(data["backup_dir"]),
+ journal_path=path,
+ operation=data.get("operation", "mutation"),
+ details=data.get("details") or {},
+ )
+ snapshot.entries = {
+ Path(item["target"]): Path(item["backup"]) if item.get("backup") else None
+ for item in data.get("entries", [])
+ }
+ snapshot.attempts = int(data.get("attempts", 0) or 0)
+ return snapshot
+
+
+def recover_pending_journals(kb_dir: Path) -> list[str]:
+ """Rollback active journals left by an interrupted process."""
+ journal_dir = kb_dir / ".openkb" / "journal"
+ if not journal_dir.is_dir():
+ return []
+ messages: list[str] = []
+ for journal_path in sorted(journal_dir.glob("*.json")):
+ snapshot: MutationSnapshot | None = None
+ try:
+ data = json.loads(journal_path.read_text(encoding="utf-8"))
+ snapshot = _snapshot_from_journal(journal_path, data)
+ status = data.get("status", "active")
+ if status in {"committed", "rolled_back"}:
+ snapshot.discard()
+ messages.append(f"Cleaned terminal mutation journal {journal_path.name}.")
+ continue
+ snapshot.rollback()
+ snapshot.discard()
+ messages.append(
+ f"Rolled back interrupted {snapshot.operation} journal {journal_path.name}."
+ )
+ except Exception as exc:
+ if snapshot is None:
+ # The journal couldn't be read or reconstructed (corrupt/empty/
+ # stray .json, or missing the kb_dir/backup_dir keys recovery
+ # needs). There is nothing to roll back or retry — and leaving
+ # it in place would re-trigger this failure on every future lock
+ # acquisition (draining runs on first exclusive acquisition),
+ # bricking add/remove/recompile/chat for the whole KB. Best-effort
+ # remove the unrecoverable journal and log loudly; any backup_dir
+ # it referenced is unreachable now and may leak.
+ journal_path.unlink(missing_ok=True)
+ messages.append(
+ f"Unrecoverable mutation journal {journal_path.name} "
+ f"({type(exc).__name__}: {exc}); removed so it can't block "
+ f"recovery. The KB may need manual review."
+ )
+ continue
+ # Rollback failed. Retry a bounded number of times across recovery
+ # runs (a later attempt may succeed once the cause clears, e.g. disk
+ # space freed), then give up: discard the journal + backup and log
+ # loudly so it can't leak forever re-doing the same failing rollback.
+ snapshot.attempts += 1
+ if snapshot.attempts >= MAX_ROLLBACK_ATTEMPTS:
+ snapshot.discard()
+ messages.append(
+ f"GAVE UP on {snapshot.operation} journal {journal_path.name} after "
+ f"{snapshot.attempts} failed rollback(s): {type(exc).__name__}: {exc}. "
+ f"The KB may be in a partially-rolled-back state — manual review needed."
+ )
+ else:
+ snapshot.write_journal("active") # persist incremented attempts
+ messages.append(
+ f"Rollback of {snapshot.operation} journal {journal_path.name} failed "
+ f"(attempt {snapshot.attempts}/{MAX_ROLLBACK_ATTEMPTS}): "
+ f"{type(exc).__name__}: {exc}; retained for retry."
+ )
+ return messages
+
+
+def publish_staged_tree(staging_dir: Path | None, kb_dir: Path) -> None:
+ """Move staged raw/source artifacts into their final KB locations."""
+ if staging_dir is None or not staging_dir.exists():
+ return
+ for rel in ("raw", "wiki/sources"):
+ src_root = staging_dir / rel
+ if not src_root.exists():
+ continue
+ for src in src_root.rglob("*"):
+ if not src.is_file():
+ continue
+ _publish_staged_file(src, kb_dir / rel / src.relative_to(src_root))
diff --git a/openkb/state.py b/openkb/state.py
index 10cb20cc..c7302770 100644
--- a/openkb/state.py
+++ b/openkb/state.py
@@ -13,12 +13,29 @@ class HashRegistry:
def __init__(self, path: Path) -> None:
self._path = path
+ self._persist_enabled = True
if path.exists():
with path.open("r", encoding="utf-8") as fh:
self._data: dict[str, dict] = json.load(fh)
else:
self._data = {}
+ @classmethod
+ def memory(cls, entries: dict[str, dict]) -> "HashRegistry":
+ """An in-memory view over ``entries`` that never writes back to disk.
+
+ Shares the on-disk registry's read/resolve contract (``get_by_path``,
+ ``find_legacy_by_stem``, ``all_entries``, ``add``) so callers that only
+ need to stage mutations in memory — batch doc_name reservation —
+ reuse the same code path instead of a parallel reimplementation.
+ ``add`` updates the in-memory dict but skips persistence.
+ """
+ reg = cls.__new__(cls)
+ reg._path = None
+ reg._persist_enabled = False
+ reg._data = {key: dict(value) for key, value in entries.items()}
+ return reg
+
# ------------------------------------------------------------------
# Query helpers
# ------------------------------------------------------------------
@@ -115,6 +132,8 @@ def remove_by_hash(self, file_hash: str) -> bool:
# ------------------------------------------------------------------
def _persist(self) -> None:
+ if not self._persist_enabled:
+ return
atomic_write_json(self._path, self._data)
# ------------------------------------------------------------------
diff --git a/tests/test_add_command.py b/tests/test_add_command.py
index 0199c9e2..ac432beb 100644
--- a/tests/test_add_command.py
+++ b/tests/test_add_command.py
@@ -80,17 +80,67 @@ def test_add_directory_calls_helper_for_each_file(self, tmp_path):
(docs_dir / "b.txt").write_text("B content")
(docs_dir / "ignore.xyz").write_text("skip me")
+ from openkb.cli import _PreparedAdd
+
+ def fake_prepare(file_path, kb_dir_arg, staging_dir, doc_name=None):
+ return _PreparedAdd(file_path=file_path, staging_dir=staging_dir)
+
runner = CliRunner()
- with patch("openkb.cli.add_single_file") as mock_add, \
+ with patch("openkb.cli._prepare_add_file", side_effect=fake_prepare) as mock_prepare, \
+ patch("openkb.cli._commit_prepared_add", return_value="added") as mock_commit, \
patch("openkb.cli._find_kb_dir", return_value=kb_dir):
runner.invoke(cli, ["add", str(docs_dir)])
- # Should be called for .md and .txt but not .xyz
- assert mock_add.call_count == 2
- called_names = {call.args[0].name for call in mock_add.call_args_list}
+ # Should be prepared/committed for .md and .txt but not .xyz
+ assert mock_prepare.call_count == 2
+ assert mock_commit.call_count == 2
+ called_names = {call.args[0].name for call in mock_prepare.call_args_list}
assert "a.md" in called_names
assert "b.txt" in called_names
assert "ignore.xyz" not in called_names
+ def test_add_directory_prefilters_known_hashes_before_prepare(self, tmp_path):
+ kb_dir = self._setup_kb(tmp_path)
+ (kb_dir / ".openkb" / "config.yaml").write_text(
+ "model: gpt-4o-mini\nfile_processing_jobs: 2\n",
+ encoding="utf-8",
+ )
+ docs_dir = tmp_path / "docs"
+ docs_dir.mkdir()
+ known = docs_dir / "known.md"
+ unknown = docs_dir / "unknown.md"
+ known.write_text("# Known", encoding="utf-8")
+ unknown.write_text("# Unknown", encoding="utf-8")
+
+ from openkb.cli import _PreparedAdd
+ from openkb.state import HashRegistry
+
+ HashRegistry(kb_dir / ".openkb" / "hashes.json").add(
+ HashRegistry.hash_file(known),
+ {
+ "name": "known.md",
+ "doc_name": "known",
+ "type": "md",
+ "path": "docs/known.md",
+ },
+ )
+
+ def fake_prepare(file_path, kb_dir_arg, staging_dir, doc_name=None):
+ return _PreparedAdd(file_path=file_path, staging_dir=staging_dir)
+
+ runner = CliRunner()
+ with patch("openkb.cli._prepare_add_file", side_effect=fake_prepare) as mock_prepare, \
+ patch("openkb.cli._commit_prepared_add", return_value="added") as mock_commit, \
+ patch("openkb.cli._find_kb_dir", return_value=kb_dir):
+ result = runner.invoke(cli, ["add", str(docs_dir)])
+
+ assert result.exception is None
+ assert [call.args[0].name for call in mock_prepare.call_args_list] == ["unknown.md"]
+ assert [call.args[0].file_path.name for call in mock_commit.call_args_list] == [
+ "known.md",
+ "unknown.md",
+ ]
+ assert mock_commit.call_args_list[0].args[0].outcome == "skipped"
+
def test_add_unsupported_extension(self, tmp_path):
kb_dir = self._setup_kb(tmp_path)
doc = tmp_path / "file.xyz"
@@ -119,7 +169,7 @@ def test_add_skipped_file(self, tmp_path):
runner = CliRunner()
with patch("openkb.cli._find_kb_dir", return_value=kb_dir), \
- patch("openkb.cli.convert_document", return_value=mock_result), \
+ patch("openkb.cli._convert_document_locked", return_value=mock_result), \
patch("openkb.cli.asyncio.run") as mock_arun:
result = runner.invoke(cli, ["add", str(doc)])
assert "SKIP" in result.output
@@ -151,7 +201,7 @@ def test_add_short_doc_runs_compiler(self, tmp_path):
runner = CliRunner()
with patch("openkb.cli._find_kb_dir", return_value=kb_dir), \
- patch("openkb.cli.convert_document", return_value=mock_result), \
+ patch("openkb.cli._convert_document_locked", return_value=mock_result), \
patch("openkb.cli.asyncio.run") as mock_arun:
result = runner.invoke(cli, ["add", str(doc)])
mock_arun.assert_called_once()
@@ -168,6 +218,45 @@ def test_add_short_doc_runs_compiler(self, tmp_path):
assert "path" in meta
assert "stale-old-hash" not in hashes
+ def test_commit_keeps_journal_when_rollback_fails(self, tmp_path):
+ from openkb.cli import _PreparedAdd, _commit_prepared_add
+ from openkb.converter import ConvertResult
+
+ kb_dir = self._setup_kb(tmp_path)
+ source_path = kb_dir / "wiki" / "sources" / "broken.md"
+ source_path.write_text("# Broken", encoding="utf-8")
+ prepared = _PreparedAdd(
+ file_path=tmp_path / "broken.md",
+ result=ConvertResult(
+ raw_path=kb_dir / "raw" / "broken.md",
+ source_path=source_path,
+ file_hash="beadfeed00" * 8,
+ doc_name="broken",
+ ),
+ )
+
+ class FakeSnapshot:
+ journal_path = kb_dir / ".openkb" / "journal" / "broken.json"
+
+ def __init__(self):
+ self.discard_called = False
+
+ def rollback_best_effort(self):
+ return RuntimeError("rollback failed")
+
+ def discard_best_effort(self):
+ self.discard_called = True
+
+ fake_snapshot = FakeSnapshot()
+
+ with patch("openkb.cli.snapshot_paths", return_value=fake_snapshot), \
+ patch("openkb.cli.publish_staged_tree"), \
+ patch("openkb.cli.asyncio.run", side_effect=RuntimeError("compile failed")):
+ outcome = _commit_prepared_add(prepared, kb_dir, "gpt-4o-mini")
+
+ assert outcome == "failed"
+ assert fake_snapshot.discard_called is False
+
def test_add_oldest_legacy_entry_converges_to_single_entry(self, tmp_path):
"""Editing a pre-doc_name-era document must not fork the registry.
@@ -203,3 +292,506 @@ def test_add_oldest_legacy_entry_converges_to_single_entry(self, tmp_path):
new_entries = [m for m in hashes.values() if m.get("doc_name") == "notes"]
assert len(new_entries) == 1 # …exactly one entry survives
assert new_entries[0]["path"] # with path identity persisted
+
+ def test_add_directory_legacy_entry_converges_to_single_entry(self, tmp_path):
+ import json as json_mod
+
+ from openkb.state import HashRegistry
+
+ kb_dir = self._setup_kb(tmp_path)
+ (kb_dir / ".openkb" / "config.yaml").write_text(
+ "model: gpt-4o-mini\nfile_processing_jobs: 2\n",
+ encoding="utf-8",
+ )
+ HashRegistry(kb_dir / ".openkb" / "hashes.json").add(
+ "old-hash", {"name": "notes.md", "type": "md"}
+ )
+ docs_dir = tmp_path / "docs"
+ docs_dir.mkdir()
+ (docs_dir / "notes.md").write_text("# Notes, edited", encoding="utf-8")
+
+ runner = CliRunner()
+ with patch("openkb.cli._find_kb_dir", return_value=kb_dir), \
+ patch("openkb.cli.asyncio.run"):
+ result = runner.invoke(cli, ["add", str(docs_dir)])
+ assert "OK" in result.output
+
+ hashes = json_mod.loads(
+ (kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8")
+ )
+ assert "old-hash" not in hashes
+ new_entries = [m for m in hashes.values() if m.get("doc_name") == "notes"]
+ assert len(new_entries) == 1
+ assert new_entries[0]["path"]
+
+ def test_add_directory_same_stem_files_get_reserved_names(self, tmp_path):
+ import json as json_mod
+
+ kb_dir = self._setup_kb(tmp_path)
+ (kb_dir / ".openkb" / "config.yaml").write_text(
+ "model: gpt-4o-mini\nfile_processing_jobs: 2\n",
+ encoding="utf-8",
+ )
+ docs_dir = tmp_path / "docs"
+ (docs_dir / "a").mkdir(parents=True)
+ (docs_dir / "b").mkdir(parents=True)
+ (docs_dir / "a" / "report.md").write_text("# A", encoding="utf-8")
+ (docs_dir / "b" / "report.md").write_text("# B", encoding="utf-8")
+
+ runner = CliRunner()
+ with patch("openkb.cli._find_kb_dir", return_value=kb_dir), \
+ patch("openkb.cli.asyncio.run"):
+ result = runner.invoke(cli, ["add", str(docs_dir)])
+ assert "Document name conflict" not in result.output
+ assert result.output.count("[OK]") == 2
+
+ hashes = json_mod.loads(
+ (kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8")
+ )
+ doc_names = {meta["doc_name"] for meta in hashes.values()}
+ assert len(doc_names) == 2
+ assert "report" in doc_names
+ assert any(name.startswith("report-") for name in doc_names)
+
+ def test_add_directory_same_stem_with_legacy_entry_no_duplicate(self, tmp_path):
+ """Two same-stem files plus a legacy (path-less) entry must not both
+ reserve the legacy doc_name. ``find_legacy_by_stem`` must be consumed
+ (idempotent) across the batch so the second file gets a suffixed name
+ instead of colliding with the first.
+ """
+ import json as json_mod
+
+ from openkb.state import HashRegistry
+
+ kb_dir = self._setup_kb(tmp_path)
+ (kb_dir / ".openkb" / "config.yaml").write_text(
+ "model: gpt-4o-mini\nfile_processing_jobs: 2\n",
+ encoding="utf-8",
+ )
+ # Legacy entry: name + doc_name but NO path → find_legacy_by_stem matches "report".
+ HashRegistry(kb_dir / ".openkb" / "hashes.json").add(
+ "legacy-hash", {"name": "report.md", "doc_name": "report", "type": "md"}
+ )
+ docs_dir = tmp_path / "docs"
+ (docs_dir / "a").mkdir(parents=True)
+ (docs_dir / "b").mkdir(parents=True)
+ (docs_dir / "a" / "report.md").write_text("# A", encoding="utf-8")
+ (docs_dir / "b" / "report.md").write_text("# B", encoding="utf-8")
+
+ runner = CliRunner()
+ with patch("openkb.cli._find_kb_dir", return_value=kb_dir), \
+ patch("openkb.cli.asyncio.run"):
+ result = runner.invoke(cli, ["add", str(docs_dir)])
+ assert "Document name conflict" not in result.output
+ assert result.output.count("[OK]") == 2
+
+ hashes = json_mod.loads(
+ (kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8")
+ )
+ report_names = [
+ m["doc_name"] for m in hashes.values() if str(m.get("doc_name", "")).startswith("report")
+ ]
+ assert len(report_names) == 2
+ assert len(set(report_names)) == 2 # no silent overwrite
+
+ def test_commit_rejects_same_filename_different_path_conflict(self, tmp_path):
+ """A path-indexed entry sharing doc_name + filename but with a
+ different path (a concurrent add of a same-named file in the
+ reservation/commit window) must be rejected, not silently
+ overwritten via the filename escape.
+ """
+ from openkb.cli import _PreparedAdd, _commit_prepared_add
+ from openkb.converter import ConvertResult
+ from openkb.state import HashRegistry
+
+ kb_dir = self._setup_kb(tmp_path)
+ source_path = kb_dir / "wiki" / "sources" / "report.md"
+ source_path.parent.mkdir(parents=True, exist_ok=True)
+ source_path.write_text("# mine", encoding="utf-8")
+
+ # A DIFFERENT document already owns doc_name "report": different path,
+ # different hash, same filename.
+ HashRegistry(kb_dir / ".openkb" / "hashes.json").add(
+ "other-hash",
+ {"name": "report.md", "doc_name": "report", "type": "md",
+ "path": "elsewhere/report.md"},
+ )
+
+ prepared = _PreparedAdd(
+ file_path=tmp_path / "report.md",
+ result=ConvertResult(
+ raw_path=kb_dir / "raw" / "report.md",
+ source_path=source_path,
+ file_hash="myhash" + "0" * 59,
+ doc_name="report",
+ ),
+ )
+
+ with patch("openkb.cli.publish_staged_tree"), \
+ patch("openkb.cli.asyncio.run"):
+ outcome = _commit_prepared_add(prepared, kb_dir, "gpt-4o-mini")
+
+ assert outcome == "failed"
+ hashes = json.loads((kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8"))
+ # The pre-existing document is untouched.
+ assert "other-hash" in hashes
+ assert hashes["other-hash"]["path"] == "elsewhere/report.md"
+ assert "myhash" + "0" * 59 not in hashes
+
+ def test_add_directory_jobs1_stages_each_file(self, tmp_path):
+ """jobs==1 must stage each file (pass a real staging_dir) instead of
+ writing the live KB unlocked via staging_dir=None.
+ """
+ kb_dir = self._setup_kb(tmp_path)
+ (kb_dir / ".openkb" / "config.yaml").write_text(
+ "model: gpt-4o-mini\nfile_processing_jobs: 1\n",
+ encoding="utf-8",
+ )
+ docs_dir = tmp_path / "docs"
+ docs_dir.mkdir()
+ (docs_dir / "a.md").write_text("# A", encoding="utf-8")
+ (docs_dir / "b.md").write_text("# B", encoding="utf-8")
+
+ from openkb.cli import _PreparedAdd
+
+ seen_staging: list = []
+
+ def fake_prepare(file_path, kb_dir_arg, staging_dir, doc_name=None):
+ seen_staging.append(staging_dir)
+ return _PreparedAdd(file_path=file_path, staging_dir=staging_dir)
+
+ runner = CliRunner()
+ with patch("openkb.cli._prepare_add_file", side_effect=fake_prepare), \
+ patch("openkb.cli._commit_prepared_add", return_value="added"), \
+ patch("openkb.cli._find_kb_dir", return_value=kb_dir):
+ runner.invoke(cli, ["add", str(docs_dir)])
+
+ assert len(seen_staging) == 2
+ assert all(s is not None for s in seen_staging) # regression: was None
+
+ def test_commit_returns_added_when_post_commit_cleanup_fails(self, tmp_path):
+ """Once the registry write lands, a failure in journal cleanup must
+ NOT roll back the completed ingest (regression: the discard/log used
+ to live inside the rollback try).
+ """
+ from openkb.cli import _PreparedAdd, _commit_prepared_add
+ from openkb.converter import ConvertResult
+
+ kb_dir = self._setup_kb(tmp_path)
+ source_path = kb_dir / "wiki" / "sources" / "ok.md"
+ source_path.parent.mkdir(parents=True, exist_ok=True)
+ source_path.write_text("# OK", encoding="utf-8")
+ prepared = _PreparedAdd(
+ file_path=tmp_path / "ok.md",
+ result=ConvertResult(
+ raw_path=kb_dir / "raw" / "ok.md",
+ source_path=source_path,
+ file_hash="ok" + "0" * 62,
+ doc_name="ok",
+ ),
+ )
+
+ class FakeSnapshot:
+ journal_path = kb_dir / ".openkb" / "journal" / "ok.json"
+
+ def mark_committed(self):
+ pass
+
+ def rollback_best_effort(self):
+ return None
+
+ def discard_best_effort(self):
+ return RuntimeError("cleanup failed")
+
+ with patch("openkb.cli.snapshot_paths", return_value=FakeSnapshot()), \
+ patch("openkb.cli.publish_staged_tree"), \
+ patch("openkb.cli.asyncio.run"):
+ outcome = _commit_prepared_add(prepared, kb_dir, "gpt-4o-mini")
+
+ assert outcome == "added" # regression: was "failed" (rolled back success)
+ hashes = json.loads((kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8"))
+ assert "ok" + "0" * 62 in hashes # registry write survived
+
+ def test_commit_rolls_back_real_snapshot_on_compile_failure(self, tmp_path):
+ """End-to-end rollback: a REAL snapshot + REAL publish, then a compile
+ failure, must restore the KB to its pre-add state — published raw and
+ source files removed, registry unchanged, no orphaned artifacts or
+ journal. The FakeSnapshot-based test cannot exercise this transactional
+ guarantee (the whole reason the feature exists).
+ """
+ import json as json_mod
+
+ from openkb.cli import _PreparedAdd, _commit_prepared_add
+ from openkb.converter import ConvertResult
+
+ kb_dir = self._setup_kb(tmp_path)
+ pre_hashes = (kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8")
+
+ # A staging dir holding the converted artifacts that publish_staged_tree
+ # copies into the live KB before compile runs.
+ staging = tmp_path / "staging"
+ (staging / "raw").mkdir(parents=True)
+ (staging / "wiki" / "sources").mkdir(parents=True)
+ (staging / "raw" / "boom.md").write_text("# raw", encoding="utf-8")
+ source_md = staging / "wiki" / "sources" / "boom.md"
+ source_md.write_text("# converted", encoding="utf-8")
+
+ prepared = _PreparedAdd(
+ file_path=tmp_path / "boom.md",
+ result=ConvertResult(
+ raw_path=staging / "raw" / "boom.md",
+ source_path=source_md,
+ file_hash="boom" + "0" * 60,
+ doc_name="boom",
+ ),
+ staging_dir=staging,
+ )
+
+ with patch("openkb.cli.asyncio.run", side_effect=RuntimeError("compile failed")), \
+ patch("openkb.cli.time.sleep"):
+ outcome = _commit_prepared_add(prepared, kb_dir, "gpt-4o-mini")
+
+ assert outcome == "failed"
+ # Published artifacts were rolled back (removed).
+ assert not (kb_dir / "raw" / "boom.md").exists()
+ assert not (kb_dir / "wiki" / "sources" / "boom.md").exists()
+ assert not (kb_dir / "wiki" / "summaries" / "boom.md").exists()
+ # Registry restored to pre-add state; no leaked boom entry.
+ hashes = json_mod.loads(
+ (kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8")
+ )
+ assert hashes == json.loads(pre_hashes)
+ assert "boom" + "0" * 60 not in hashes
+ # No orphan journal/backup left behind; staging cleaned up.
+ assert not any((kb_dir / ".openkb" / "journal").glob("*.json"))
+ assert not staging.exists()
+
+ def test_add_snapshot_rolls_back_pageindex_sqlite_sidecars(self, tmp_path):
+ """Long-doc failures must not leave SQLite sidecars newer than pageindex.db."""
+ from openkb.cli import _snapshot_add_paths
+ from openkb.mutation import snapshot_paths
+
+ kb_dir = self._setup_kb(tmp_path)
+ openkb_dir = kb_dir / ".openkb"
+ (openkb_dir / "pageindex.db").write_bytes(b"before")
+
+ snapshot = snapshot_paths(
+ kb_dir,
+ _snapshot_add_paths(kb_dir, "long", None, None),
+ operation="add",
+ details={},
+ )
+
+ for suffix in ("-wal", "-shm", "-journal"):
+ (openkb_dir / f"pageindex.db{suffix}").write_bytes(b"after")
+
+ snapshot.rollback()
+ snapshot.discard()
+
+ assert (openkb_dir / "pageindex.db").read_bytes() == b"before"
+ for suffix in ("-wal", "-shm", "-journal"):
+ assert not (openkb_dir / f"pageindex.db{suffix}").exists()
+
+ def test_add_single_file_stages_unless_file_already_in_raw(self, tmp_path):
+ """stage=True (default for single-file add / chat) routes convert
+ through an isolated staging dir; stage=False (watch / URL, file
+ already in raw/) keeps convert's in-place path. The staging default
+ closes the crash-orphan window for files that don't already live in
+ raw/."""
+ from openkb.cli import _PreparedAdd, _add_single_file_locked
+
+ kb_dir = self._setup_kb(tmp_path)
+ doc = tmp_path / "test.md"
+ doc.write_text("# hi", encoding="utf-8")
+
+ captured: list = []
+
+ def fake_prepare(file_path, kb_dir_arg, staging_dir, doc_name=None):
+ captured.append(staging_dir)
+ return _PreparedAdd(file_path=file_path, staging_dir=staging_dir, outcome="skipped")
+
+ with patch("openkb.cli._prepare_add_file", side_effect=fake_prepare), \
+ patch("openkb.cli._commit_prepared_add", return_value="skipped"):
+ _add_single_file_locked(doc, kb_dir) # default stage=True
+ _add_single_file_locked(doc, kb_dir, stage=False)
+
+ assert captured[0] is not None # staged by default → no live-KB write pre-snapshot
+ assert captured[1] is None # in-place for watch/URL (file already in raw/)
+
+ def test_commit_conflict_guard_normalizes_unicode_filenames(self, tmp_path):
+ """A legacy (path-less) entry whose name is stored NFC must match a
+ file whose name the filesystem reports as NFD (macOS HFS+/APFS), so a
+ same-document re-add is allowed instead of mis-reported as a conflict.
+ The guard NFKC-normalizes both sides; a raw ``==`` would diverge."""
+ from openkb.cli import _PreparedAdd, _commit_prepared_add
+ from openkb.converter import ConvertResult
+ from openkb.state import HashRegistry
+
+ kb_dir = self._setup_kb(tmp_path)
+ import unicodedata as _ud
+ nfc_name = "r\u00e9sum\u00e9.pdf" # NFC: é = U+00E9 (composed)
+ nfd_name = _ud.normalize("NFD", nfc_name) # NFD: e + U+0301 (decomposed)
+ assert nfc_name != nfd_name # raw bytes differ
+
+ source_path = kb_dir / "wiki" / "sources" / "resume.md"
+ source_path.parent.mkdir(parents=True, exist_ok=True)
+ source_path.write_text("# cv", encoding="utf-8")
+
+ HashRegistry(kb_dir / ".openkb" / "hashes.json").add(
+ "legacy-hash", {"name": nfc_name, "doc_name": "resume", "type": "pdf"}
+ )
+
+ prepared = _PreparedAdd(
+ file_path=tmp_path / nfd_name,
+ result=ConvertResult(
+ raw_path=kb_dir / "raw" / "resume.pdf",
+ source_path=source_path,
+ file_hash="new" + "0" * 61,
+ doc_name="resume",
+ ),
+ )
+
+ with patch("openkb.cli.publish_staged_tree"), \
+ patch("openkb.cli.asyncio.run"):
+ outcome = _commit_prepared_add(prepared, kb_dir, "gpt-4o-mini")
+
+ # NFC-vs-NFD is the same document, not a conflict → ingest proceeds.
+ assert outcome == "added"
+
+ def test_add_directory_jobs_gt1_runs_real_pipeline(self, tmp_path):
+ """jobs>1 ThreadPoolExecutor 路径的端到端测试。
+
+ 其余 jobs>1 测试都 mock 了 _prepare_add_file 和 _commit_prepared_add,
+ 所以真正的并发分支——futures 按扫描顺序提交、_staging_dir_for 分配、
+ prepared_outcomes.get(f) or futures[f].result() 回退、publish_staged_tree
+ 发布、registry 写入、staging 清理——从不被执行。这里用真实 prepare + 真实
+ commit,只 mock LLM compile,让最复杂的新路径真正跑一遍。
+ """
+ kb_dir = self._setup_kb(tmp_path)
+ (kb_dir / ".openkb" / "config.yaml").write_text(
+ "model: gpt-4o-mini\nfile_processing_jobs: 3\n",
+ encoding="utf-8",
+ )
+ docs_dir = tmp_path / "docs"
+ docs_dir.mkdir()
+ for letter in ("a", "b", "c"):
+ (docs_dir / f"{letter}.md").write_text(f"# {letter}", encoding="utf-8")
+
+ runner = CliRunner()
+ with patch("openkb.cli._find_kb_dir", return_value=kb_dir), \
+ patch("openkb.cli.asyncio.run"):
+ result = runner.invoke(cli, ["add", str(docs_dir)])
+
+ assert result.exception is None, result.output
+ assert result.output.count("[OK]") == 3
+ hashes = json.loads(
+ (kb_dir / ".openkb" / "hashes.json").read_text(encoding="utf-8")
+ )
+ assert len(hashes) == 3
+ assert {meta["doc_name"] for meta in hashes.values()} == {"a", "b", "c"}
+ # Staging dirs cleaned up after each commit.
+ staging = kb_dir / ".openkb" / "staging"
+ if staging.exists():
+ assert not any(p.name.startswith("add-") for p in staging.iterdir())
+ # Source artifacts published from staging into the live KB.
+ for letter in ("a", "b", "c"):
+ assert (kb_dir / "wiki" / "sources" / f"{letter}.md").exists()
+
+ def test_add_directory_interrupted_batch_does_not_leak_staging(self, tmp_path):
+ """A failure aborting the batch mid-loop must not leak the staging dirs
+ already created for files that never reach commit. Per-commit cleanup
+ only runs inside _commit_prepared_add, and recovery only scans
+ journal/ — so the batch itself must reap its own staging set.
+
+ The fake commit mimics the real one's per-call staging cleanup (try/
+ finally), so the only dir that should leak without a batch-level guard
+ is the never-committed third file's.
+ """
+ kb_dir = self._setup_kb(tmp_path)
+ docs_dir = tmp_path / "docs"
+ docs_dir.mkdir()
+ for name in ("a.md", "b.md", "c.md"):
+ (docs_dir / name).write_text(f"# {name}", encoding="utf-8")
+
+ from openkb.cli import _PreparedAdd, _cleanup_staging
+
+ def fake_prepare(file_path, kb_dir_arg, staging_dir, doc_name=None):
+ return _PreparedAdd(file_path=file_path, staging_dir=staging_dir)
+
+ commit_calls = {"n": 0}
+
+ def failing_commit(prepared, kb_dir_arg, model):
+ commit_calls["n"] += 1
+ try:
+ if commit_calls["n"] == 2:
+ raise RuntimeError("mid-batch failure aborts the loop")
+ return "added"
+ finally:
+ # mimic real _commit_prepared_add's per-call staging cleanup
+ _cleanup_staging(prepared.staging_dir)
+
+ runner = CliRunner()
+ with patch("openkb.cli._prepare_add_file", side_effect=fake_prepare), \
+ patch("openkb.cli._commit_prepared_add", side_effect=failing_commit), \
+ patch("openkb.cli._find_kb_dir", return_value=kb_dir):
+ result = runner.invoke(cli, ["add", str(docs_dir)])
+
+ assert isinstance(result.exception, RuntimeError)
+ staging_root = kb_dir / ".openkb" / "staging"
+ leaked = [p for p in staging_root.glob("add-*")] if staging_root.exists() else []
+ assert leaked == [], f"interrupted batch leaked staging dirs: {leaked}"
+
+ def test_commit_path_hardlinks_concepts_backup(self, tmp_path):
+ """The real add-commit path must snapshot wiki/concepts (and peers)
+ via hardlinks, not copies. Spy on snapshot_paths during a real
+ _commit_prepared_add and assert the concepts backup shares the live
+ file's inode — the O(1) snapshot that keeps per-file batch cost from
+ scaling with the corpus.
+ """
+ import openkb.mutation as mut
+ from openkb.cli import _PreparedAdd, _commit_prepared_add
+ from openkb.converter import ConvertResult
+
+ kb_dir = self._setup_kb(tmp_path)
+ concepts_file = kb_dir / "wiki" / "concepts" / "keep.md"
+ concepts_file.write_text("keep", encoding="utf-8")
+ live_inode = concepts_file.stat().st_ino
+
+ captured = {}
+ real_snapshot = mut.snapshot_paths
+
+ def spy(kb_dir_arg, paths, *, operation, details=None, hardlink_dirs=None):
+ snap = real_snapshot(
+ kb_dir_arg, paths,
+ operation=operation, details=details, hardlink_dirs=hardlink_dirs,
+ )
+ backup_concepts = snap.backup_dir / "wiki" / "concepts" / "keep.md"
+ captured["hardlinked"] = (
+ backup_concepts.exists() and backup_concepts.stat().st_ino == live_inode
+ )
+ return snap
+
+ staging = tmp_path / "staging"
+ (staging / "raw").mkdir(parents=True)
+ (staging / "wiki" / "sources").mkdir(parents=True)
+ (staging / "raw" / "doc.md").write_text("# raw", encoding="utf-8")
+ source_md = staging / "wiki" / "sources" / "doc.md"
+ source_md.write_text("# converted", encoding="utf-8")
+ prepared = _PreparedAdd(
+ file_path=tmp_path / "doc.md",
+ result=ConvertResult(
+ raw_path=staging / "raw" / "doc.md",
+ source_path=source_md,
+ file_hash="d" + "0" * 63,
+ doc_name="doc",
+ ),
+ staging_dir=staging,
+ )
+
+ with patch("openkb.cli.asyncio.run"), \
+ patch("openkb.cli.snapshot_paths", side_effect=spy):
+ _commit_prepared_add(prepared, kb_dir, "gpt-4o-mini")
+
+ assert captured.get("hardlinked") is True, (
+ "real add-commit path did not hardlink the concepts backup"
+ )
diff --git a/tests/test_mutation.py b/tests/test_mutation.py
new file mode 100644
index 00000000..beffddd4
--- /dev/null
+++ b/tests/test_mutation.py
@@ -0,0 +1,519 @@
+from __future__ import annotations
+
+import errno
+import os
+
+import pytest
+
+from openkb.mutation import publish_staged_tree, recover_pending_journals, snapshot_paths
+
+
+def test_recover_pending_add_journal_rolls_back_files(tmp_path):
+ kb_dir = tmp_path
+ openkb_dir = kb_dir / ".openkb"
+ openkb_dir.mkdir()
+ target = kb_dir / "wiki" / "summaries" / "doc.md"
+ target.parent.mkdir(parents=True)
+ target.write_text("before", encoding="utf-8")
+ new_file = kb_dir / "wiki" / "sources" / "doc.md"
+
+ snapshot_paths(
+ kb_dir,
+ [target, new_file],
+ operation="add",
+ details={"doc_name": "doc"},
+ )
+ target.write_text("after", encoding="utf-8")
+ new_file.parent.mkdir(parents=True)
+ new_file.write_text("new", encoding="utf-8")
+
+ messages = recover_pending_journals(kb_dir)
+
+ assert any("Rolled back interrupted add journal" in message for message in messages)
+ assert target.read_text(encoding="utf-8") == "before"
+ assert not new_file.exists()
+ assert not any((openkb_dir / "journal").glob("*.json"))
+
+
+def test_mark_committed_prevents_recovery_rollback(tmp_path):
+ """A snapshot marked committed must be discarded (not rolled back) by
+ recovery — the commit signal that protects a completed mutation from
+ being undone when post-commit cleanup fails.
+ """
+ kb_dir = tmp_path
+ openkb_dir = kb_dir / ".openkb"
+ openkb_dir.mkdir()
+ target = kb_dir / "wiki" / "summaries" / "doc.md"
+ target.parent.mkdir(parents=True)
+ target.write_text("before", encoding="utf-8")
+
+ snapshot = snapshot_paths(
+ kb_dir, [target], operation="add", details={"doc_name": "doc"}
+ )
+ target.write_text("after", encoding="utf-8") # the "committed" mutation
+ snapshot.mark_committed()
+
+ messages = recover_pending_journals(kb_dir)
+
+ assert any("Cleaned terminal mutation journal" in m for m in messages)
+ assert target.read_text(encoding="utf-8") == "after" # NOT rolled back
+ assert not any((openkb_dir / "journal").glob("*.json"))
+
+
+def test_snapshot_paths_cleans_backup_dir_on_failure(tmp_path):
+ """A partially-created snapshot must not leak its backup dir: on any
+ failure before the journal is written, snapshot_paths removes the
+ rollback dir it created (recover_pending_journals only scans journals
+ and could never reach it otherwise).
+ """
+ kb_dir = tmp_path / "kb"
+ kb_dir.mkdir()
+ # A target that resolves OUTSIDE kb_dir makes relative_to(kb_dir) raise
+ # mid-loop, after backup_dir was already mkdir'd.
+ outside = tmp_path / "outside.txt"
+ outside.write_text("hi", encoding="utf-8")
+
+ with pytest.raises(ValueError):
+ snapshot_paths(kb_dir, [outside], operation="add", details={})
+
+ staging = kb_dir / ".openkb" / "staging"
+ if staging.exists():
+ assert not any(staging.iterdir()) # no orphan rollback- dir
+
+
+def test_exclusive_lock_drains_active_journal_before_yielding(tmp_path):
+ """Recovery runs on every exclusive-lock acquisition, not just the add path.
+
+ ``recover_pending_journals`` is wired into ``kb_lock``'s first exclusive
+ acquisition, so any mutation command — ``remove``/``recompile``/``lint``/
+ ``chat``, all of which take ``kb_ingest_lock`` directly — drains a crashed
+ predecessor's active journal before it mutates. This is the regression
+ guard for the bug where an ``add`` crash left an active journal that an
+ intervening ``remove`` ignored and a later ``add`` then rolled back over
+ the remove's edits.
+ """
+ from openkb.locks import kb_ingest_lock
+
+ kb_dir = tmp_path
+ openkb_dir = kb_dir / ".openkb"
+ openkb_dir.mkdir()
+ target = kb_dir / "wiki" / "summaries" / "doc.md"
+ target.parent.mkdir(parents=True)
+ target.write_text("before", encoding="utf-8")
+
+ # Simulate a crashed add: snapshot taken, file mutated, but mark_committed
+ # never ran — an ACTIVE journal is left on disk.
+ snapshot_paths(kb_dir, [target], operation="add", details={"doc_name": "doc"})
+ target.write_text("after", encoding="utf-8")
+
+ # Any exclusive-lock holder drains before its body runs.
+ with kb_ingest_lock(openkb_dir):
+ assert target.read_text(encoding="utf-8") == "before"
+
+ assert target.read_text(encoding="utf-8") == "before"
+ assert not any((openkb_dir / "journal").glob("*.json"))
+
+
+# --- publish_staged_tree: O(1) rename + durability (review #2) -------------
+
+def _staged_raw(staging: Path, name: str, payload: bytes) -> Path:
+ src = staging / "raw" / name
+ src.parent.mkdir(parents=True, exist_ok=True)
+ src.write_bytes(payload)
+ return src
+
+
+def test_publish_moves_staged_files_on_same_filesystem(tmp_path):
+ """Publish must rename staged files into place (O(1)) when staging and
+ the live KB share a filesystem, not stream-copy them. The surest
+ observable signal: after publish the staged source is GONE (moved),
+ whereas a copy leaves it behind.
+ """
+ kb_dir = tmp_path / "kb"
+ staging = kb_dir / ".openkb" / "staging" / "add-x"
+ src = _staged_raw(staging, "doc.pdf", b"%PDF-1.4 payload")
+
+ publish_staged_tree(staging, kb_dir)
+
+ published = kb_dir / "raw" / "doc.pdf"
+ assert published.read_bytes() == b"%PDF-1.4 payload"
+ assert not src.exists() # moved, not copied
+
+
+def test_published_files_keep_umask_mode_not_0600(tmp_path):
+ """Published artifacts must be created at the process umask mode, not
+ inherit tempfile.mkstemp's 0600. 0600 would make the KB's published
+ files owner-only and inconsistent with atomic_write_bytes.
+ """
+ prev_umask = os.umask(0o022)
+ try:
+ kb_dir = tmp_path / "kb"
+ staging = kb_dir / ".openkb" / "staging" / "add-y"
+ _staged_raw(staging, "doc.pdf", b"data")
+
+ publish_staged_tree(staging, kb_dir)
+
+ from openkb.locks import _default_file_mode
+
+ published = kb_dir / "raw" / "doc.pdf"
+ assert (published.stat().st_mode & 0o777) == _default_file_mode()
+ finally:
+ os.umask(prev_umask)
+
+
+def test_publish_falls_back_to_copy_on_cross_filesystem(tmp_path, monkeypatch):
+ """When staging and the live KB are on different filesystems, the publish
+ rename raises EXDEV; publish must fall back to a durable copy and still
+ land the file with correct content at the destination.
+
+ Only the cross-device publish rename raises EXDEV — the fallback copy's
+ own temp-file rename is on the destination's filesystem and must succeed,
+ so the fake raises exactly once then delegates to the real ``os.replace``.
+ """
+ import openkb.mutation as mut
+
+ kb_dir = tmp_path / "kb"
+ staging = kb_dir / ".openkb" / "staging" / "add-z"
+ _staged_raw(staging, "doc.pdf", b"cross-fs payload")
+
+ real_replace = os.replace
+ calls = {"n": 0}
+
+ def fake_replace(src, dst, *args, **kwargs):
+ calls["n"] += 1
+ if calls["n"] == 1:
+ raise OSError(errno.EXDEV, "cross-device link")
+ return real_replace(src, dst, *args, **kwargs)
+
+ monkeypatch.setattr(mut.os, "replace", fake_replace)
+
+ publish_staged_tree(staging, kb_dir)
+
+ assert calls["n"] >= 2 # publish rename failed, fallback copy renamed
+ assert (kb_dir / "raw" / "doc.pdf").read_bytes() == b"cross-fs payload"
+
+
+# --- snapshot_paths: hardlinked dir backups (review #1) --------------------
+
+def test_snapshot_hardlinks_marked_directory_trees(tmp_path):
+ """Directory snapshots the caller marks hardlink-safe must hardlink the
+ live files into the backup (shared inode) — O(1), no per-file byte copy —
+ instead of streaming a fresh copy. This is what makes per-file concept /
+ entity / PageIndex-blob snapshots cheap on a large KB.
+ """
+ kb_dir = tmp_path
+ concepts = kb_dir / "wiki" / "concepts"
+ concepts.mkdir(parents=True)
+ existing = concepts / "old.md"
+ existing.write_text("old", encoding="utf-8")
+ live_inode = existing.stat().st_ino
+
+ snapshot = snapshot_paths(
+ kb_dir,
+ [concepts],
+ operation="add",
+ details={},
+ hardlink_dirs={concepts},
+ )
+ try:
+ backup_file = snapshot.backup_dir / "wiki" / "concepts" / "old.md"
+ assert backup_file.exists()
+ assert backup_file.stat().st_ino == live_inode # hardlink, not copy
+ finally:
+ snapshot.discard_best_effort()
+
+
+def test_hardlinked_dir_rollback_correct_after_atomic_writes(tmp_path):
+ """With a hardlinked dir backup, an atomic (temp+replace) rewrite of an
+ existing page and creation of a new page must still roll back correctly:
+ existing page restored to its pre-snapshot content, new page removed.
+
+ This is the correctness invariant hardlinking relies on — the wiki
+ writers must go through atomic temp+replace so the hardlink backup keeps
+ pointing at the old inode while the live file moves to a new one.
+ """
+ from openkb.locks import atomic_write_text
+
+ kb_dir = tmp_path
+ concepts = kb_dir / "wiki" / "concepts"
+ concepts.mkdir(parents=True)
+ existing = concepts / "old.md"
+ existing.write_text("old-content", encoding="utf-8")
+
+ snapshot = snapshot_paths(
+ kb_dir, [concepts], operation="add", details={}, hardlink_dirs={concepts},
+ )
+ # Mirror the (now atomic) compiler writers: rewrite the existing page via
+ # atomic temp+replace, and add a brand-new page the doc creates.
+ atomic_write_text(existing, "rewritten-content")
+ (concepts / "new.md").write_text("new", encoding="utf-8")
+
+ snapshot.rollback()
+ snapshot.discard_best_effort()
+
+ assert existing.read_text(encoding="utf-8") == "old-content"
+ assert not (concepts / "new.md").exists()
+
+
+def test_openkb_files_tree_is_hardlinked(tmp_path):
+ """The PageIndex blob store (.openkb/files) is append-only across docs —
+ each add creates new {doc_id} blobs and never modifies existing ones — so
+ it is hardlink-safe and must be snapshotted via hardlinks, not copied.
+ """
+ kb_dir = tmp_path
+ blobs = kb_dir / ".openkb" / "files" / "col"
+ blobs.mkdir(parents=True)
+ existing = blobs / "an-existing-doc.pdf"
+ existing.write_bytes(b"existing-blob")
+ live_inode = existing.stat().st_ino
+
+ snapshot = snapshot_paths(
+ kb_dir, [kb_dir / ".openkb" / "files"], operation="add",
+ details={}, hardlink_dirs={kb_dir / ".openkb" / "files"},
+ )
+ try:
+ backup = (
+ snapshot.backup_dir / ".openkb" / "files" / "col" / "an-existing-doc.pdf"
+ )
+ assert backup.stat().st_ino == live_inode
+ finally:
+ snapshot.discard_best_effort()
+
+
+def test_concept_writer_is_atomic_so_hardlink_rollback_restores(tmp_path):
+ """Regression guard for the hardlink invariant: the wiki page writers must
+ go through atomic temp+replace (new inode). If any regresses to in-place
+ ``write_text`` (same inode), the hardlinked snapshot backup aliases that
+ inode and rollback restores the MUTATED content instead of the original.
+
+ Exercises _write_concept's update path — the canonical in-place modify —
+ through a real hardlinked snapshot + rollback.
+ """
+ from openkb.agent.compiler import _write_concept
+
+ kb_dir = tmp_path
+ concepts = kb_dir / "wiki" / "concepts"
+ concepts.mkdir(parents=True)
+ existing = concepts / "topic.md"
+ existing.write_text("---\nsources: []\n---\n\noriginal body", encoding="utf-8")
+
+ snapshot = snapshot_paths(
+ kb_dir, [concepts], operation="add", details={}, hardlink_dirs={concepts},
+ )
+ # The compiler rewrites the concept page as part of the doc ingest. If this
+ # write is in-place, the hardlink backup is corrupted and rollback fails.
+ _write_concept(kb_dir / "wiki", "topic", "rewritten body", "summaries/doc.md", is_update=True)
+
+ snapshot.rollback()
+ snapshot.discard_best_effort()
+
+ restored = existing.read_text(encoding="utf-8")
+ assert "original body" in restored
+ assert "rewritten body" not in restored
+
+
+def test_fix_broken_links_is_atomic_so_hardlink_rollback_restores(tmp_path):
+ """Regression guard for lint --fix/remove cleanup writers.
+
+ ``fix_broken_links`` rewrites concept/entity pages outside the add path. If
+ it writes in place, a hardlinked snapshot aliases the live inode and rollback
+ restores the cleaned content instead of the original page.
+ """
+ from openkb.lint import fix_broken_links
+
+ kb_dir = tmp_path
+ wiki = kb_dir / "wiki"
+ concepts = wiki / "concepts"
+ concepts.mkdir(parents=True)
+ page = concepts / "topic.md"
+ page.write_text("# Topic\n\nGhost [[concepts/missing]] link.\n", encoding="utf-8")
+
+ snapshot = snapshot_paths(
+ kb_dir, [concepts], operation="add", details={}, hardlink_dirs={concepts},
+ )
+ fix_broken_links(wiki, restrict_to=[page])
+
+ snapshot.rollback()
+ snapshot.discard_best_effort()
+
+ restored = page.read_text(encoding="utf-8")
+ assert "[[concepts/missing]]" in restored
+ assert "Ghost link" not in restored
+
+
+def test_hardlink_falls_back_to_copy_on_eacces(tmp_path, monkeypatch):
+ """A hardlink blocked by a Windows ACL / OneDrive sync folder surfaces as
+ EACCES, not EXDEV/EPERM. _hardlink_or_copy must fall back to a real copy so
+ the snapshot still succeeds — otherwise the POSIX-oriented errno set aborts
+ the whole add on Windows where a plain copy would have worked.
+ """
+ import openkb.mutation as mut
+
+ kb_dir = tmp_path
+ concepts = kb_dir / "wiki" / "concepts"
+ concepts.mkdir(parents=True)
+ (concepts / "page.md").write_text("content", encoding="utf-8")
+
+ def link_eacces(src, dst, *args, **kwargs):
+ raise OSError(errno.EACCES, "simulated Windows ACL hardlink block")
+ monkeypatch.setattr(mut.os, "link", link_eacces)
+
+ snapshot = snapshot_paths(
+ kb_dir, [concepts], operation="add", details={}, hardlink_dirs={concepts},
+ )
+ try:
+ backup = snapshot.backup_dir / "wiki" / "concepts" / "page.md"
+ assert backup.read_text(encoding="utf-8") == "content" # copy fallback landed
+ # It is a real copy, not a hardlink (distinct inode).
+ assert backup.stat().st_ino != (concepts / "page.md").stat().st_ino
+ finally:
+ snapshot.discard_best_effort()
+
+
+# --- recover_pending_journals: bounded retry (pre-existing issue) ----------
+
+def test_recovery_gives_up_on_persistently_failing_journal(tmp_path, monkeypatch):
+ """A journal whose rollback keeps failing (e.g. persistent ENOSPC) must
+ not be retried forever — otherwise the backup dir + journal leak and every
+ future lock acquisition re-attempts the same failing rollback. After
+ MAX_ROLLBACK_ATTEMPTS failed attempts recovery discards it with a loud
+ message so a human can intervene, bounding the on-disk retention.
+ """
+ import openkb.mutation as mut
+
+ kb_dir = tmp_path
+ (kb_dir / ".openkb").mkdir()
+ target = kb_dir / "wiki" / "summaries" / "doc.md"
+ target.parent.mkdir(parents=True)
+ target.write_text("before", encoding="utf-8")
+ # Leave an ACTIVE journal (simulating a crashed add).
+ snapshot_paths(kb_dir, [target], operation="add", details={})
+ target.write_text("after", encoding="utf-8")
+
+ # Make rollback deterministically fail.
+ def boom(self):
+ raise OSError("persistent rollback failure")
+ monkeypatch.setattr(mut.MutationSnapshot, "rollback", boom)
+
+ for _ in range(mut.MAX_ROLLBACK_ATTEMPTS + 1):
+ recover_pending_journals(kb_dir)
+
+ # Given up + discarded, not retained forever.
+ journal_dir = kb_dir / ".openkb" / "journal"
+ assert not any(journal_dir.glob("*.json"))
+
+
+@pytest.mark.parametrize(
+ "payload",
+ [
+ "", # empty file -> JSONDecodeError
+ "{not json", # truncated/invalid -> JSONDecodeError
+ '{"status": "active"}', # valid JSON missing kb_dir/backup_dir -> KeyError
+ '{"not": "a journal"}', # valid JSON, wrong shape -> KeyError
+ ],
+)
+def test_recover_skips_malformed_journal_without_bricking_lock(tmp_path, payload):
+ """A corrupt/empty/stray .json in journal/ must not crash recovery.
+
+ ``snapshot`` is assigned inside the try (after json.loads /
+ _snapshot_from_journal), but the except block referenced it unconditionally
+ — so a single malformed journal raised NameError out of recovery, and thus
+ out of every exclusive kb_lock acquisition (draining runs on first
+ acquisition), bricking add/remove/recompile/chat for the whole KB. Recovery
+ must instead drop the unrecoverable journal, log loudly, and keep going so
+ the lock still acquires.
+ """
+ from openkb.locks import kb_ingest_lock
+
+ kb_dir = tmp_path
+ journal_dir = kb_dir / ".openkb" / "journal"
+ journal_dir.mkdir(parents=True)
+ (journal_dir / "deadbeef.json").write_text(payload, encoding="utf-8")
+
+ messages = recover_pending_journals(kb_dir) # must not raise NameError
+ assert any("Unrecoverable mutation journal" in m for m in messages)
+ assert not any(journal_dir.glob("*.json")) # bad journal removed, not retained
+
+ # The whole point: the KB's mutation lock still acquires afterwards.
+ with kb_ingest_lock(kb_dir / ".openkb"):
+ pass
+
+
+# --- O(touched) rollback for hardlinked dirs (pre-existing issue) ----------
+
+def test_hardlinked_dir_rollback_leaves_untouched_files_in_place(tmp_path):
+ """O(touched) rollback: an untouched file in a hardlinked dir shares the
+ backup's inode, so rollback must leave it in place (same inode) instead
+ of delete + recopy. A full-copy rollback would give it a new inode — this
+ is the regression driver for the inode-aware restore.
+ """
+ kb_dir = tmp_path
+ concepts = kb_dir / "wiki" / "concepts"
+ concepts.mkdir(parents=True)
+ keep = concepts / "keep.md"
+ keep.write_text("keep", encoding="utf-8")
+ keep_inode = keep.stat().st_ino
+
+ snapshot = snapshot_paths(
+ kb_dir, [concepts], operation="add", details={}, hardlink_dirs={concepts},
+ )
+ # keep.md is not mutated — it stays shared-inode with the backup.
+ snapshot.rollback()
+ snapshot.discard_best_effort()
+
+ assert keep.exists()
+ assert keep.read_text(encoding="utf-8") == "keep"
+ assert keep.stat().st_ino == keep_inode # NOT recopied
+
+
+def test_hardlinked_dir_rollback_removes_new_and_restores_modified(tmp_path):
+ from openkb.locks import atomic_write_text
+
+ kb_dir = tmp_path
+ concepts = kb_dir / "wiki" / "concepts"
+ concepts.mkdir(parents=True)
+ (concepts / "old.md").write_text("old", encoding="utf-8")
+ page = concepts / "page.md"
+ page.write_text("original", encoding="utf-8")
+
+ snapshot = snapshot_paths(
+ kb_dir, [concepts], operation="add", details={}, hardlink_dirs={concepts},
+ )
+ # Commit created a new page and atomically rewrote an existing one.
+ (concepts / "new.md").write_text("new", encoding="utf-8")
+ atomic_write_text(page, "rewritten")
+
+ snapshot.rollback()
+ snapshot.discard_best_effort()
+
+ assert (concepts / "old.md").read_text(encoding="utf-8") == "old"
+ assert page.read_text(encoding="utf-8") == "original"
+ assert not (concepts / "new.md").exists()
+
+
+def test_hardlinked_dir_rollback_prunes_new_nested_blob_dirs(tmp_path):
+ """PageIndex blob-store scenario: an existing blob is untouched (shared
+ inode, left in place), while a new doc's blob + its nested images subdir
+ are removed on rollback — including the now-empty newdoc/ directory.
+ """
+ kb_dir = tmp_path
+ files = kb_dir / ".openkb" / "files"
+ (files / "col").mkdir(parents=True)
+ existing = files / "col" / "existing.pdf"
+ existing.write_bytes(b"existing")
+ existing_inode = existing.stat().st_ino
+
+ snapshot = snapshot_paths(
+ kb_dir, [files], operation="add", details={}, hardlink_dirs={files},
+ )
+ (files / "col" / "newdoc.pdf").write_bytes(b"new")
+ (files / "col" / "newdoc" / "images").mkdir(parents=True)
+ (files / "col" / "newdoc" / "images" / "p1.png").write_bytes(b"png")
+
+ snapshot.rollback()
+ snapshot.discard_best_effort()
+
+ assert existing.read_bytes() == b"existing"
+ assert existing.stat().st_ino == existing_inode # untouched, not recopied
+ assert not (files / "col" / "newdoc.pdf").exists()
+ assert not (files / "col" / "newdoc").exists() # empty new dir pruned
diff --git a/tests/test_remove.py b/tests/test_remove.py
index 75daadf0..f99cd5ab 100644
--- a/tests/test_remove.py
+++ b/tests/test_remove.py
@@ -742,7 +742,7 @@ def test_add_persists_doc_name_for_later_remove(tmp_path):
runner = CliRunner()
# Mock convert_document + asyncio.run to skip the LLM-driven compile.
with patch("openkb.cli._find_kb_dir", return_value=tmp_path), \
- patch("openkb.cli.convert_document", return_value=mock_result), \
+ patch("openkb.cli._convert_document_locked", return_value=mock_result), \
patch("openkb.cli.asyncio.run"):
add_res = runner.invoke(cli, ["add", str(doc)])
assert add_res.exit_code == 0, add_res.output
@@ -985,7 +985,7 @@ def test_add_long_pdf_persists_doc_id_to_registry(tmp_path):
runner = CliRunner()
with patch("openkb.cli._find_kb_dir", return_value=tmp_path), \
- patch("openkb.cli.convert_document", return_value=convert_mock), \
+ patch("openkb.cli._convert_document_locked", return_value=convert_mock), \
patch("openkb.indexer.index_long_document", return_value=index_mock), \
patch("openkb.cli.asyncio.run"):
result = runner.invoke(cli, ["add", str(pdf)])
diff --git a/tests/test_url_ingest.py b/tests/test_url_ingest.py
index 1b8548ee..fba9c748 100644
--- a/tests/test_url_ingest.py
+++ b/tests/test_url_ingest.py
@@ -488,7 +488,7 @@ def test_add_single_file_returns_added_on_success(tmp_path):
is_long_doc=False, file_hash="cafe" * 16,
)
- with patch("openkb.cli.convert_document", return_value=mock_result), \
+ with patch("openkb.cli._convert_document_locked", return_value=mock_result), \
patch("openkb.cli.asyncio.run"):
outcome = add_single_file(doc, tmp_path)
@@ -507,7 +507,7 @@ def test_add_single_file_returns_skipped_on_dedup(tmp_path):
doc.write_text("# Hello")
skipped = ConvertResult(skipped=True)
- with patch("openkb.cli.convert_document", return_value=skipped):
+ with patch("openkb.cli._convert_document_locked", return_value=skipped):
outcome = add_single_file(doc, tmp_path)
assert outcome == "skipped"
@@ -539,7 +539,7 @@ def test_add_single_file_returns_failed_on_pipeline_error(tmp_path):
)
# Make both compile attempts raise to drive the failure path.
- with patch("openkb.cli.convert_document", return_value=mock_result), \
+ with patch("openkb.cli._convert_document_locked", return_value=mock_result), \
patch("openkb.cli.asyncio.run", side_effect=RuntimeError("LLM 503")), \
patch("openkb.cli.time.sleep"):
outcome = add_single_file(doc, tmp_path)
@@ -570,7 +570,7 @@ def test_url_ingest_cleans_up_orphan_on_dedup_skip(tmp_path, monkeypatch):
# source module — that's where the `from ... import` resolves.
with patch("openkb.cli._find_kb_dir", return_value=tmp_path), \
patch("openkb.url_ingest.fetch_url_to_raw", return_value=fetched_path), \
- patch("openkb.cli.convert_document",
+ patch("openkb.cli._convert_document_locked",
return_value=ConvertResult(skipped=True)):
result = runner.invoke(cli, ["add", "https://example.com/paper.pdf"])
@@ -610,7 +610,7 @@ def test_url_ingest_keeps_raw_file_on_pipeline_failure(tmp_path):
runner = CliRunner()
with patch("openkb.cli._find_kb_dir", return_value=tmp_path), \
patch("openkb.url_ingest.fetch_url_to_raw", return_value=fetched_path), \
- patch("openkb.cli.convert_document", return_value=mock_result), \
+ patch("openkb.cli._convert_document_locked", return_value=mock_result), \
patch("openkb.cli.asyncio.run", side_effect=RuntimeError("LLM 503")), \
patch("openkb.cli.time.sleep"):
result = runner.invoke(cli, ["add", "https://example.com/paper.pdf"])