AI Memory & Knowledge Layer

Semantic memory that lets agents learn from past sessions, search historical decisions, and ground prompts in real project knowledge

Table of Contents

  1. Problem Statement
  2. Architecture Overview
  3. Vector DB Choice: pgvector
    1. Why pgvector over a dedicated vector DB
    2. Schema
    3. Hybrid Query Example
  4. Knowledge Types
    1. Content Templates
  5. Embedding Pipeline
    1. Architecture
    2. KnowledgeConsumer
    3. EmbeddingService
    4. Document Chunker
  6. Retrieval Patterns
    1. 1. retrieve_knowledge Skill
    2. 2. RAG for Agent Prompting
    3. 3. REST API
  7. Event Synchronization
    1. Real-Time Pipeline
    2. Periodic Reconciliation
    3. Re-Embedding Batch Job
  8. Integration with Dual State Machine
  9. ingest_knowledge Skill
  10. Pydantic Models
  11. Repository Structure
  12. Phased Delivery
    1. Phase Dependencies
    2. P1.5 Deliverables (Detail)
  13. Kafka Topics
  14. Performance Targets
  15. Related Documents

Problem Statement

MetaForge agents are stateless per-session. They read context from Neo4j, run skills, produce artifacts, and forget everything. There is no vector database, no embedding store, no semantic search, and no agent memory anywhere in the current architecture. This means:

  • No learning from past sessions — “last time we picked STM32F4 for a drone FC, here’s what worked/failed” is lost
  • No semantic search over historical decisions, constraints, or design rationale
  • No accumulated domain knowledge — component preferences, failure modes, supplier reliability
  • No RAG (Retrieval-Augmented Generation) for agent prompting
  • No indexed datasheets or application notes

This document defines a Knowledge Layer — a pgvector-backed semantic memory that agents use via pure skills (retrieve_knowledge, ingest_knowledge) and that stays in sync with the event stream.


Architecture Overview

The Knowledge Layer is a derived, read-only projection of the event stream and document store — architecturally identical to how Neo4j projects from Kafka. It never writes to Neo4j or Git, and is fully reconstructable from Kafka events plus MinIO documents.

flowchart TD
    KAFKA["Kafka Topics<br/>graph.mutations<br/>session.completed<br/>knowledge.ingest"] --> KC["KnowledgeConsumer<br/>Event Processor"]
    MINIO["MinIO<br/>PDFs, App Notes"] --> KC
    KC --> ES["EmbeddingService<br/>Local or Cloud"]
    ES --> PG["PostgreSQL + pgvector<br/>knowledge_embeddings table"]

    AGENT["Domain Agent"] -->|"retrieve_knowledge skill"| PG
    PG -->|"ranked results"| AGENT
    AGENT -->|"RAG context injection"| LLM["LLM Provider"]

    API["REST API<br/>GET /api/v1/knowledge/search"] --> PG
    DASH["Dashboard"] --> API

    style KAFKA fill:#E67E22,color:#fff
    style PG fill:#27ae60,color:#fff
    style AGENT fill:#9b59b6,color:#fff
    style ES fill:#3498db,color:#fff

Key architectural properties:

  • Zero new infrastructure — PostgreSQL already exists for session state; CREATE EXTENSION vector; adds pgvector
  • Hybrid queries — structured WHERE filters + vector cosine distance in a single SQL query
  • ACID transactions — consistent writes, no eventual consistency concerns
  • Local-first — no cloud dependency for embedding (local model as default)
  • Fully reconstructable — replay Kafka + re-index MinIO to rebuild from scratch

Vector DB Choice: pgvector

Why pgvector over a dedicated vector DB

Criterion pgvector (PostgreSQL) Dedicated (Qdrant, Weaviate)
New infrastructure None — PostgreSQL already deployed New service to deploy, monitor, backup
Hybrid queries Native SQL WHERE + vector distance Separate filter syntax, varies by vendor
ACID transactions Full PostgreSQL transactions Eventually consistent (most vendors)
Local-first Ships with PostgreSQL Extra container
Operational cost Zero incremental Separate backup, upgrade, scaling
Scale ceiling ~10M vectors with HNSW ~100M+ vectors

Decision: pgvector for P1.5 through P3. If vector count exceeds 10M or query latency exceeds 50ms at the 95th percentile, migrate to Qdrant (P4 option).

