Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ config :loopctl, Oban,
{"0 2 * * *", Loopctl.Workers.CostRollupWorker},
{"0 3 * * *", Loopctl.Workers.WebhookCleanupWorker},
{"0 3 * * 0", Loopctl.Workers.TokenDataArchivalWorker},
{"0 4 * * *", Loopctl.Workers.KnowledgeLintWorker, args: %{"mode" => "all_tenants"}},
{"*/5 * * * *", Loopctl.Workers.PendingEnrollmentCleanupWorker},
{"* * * * *", Loopctl.Workers.ComputeSthWorker, args: %{"mode" => "all_tenants"}},
{"* * * * *", Loopctl.Workers.RevokeExpiredDispatchesWorker}
Expand Down
136 changes: 136 additions & 0 deletions lib/loopctl/workers/knowledge_lint_worker.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
defmodule Loopctl.Workers.KnowledgeLintWorker do
@moduledoc """
Oban worker that runs the knowledge-wiki lint nightly and acts on the findings.

This is the "nightly refinement" loop that both the Karpathy `llm-wiki`
pattern (lint-and-act) and the Dan Martell second-brain workflow converge on:
the lint *engine* already exists (`Loopctl.Knowledge.lint/2`); this worker is
the orchestration that RUNS it on a schedule and takes a safe, automated
repair action on the one finding that can be auto-repaired — orphans.

## Scheduling

Configured via the Oban Cron plugin to run once nightly in `all_tenants`
mode, which fans out one per-tenant job per active tenant (mirroring
`Loopctl.Workers.ComputeSthWorker`). Lint is inherently per-tenant, so the
fan-out keeps each tenant's analysis in its own job (independent retries,
no cross-tenant coupling).

## What it does per tenant

1. Runs `Knowledge.lint/2` (stale, orphan, contradiction, coverage-gap and
broken-source detection).
2. **Acts on orphans** — published articles with zero links. For each orphan
it re-enqueues `Loopctl.Workers.ArticleLinkingWorker`, which re-runs the
proven pgvector similarity pass against the *current* corpus. This is the
reason a nightly pass is valuable rather than a no-op: an article orphaned
in January (no neighbor cleared the similarity threshold at the time) can
find neighbors that were ingested months later. Re-linking is deterministic,
cheap, and makes **no** embedding-API calls (orphans missing an embedding
simply no-op in the linking worker — backfilling those is a separate
concern, out of scope here).
3. **Surfaces all findings** via an immutable audit event
(`knowledge.lint_completed`) carrying the full lint summary, so
contradictions / coverage gaps / broken sources / stale counts are
observable in the change feed even though they are not auto-repaired
(those require human judgment).

## Scale safety

Lint caps each finding array at `max_per_category` (we request the ceiling,
500) while still reporting the *true* totals in `summary.total_per_category`.
Orphan re-link enqueues are additionally bounded by
`:knowledge_lint_max_orphan_relink` (default 500). When the true orphan count
exceeds what we act on, the gap is logged — never silently dropped — so an
operator can see that a backlog remains for the next run.
"""

use Oban.Worker,
queue: :knowledge,
max_attempts: 3,
unique: [fields: [:worker, :args], period: 60]

require Logger

import Ecto.Query

alias Loopctl.AdminRepo
alias Loopctl.Audit
alias Loopctl.Knowledge
alias Loopctl.Tenants.Tenant
alias Loopctl.Workers.ArticleLinkingWorker

# Ask lint for the ceiling so we act on as many orphans per run as the engine
# will return; the true (pre-cap) totals still come back in the summary.
@lint_max_per_category 500
@default_max_orphan_relink 500

@impl Oban.Worker
def perform(%Oban.Job{args: %{"mode" => "all_tenants"}}) do
tenant_ids =
from(t in Tenant, where: t.status == :active, select: t.id)
|> AdminRepo.all()

for tenant_id <- tenant_ids do
%{"tenant_id" => tenant_id}
|> __MODULE__.new()
|> Oban.insert()
end

:ok
end

def perform(%Oban.Job{args: %{"tenant_id" => tenant_id}}) do
{:ok, report} = Knowledge.lint(tenant_id, max_per_category: @lint_max_per_category)

relinked = enqueue_orphan_relinks(tenant_id, report)
log_audit_event(tenant_id, report, relinked)

Logger.info(
"KnowledgeLintWorker: tenant=#{tenant_id} " <>
"issues=#{report.summary.total_issues} orphans_relinked=#{relinked}"
)

:ok
end

# --- Private ---

defp enqueue_orphan_relinks(tenant_id, report) do
max_relink =
Application.get_env(:loopctl, :knowledge_lint_max_orphan_relink, @default_max_orphan_relink)

orphans = Enum.take(report.orphan_articles, max_relink)
true_total = report.summary.total_per_category.orphan_articles

if true_total > length(orphans) do
Logger.warning(
"KnowledgeLintWorker: tenant=#{tenant_id} has #{true_total} orphan articles; " <>
"re-linking #{length(orphans)} this run (cap=#{max_relink}). Remainder retried next run."
)
end

Enum.each(orphans, fn %{article_id: article_id} ->
%{"article_id" => article_id, "tenant_id" => tenant_id}
|> ArticleLinkingWorker.new()
|> Oban.insert()
end)

length(orphans)
end

defp log_audit_event(tenant_id, report, relinked) do
Audit.create_log_entry(tenant_id, %{
entity_type: "knowledge_lint",
entity_id: tenant_id,
action: "knowledge.lint_completed",
actor_type: "system",
actor_id: nil,
actor_label: "worker:knowledge_lint",
new_state: %{
"summary" => report.summary,
"orphans_relinked" => relinked
}
})
end
end
148 changes: 148 additions & 0 deletions test/loopctl/workers/knowledge_lint_worker_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
defmodule Loopctl.Workers.KnowledgeLintWorkerTest do
use Loopctl.DataCase, async: true
use Oban.Testing, repo: Loopctl.Repo

