diff --git a/config/config.exs b/config/config.exs index 2e6f997..4ff1fc0 100644 --- a/config/config.exs +++ b/config/config.exs @@ -237,6 +237,7 @@ config :loopctl, Oban, {"0 2 * * *", Loopctl.Workers.CostRollupWorker}, {"0 3 * * *", Loopctl.Workers.WebhookCleanupWorker}, {"0 3 * * 0", Loopctl.Workers.TokenDataArchivalWorker}, + {"0 4 * * *", Loopctl.Workers.KnowledgeLintWorker, args: %{"mode" => "all_tenants"}}, {"*/5 * * * *", Loopctl.Workers.PendingEnrollmentCleanupWorker}, {"* * * * *", Loopctl.Workers.ComputeSthWorker, args: %{"mode" => "all_tenants"}}, {"* * * * *", Loopctl.Workers.RevokeExpiredDispatchesWorker} diff --git a/lib/loopctl/workers/knowledge_lint_worker.ex b/lib/loopctl/workers/knowledge_lint_worker.ex new file mode 100644 index 0000000..b029c5f --- /dev/null +++ b/lib/loopctl/workers/knowledge_lint_worker.ex @@ -0,0 +1,136 @@ +defmodule Loopctl.Workers.KnowledgeLintWorker do + @moduledoc """ + Oban worker that runs the knowledge-wiki lint nightly and acts on the findings. + + This is the "nightly refinement" loop that both the Karpathy `llm-wiki` + pattern (lint-and-act) and the Dan Martell second-brain workflow converge on: + the lint *engine* already exists (`Loopctl.Knowledge.lint/2`); this worker is + the orchestration that RUNS it on a schedule and takes a safe, automated + repair action on the one finding that can be auto-repaired — orphans. + + ## Scheduling + + Configured via the Oban Cron plugin to run once nightly in `all_tenants` + mode, which fans out one per-tenant job per active tenant (mirroring + `Loopctl.Workers.ComputeSthWorker`). Lint is inherently per-tenant, so the + fan-out keeps each tenant's analysis in its own job (independent retries, + no cross-tenant coupling). + + ## What it does per tenant + + 1. Runs `Knowledge.lint/2` (stale, orphan, contradiction, coverage-gap and + broken-source detection). + 2. **Acts on orphans** — published articles with zero links. For each orphan + it re-enqueues `Loopctl.Workers.ArticleLinkingWorker`, which re-runs the + proven pgvector similarity pass against the *current* corpus. This is the + reason a nightly pass is valuable rather than a no-op: an article orphaned + in January (no neighbor cleared the similarity threshold at the time) can + find neighbors that were ingested months later. Re-linking is deterministic, + cheap, and makes **no** embedding-API calls (orphans missing an embedding + simply no-op in the linking worker — backfilling those is a separate + concern, out of scope here). + 3. **Surfaces all findings** via an immutable audit event + (`knowledge.lint_completed`) carrying the full lint summary, so + contradictions / coverage gaps / broken sources / stale counts are + observable in the change feed even though they are not auto-repaired + (those require human judgment). + + ## Scale safety + + Lint caps each finding array at `max_per_category` (we request the ceiling, + 500) while still reporting the *true* totals in `summary.total_per_category`. + Orphan re-link enqueues are additionally bounded by + `:knowledge_lint_max_orphan_relink` (default 500). When the true orphan count + exceeds what we act on, the gap is logged — never silently dropped — so an + operator can see that a backlog remains for the next run. + """ + + use Oban.Worker, + queue: :knowledge, + max_attempts: 3, + unique: [fields: [:worker, :args], period: 60] + + require Logger + + import Ecto.Query + + alias Loopctl.AdminRepo + alias Loopctl.Audit + alias Loopctl.Knowledge + alias Loopctl.Tenants.Tenant + alias Loopctl.Workers.ArticleLinkingWorker + + # Ask lint for the ceiling so we act on as many orphans per run as the engine + # will return; the true (pre-cap) totals still come back in the summary. + @lint_max_per_category 500 + @default_max_orphan_relink 500 + + @impl Oban.Worker + def perform(%Oban.Job{args: %{"mode" => "all_tenants"}}) do + tenant_ids = + from(t in Tenant, where: t.status == :active, select: t.id) + |> AdminRepo.all() + + for tenant_id <- tenant_ids do + %{"tenant_id" => tenant_id} + |> __MODULE__.new() + |> Oban.insert() + end + + :ok + end + + def perform(%Oban.Job{args: %{"tenant_id" => tenant_id}}) do + {:ok, report} = Knowledge.lint(tenant_id, max_per_category: @lint_max_per_category) + + relinked = enqueue_orphan_relinks(tenant_id, report) + log_audit_event(tenant_id, report, relinked) + + Logger.info( + "KnowledgeLintWorker: tenant=#{tenant_id} " <> + "issues=#{report.summary.total_issues} orphans_relinked=#{relinked}" + ) + + :ok + end + + # --- Private --- + + defp enqueue_orphan_relinks(tenant_id, report) do + max_relink = + Application.get_env(:loopctl, :knowledge_lint_max_orphan_relink, @default_max_orphan_relink) + + orphans = Enum.take(report.orphan_articles, max_relink) + true_total = report.summary.total_per_category.orphan_articles + + if true_total > length(orphans) do + Logger.warning( + "KnowledgeLintWorker: tenant=#{tenant_id} has #{true_total} orphan articles; " <> + "re-linking #{length(orphans)} this run (cap=#{max_relink}). Remainder retried next run." + ) + end + + Enum.each(orphans, fn %{article_id: article_id} -> + %{"article_id" => article_id, "tenant_id" => tenant_id} + |> ArticleLinkingWorker.new() + |> Oban.insert() + end) + + length(orphans) + end + + defp log_audit_event(tenant_id, report, relinked) do + Audit.create_log_entry(tenant_id, %{ + entity_type: "knowledge_lint", + entity_id: tenant_id, + action: "knowledge.lint_completed", + actor_type: "system", + actor_id: nil, + actor_label: "worker:knowledge_lint", + new_state: %{ + "summary" => report.summary, + "orphans_relinked" => relinked + } + }) + end +end diff --git a/test/loopctl/workers/knowledge_lint_worker_test.exs b/test/loopctl/workers/knowledge_lint_worker_test.exs new file mode 100644 index 0000000..75bcdf7 --- /dev/null +++ b/test/loopctl/workers/knowledge_lint_worker_test.exs @@ -0,0 +1,148 @@ +defmodule Loopctl.Workers.KnowledgeLintWorkerTest do + use Loopctl.DataCase, async: true + use Oban.Testing, repo: Loopctl.Repo + + setup :verify_on_exit! + + import Ecto.Query + + alias Loopctl.AdminRepo + alias Loopctl.Audit.AuditLog + alias Loopctl.Knowledge.ArticleLink + alias Loopctl.Workers.KnowledgeLintWorker + + # A published article with a known embedding vector, written directly via + # AdminRepo to bypass the inline Oban cascade (embedding -> linking) that + # `fixture(:article)` would otherwise trigger on publish. + defp published_article_with_embedding(tenant_id, embedding, attrs \\ %{}) do + base = %{ + title: "Article #{System.unique_integer([:positive])}", + body: "Test article body.", + category: :pattern, + status: :draft, + tags: [] + } + + fixture(:article, Map.merge(base, Map.put(attrs, :tenant_id, tenant_id))) + |> Ecto.Changeset.change(%{status: :published, embedding: embedding}) + |> AdminRepo.update!() + end + + # Two near-identical directional vectors -> cosine similarity ~1.0, above the + # 0.6 linking threshold. (Cosine measures direction, not magnitude.) + defp similar_embedding, do: List.duplicate(1.0, 768) ++ List.duplicate(0.0, 768) + + defp near_similar_embedding do + List.duplicate(1.0, 768) + |> List.update_at(0, fn _ -> 0.99 end) + |> List.update_at(1, fn _ -> 1.01 end) + |> Kernel.++(List.duplicate(0.01, 768)) + end + + defp lint_audit_entries(tenant_id) do + from(a in AuditLog, + where: a.tenant_id == ^tenant_id, + where: a.action == "knowledge.lint_completed" + ) + |> AdminRepo.all() + end + + describe "perform/1 per-tenant" do + test "logs a knowledge.lint_completed audit event carrying the lint summary" do + tenant = fixture(:tenant) + _article = published_article_with_embedding(tenant.id, similar_embedding()) + + assert :ok = + KnowledgeLintWorker.perform(%Oban.Job{args: %{"tenant_id" => tenant.id}}) + + assert [entry] = lint_audit_entries(tenant.id) + assert entry.actor_type == "system" + assert entry.actor_label == "worker:knowledge_lint" + # new_state is jsonb -> string keys on read + assert entry.new_state["summary"]["total_articles"] == 1 + assert is_integer(entry.new_state["summary"]["total_issues"]) + assert is_integer(entry.new_state["orphans_relinked"]) + end + + test "re-links orphan articles against the current corpus" do + tenant = fixture(:tenant) + # Two similar, published, unlinked articles -> both orphans. + a = published_article_with_embedding(tenant.id, similar_embedding()) + b = published_article_with_embedding(tenant.id, near_similar_embedding()) + + assert :ok = + KnowledgeLintWorker.perform(%Oban.Job{args: %{"tenant_id" => tenant.id}}) + + # Orphan re-link (inline Oban) ran ArticleLinkingWorker -> a relates_to + # link now connects the previously-orphaned pair. + links = + from(l in ArticleLink, + where: l.tenant_id == ^tenant.id, + where: l.relationship_type == :relates_to, + where: + (l.source_article_id == ^a.id and l.target_article_id == ^b.id) or + (l.source_article_id == ^b.id and l.target_article_id == ^a.id) + ) + |> AdminRepo.all() + + assert length(links) == 1 + + assert [entry] = lint_audit_entries(tenant.id) + # Both were orphans at lint time, so both were enqueued for re-link. + assert entry.new_state["summary"]["total_per_category"]["orphan_articles"] == 2 + assert entry.new_state["orphans_relinked"] == 2 + end + + test "handles a tenant with no published articles" do + tenant = fixture(:tenant) + + assert :ok = + KnowledgeLintWorker.perform(%Oban.Job{args: %{"tenant_id" => tenant.id}}) + + assert [entry] = lint_audit_entries(tenant.id) + assert entry.new_state["summary"]["total_articles"] == 0 + assert entry.new_state["orphans_relinked"] == 0 + end + end + + describe "perform/1 all_tenants mode" do + test "fans out and lints every active tenant, skipping inactive ones" do + active_a = fixture(:tenant) + active_b = fixture(:tenant) + suspended = fixture(:tenant, %{status: :suspended}) + + for t <- [active_a, active_b, suspended] do + published_article_with_embedding(t.id, similar_embedding()) + end + + assert :ok = + KnowledgeLintWorker.perform(%Oban.Job{args: %{"mode" => "all_tenants"}}) + + assert [_] = lint_audit_entries(active_a.id) + assert [_] = lint_audit_entries(active_b.id) + assert [] == lint_audit_entries(suspended.id) + end + end + + describe "tenant isolation" do + test "lint summary counts only the caller tenant's articles" do + tenant_a = fixture(:tenant) + tenant_b = fixture(:tenant) + + published_article_with_embedding(tenant_a.id, similar_embedding()) + published_article_with_embedding(tenant_a.id, near_similar_embedding()) + + published_article_with_embedding(tenant_b.id, similar_embedding()) + published_article_with_embedding(tenant_b.id, near_similar_embedding()) + published_article_with_embedding(tenant_b.id, similar_embedding()) + + assert :ok = + KnowledgeLintWorker.perform(%Oban.Job{args: %{"tenant_id" => tenant_a.id}}) + + assert [entry] = lint_audit_entries(tenant_a.id) + assert entry.new_state["summary"]["total_articles"] == 2 + # Tenant B was never linted. + assert [] == lint_audit_entries(tenant_b.id) + end + end +end