# AI Labs Knowledge Base (RAG) — Architecture & Data Flows

> Status: as-built, reflects the code in `api/src/core/*` and `api/src/libs/*`.
> Companion doc: [architecture-sales-dashboard.md](architecture-sales-dashboard.md) (the Sales
> system is built on top of this RAG base).

## 1. What this system is

A retrieval-augmented-generation (RAG) knowledge layer over Silk Commerce's scattered
operational data. It continuously **ingests** content from many sources into a single S3
bucket, hands that bucket to an **Amazon Bedrock Knowledge Base** (vector index), and then
**answers natural-language questions** grounded in that corpus through three surfaces:

- a **web chat** UI,
- a set of **Slack / Lark bots** (`@SilkAI` + others), and
- an **MCP server** (so external MCP clients — Claude Desktop, etc. — can query the KB).

The unifying idea: every source normalizes its content into S3 objects **plus a sidecar
`*.metadata.json`**, Bedrock indexes them, and all three surfaces retrieve with the same
metadata filter vocabulary.

## 2. High-level architecture

```
   ┌──────────── SOURCES (pull crons + push webhooks) ────────────┐
   │  Gmail   Google Drive   Redmine   Slack   Lark               │
   │  Fathom  Fireflies      HubSpot                              │
   └───────────────────────────┬─────────────────────────────────┘
                                │  normalize → object + sidecar metadata
                                ▼
                       ┌─────────────────┐        mirror/dedup rows
                       │   S3  bucket     │◄──────────────────────────┐
                       │  emails/ drive/  │                           │
                       │  redmine/ slack/ │                    ┌──────────────┐
                       │  meetings/ ...   │                    │  PostgreSQL  │
                       └────────┬─────────┘                    │  (Prisma)    │
                                │ triggerKbIngestion()         └──────────────┘
                                ▼  (debounced, 1 job at a time)
                   ┌──────────────────────────┐
                   │  Bedrock Knowledge Base  │   (S3 Vectors backend)
                   │  embeddings + metadata   │
                   └────────────┬─────────────┘
                                │ Retrieve / InvokeAgent (RetrievalFilter)
              ┌─────────────────┼──────────────────┐
              ▼                 ▼                  ▼
         Web Chat          Slack / Lark          MCP server
       (chat/service)     (slack/service)      (mcp/tools.ts)
              │                 │                  │
              ▼                 ▼                  ▼
                    Bedrock Runtime (Converse / InvokeAgent)
                         answer + inline citations
```

### Tech stack
- **Runtime:** Elysia (Bun) API, single process; entry [api/src/main.ts](../api/src/main.ts).
- **Storage:** PostgreSQL via Prisma (source-of-record mirror + sync bookkeeping); S3 (the KB
  corpus); Bedrock KB (vector index over S3).
- **AI:** Amazon Bedrock — `Retrieve`, `InvokeAgent`, `Converse`/`ConverseStream`. Default model
  is Amazon **Nova Pro** (`CHAT_MODEL_ID`, etc.); switchable to Claude via env once the Bedrock
  account has it unlocked.

## 3. The two data planes

The system deliberately keeps two parallel representations of every source:

| Plane | Where | Purpose |
|---|---|---|
| **Semantic plane** | S3 object + `*.metadata.json` → Bedrock KB | Free-text retrieval for chat/Slack/MCP. |
| **Structured plane** | Postgres mirror tables (`email_messages`, `drive_files`, `redmine_*`, `slack_messages`, `fathom_meetings`, `hubspot_*`, …) | Dedup ("have I seen this id?"), sync bookkeeping, and deterministic queries (used heavily by the Sales system). |

An ingest run typically writes **both**: the object to S3 (for retrieval) and a row to Postgres
(for dedup + structured use). The Postgres row's existence is what makes the next cron tick cheap
(skip already-ingested ids).

## 4. Ingestion

### 4.1 Module shape
Each source is a module under `api/src/core/<source>/` exposing:
- `controller.ts` — HTTP routes (status, manual "sync now", account CRUD, webhook).
- `service.ts` — fetch + normalize logic for that source's API.
- `ingest.ts` — the run loop: fetch → filter → write S3 (+ metadata) → upsert Postgres → trigger KB.
- `cron.ts` / `index.ts` — interval scheduler (`startXxxCron()` called from `main.ts`).

Sync runs are wrapped by `runSyncTask(...)` (the `task` module), which records a row in
`sync_tasks` / `sync_task_items` with per-item success/failure — that's what the **Tasks** page
and each source's status endpoint read. Stale tasks are recovered on boot
(`taskService.recoverStaleTasks()`).

### 4.2 Sources