setup :verify_on_exit!

import Ecto.Query

alias Loopctl.AdminRepo
alias Loopctl.Audit.AuditLog
alias Loopctl.Knowledge.ArticleLink
alias Loopctl.Workers.KnowledgeLintWorker

# A published article with a known embedding vector, written directly via
# AdminRepo to bypass the inline Oban cascade (embedding -> linking) that
# `fixture(:article)` would otherwise trigger on publish.
defp published_article_with_embedding(tenant_id, embedding, attrs \\ %{}) do
base = %{
title: "Article #{System.unique_integer([:positive])}",
body: "Test article body.",
category: :pattern,
status: :draft,
tags: []
}

fixture(:article, Map.merge(base, Map.put(attrs, :tenant_id, tenant_id)))
|> Ecto.Changeset.change(%{status: :published, embedding: embedding})
|> AdminRepo.update!()
end

# Two near-identical directional vectors -> cosine similarity ~1.0, above the
# 0.6 linking threshold. (Cosine measures direction, not magnitude.)
defp similar_embedding, do: List.duplicate(1.0, 768) ++ List.duplicate(0.0, 768)

defp near_similar_embedding do
List.duplicate(1.0, 768)
|> List.update_at(0, fn _ -> 0.99 end)
|> List.update_at(1, fn _ -> 1.01 end)
|> Kernel.++(List.duplicate(0.01, 768))
end

defp lint_audit_entries(tenant_id) do
from(a in AuditLog,
where: a.tenant_id == ^tenant_id,
where: a.action == "knowledge.lint_completed"
)
|> AdminRepo.all()
end

describe "perform/1 per-tenant" do
test "logs a knowledge.lint_completed audit event carrying the lint summary" do
tenant = fixture(:tenant)
_article = published_article_with_embedding(tenant.id, similar_embedding())

assert :ok =
KnowledgeLintWorker.perform(%Oban.Job{args: %{"tenant_id" => tenant.id}})

assert [entry] = lint_audit_entries(tenant.id)
assert entry.actor_type == "system"
assert entry.actor_label == "worker:knowledge_lint"
# new_state is jsonb -> string keys on read
assert entry.new_state["summary"]["total_articles"] == 1
assert is_integer(entry.new_state["summary"]["total_issues"])
assert is_integer(entry.new_state["orphans_relinked"])
end

test "re-links orphan articles against the current corpus" do
tenant = fixture(:tenant)
# Two similar, published, unlinked articles -> both orphans.
a = published_article_with_embedding(tenant.id, similar_embedding())
b = published_article_with_embedding(tenant.id, near_similar_embedding())

assert :ok =
KnowledgeLintWorker.perform(%Oban.Job{args: %{"tenant_id" => tenant.id}})

# Orphan re-link (inline Oban) ran ArticleLinkingWorker -> a relates_to
# link now connects the previously-orphaned pair.
links =
from(l in ArticleLink,
where: l.tenant_id == ^tenant.id,
where: l.relationship_type == :relates_to,
where:
(l.source_article_id == ^a.id and l.target_article_id == ^b.id) or
(l.source_article_id == ^b.id and l.target_article_id == ^a.id)
)
|> AdminRepo.all()

assert length(links) == 1

assert [entry] = lint_audit_entries(tenant.id)
# Both were orphans at lint time, so both were enqueued for re-link.
assert entry.new_state["summary"]["total_per_category"]["orphan_articles"] == 2
assert entry.new_state["orphans_relinked"] == 2
end

test "handles a tenant with no published articles" do
tenant = fixture(:tenant)

assert :ok =
KnowledgeLintWorker.perform(%Oban.Job{args: %{"tenant_id" => tenant.id}})

assert [entry] = lint_audit_entries(tenant.id)
assert entry.new_state["summary"]["total_articles"] == 0
assert entry.new_state["orphans_relinked"] == 0
end
end

describe "perform/1 all_tenants mode" do
test "fans out and lints every active tenant, skipping inactive ones" do
active_a = fixture(:tenant)
active_b = fixture(:tenant)
suspended = fixture(:tenant, %{status: :suspended})

for t <- [active_a, active_b, suspended] do
published_article_with_embedding(t.id, similar_embedding())
end

assert :ok =
KnowledgeLintWorker.perform(%Oban.Job{args: %{"mode" => "all_tenants"}})

assert [_] = lint_audit_entries(active_a.id)
assert [_] = lint_audit_entries(active_b.id)
assert [] == lint_audit_entries(suspended.id)
end
end

describe "tenant isolation" do
test "lint summary counts only the caller tenant's articles" do
tenant_a = fixture(:tenant)
tenant_b = fixture(:tenant)

published_article_with_embedding(tenant_a.id, similar_embedding())
published_article_with_embedding(tenant_a.id, near_similar_embedding())

published_article_with_embedding(tenant_b.id, similar_embedding())
published_article_with_embedding(tenant_b.id, near_similar_embedding())
published_article_with_embedding(tenant_b.id, similar_embedding())

assert :ok =
KnowledgeLintWorker.perform(%Oban.Job{args: %{"tenant_id" => tenant_a.id}})

assert [entry] = lint_audit_entries(tenant_a.id)
assert entry.new_state["summary"]["total_articles"] == 2
# Tenant B was never linted.
assert [] == lint_audit_entries(tenant_b.id)
end
end
end
Loading