Skip to content

feat: concurrent add pipeline with crash-safe mutation recovery#104

Open
gwokhou wants to merge 12 commits into
VectifyAI:mainfrom
gwokhou:feat/concurrent-ingestion-recovery
Open

feat: concurrent add pipeline with crash-safe mutation recovery#104
gwokhou wants to merge 12 commits into
VectifyAI:mainfrom
gwokhou:feat/concurrent-ingestion-recovery

Conversation

@gwokhou

@gwokhou gwokhou commented Jun 18, 2026

Copy link
Copy Markdown
Contributor

Summary

Parallelize openkb add <dir> and make every KB mutation crash-recoverable through a journaled snapshot/rollback layer.

Why

Directory add was strictly serial — ingest time scaled linearly with file count and was dominated by conversion. Worse, a crash between convert and the registry write orphaned raw/source artifacts in the live KB with no way to recover or clean them up.

Changes

  • Concurrent prepare, serialized commit. File preparation (hash, duplicate prefilter, staging, conversion) runs on a ThreadPoolExecutor (file_processing_jobs, default 2); live-KB mutation — publish, PageIndex indexing, LLM compile, registry write, log append — stays under the mutation lock. Commits run in scan order for stable log.md / CLI output.
  • Crash-safe mutation journal (openkb/mutation.py). Each commit snapshots the KB paths it will touch, journals the intent, and rolls back on failure. recover_pending_journals rolls back any active journal left by an interrupted run and discards committed/rolled_back ones. SQLite sidecars (pageindex.db-wal/-shm/-journal) are snapshotted too, so long-doc failures don't leave sidecars newer than the db.
  • Recovery wired into the lock. Draining runs on every exclusive-lock acquisition, so remove/recompile/lint/chat also restore the KB before mutating — not just add. Prevents a crashed add's journal from later clobbering an intervening remove.
  • Staged conversion. convert_document writes into an isolated staging dir published atomically at commit, so a crash can no longer orphan live-KB artifacts.
  • Perf & diagnostics. Prefilter known hashes before preparing; DEBUG-level per-stage timing logs.

Tests

test_mutation.py + test_add_command.py cover snapshot rollback, commit protection, partial-failure cleanup, lock-driven recovery, and the jobs>1 pipeline end-to-end (real prepare + commit, only LLM compile mocked). Full suite: 820 passed.

🤖 Generated with Claude Code

@gwokhou gwokhou marked this pull request as ready for review June 18, 2026 16:15
gwokhou and others added 7 commits June 20, 2026 00:54
Add openkb/mutation.py, the transactional layer for KB mutations, plus
tests. MutationSnapshot snapshots the target paths, journals the intent,
and restores on failure across active/committed/rolled_back states
(children restored before parents; self-cleans its backup dir on partial
failure). snapshot_paths writes the active journal as the recovery signal;
recover_pending_journals rolls back active journals and discards terminal
ones. publish_staged_tree copies staged raw/source artifacts into place.

Module and tests only — wired into the add path in the next commit.
Parallelize directory `add`: prepare files concurrently (hash, prefilter,
staging, convert) while live-KB mutation stays serialized under the
mutation lock.

- Split add into prepare (file-local, into an isolated staging dir) and
  commit (under the lock): snapshot -> publish -> index -> compile ->
  registry write -> mark_committed, with snapshot rollback on failure.
- convert_document gains assume_locked/staging_dir/doc_name_override so
  parallel prepares never write the live KB unlocked.
- Reserve doc_names in scan order via HashRegistry.memory so same-stem
  files behave like serial adds.
- New file_processing_jobs config (default 2).
Document file_processing_jobs: add it to the Settings yaml example and
note (README Advanced options + config.yaml.example) that only file
preparation is parallelized while live-KB mutation stays serialized, so
raising it helps mainly when conversion is the bottleneck.
Add DEBUG-level timing logs across the add pipeline (lock_wait, prefilter,
prepare, index, compile, commit) via _log_add_timing, gated behind
isEnabledFor(DEBUG) so there is no cost when disabled. Surfaces where time
goes during concurrent directory add.
Hash directory inputs up front and skip files whose hash is already in the
registry before spawning prepare workers, so known duplicates no longer go
through conversion and staging. Hashing runs across the jobs workers;
files that fail to hash surface as per-file "failed" outcomes instead of
aborting the batch.
recover_pending_journals was wired only into _kb_mutation_lock (the add
path), so remove/recompile/lint/chat — which take kb_ingest_lock directly
via _with_kb_lock — never drained. An `openkb add` that crashed mid-commit
left an ACTIVE journal that an intervening `openkb remove` ignored and a
later `openkb add` then rolled back, clobbering the remove's hashes.json
edits and resurrecting the removed document.