| Source | Trigger | Notes |
|---|---|---|
| **Email** (Gmail) | 3-day cron + `/email/webhook` | Forwarded-email ingestion. Filters self-loops / auto-replies / bulk / mailing-list. Attachments (.pptx, images) are text-extracted (see §4.4). DMs the *forwarder* on Slack — never the customer (security note in [ingest.ts](../api/src/core/email/ingest.ts)). |
| **Drive** (Google) | cron | Mirrors files; non-text types extracted to `.txt`. |
| **Redmine** | cron | Projects / issues / members / users mirrored; rich Postgres model is the base for customer linking. |
| **Slack** | cron + Events webhook | Tracks selected channels; messages indexed with `client_id` / channel metadata. Also the primary **answer surface** (§6). |
| **Lark** | cron + webhook | Same shape as Slack (tracked chats). |
| **Fathom** | cron + `/fathom/webhook` | Meeting transcripts (`type: meeting`). **Owns the shared KB ingestion trigger** historically. |
| **Fireflies** | cron + `/fireflies/webhook` | Meeting transcripts (`type: fireflies`). |
| **HubSpot** | 10-min cron | Deals/companies/contacts → mirror tables + KB. Feeds the Sales system. |

Auth pattern: webhook controllers are **public** but verify a signature / shared secret;
everything else sits behind the JWT `auth` guard. Google email/drive OAuth callbacks are public
(browser popup, no Bearer token).

### 4.3 Metadata schema (the contract)
The authoritative metadata vocabulary lives as a doc-comment at the top of
[api/src/core/mcp/tools.ts](../api/src/core/mcp/tools.ts). Key points:

- **Shared fields:** `month` (YYYY-MM), `year` (YYYY), `updated_at` (ISO). `month`/`year` are
  **pre-computed at ingest** because the S3 Vectors backend supports only `equals`/`notEquals`
  (no range, no `startsWith`). "Meetings in April" → `equals(month, "2026-04")`.
- **Per-source fields:** e.g. meetings carry `type/title/recording_id/date/datetime/is_external/
  recorded_by`; slack carries `source/channel/channel_name/client_id/...`; email carries
  `source/account/from_addr/subject/date/direction/...`.