Schema

CREATE EXTENSION IF NOT EXISTS vector;

CREATE TABLE knowledge_embeddings (
    id              UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    knowledge_type  VARCHAR(50) NOT NULL,      -- 'design_decision', 'component_rationale', etc.
    source_event_id VARCHAR(64),               -- Kafka event ID that created this
    source_node_id  VARCHAR(64),               -- Neo4j node ID (if applicable)
    domain          VARCHAR(30),               -- 'electrical', 'mechanical', 'firmware', 'system'
    phase           VARCHAR(10),               -- 'P1', 'P2', etc.
    project_id      VARCHAR(64),               -- Project scope
    content_text    TEXT NOT NULL,              -- Human-readable text that was embedded
    embedding       vector(1536) NOT NULL,     -- Vector embedding
    metadata        JSONB DEFAULT '{}',        -- Flexible metadata
    created_at      TIMESTAMPTZ DEFAULT now(),
    updated_at      TIMESTAMPTZ DEFAULT now()
);

-- HNSW index for approximate nearest neighbor search
CREATE INDEX idx_knowledge_embedding_hnsw
    ON knowledge_embeddings
    USING hnsw (embedding vector_cosine_ops)
    WITH (m = 16, ef_construction = 64);

-- Structured filter indexes
CREATE INDEX idx_knowledge_type ON knowledge_embeddings (knowledge_type);
CREATE INDEX idx_knowledge_domain ON knowledge_embeddings (domain);
CREATE INDEX idx_knowledge_project ON knowledge_embeddings (project_id);
CREATE INDEX idx_knowledge_source_node ON knowledge_embeddings (source_node_id);
CREATE INDEX idx_knowledge_metadata ON knowledge_embeddings USING gin (metadata);

Hybrid Query Example

-- "Find design decisions about power management in the electrical domain"
SELECT id, knowledge_type, content_text, metadata,
       1 - (embedding <=> $query_embedding) AS similarity
FROM knowledge_embeddings
WHERE knowledge_type = 'design_decision'
  AND domain = 'electrical'
  AND project_id = $project_id
ORDER BY embedding <=> $query_embedding
LIMIT 10;

Knowledge Types

Seven types of knowledge are indexed, each with a specific Kafka trigger and content template.

Type Source Kafka Trigger Phase
Design Decisions DecisionRecord nodes node.created / node.updated on graph.mutations P1.5
Component Selection Rationale BOMItem + Session traces node.created on graph.mutations P1.5
Failure Modes & Lessons TestExecution (failed) + CAPARecord node.created on graph.mutations P2
Constraint Violations ConstraintViolation results node.updated on graph.mutations P2
Session Summaries Agent session traces session.completed P1.5
Datasheet Chunks PDFs in MinIO knowledge.ingest (manual) P2.5
Application Notes Markdown/docs in MinIO knowledge.ingest (manual) P2.5

Content Templates

Each knowledge type has a structured template that produces high-quality embedding text:

# digital_twin/knowledge/templates.py

TEMPLATES = {
    "design_decision": """
Design Decision: {title}
Status: {status} | Date: {date} | Domain: {domain}
Context: {context}
Options Considered: {options}
Outcome: {outcome}
Rationale: {rationale}
""",
    "component_rationale": """
Component Selection: {mpn} ({manufacturer})
For: {design_element_name} | Domain: {domain}
Package: {package} | Unit Price: ${unit_price}
Selection Rationale: {rationale}
Key Specifications: {specifications}
Alternatives Considered: {alternatives}
""",
    "failure_mode": """
Failure: {test_name} — {status}
Procedure: {procedure_name} | Category: {category}
Root Cause: {root_cause}
Corrective Action: {action}
Lesson Learned: {lesson}
Affected Components: {components}
""",
    "constraint_violation": """
Constraint Violation: {constraint_name}
Domain: {domain} | Severity: {severity}
Expected: {expected} | Actual: {actual}
Affected Nodes: {affected_nodes}
Resolution: {resolution}
""",
    "session_summary": """
Session: {skill} by {agent_code}
Duration: {duration} | Status: {status}
Input Summary: {input_summary}
Output Summary: {output_summary}
Key Decisions: {decisions}
Nodes Modified: {modified_nodes}
""",
    "datasheet_chunk": """
Datasheet: {part_number} — {manufacturer}
Section: {section_title}
Content: {chunk_text}
""",
    "application_note": """
Application Note: {title}
Source: {filename} | Section: {section_title}
Content: {chunk_text}
Tags: {tags}
""",
}