Move draining into kb_lock's first exclusive acquisition so every mutation
entry point restores the KB to a known state before mutating. Delay-import
mutation from locks to break the locks<->mutation cycle, and drop the
now-redundant drain (plus its double-scan/double-log per file) from
_kb_mutation_lock.

Co-Authored-By: Claude <noreply@anthropic.com>
- test_exclusive_lock_drains_active_journal_before_yielding: regression
  guard for the lock-level drain — an ACTIVE journal left by a crashed add
  is rolled back the moment any exclusive lock is taken, not only on the
  add path.
- test_add_directory_jobs_gt1_runs_real_pipeline: end-to-end exercise of the
  jobs>1 ThreadPoolExecutor branch (real prepare + real commit, only LLM
  compile mocked). Every prior jobs>1 test mocked both halves, so futures
  ordering, staging publish, registry writes, and cleanup were never run.

Co-Authored-By: Claude <noreply@anthropic.com>
@gwokhou gwokhou force-pushed the feat/concurrent-ingestion-recovery branch 2 times, most recently from 114abea to a48d054 Compare June 19, 2026 17:20
@gwokhou gwokhou changed the title feat: add ingestion recovery and concurrent add pipeline feat: concurrent add pipeline with crash-safe mutation recovery Jun 19, 2026
gwokhou and others added 2 commits June 20, 2026 11:21
Cleanup pass over the concurrent-ingestion + mutation-journal feature.
Behavior preserved (full suite: 820 passed).

- mutation: stream _copy_file_atomic (mkstemp + copyfileobj + fsync +
  os.replace) instead of buffering whole files; route snapshot_paths
  file backups through it so every file copy in the module is atomic and
  streaming — large raw PDFs no longer spike peak memory.
- converter: drop the dead ConvertResult.staging_dir field (only
  _PreparedAdd.staging_dir is ever read); split convert_document into a
  lock-acquiring wrapper + _convert_document_locked so the parallel
  prepare path calls the locked form directly, removing the
  assume_locked flag and its recursive self-call.
- cli: hoist a module-level logger (drop four local rebinding sites);
  unify the jobs==1 and jobs>1 commit loops into one ThreadPoolExecutor
  path; document the defensive long-doc snapshot paths.

Co-Authored-By: Claude <noreply@anthropic.com>

@KylinMountain KylinMountain left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall the design holds up well — staging isolation, serialized commit, the journal-based recovery, and the commit-time re-checks are all correct, and test coverage is solid. Nothing here blocks merge on day-to-day correctness. Three things are worth fixing first, though — #1 is the one that bites in normal use:

1. Every per-file commit deep-copies the whole KB for the rollback backupopenkb/cli.py:475, openkb/mutation.py:165

_snapshot_add_paths lists the entire wiki/concepts and wiki/entities trees plus the .openkb/files PDF blob store, and snapshot_paths copytrees them on each file's commit. A 50-file batch copies the whole KB ~50 times — O(files × corpus) I/O that scales with the KB and can fill the disk mid-batch; even adding a 2 KB .md copies the full blob store. Suggest snapshotting only the pages the doc actually touches (or hardlink/CoW), or taking the snapshot once per batch rather than once per file.

2. Publish copies bytes instead of renaming, and drops the dir-fsync + chmodopenkb/mutation.py:18

Staging is on the same filesystem as raw/ and wiki/sources/, so os.replace would be an O(1) atomic rename instead of a full streaming copy + fsync. _copy_file_atomic also omits the parent-dir fsync and fchmod that locks.atomic_write_bytes performs, so published files are less durable across a crash and inherit mkstemp's 0600 mode instead of the umask mode. Suggest os.replace with a copy fallback on EXDEV, reusing the atomic_write_bytes fsync/chmod path.

3. Interrupted batches leak staging dirsopenkb/cli.py:1067

Staging dirs are all mkdir'd up front in the futures comprehension, but _cleanup_staging only runs inside each per-file commit and there's no batch-level try/finally. A Ctrl+C, a failed file, or an mkdir error orphans the already-created add-* dirs, and recovery only scans journal/ so it never reclaims them. Suggest a batch-level cleanup, or folding staging into the recovery sweep.

Happy to send a patch for #1 if useful. The rest of what I found is rarer crash/concurrency edges and design follow-ups — fine as separate issues.

gwokhou added 2 commits June 24, 2026 21:40
…, batch cleanup