- **Hard cap:** ~10 filterable attributes per chunk in S3 Vectors. A field is promoted to
  metadata *only if it's filtered on*; everything else stays inside the embedded body text.
  (That's why Redmine omits `month`/`year`, and meeting `duration_minutes` lives in the body.)

### 4.4 Attachment / non-text handling
`api/src/libs/extractors/` converts KB-incompatible files into indexable `.txt`:
- `.pptx` → slide text + speaker notes (`extractPptxText`).
- images (`.jpg/.png/.gif/.webp`) → vision-model description + OCR (`extractImageText`).
Email and Drive both run the same `processAttachmentForKb` path so attachments are searchable.

### 4.5 KB ingestion triggering (the 409 coalescer)
[api/src/libs/bedrock/index.ts](../api/src/libs/bedrock/index.ts) `triggerKbIngestion(label)`:
Bedrock allows **only one ingestion job per data source**, and every source writes to the *same*
data source. So triggers are **coalesced**: a debounce window (`KB_INGEST_DEBOUNCE_MS`, 20s)
batches a burst of writes into one job; a `409 ConflictException` (a job is already running) is
logged quietly and a **single retry** is scheduled (`KB_INGEST_RETRY_MS`, 90s) until a slot
frees. One global timer → at most one pending job, never a storm.

## 5. Retrieval & filtering

All three surfaces retrieve from the KB with a Bedrock `RetrievalFilter`. The user-facing
abstraction is the **KB filter preset** ([api/src/core/agents/kb-filter.ts](../api/src/core/agents/kb-filter.ts)):

```ts
type KbFilterPreset = {
  slackScope?: 'currentChannel' | 'allChannels' | 'none';
  larkScope?:  'currentChat'    | 'allChats'    | 'none';
  meeting?: boolean; fireflies?: boolean; redmine?: boolean;
  hubspot?: boolean; email?: boolean; drive?: boolean;
};
```

`materializeFilter(preset, context)` expands it into a Bedrock `orAll([...])` of per-source
`equals` branches, substituting the current Slack channel / Lark chat id from `context`. If the
caller has no platform context (web UI), `currentChannel`/`currentChat` **degrade to `all*`**
rather than filtering everything out. An empty selection falls back to the permissive default
(Bedrock rejects an empty `orAll`).

Presets are stored per-agent on `slack_agents.kbFilter` (JSON), so each persona can be scoped to
a subset of sources.

## 6. Answer surfaces

### 6.1 Web chat — `chat/service.ts`
[api/src/core/chat/service.ts](../api/src/core/chat/service.ts) exposes a streaming generator
(`streamAsk`) with two backends sharing one event shape (`agent` → `citations` → `chunk*` →
`done`):

- **`bedrock` mode (default):** `InvokeAgentCommand` — full Bedrock-side orchestration (the
  console agent's prompt + tools + server-side memory keyed by `sessionId`, KB attached via
  `sessionState`). Best answer quality; "streaming" arrives in coarse chunks.
- **`stream` mode:** `RetrieveCommand` (we own retrieval) → `ConverseStreamCommand` for real
  token-by-token deltas. Uses `agent.instruction` as the system prompt, builds a `<context>`
  block of citations, carries client-supplied `history`.

Both paths: resolve the agent (default or by `agentId`), build the KB filter, apply
`extractTimeFilter` (turns "today / 本月 / Q2 / 2026-05" into a metadata equals filter + an
embedding hint — mirrors the MCP `get_meeting_summary` logic), then answer. Citations dedupe by
S3 URI and map to numeric `[1]` refs the UI renders inline.

> `surface: 'sales'` is the hook the Sales system uses to pin the `@SilkSales` agent and inject
> the structured daily snapshot — see the Sales doc.

### 6.2 Slack / Lark — `slack/service.ts`
Mention/thread/DM → `routeAgent()` classifies the question to an enabled agent via a cheap
`Converse` call (`CLASSIFIER_MODEL_ID`, `maxTokens: 30`, JSON intent), falling back to the
default agent on any failure. Then `invokeAgent()` runs `InvokeAgentCommand` with the agent's KB
filter scoped to the current channel. The answer is suffixed with `— answered by <byline>`.
`@SilkSales` (sales intent) is **excluded** from the general classifier and reached only via the
separate sales bot (see Sales doc).

### 6.3 MCP server — `mcp/tools.ts`
A standards MCP HTTP transport (key or OAuth-JWT auth) exposing four tools:
- `search_knowledge(query, source?, limit?)` — scoped or cross-source semantic search.
- `get_customer_brief(customer, focus?)` — strict `client_id` filter (Slack-only) then semantic
  fallback, merged + deduped by source.
- `get_meeting_summary(...)` — Fathom+Fireflies, with the date→`date`/`month`/`year` equals
  routing and client-side range fallback.
- `list_recent_docs(days, source?)` — lists recent S3 objects by top-level source directory.

## 7. Agents (personas)
`slack_agents` rows define personas: `intent`, `byline`, `classifierHint`, `instruction`
(system prompt), `kbFilter` preset, optional Bedrock `agentId`/`agentAliasId`, `isDefault`,
`enabled`, `sortOrder`. Managed via the **Agents** page (`agentController`). Defaults are seeded
idempotently on boot (`seedDefaultsIfMissing()`) so a fresh DB still has personas. The Eval
module (`eval/`) regression-tests agent answers against `eval_test_cases`.

## 8. Data model (RAG-relevant tables)
`email_accounts` / `email_messages`, `drive_accounts` / `drive_files`,
`redmine_accounts/projects/issues/users/roles/project_members`,
`slack_tracked_channels` / `slack_messages`, `lark_tracked_chats` / `lark_messages`,
`fathom_meetings`, `fireflies_accounts` / `fireflies_meetings`,
`hubspot_accounts/companies/contacts/deals`, `slack_agents`,
`sync_tasks` / `sync_task_items`, `eval_*`. (See [api/prisma/schema.prisma](../api/prisma/schema.prisma).)

## 9. Operational notes
- **Required env:** `DATABASE_URL`, `JWT_SECRET` (≥16). **KB:** `KB_ID`, `KB_DATA_SOURCE_ID`,
  `AGENT_ID`/`AGENT_ALIAS_ID` (for `bedrock` mode), `AWS_REGION`, AWS creds, `S3_BUCKET`. Models:
  `CHAT_MODEL_ID`, `CLASSIFIER_MODEL_ID` (default Nova Pro). MCP: `MCP_API_KEY`.
- **Single shared KB data source** → ingestion is serialized via the coalescer (§4.5). Don't
  fan out parallel `triggerKbIngestion` calls expecting parallel jobs.
- **S3 Vectors filter limits** drive several design choices (pre-computed `month`/`year`, the
  ~10-attribute cap, equals-only filtering). Keep these in mind when adding a new filterable field.
- After schema edits: `bun run prisma:generate` then `bun run prisma:push`.

## 10. End-to-end example (a Fathom meeting)
1. Cron tick (or `/fathom/webhook`) → `fathom` service fetches new recordings.
2. For each new `recording_id` not already in `fathom_meetings`: write transcript markdown to
   `s3://…/meetings/…` + a `.metadata.json` sidecar (`type:meeting`, `date`, `month`, `year`,
   `is_external`, …); upsert the Postgres row.
3. `triggerKbIngestion('fathom')` debounces; one Bedrock ingestion job indexes the new objects.
4. A user asks the web chat "what did we tell Acme about the launch date last month?" →
   `extractTimeFilter` adds `equals(month, '2026-05')`, the agent's preset adds the meeting
   branch, Bedrock retrieves the relevant chunks, Converse/InvokeAgent answers with `[1]` citing
   the meeting's S3 URI.