Embedding Pipeline

Architecture

flowchart LR
    subgraph "Event Sources"
        K["Kafka Consumer"]
        M["MinIO Watcher"]
    end

    subgraph "Processing Pipeline"
        KC["KnowledgeConsumer<br/>Route by event type"]
        T["Template Renderer<br/>Structured text"]
        CH["Document Chunker<br/>512 tokens, 64 overlap"]
    end

    subgraph "Embedding"
        ES["EmbeddingService"]
        LOCAL["Local: all-MiniLM-L6-v2<br/>384 dims"]
        CLOUD["Cloud: text-embedding-3-small<br/>1536 dims"]
    end

    subgraph "Storage"
        PG["pgvector<br/>knowledge_embeddings"]
    end

    K --> KC
    M --> KC
    KC -->|graph events| T
    KC -->|documents| CH
    T --> ES
    CH --> ES
    ES --> LOCAL
    ES --> CLOUD
    LOCAL --> PG
    CLOUD --> PG

    style K fill:#E67E22,color:#fff
    style PG fill:#27ae60,color:#fff
    style ES fill:#3498db,color:#fff

KnowledgeConsumer

The KnowledgeConsumer is a Kafka consumer subscribing to three topics:

# digital_twin/knowledge/consumer.py

class KnowledgeConsumer:
    """Kafka consumer that processes events into knowledge embeddings."""

    SUBSCRIPTIONS = [
        "graph.mutations",      # Node created/updated events
        "session.completed",    # Agent session completion
        "knowledge.ingest",     # Manual document ingestion requests
    ]

    async def process_event(self, event: GraphEvent) -> None:
        """Route event to appropriate handler based on type."""
        if event.event_type in (EventType.NODE_CREATED, EventType.NODE_UPDATED):
            await self._handle_node_event(event)
        elif event.event_type == "session.completed":
            await self._handle_session_completed(event)
        elif event.event_type == "knowledge.ingest":
            await self._handle_document_ingestion(event)

    async def _handle_node_event(self, event: GraphEvent) -> None:
        """Index graph node changes as knowledge."""
        handler = self.NODE_HANDLERS.get(event.node_type)
        if handler:
            content = handler.render_template(event)
            embedding = await self.embedding_service.embed(content)
            await self.store.upsert(
                knowledge_type=handler.knowledge_type,
                source_event_id=event.event_id,
                source_node_id=event.node_id,
                domain=event.properties.get("domain"),
                content_text=content,
                embedding=embedding,
                metadata=handler.extract_metadata(event),
            )

    NODE_HANDLERS = {
        "DecisionRecord": DecisionHandler(),
        "BOMItem": ComponentRationaleHandler(),
        "TestExecution": FailureModeHandler(),
        "CAPARecord": FailureModeHandler(),
        "ConstraintViolation": ConstraintViolationHandler(),
    }

EmbeddingService

An abstraction over local and cloud embedding providers:

# digital_twin/knowledge/embedding_service.py

class EmbeddingService:
    """Unified embedding interface — local or cloud."""

    def __init__(self, provider: Literal["local", "openai"] = "local"):
        self.provider = provider
        if provider == "local":
            from sentence_transformers import SentenceTransformer
            self.model = SentenceTransformer("all-MiniLM-L6-v2")
            self.dimensions = 384
        else:
            import openai
            self.client = openai.OpenAI()
            self.dimensions = 1536

    async def embed(self, text: str) -> list[float]:
        """Generate embedding vector for text."""
        if self.provider == "local":
            return self.model.encode(text).tolist()
        else:
            response = await self.client.embeddings.create(
                model="text-embedding-3-small",
                input=text,
            )
            return response.data[0].embedding

    async def embed_batch(self, texts: list[str]) -> list[list[float]]:
        """Batch embedding for efficiency."""
        if self.provider == "local":
            return self.model.encode(texts).tolist()
        else:
            response = await self.client.embeddings.create(
                model="text-embedding-3-small",
                input=texts,
            )
            return [d.embedding for d in response.data]