Addresses reviewer (KylinMountain) feedback on the concurrent-add /
crash-recovery PR. No day-to-day-correctness blocker; perf, durability,
and a resource leak.

VectifyAI#1 — per-file snapshot no longer copies the whole KB.
  snapshot_paths gains a hardlink_dirs opt-in: wiki/concepts, wiki/entities,
  and .openkb/files are backed up via os.link (O(1)) instead of copytree
  (EXDEV/EPERM/EACCES → copy fallback, incl. Windows OneDrive/ACL hardlink
  blocks). Per-file snapshot is now O(touched) instead of O(files × corpus),
  so the "2 KB .md copies the full blob store" case is gone. Hardlinks
  required the wiki writers (all 14 sites) to move from path.write_text
  (in-place, same inode) to atomic_write_text (temp+replace, new inode) so a
  backup keeps the old bytes while the live tree is mutated. .openkb/files is
  hardlink-safe because PageIndex only appends new {doc_id} blobs. Per-file
  failure isolation preserved (did not take the once-per-batch suggestion,
  which would roll back earlier files' shared-index changes on a later
  failure).

VectifyAI#2 — publish renames; copies are durable + umask-mode.
  publish_staged_tree → os.replace (O(1)) with EXDEV→copy fallback
  (_publish_staged_file); the rename path fsyncs the file DATA as well as the
  parent dir (parity with _copy_file_atomic, so a crash right after publish
  can't leave committed metadata pointing at un-durable bytes). _copy_file_atomic
  (snapshot/rollback/EXDEV-fallback) now fsyncs the parent dir and sets the
  umask mode instead of inheriting mkstemp's 0600, reusing locks'
  _fsync_directory/_target_mode.

VectifyAI#3 — interrupted batches no longer leak staging dirs.
  The add batch tracks its staging dirs in a try/finally that reaps any whose
  file never reached _commit_prepared_add's per-call cleanup. Batch-local
  rather than a recovery sweep, since the lock is released between per-file
  commits and a sweep could reap a mid-batch acquisition's pending dirs.

Tests: +10 covering publish rename/durability/EXDEV, hardlink snapshot +
rollback correctness, hardlink EACCES fallback (Windows), interrupted-batch
leak, a hardlink-invariant regression guard (mutation-verified), and
real-commit-path hardlink wiring. Full suite: 837 passed.
Two pre-existing resource/crash issues surfaced by the self-review of the
review-fix commit, addressed here:

recovery retry is now bounded (recover_pending_journals).
  An active journal whose rollback keeps failing (e.g. persistent ENOSPC) was
  retried on every lock acquisition forever — re-doing the failing rollback
  and never releasing the backup dir + journal. The journal now carries an
  additive "attempts" counter (default 0, backward-compatible with old
  journals); after MAX_ROLLBACK_ATTEMPTS (5) failed rollbacks recovery
  discards it with a loud "manual review needed" message, bounding on-disk
  retention instead of leaking indefinitely.

rollback of a hardlinked dir is now O(touched), not O(corpus).
  rollback() previously did rmtree(target) + copytree(backup) for every dir,
  recopying the whole tree — the .openkb/files blob store (potentially GB) on
  every failed long-doc add, and able to ENOSPC mid-rollback (feeding the
  unbounded-retry leak above). For dirs snapshotted via hardlinks, rollback
  now does an inode-diff restore (_restore_hardlinked_dir): untouched files
  already share the backup's inode and are left in place; only new/modified/
  deleted files are touched. Crash-rebuilt snapshots (no hardlink info on
  disk) keep the safe full-copy path, so the journal format is unchanged and
  recovery stays conservative.

Tests: +4 (bounded-recovery give-up; hardlinked rollback leaves untouched
files in place / removes new + restores modified / prunes new nested blob
dirs). Full suite: 841 passed.
@gwokhou

gwokhou commented Jun 24, 2026

Copy link
Copy Markdown
Contributor Author

@KylinMountain Thanks your review! All three items are addressed in the latest commits, with tests added for each change (full suite: 841 passed). I also found two further issues during self-review, noted below.

Solved issues from your review 981e491

1. Per-file snapshot cost reduced to O(touched).

wiki/concepts, wiki/entities, and .openkb/files are now snapshotted with hardlinks (os.link) rather than copytree, with EXDEV/EPERM/EACCES falling back to a copy. This removes the O(files × corpus) copy, including the case where a small add copied the entire blob store.

Notes:

  • Hardlinks require every writer into these trees to write atomically (temp + replace, yielding a new inode). The wiki writers used in-place write_text, which would invalidate a hardlink backup on rewrite; all 14 have been switched to atomic_write_text. A regression test guards this — it fails if any writer reverts to in-place writes.
  • Per-file rollback granularity is preserved. A single batch-level snapshot was not adopted because it would discard shared-index changes from earlier files when a later file fails.

One assumption worth confirming: .openkb/files relies on PageIndex appending only new {doc_id} blobs per add (verified in its local backend — fresh UUID per add, dedup returns early, no in-place rewrite). If you would prefer not to rely on that, the alternative is to track removals for new blobs in that tree.

Please share your perspective on the patch you had in mind. If a simpler approach than hardlinks plus atomic writers exists, I would prefer to adopt it.

2. Publish uses rename, durable, and umask-mode.

publish_staged_tree now uses os.replace (O(1)) with an EXDEV→copy fallback. The renamed file's data and parent directory are fsync'd, and the mode is the umask default rather than mkstemp's 0600, reusing locks._fsync_directory/_target_mode as suggested.

3. Interrupted batches reclaim their staging directories.

The batch records its staging directories and reclaims any that do not reach commit via a try/finally. It is scoped to the batch rather than folded into recovery, because the lock is released between per-file commits, and a recovery sweep could reclaim another in-flight batch's directories. Remaining limitation: a hard kill or power loss mid-batch can still orphan staging directories, since the final step does not run and recovery scans only journal/.

Solved issues from self-review 5fb53b8

  • The publish rename path initially omitted the file-data fsync that the copy path performs, leaving committed metadata referencing undurable bytes after a crash. Fixed.
  • The hardlink fallback handled only EXDEV/EPERM; on Windows, OneDrive/ACL hardlink blocks surface as EACCES and would abort the snapshot. EACCES is now included in the fallback set.

Additionally, recovery retry is now bounded (a persistently-failing rollback journal previously retried indefinitely; it now gives up after five attempts with an explicit warning), and rollback for hardlinked directories is O(touched) via inode comparison, leaving untouched files in place.

Further Discussions

I am happy to hear the follow-up suggestions you mentioned (about rarer concurrency, crashes, and design).

@gwokhou gwokhou requested a review from KylinMountain June 24, 2026 14:59
recover_pending_journals' except block referenced `snapshot` unconditionally, but it is assigned inside the try — so a corrupt/empty/stray .json (or one missing kb_dir/backup_dir) made json.loads/_snapshot_from_journal raise before the assignment. The handler then threw UnboundLocalError, which propagates out of recovery and out of every exclusive kb_lock acquisition (draining runs on first acquisition), bricking add/remove/recompile/chat for the whole KB off a single bad journal.

Initialize snapshot=None per iteration; when it is still None in the except (parse/reconstruct failed), best-effort remove the unrecoverable journal, log loudly, and continue instead of crashing. Adds a parametrized regression test over malformed-journal payloads plus a lock-acquire assertion.
@gwokhou

gwokhou commented Jun 25, 2026

Copy link
Copy Markdown
Contributor Author

One more solved issue from self-review bdef059

a malformed recovery journal no longer bricks the KB lock

While self-reviewing this PR I caught a latent crash in recover_pending_journals (openkb/mutation.py).

Bug

The except Exception handler referenced snapshot unconditionally, but snapshot is assigned inside the try (after json.loads / _snapshot_from_journal). A malformed journal — a corrupt/empty/stray .json in .openkb/journal/, or a valid JSON missing the kb_dir/backup_dir keys — makes parsing/construction raise before the assignment, so the handler itself threw UnboundLocalError:

except Exception as exc:
    snapshot.attempts += 1   # ← snapshot unbound → UnboundLocalError

Because journal draining runs on every first exclusive kb_lock acquisition (locks._drain_pending_journals), that propagated up through kb_lock: a single bad journal file would make every subsequent add / remove / recompile / chat fail with an opaque UnboundLocalError, bricking mutations for the entire KB.

Normal writes go through atomic_write_json, so the trigger for a malformed journal is uncommon (external corruption, manual edits, or a stray .json dropped in the dir) — but recovery must be robust to it regardless, and it wasn't.

Fix

Initialize snapshot = None per loop iteration, and in the except handle the snapshot is None case: best-effort unlink the unrecoverable journal, log loudly, and continue instead of crashing. The existing bounded-retry logic is left untouched for the case where snapshot is bound (a rollback/discard failure).

Test

Added a parametrized regression test (tests/test_mutation.py) over four malformed-journal payloads (empty, invalid JSON, missing keys, wrong shape), asserting that recovery returns cleanly and removes the bad journal — and, the key assertion, that the KB's mutation lock still acquires afterward. Full suite green (845 passed).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants