feat: concurrent add pipeline with crash-safe mutation recovery#104
feat: concurrent add pipeline with crash-safe mutation recovery#104gwokhou wants to merge 12 commits into
Conversation
Add openkb/mutation.py, the transactional layer for KB mutations, plus tests. MutationSnapshot snapshots the target paths, journals the intent, and restores on failure across active/committed/rolled_back states (children restored before parents; self-cleans its backup dir on partial failure). snapshot_paths writes the active journal as the recovery signal; recover_pending_journals rolls back active journals and discards terminal ones. publish_staged_tree copies staged raw/source artifacts into place. Module and tests only — wired into the add path in the next commit.
Parallelize directory `add`: prepare files concurrently (hash, prefilter, staging, convert) while live-KB mutation stays serialized under the mutation lock. - Split add into prepare (file-local, into an isolated staging dir) and commit (under the lock): snapshot -> publish -> index -> compile -> registry write -> mark_committed, with snapshot rollback on failure. - convert_document gains assume_locked/staging_dir/doc_name_override so parallel prepares never write the live KB unlocked. - Reserve doc_names in scan order via HashRegistry.memory so same-stem files behave like serial adds. - New file_processing_jobs config (default 2).
Document file_processing_jobs: add it to the Settings yaml example and note (README Advanced options + config.yaml.example) that only file preparation is parallelized while live-KB mutation stays serialized, so raising it helps mainly when conversion is the bottleneck.
Add DEBUG-level timing logs across the add pipeline (lock_wait, prefilter, prepare, index, compile, commit) via _log_add_timing, gated behind isEnabledFor(DEBUG) so there is no cost when disabled. Surfaces where time goes during concurrent directory add.
Hash directory inputs up front and skip files whose hash is already in the registry before spawning prepare workers, so known duplicates no longer go through conversion and staging. Hashing runs across the jobs workers; files that fail to hash surface as per-file "failed" outcomes instead of aborting the batch.
recover_pending_journals was wired only into _kb_mutation_lock (the add path), so remove/recompile/lint/chat — which take kb_ingest_lock directly via _with_kb_lock — never drained. An `openkb add` that crashed mid-commit left an ACTIVE journal that an intervening `openkb remove` ignored and a later `openkb add` then rolled back, clobbering the remove's hashes.json edits and resurrecting the removed document. Move draining into kb_lock's first exclusive acquisition so every mutation entry point restores the KB to a known state before mutating. Delay-import mutation from locks to break the locks<->mutation cycle, and drop the now-redundant drain (plus its double-scan/double-log per file) from _kb_mutation_lock. Co-Authored-By: Claude <noreply@anthropic.com>
- test_exclusive_lock_drains_active_journal_before_yielding: regression guard for the lock-level drain — an ACTIVE journal left by a crashed add is rolled back the moment any exclusive lock is taken, not only on the add path. - test_add_directory_jobs_gt1_runs_real_pipeline: end-to-end exercise of the jobs>1 ThreadPoolExecutor branch (real prepare + real commit, only LLM compile mocked). Every prior jobs>1 test mocked both halves, so futures ordering, staging publish, registry writes, and cleanup were never run. Co-Authored-By: Claude <noreply@anthropic.com>
114abea to
a48d054
Compare
Cleanup pass over the concurrent-ingestion + mutation-journal feature. Behavior preserved (full suite: 820 passed). - mutation: stream _copy_file_atomic (mkstemp + copyfileobj + fsync + os.replace) instead of buffering whole files; route snapshot_paths file backups through it so every file copy in the module is atomic and streaming — large raw PDFs no longer spike peak memory. - converter: drop the dead ConvertResult.staging_dir field (only _PreparedAdd.staging_dir is ever read); split convert_document into a lock-acquiring wrapper + _convert_document_locked so the parallel prepare path calls the locked form directly, removing the assume_locked flag and its recursive self-call. - cli: hoist a module-level logger (drop four local rebinding sites); unify the jobs==1 and jobs>1 commit loops into one ThreadPoolExecutor path; document the defensive long-doc snapshot paths. Co-Authored-By: Claude <noreply@anthropic.com>
KylinMountain
left a comment
There was a problem hiding this comment.
Overall the design holds up well — staging isolation, serialized commit, the journal-based recovery, and the commit-time re-checks are all correct, and test coverage is solid. Nothing here blocks merge on day-to-day correctness. Three things are worth fixing first, though — #1 is the one that bites in normal use:
1. Every per-file commit deep-copies the whole KB for the rollback backup — openkb/cli.py:475, openkb/mutation.py:165
_snapshot_add_paths lists the entire wiki/concepts and wiki/entities trees plus the .openkb/files PDF blob store, and snapshot_paths copytrees them on each file's commit. A 50-file batch copies the whole KB ~50 times — O(files × corpus) I/O that scales with the KB and can fill the disk mid-batch; even adding a 2 KB .md copies the full blob store. Suggest snapshotting only the pages the doc actually touches (or hardlink/CoW), or taking the snapshot once per batch rather than once per file.
2. Publish copies bytes instead of renaming, and drops the dir-fsync + chmod — openkb/mutation.py:18
Staging is on the same filesystem as raw/ and wiki/sources/, so os.replace would be an O(1) atomic rename instead of a full streaming copy + fsync. _copy_file_atomic also omits the parent-dir fsync and fchmod that locks.atomic_write_bytes performs, so published files are less durable across a crash and inherit mkstemp's 0600 mode instead of the umask mode. Suggest os.replace with a copy fallback on EXDEV, reusing the atomic_write_bytes fsync/chmod path.
3. Interrupted batches leak staging dirs — openkb/cli.py:1067
Staging dirs are all mkdir'd up front in the futures comprehension, but _cleanup_staging only runs inside each per-file commit and there's no batch-level try/finally. A Ctrl+C, a failed file, or an mkdir error orphans the already-created add-* dirs, and recovery only scans journal/ so it never reclaims them. Suggest a batch-level cleanup, or folding staging into the recovery sweep.
Happy to send a patch for #1 if useful. The rest of what I found is rarer crash/concurrency edges and design follow-ups — fine as separate issues.
…, batch cleanup Addresses reviewer (KylinMountain) feedback on the concurrent-add / crash-recovery PR. No day-to-day-correctness blocker; perf, durability, and a resource leak. VectifyAI#1 — per-file snapshot no longer copies the whole KB. snapshot_paths gains a hardlink_dirs opt-in: wiki/concepts, wiki/entities, and .openkb/files are backed up via os.link (O(1)) instead of copytree (EXDEV/EPERM/EACCES → copy fallback, incl. Windows OneDrive/ACL hardlink blocks). Per-file snapshot is now O(touched) instead of O(files × corpus), so the "2 KB .md copies the full blob store" case is gone. Hardlinks required the wiki writers (all 14 sites) to move from path.write_text (in-place, same inode) to atomic_write_text (temp+replace, new inode) so a backup keeps the old bytes while the live tree is mutated. .openkb/files is hardlink-safe because PageIndex only appends new {doc_id} blobs. Per-file failure isolation preserved (did not take the once-per-batch suggestion, which would roll back earlier files' shared-index changes on a later failure). VectifyAI#2 — publish renames; copies are durable + umask-mode. publish_staged_tree → os.replace (O(1)) with EXDEV→copy fallback (_publish_staged_file); the rename path fsyncs the file DATA as well as the parent dir (parity with _copy_file_atomic, so a crash right after publish can't leave committed metadata pointing at un-durable bytes). _copy_file_atomic (snapshot/rollback/EXDEV-fallback) now fsyncs the parent dir and sets the umask mode instead of inheriting mkstemp's 0600, reusing locks' _fsync_directory/_target_mode. VectifyAI#3 — interrupted batches no longer leak staging dirs. The add batch tracks its staging dirs in a try/finally that reaps any whose file never reached _commit_prepared_add's per-call cleanup. Batch-local rather than a recovery sweep, since the lock is released between per-file commits and a sweep could reap a mid-batch acquisition's pending dirs. Tests: +10 covering publish rename/durability/EXDEV, hardlink snapshot + rollback correctness, hardlink EACCES fallback (Windows), interrupted-batch leak, a hardlink-invariant regression guard (mutation-verified), and real-commit-path hardlink wiring. Full suite: 837 passed.
Two pre-existing resource/crash issues surfaced by the self-review of the review-fix commit, addressed here: recovery retry is now bounded (recover_pending_journals). An active journal whose rollback keeps failing (e.g. persistent ENOSPC) was retried on every lock acquisition forever — re-doing the failing rollback and never releasing the backup dir + journal. The journal now carries an additive "attempts" counter (default 0, backward-compatible with old journals); after MAX_ROLLBACK_ATTEMPTS (5) failed rollbacks recovery discards it with a loud "manual review needed" message, bounding on-disk retention instead of leaking indefinitely. rollback of a hardlinked dir is now O(touched), not O(corpus). rollback() previously did rmtree(target) + copytree(backup) for every dir, recopying the whole tree — the .openkb/files blob store (potentially GB) on every failed long-doc add, and able to ENOSPC mid-rollback (feeding the unbounded-retry leak above). For dirs snapshotted via hardlinks, rollback now does an inode-diff restore (_restore_hardlinked_dir): untouched files already share the backup's inode and are left in place; only new/modified/ deleted files are touched. Crash-rebuilt snapshots (no hardlink info on disk) keep the safe full-copy path, so the journal format is unchanged and recovery stays conservative. Tests: +4 (bounded-recovery give-up; hardlinked rollback leaves untouched files in place / removes new + restores modified / prunes new nested blob dirs). Full suite: 841 passed.
|
@KylinMountain Thanks your review! All three items are addressed in the latest commits, with tests added for each change (full suite: 841 passed). I also found two further issues during self-review, noted below. Solved issues from your review 981e4911. Per-file snapshot cost reduced to O(touched).wiki/concepts, wiki/entities, and .openkb/files are now snapshotted with hardlinks (os.link) rather than copytree, with EXDEV/EPERM/EACCES falling back to a copy. This removes the O(files × corpus) copy, including the case where a small add copied the entire blob store. Notes:
One assumption worth confirming: .openkb/files relies on PageIndex appending only new {doc_id} blobs per add (verified in its local backend — fresh UUID per add, dedup returns early, no in-place rewrite). If you would prefer not to rely on that, the alternative is to track removals for new blobs in that tree. Please share your perspective on the patch you had in mind. If a simpler approach than hardlinks plus atomic writers exists, I would prefer to adopt it. 2. Publish uses rename, durable, and umask-mode.publish_staged_tree now uses os.replace (O(1)) with an EXDEV→copy fallback. The renamed file's data and parent directory are fsync'd, and the mode is the umask default rather than mkstemp's 0600, reusing locks._fsync_directory/_target_mode as suggested. 3. Interrupted batches reclaim their staging directories.The batch records its staging directories and reclaims any that do not reach commit via a try/finally. It is scoped to the batch rather than folded into recovery, because the lock is released between per-file commits, and a recovery sweep could reclaim another in-flight batch's directories. Remaining limitation: a hard kill or power loss mid-batch can still orphan staging directories, since the final step does not run and recovery scans only journal/. Solved issues from self-review 5fb53b8
Additionally, recovery retry is now bounded (a persistently-failing rollback journal previously retried indefinitely; it now gives up after five attempts with an explicit warning), and rollback for hardlinked directories is O(touched) via inode comparison, leaving untouched files in place. Further DiscussionsI am happy to hear the follow-up suggestions you mentioned (about rarer concurrency, crashes, and design). |
recover_pending_journals' except block referenced `snapshot` unconditionally, but it is assigned inside the try — so a corrupt/empty/stray .json (or one missing kb_dir/backup_dir) made json.loads/_snapshot_from_journal raise before the assignment. The handler then threw UnboundLocalError, which propagates out of recovery and out of every exclusive kb_lock acquisition (draining runs on first acquisition), bricking add/remove/recompile/chat for the whole KB off a single bad journal. Initialize snapshot=None per iteration; when it is still None in the except (parse/reconstruct failed), best-effort remove the unrecoverable journal, log loudly, and continue instead of crashing. Adds a parametrized regression test over malformed-journal payloads plus a lock-acquire assertion.
One more solved issue from self-review bdef059a malformed recovery journal no longer bricks the KB lockWhile self-reviewing this PR I caught a latent crash in BugThe except Exception as exc:
snapshot.attempts += 1 # ← snapshot unbound → UnboundLocalErrorBecause journal draining runs on every first exclusive Normal writes go through FixInitialize TestAdded a parametrized regression test ( |
Summary
Parallelize
openkb add <dir>and make every KB mutation crash-recoverable through a journaled snapshot/rollback layer.Why
Directory
addwas strictly serial — ingest time scaled linearly with file count and was dominated by conversion. Worse, a crash between convert and the registry write orphaned raw/source artifacts in the live KB with no way to recover or clean them up.Changes
ThreadPoolExecutor(file_processing_jobs, default 2); live-KB mutation — publish, PageIndex indexing, LLM compile, registry write, log append — stays under the mutation lock. Commits run in scan order for stablelog.md/ CLI output.openkb/mutation.py). Each commit snapshots the KB paths it will touch, journals the intent, and rolls back on failure.recover_pending_journalsrolls back anyactivejournal left by an interrupted run and discardscommitted/rolled_backones. SQLite sidecars (pageindex.db-wal/-shm/-journal) are snapshotted too, so long-doc failures don't leave sidecars newer than the db.remove/recompile/lint/chatalso restore the KB before mutating — not justadd. Prevents a crashedadd's journal from later clobbering an interveningremove.convert_documentwrites into an isolated staging dir published atomically at commit, so a crash can no longer orphan live-KB artifacts.Tests
test_mutation.py+test_add_command.pycover snapshot rollback, commit protection, partial-failure cleanup, lock-driven recovery, and the jobs>1 pipeline end-to-end (real prepare + commit, only LLM compile mocked). Full suite: 820 passed.🤖 Generated with Claude Code