Embedding model selection:

Model Dimensions Speed Quality Use Case
all-MiniLM-L6-v2 384 Fast (local) Good Default — local-first, no API key needed
text-embedding-3-small 1536 Medium (API) Better Cloud option — higher quality for production

The pgvector schema uses vector(1536) to accommodate the larger cloud model. When using the 384-dim local model, vectors are zero-padded to 1536. On model migration, run the re-embedding batch job.

Document Chunker

For PDFs and markdown documents (datasheets, application notes):

# digital_twin/knowledge/chunker.py

class DocumentChunker:
    """Split documents into overlapping chunks for embedding."""

    def __init__(
        self,
        chunk_size: int = 512,      # tokens per chunk
        chunk_overlap: int = 64,     # overlapping tokens between chunks
    ):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap

    def chunk_text(self, text: str, metadata: dict) -> list[ChunkResult]:
        """Split text into overlapping chunks with metadata."""
        tokens = self.tokenize(text)
        chunks = []
        start = 0
        while start < len(tokens):
            end = min(start + self.chunk_size, len(tokens))
            chunk_text = self.detokenize(tokens[start:end])
            chunks.append(ChunkResult(
                text=chunk_text,
                chunk_index=len(chunks),
                start_token=start,
                end_token=end,
                metadata={**metadata, "chunk_index": len(chunks)},
            ))
            start += self.chunk_size - self.chunk_overlap
        return chunks

    def chunk_pdf(self, pdf_path: str, metadata: dict) -> list[ChunkResult]:
        """Extract text from PDF and chunk it."""
        text = self.extract_pdf_text(pdf_path)
        return self.chunk_text(text, metadata)

Retrieval Patterns

1. retrieve_knowledge Skill

A pure skill (no side effects) that agents call to search historical knowledge:

# domain_agents/shared/skills/retrieve_knowledge/handler.py

class RetrieveKnowledgeInput(BaseModel):
    query: str                                       # Natural language query
    knowledge_types: Optional[list[str]] = None      # Filter by type
    domain: Optional[str] = None                     # Filter by domain
    project_id: Optional[str] = None                 # Filter by project
    top_k: int = 5                                   # Number of results
    similarity_threshold: float = 0.7                # Minimum similarity

class RetrieveKnowledgeOutput(BaseModel):
    results: list[KnowledgeResult]
    query_embedding_time_ms: float
    search_time_ms: float

class KnowledgeResult(BaseModel):
    id: str
    knowledge_type: str
    content_text: str
    similarity: float
    domain: Optional[str]
    metadata: dict

class RetrieveKnowledgeSkill(Skill):
    """Pure skill: semantic search over project knowledge."""

    async def execute(self, input: RetrieveKnowledgeInput) -> RetrieveKnowledgeOutput:
        query_embedding = await self.embedding_service.embed(input.query)
        results = await self.store.search(
            embedding=query_embedding,
            knowledge_types=input.knowledge_types,
            domain=input.domain,
            project_id=input.project_id,
            top_k=input.top_k,
            similarity_threshold=input.similarity_threshold,
        )
        return RetrieveKnowledgeOutput(results=results, ...)

2. RAG for Agent Prompting

Agents call retrieve_knowledge before LLM calls and inject historical context into prompts:

# Example: BOM Agent using RAG
async def select_component(self, requirements: dict) -> BOMItem:
    # Step 1: Retrieve relevant historical knowledge
    knowledge = await self.run_skill("retrieve_knowledge", {
        "query": f"component selection for {requirements['function']} "
                 f"in {requirements['domain']}",
        "knowledge_types": ["component_rationale", "failure_mode"],
        "domain": requirements["domain"],
        "top_k": 5,
    })

    # Step 2: Build prompt with RAG context
    prompt = f"""Select the best component for: {requirements['description']}

## Historical Knowledge
{self._format_knowledge_context(knowledge.results)}

## Current Requirements
{json.dumps(requirements, indent=2)}

Based on the historical knowledge above and current requirements,
select and justify a component.
"""

    # Step 3: Call LLM with enriched context
    response = await self.llm.complete(prompt)
    return self.parse_bom_item(response)

def _format_knowledge_context(self, results: list[KnowledgeResult]) -> str:
    """Format knowledge results for prompt injection."""
    if not results:
        return "_No relevant historical knowledge found._"
    sections = []
    for r in results:
        sections.append(
            f"### {r.knowledge_type} (similarity: {r.similarity:.2f})\n"
            f"{r.content_text}"
        )
    return "\n\n".join(sections)

3. REST API

# digital_twin/knowledge/api.py

# GET /api/v1/knowledge/search
@router.get("/search")
async def search_knowledge(
    query: str,
    knowledge_types: Optional[list[str]] = Query(None),
    domain: Optional[str] = None,
    project_id: Optional[str] = None,
    top_k: int = 10,
    similarity_threshold: float = 0.6,
) -> KnowledgeSearchResponse:
    """Semantic search over project knowledge."""
    embedding = await embedding_service.embed(query)
    results = await store.search(
        embedding=embedding,
        knowledge_types=knowledge_types,
        domain=domain,
        project_id=project_id,
        top_k=top_k,
        similarity_threshold=similarity_threshold,
    )
    return KnowledgeSearchResponse(results=results, total=len(results))

Event Synchronization

Real-Time Pipeline

The KnowledgeConsumer processes events from Kafka in real time, targeting < 2s end-to-end latency from event publication to searchable embedding:

Event published → Kafka → KnowledgeConsumer → Template render → Embed → pgvector INSERT
                                              (~100ms)          (~200ms)   (~50ms)

Periodic Reconciliation

A daily job compares Neo4j node counts against pgvector counts to detect drift:

# digital_twin/knowledge/reconciler.py

class KnowledgeReconciler:
    """Daily reconciliation between Neo4j and pgvector."""

    RECONCILE_TYPES = {
        "DecisionRecord": "design_decision",
        "BOMItem": "component_rationale",
        "TestExecution": "failure_mode",
        "CAPARecord": "failure_mode",
    }

    async def reconcile(self) -> ReconciliationReport:
        """Compare Neo4j node counts with pgvector embedding counts."""
        report = ReconciliationReport()
        for node_type, knowledge_type in self.RECONCILE_TYPES.items():
            neo4j_count = await self.neo4j.count_nodes(node_type)
            pgvector_count = await self.store.count_by_type(knowledge_type)
            if neo4j_count != pgvector_count:
                report.add_drift(node_type, neo4j_count, pgvector_count)
                await self._backfill_missing(node_type, knowledge_type)
        return report

Re-Embedding Batch Job

When the embedding model changes (e.g., switching from local to cloud, or upgrading model version), a batch job re-embeds all existing knowledge:

async def re_embed_all(self, batch_size: int = 100) -> int:
    """Re-embed all knowledge entries with current model. Run on model change."""
    total = 0
    offset = 0
    while True:
        entries = await self.store.fetch_batch(offset=offset, limit=batch_size)
        if not entries:
            break
        texts = [e.content_text for e in entries]
        embeddings = await self.embedding_service.embed_batch(texts)
        await self.store.update_embeddings(
            ids=[e.id for e in entries],
            embeddings=embeddings,
        )
        total += len(entries)
        offset += batch_size
    return total

Integration with Dual State Machine

The Knowledge Layer fits into the existing Dual State Machine as a third projection — a derived, read-only view alongside Neo4j and Git:

flowchart TD
    KAFKA["Kafka Event Stream<br/>(System of Record)"] --> NEO4J["Neo4j Projection<br/>Semantic State"]
    KAFKA --> GIT["Git Projection<br/>Artifact State"]
    KAFKA --> PG["pgvector Projection<br/>Knowledge State"]
    MINIO["MinIO Documents"] --> PG

    AGENT["Domain Agents"] -->|"read graph"| NEO4J
    AGENT -->|"read artifacts"| GIT
    AGENT -->|"retrieve_knowledge"| PG

    style KAFKA fill:#E67E22,color:#fff
    style NEO4J fill:#2C3E50,color:#fff
    style GIT fill:#27ae60,color:#fff
    style PG fill:#3498db,color:#fff
    style AGENT fill:#9b59b6,color:#fff
Property Neo4j Projection Git Projection Knowledge Projection (pgvector)
Data Entity graph Artifact files Embedded knowledge fragments
Writes to Neo4j Git repo PostgreSQL
Reads from Kafka graph.mutations Kafka (projection trigger) Kafka + MinIO
Query pattern Cypher traversal File path Semantic similarity + filters
Reconstructable Replay events Replay projections Replay events + re-index docs
Update frequency Every mutation On explicit projection Every mutation + manual ingest

Key constraint: The Knowledge Layer never writes to Neo4j or Git. It is a consumer-only projection. This preserves the existing event-sourcing architecture and avoids circular dependencies.


ingest_knowledge Skill

For manual document ingestion (datasheets, application notes):

# domain_agents/shared/skills/ingest_knowledge/handler.py

class IngestKnowledgeInput(BaseModel):
    source_url: str              # MinIO object path
    knowledge_type: Literal["datasheet_chunk", "application_note"]
    metadata: dict               # part_number, manufacturer, tags, etc.

class IngestKnowledgeOutput(BaseModel):
    fragments_created: int
    source_id: str

class IngestKnowledgeSkill(Skill):
    """Pure skill: chunk and embed a document from MinIO."""

    async def execute(self, input: IngestKnowledgeInput) -> IngestKnowledgeOutput:
        # Download document from MinIO
        document = await self.minio.get_object(input.source_url)

        # Chunk the document
        chunks = self.chunker.chunk_pdf(document, input.metadata)

        # Embed and store all chunks
        texts = [c.text for c in chunks]
        embeddings = await self.embedding_service.embed_batch(texts)
        for chunk, embedding in zip(chunks, embeddings):
            await self.store.insert(
                knowledge_type=input.knowledge_type,
                content_text=chunk.text,
                embedding=embedding,
                metadata=chunk.metadata,
            )

        return IngestKnowledgeOutput(
            fragments_created=len(chunks),
            source_id=input.source_url,
        )

Pydantic Models

# digital_twin/knowledge/models.py

from pydantic import BaseModel
from datetime import datetime
from typing import Optional

class KnowledgeEntry(BaseModel):
    """A single knowledge embedding in pgvector."""
    id: str
    knowledge_type: str
    source_event_id: Optional[str] = None
    source_node_id: Optional[str] = None
    domain: Optional[str] = None
    phase: Optional[str] = None
    project_id: Optional[str] = None
    content_text: str
    metadata: dict = {}
    created_at: datetime
    updated_at: datetime

class KnowledgeSearchRequest(BaseModel):
    """Search request for the knowledge API."""
    query: str
    knowledge_types: Optional[list[str]] = None
    domain: Optional[str] = None
    project_id: Optional[str] = None
    top_k: int = 10
    similarity_threshold: float = 0.6

class KnowledgeSearchResult(BaseModel):
    """A single search result with similarity score."""
    id: str
    knowledge_type: str
    content_text: str
    similarity: float
    domain: Optional[str] = None
    metadata: dict = {}

class KnowledgeSearchResponse(BaseModel):
    """Response from the knowledge search API."""
    results: list[KnowledgeSearchResult]
    total: int
    query_time_ms: float

class ReconciliationReport(BaseModel):
    """Report from the daily reconciliation job."""
    timestamp: datetime
    drifts: list[DriftEntry] = []
    backfilled: int = 0

class DriftEntry(BaseModel):
    """A single drift between Neo4j and pgvector."""
    node_type: str
    neo4j_count: int
    pgvector_count: int
    difference: int

class ChunkResult(BaseModel):
    """A single chunk from document chunking."""
    text: str
    chunk_index: int
    start_token: int
    end_token: int
    metadata: dict = {}

Repository Structure

digital_twin/
├── knowledge/                     # NEW — Knowledge Layer
│   ├── store.py                   # pgvector CRUD + semantic search
│   ├── embedding_service.py       # Local + cloud embedding abstraction
│   ├── consumer.py                # Kafka event processor (KnowledgeConsumer)
│   ├── reconciler.py              # Periodic reconciliation (Neo4j ↔ pgvector)
│   ├── chunker.py                 # Document chunking (PDF/markdown)
│   ├── templates.py               # Embedding content templates per knowledge type
│   ├── models.py                  # Pydantic models
│   └── api.py                     # Knowledge query REST API

domain_agents/
└── shared/                        # NEW — Shared cross-agent skills
    └── skills/
        ├── retrieve_knowledge/    # Semantic search skill (pure, read-only)
        │   ├── definition.json
        │   ├── SKILL.md
        │   ├── schema.py
        │   ├── handler.py
        │   └── tests.py
        └── ingest_knowledge/      # Document ingestion skill (pure)
            ├── definition.json
            ├── SKILL.md
            ├── schema.py
            ├── handler.py
            └── tests.py

See Repository Structure for the full platform layout.


Phased Delivery

Phase Timeline Deliverable
P1.5 Months 5-6 pgvector setup, local embedding (all-MiniLM-L6-v2), retrieve_knowledge skill, design decision + session summary indexing
P2 Months 7-10 All 5 graph knowledge types indexed, RAG integrated in 5+ agents, search API, dashboard knowledge panel
P2.5 Months 10-12 Datasheet chunking, application note indexing, ingest_knowledge skill, manual ingestion API
P3 Months 13-18 Cross-project knowledge sharing, supplier reliability patterns, knowledge quality scoring
P4 Months 21-24+ Fleet knowledge aggregation, anomaly pattern embeddings, optional Qdrant migration, knowledge marketplace

Phase Dependencies

flowchart LR
    P1["P1: Digital Thread<br/>Months 1-6"] --> P15["P1.5: Knowledge Layer<br/>Months 5-6"]
    P15 --> P2["P2: Full Indexing + RAG<br/>Months 7-10"]
    P2 --> P25["P2.5: Document Ingestion<br/>Months 10-12"]
    P25 --> P3["P3: Cross-Project<br/>Months 13-18"]
    P3 --> P4["P4: Fleet Knowledge<br/>Months 21-24+"]

    style P1 fill:#27ae60,color:#fff
    style P15 fill:#2ecc71,color:#fff
    style P2 fill:#3498db,color:#fff
    style P25 fill:#3498db,color:#fff
    style P3 fill:#9b59b6,color:#fff
    style P4 fill:#E67E22,color:#fff

P1.5 Deliverables (Detail)

P1.5 overlaps with the tail of P1 (months 5-6) and delivers the foundational knowledge infrastructure:

  1. PostgreSQL pgvector extensionCREATE EXTENSION vector; on the existing session state database
  2. knowledge_embeddings table — schema, HNSW index, filter indexes
  3. EmbeddingService — local model (all-MiniLM-L6-v2) as default, cloud model as configuration option
  4. KnowledgeConsumer — Kafka consumer for graph.mutations and session.completed
  5. Two knowledge types indexed: design_decision (from DecisionRecord nodes) and session_summary (from session completion events)
  6. retrieve_knowledge skill — pure skill available to all agents
  7. Basic RAG integration — one agent (BOM Agent) demonstrates knowledge-augmented prompting

Kafka Topics

Two new Kafka topics support the Knowledge Layer:

Topic Purpose Publisher Consumer Retention
knowledge.ingest Manual document ingestion requests API / CLI KnowledgeConsumer 30 days
knowledge.indexed Confirmation that knowledge was embedded KnowledgeConsumer Dashboard (optional) 7 days

See Event Sourcing for the complete topic list and retention policies.


Performance Targets

Operation Target Notes
Embedding (local) < 50ms per text all-MiniLM-L6-v2 on CPU
Embedding (cloud) < 200ms per text OpenAI API call
Semantic search (10 results) < 100ms pgvector HNSW with filters
End-to-end event → searchable < 2s Kafka → embed → INSERT
Batch re-embedding (1000 entries) < 60s Local model, batch processing
Reconciliation job < 5 min Daily, full scan

Document Description
Event Sourcing Event stream architecture, Kafka topics, and the knowledge.ingest/knowledge.indexed topics
Graph Schema Node types that feed into knowledge embeddings, plus the KnowledgeSource node type
Digital Twin Evolution Phased delivery timeline (P1.5 knowledge layer) and technology stack
Repository Structure digital_twin/knowledge/ and domain_agents/shared/skills/ directory layout
Constraint Engine Constraint violations that become knowledge entries
Architecture Overview System architecture showing knowledge layer in the data flow

Document Version: v1.0 Last Updated: 2026-02-28 Status: Technical Architecture Document

← Digital Twin Evolution Architecture Home →