AI Memory & Knowledge Layer
Semantic memory that lets agents learn from past sessions, search historical decisions, and ground prompts in real project knowledge
Table of Contents
- Problem Statement
- Architecture Overview
- Vector DB Choice: pgvector
- Knowledge Types
- Embedding Pipeline
- Retrieval Patterns
- Event Synchronization
- Integration with Dual State Machine
ingest_knowledgeSkill- Pydantic Models
- Repository Structure
- Phased Delivery
- Kafka Topics
- Performance Targets
- Related Documents
Problem Statement
MetaForge agents are stateless per-session. They read context from Neo4j, run skills, produce artifacts, and forget everything. There is no vector database, no embedding store, no semantic search, and no agent memory anywhere in the current architecture. This means:
- No learning from past sessions — “last time we picked STM32F4 for a drone FC, here’s what worked/failed” is lost
- No semantic search over historical decisions, constraints, or design rationale
- No accumulated domain knowledge — component preferences, failure modes, supplier reliability
- No RAG (Retrieval-Augmented Generation) for agent prompting
- No indexed datasheets or application notes
This document defines a Knowledge Layer — a pgvector-backed semantic memory that agents use via pure skills (retrieve_knowledge, ingest_knowledge) and that stays in sync with the event stream.
Architecture Overview
The Knowledge Layer is a derived, read-only projection of the event stream and document store — architecturally identical to how Neo4j projects from Kafka. It never writes to Neo4j or Git, and is fully reconstructable from Kafka events plus MinIO documents.
flowchart TD
KAFKA["Kafka Topics<br/>graph.mutations<br/>session.completed<br/>knowledge.ingest"] --> KC["KnowledgeConsumer<br/>Event Processor"]
MINIO["MinIO<br/>PDFs, App Notes"] --> KC
KC --> ES["EmbeddingService<br/>Local or Cloud"]
ES --> PG["PostgreSQL + pgvector<br/>knowledge_embeddings table"]
AGENT["Domain Agent"] -->|"retrieve_knowledge skill"| PG
PG -->|"ranked results"| AGENT
AGENT -->|"RAG context injection"| LLM["LLM Provider"]
API["REST API<br/>GET /api/v1/knowledge/search"] --> PG
DASH["Dashboard"] --> API
style KAFKA fill:#E67E22,color:#fff
style PG fill:#27ae60,color:#fff
style AGENT fill:#9b59b6,color:#fff
style ES fill:#3498db,color:#fff
Key architectural properties:
- Zero new infrastructure — PostgreSQL already exists for session state;
CREATE EXTENSION vector;adds pgvector - Hybrid queries — structured
WHEREfilters + vector cosine distance in a single SQL query - ACID transactions — consistent writes, no eventual consistency concerns
- Local-first — no cloud dependency for embedding (local model as default)
- Fully reconstructable — replay Kafka + re-index MinIO to rebuild from scratch
Vector DB Choice: pgvector
Why pgvector over a dedicated vector DB
| Criterion | pgvector (PostgreSQL) | Dedicated (Qdrant, Weaviate) |
|---|---|---|
| New infrastructure | None — PostgreSQL already deployed | New service to deploy, monitor, backup |
| Hybrid queries | Native SQL WHERE + vector distance |
Separate filter syntax, varies by vendor |
| ACID transactions | Full PostgreSQL transactions | Eventually consistent (most vendors) |
| Local-first | Ships with PostgreSQL | Extra container |
| Operational cost | Zero incremental | Separate backup, upgrade, scaling |
| Scale ceiling | ~10M vectors with HNSW | ~100M+ vectors |
Decision: pgvector for P1.5 through P3. If vector count exceeds 10M or query latency exceeds 50ms at the 95th percentile, migrate to Qdrant (P4 option).
Schema
CREATE EXTENSION IF NOT EXISTS vector;
CREATE TABLE knowledge_embeddings (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
knowledge_type VARCHAR(50) NOT NULL, -- 'design_decision', 'component_rationale', etc.
source_event_id VARCHAR(64), -- Kafka event ID that created this
source_node_id VARCHAR(64), -- Neo4j node ID (if applicable)
domain VARCHAR(30), -- 'electrical', 'mechanical', 'firmware', 'system'
phase VARCHAR(10), -- 'P1', 'P2', etc.
project_id VARCHAR(64), -- Project scope
content_text TEXT NOT NULL, -- Human-readable text that was embedded
embedding vector(1536) NOT NULL, -- Vector embedding
metadata JSONB DEFAULT '{}', -- Flexible metadata
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
);
-- HNSW index for approximate nearest neighbor search
CREATE INDEX idx_knowledge_embedding_hnsw
ON knowledge_embeddings
USING hnsw (embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- Structured filter indexes
CREATE INDEX idx_knowledge_type ON knowledge_embeddings (knowledge_type);
CREATE INDEX idx_knowledge_domain ON knowledge_embeddings (domain);
CREATE INDEX idx_knowledge_project ON knowledge_embeddings (project_id);
CREATE INDEX idx_knowledge_source_node ON knowledge_embeddings (source_node_id);
CREATE INDEX idx_knowledge_metadata ON knowledge_embeddings USING gin (metadata);
Hybrid Query Example
-- "Find design decisions about power management in the electrical domain"
SELECT id, knowledge_type, content_text, metadata,
1 - (embedding <=> $query_embedding) AS similarity
FROM knowledge_embeddings
WHERE knowledge_type = 'design_decision'
AND domain = 'electrical'
AND project_id = $project_id
ORDER BY embedding <=> $query_embedding
LIMIT 10;
Knowledge Types
Seven types of knowledge are indexed, each with a specific Kafka trigger and content template.
| Type | Source | Kafka Trigger | Phase |
|---|---|---|---|
| Design Decisions | DecisionRecord nodes |
node.created / node.updated on graph.mutations |
P1.5 |
| Component Selection Rationale | BOMItem + Session traces |
node.created on graph.mutations |
P1.5 |
| Failure Modes & Lessons | TestExecution (failed) + CAPARecord |
node.created on graph.mutations |
P2 |
| Constraint Violations | ConstraintViolation results |
node.updated on graph.mutations |
P2 |
| Session Summaries | Agent session traces | session.completed |
P1.5 |
| Datasheet Chunks | PDFs in MinIO | knowledge.ingest (manual) |
P2.5 |
| Application Notes | Markdown/docs in MinIO | knowledge.ingest (manual) |
P2.5 |
Content Templates
Each knowledge type has a structured template that produces high-quality embedding text:
# digital_twin/knowledge/templates.py
TEMPLATES = {
"design_decision": """
Design Decision: {title}
Status: {status} | Date: {date} | Domain: {domain}
Context: {context}
Options Considered: {options}
Outcome: {outcome}
Rationale: {rationale}
""",
"component_rationale": """
Component Selection: {mpn} ({manufacturer})
For: {design_element_name} | Domain: {domain}
Package: {package} | Unit Price: ${unit_price}
Selection Rationale: {rationale}
Key Specifications: {specifications}
Alternatives Considered: {alternatives}
""",
"failure_mode": """
Failure: {test_name} — {status}
Procedure: {procedure_name} | Category: {category}
Root Cause: {root_cause}
Corrective Action: {action}
Lesson Learned: {lesson}
Affected Components: {components}
""",
"constraint_violation": """
Constraint Violation: {constraint_name}
Domain: {domain} | Severity: {severity}
Expected: {expected} | Actual: {actual}
Affected Nodes: {affected_nodes}
Resolution: {resolution}
""",
"session_summary": """
Session: {skill} by {agent_code}
Duration: {duration} | Status: {status}
Input Summary: {input_summary}
Output Summary: {output_summary}
Key Decisions: {decisions}
Nodes Modified: {modified_nodes}
""",
"datasheet_chunk": """
Datasheet: {part_number} — {manufacturer}
Section: {section_title}
Content: {chunk_text}
""",
"application_note": """
Application Note: {title}
Source: {filename} | Section: {section_title}
Content: {chunk_text}
Tags: {tags}
""",
}
Embedding Pipeline
Architecture
flowchart LR
subgraph "Event Sources"
K["Kafka Consumer"]
M["MinIO Watcher"]
end
subgraph "Processing Pipeline"
KC["KnowledgeConsumer<br/>Route by event type"]
T["Template Renderer<br/>Structured text"]
CH["Document Chunker<br/>512 tokens, 64 overlap"]
end
subgraph "Embedding"
ES["EmbeddingService"]
LOCAL["Local: all-MiniLM-L6-v2<br/>384 dims"]
CLOUD["Cloud: text-embedding-3-small<br/>1536 dims"]
end
subgraph "Storage"
PG["pgvector<br/>knowledge_embeddings"]
end
K --> KC
M --> KC
KC -->|graph events| T
KC -->|documents| CH
T --> ES
CH --> ES
ES --> LOCAL
ES --> CLOUD
LOCAL --> PG
CLOUD --> PG
style K fill:#E67E22,color:#fff
style PG fill:#27ae60,color:#fff
style ES fill:#3498db,color:#fff
KnowledgeConsumer
The KnowledgeConsumer is a Kafka consumer subscribing to three topics:
# digital_twin/knowledge/consumer.py
class KnowledgeConsumer:
"""Kafka consumer that processes events into knowledge embeddings."""
SUBSCRIPTIONS = [
"graph.mutations", # Node created/updated events
"session.completed", # Agent session completion
"knowledge.ingest", # Manual document ingestion requests
]
async def process_event(self, event: GraphEvent) -> None:
"""Route event to appropriate handler based on type."""
if event.event_type in (EventType.NODE_CREATED, EventType.NODE_UPDATED):
await self._handle_node_event(event)
elif event.event_type == "session.completed":
await self._handle_session_completed(event)
elif event.event_type == "knowledge.ingest":
await self._handle_document_ingestion(event)
async def _handle_node_event(self, event: GraphEvent) -> None:
"""Index graph node changes as knowledge."""
handler = self.NODE_HANDLERS.get(event.node_type)
if handler:
content = handler.render_template(event)
embedding = await self.embedding_service.embed(content)
await self.store.upsert(
knowledge_type=handler.knowledge_type,
source_event_id=event.event_id,
source_node_id=event.node_id,
domain=event.properties.get("domain"),
content_text=content,
embedding=embedding,
metadata=handler.extract_metadata(event),
)
NODE_HANDLERS = {
"DecisionRecord": DecisionHandler(),
"BOMItem": ComponentRationaleHandler(),
"TestExecution": FailureModeHandler(),
"CAPARecord": FailureModeHandler(),
"ConstraintViolation": ConstraintViolationHandler(),
}
EmbeddingService
An abstraction over local and cloud embedding providers:
# digital_twin/knowledge/embedding_service.py
class EmbeddingService:
"""Unified embedding interface — local or cloud."""
def __init__(self, provider: Literal["local", "openai"] = "local"):
self.provider = provider
if provider == "local":
from sentence_transformers import SentenceTransformer
self.model = SentenceTransformer("all-MiniLM-L6-v2")
self.dimensions = 384
else:
import openai
self.client = openai.OpenAI()
self.dimensions = 1536
async def embed(self, text: str) -> list[float]:
"""Generate embedding vector for text."""
if self.provider == "local":
return self.model.encode(text).tolist()
else:
response = await self.client.embeddings.create(
model="text-embedding-3-small",
input=text,
)
return response.data[0].embedding
async def embed_batch(self, texts: list[str]) -> list[list[float]]:
"""Batch embedding for efficiency."""
if self.provider == "local":
return self.model.encode(texts).tolist()
else:
response = await self.client.embeddings.create(
model="text-embedding-3-small",
input=texts,
)
return [d.embedding for d in response.data]
Embedding model selection:
| Model | Dimensions | Speed | Quality | Use Case |
|---|---|---|---|---|
all-MiniLM-L6-v2 |
384 | Fast (local) | Good | Default — local-first, no API key needed |
text-embedding-3-small |
1536 | Medium (API) | Better | Cloud option — higher quality for production |
The pgvector schema uses
vector(1536)to accommodate the larger cloud model. When using the 384-dim local model, vectors are zero-padded to 1536. On model migration, run the re-embedding batch job.
Document Chunker
For PDFs and markdown documents (datasheets, application notes):
# digital_twin/knowledge/chunker.py
class DocumentChunker:
"""Split documents into overlapping chunks for embedding."""
def __init__(
self,
chunk_size: int = 512, # tokens per chunk
chunk_overlap: int = 64, # overlapping tokens between chunks
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
def chunk_text(self, text: str, metadata: dict) -> list[ChunkResult]:
"""Split text into overlapping chunks with metadata."""
tokens = self.tokenize(text)
chunks = []
start = 0
while start < len(tokens):
end = min(start + self.chunk_size, len(tokens))
chunk_text = self.detokenize(tokens[start:end])
chunks.append(ChunkResult(
text=chunk_text,
chunk_index=len(chunks),
start_token=start,
end_token=end,
metadata={**metadata, "chunk_index": len(chunks)},
))
start += self.chunk_size - self.chunk_overlap
return chunks
def chunk_pdf(self, pdf_path: str, metadata: dict) -> list[ChunkResult]:
"""Extract text from PDF and chunk it."""
text = self.extract_pdf_text(pdf_path)
return self.chunk_text(text, metadata)
Retrieval Patterns
1. retrieve_knowledge Skill
A pure skill (no side effects) that agents call to search historical knowledge:
# domain_agents/shared/skills/retrieve_knowledge/handler.py
class RetrieveKnowledgeInput(BaseModel):
query: str # Natural language query
knowledge_types: Optional[list[str]] = None # Filter by type
domain: Optional[str] = None # Filter by domain
project_id: Optional[str] = None # Filter by project
top_k: int = 5 # Number of results
similarity_threshold: float = 0.7 # Minimum similarity
class RetrieveKnowledgeOutput(BaseModel):
results: list[KnowledgeResult]
query_embedding_time_ms: float
search_time_ms: float
class KnowledgeResult(BaseModel):
id: str
knowledge_type: str
content_text: str
similarity: float
domain: Optional[str]
metadata: dict
class RetrieveKnowledgeSkill(Skill):
"""Pure skill: semantic search over project knowledge."""
async def execute(self, input: RetrieveKnowledgeInput) -> RetrieveKnowledgeOutput:
query_embedding = await self.embedding_service.embed(input.query)
results = await self.store.search(
embedding=query_embedding,
knowledge_types=input.knowledge_types,
domain=input.domain,
project_id=input.project_id,
top_k=input.top_k,
similarity_threshold=input.similarity_threshold,
)
return RetrieveKnowledgeOutput(results=results, ...)
2. RAG for Agent Prompting
Agents call retrieve_knowledge before LLM calls and inject historical context into prompts:
# Example: BOM Agent using RAG
async def select_component(self, requirements: dict) -> BOMItem:
# Step 1: Retrieve relevant historical knowledge
knowledge = await self.run_skill("retrieve_knowledge", {
"query": f"component selection for {requirements['function']} "
f"in {requirements['domain']}",
"knowledge_types": ["component_rationale", "failure_mode"],
"domain": requirements["domain"],
"top_k": 5,
})
# Step 2: Build prompt with RAG context
prompt = f"""Select the best component for: {requirements['description']}
## Historical Knowledge
{self._format_knowledge_context(knowledge.results)}
## Current Requirements
{json.dumps(requirements, indent=2)}
Based on the historical knowledge above and current requirements,
select and justify a component.
"""
# Step 3: Call LLM with enriched context
response = await self.llm.complete(prompt)
return self.parse_bom_item(response)
def _format_knowledge_context(self, results: list[KnowledgeResult]) -> str:
"""Format knowledge results for prompt injection."""
if not results:
return "_No relevant historical knowledge found._"
sections = []
for r in results:
sections.append(
f"### {r.knowledge_type} (similarity: {r.similarity:.2f})\n"
f"{r.content_text}"
)
return "\n\n".join(sections)
3. REST API
# digital_twin/knowledge/api.py
# GET /api/v1/knowledge/search
@router.get("/search")
async def search_knowledge(
query: str,
knowledge_types: Optional[list[str]] = Query(None),
domain: Optional[str] = None,
project_id: Optional[str] = None,
top_k: int = 10,
similarity_threshold: float = 0.6,
) -> KnowledgeSearchResponse:
"""Semantic search over project knowledge."""
embedding = await embedding_service.embed(query)
results = await store.search(
embedding=embedding,
knowledge_types=knowledge_types,
domain=domain,
project_id=project_id,
top_k=top_k,
similarity_threshold=similarity_threshold,
)
return KnowledgeSearchResponse(results=results, total=len(results))
Event Synchronization
Real-Time Pipeline
The KnowledgeConsumer processes events from Kafka in real time, targeting < 2s end-to-end latency from event publication to searchable embedding:
Event published → Kafka → KnowledgeConsumer → Template render → Embed → pgvector INSERT
(~100ms) (~200ms) (~50ms)
Periodic Reconciliation
A daily job compares Neo4j node counts against pgvector counts to detect drift:
# digital_twin/knowledge/reconciler.py
class KnowledgeReconciler:
"""Daily reconciliation between Neo4j and pgvector."""
RECONCILE_TYPES = {
"DecisionRecord": "design_decision",
"BOMItem": "component_rationale",
"TestExecution": "failure_mode",
"CAPARecord": "failure_mode",
}
async def reconcile(self) -> ReconciliationReport:
"""Compare Neo4j node counts with pgvector embedding counts."""
report = ReconciliationReport()
for node_type, knowledge_type in self.RECONCILE_TYPES.items():
neo4j_count = await self.neo4j.count_nodes(node_type)
pgvector_count = await self.store.count_by_type(knowledge_type)
if neo4j_count != pgvector_count:
report.add_drift(node_type, neo4j_count, pgvector_count)
await self._backfill_missing(node_type, knowledge_type)
return report
Re-Embedding Batch Job
When the embedding model changes (e.g., switching from local to cloud, or upgrading model version), a batch job re-embeds all existing knowledge:
async def re_embed_all(self, batch_size: int = 100) -> int:
"""Re-embed all knowledge entries with current model. Run on model change."""
total = 0
offset = 0
while True:
entries = await self.store.fetch_batch(offset=offset, limit=batch_size)
if not entries:
break
texts = [e.content_text for e in entries]
embeddings = await self.embedding_service.embed_batch(texts)
await self.store.update_embeddings(
ids=[e.id for e in entries],
embeddings=embeddings,
)
total += len(entries)
offset += batch_size
return total
Integration with Dual State Machine
The Knowledge Layer fits into the existing Dual State Machine as a third projection — a derived, read-only view alongside Neo4j and Git:
flowchart TD
KAFKA["Kafka Event Stream<br/>(System of Record)"] --> NEO4J["Neo4j Projection<br/>Semantic State"]
KAFKA --> GIT["Git Projection<br/>Artifact State"]
KAFKA --> PG["pgvector Projection<br/>Knowledge State"]
MINIO["MinIO Documents"] --> PG
AGENT["Domain Agents"] -->|"read graph"| NEO4J
AGENT -->|"read artifacts"| GIT
AGENT -->|"retrieve_knowledge"| PG
style KAFKA fill:#E67E22,color:#fff
style NEO4J fill:#2C3E50,color:#fff
style GIT fill:#27ae60,color:#fff
style PG fill:#3498db,color:#fff
style AGENT fill:#9b59b6,color:#fff
| Property | Neo4j Projection | Git Projection | Knowledge Projection (pgvector) |
|---|---|---|---|
| Data | Entity graph | Artifact files | Embedded knowledge fragments |
| Writes to | Neo4j | Git repo | PostgreSQL |
| Reads from | Kafka graph.mutations |
Kafka (projection trigger) | Kafka + MinIO |
| Query pattern | Cypher traversal | File path | Semantic similarity + filters |
| Reconstructable | Replay events | Replay projections | Replay events + re-index docs |
| Update frequency | Every mutation | On explicit projection | Every mutation + manual ingest |
Key constraint: The Knowledge Layer never writes to Neo4j or Git. It is a consumer-only projection. This preserves the existing event-sourcing architecture and avoids circular dependencies.
ingest_knowledge Skill
For manual document ingestion (datasheets, application notes):
# domain_agents/shared/skills/ingest_knowledge/handler.py
class IngestKnowledgeInput(BaseModel):
source_url: str # MinIO object path
knowledge_type: Literal["datasheet_chunk", "application_note"]
metadata: dict # part_number, manufacturer, tags, etc.
class IngestKnowledgeOutput(BaseModel):
fragments_created: int
source_id: str
class IngestKnowledgeSkill(Skill):
"""Pure skill: chunk and embed a document from MinIO."""
async def execute(self, input: IngestKnowledgeInput) -> IngestKnowledgeOutput:
# Download document from MinIO
document = await self.minio.get_object(input.source_url)
# Chunk the document
chunks = self.chunker.chunk_pdf(document, input.metadata)
# Embed and store all chunks
texts = [c.text for c in chunks]
embeddings = await self.embedding_service.embed_batch(texts)
for chunk, embedding in zip(chunks, embeddings):
await self.store.insert(
knowledge_type=input.knowledge_type,
content_text=chunk.text,
embedding=embedding,
metadata=chunk.metadata,
)
return IngestKnowledgeOutput(
fragments_created=len(chunks),
source_id=input.source_url,
)
Pydantic Models
# digital_twin/knowledge/models.py
from pydantic import BaseModel
from datetime import datetime
from typing import Optional
class KnowledgeEntry(BaseModel):
"""A single knowledge embedding in pgvector."""
id: str
knowledge_type: str
source_event_id: Optional[str] = None
source_node_id: Optional[str] = None
domain: Optional[str] = None
phase: Optional[str] = None
project_id: Optional[str] = None
content_text: str
metadata: dict = {}
created_at: datetime
updated_at: datetime
class KnowledgeSearchRequest(BaseModel):
"""Search request for the knowledge API."""
query: str
knowledge_types: Optional[list[str]] = None
domain: Optional[str] = None
project_id: Optional[str] = None
top_k: int = 10
similarity_threshold: float = 0.6
class KnowledgeSearchResult(BaseModel):
"""A single search result with similarity score."""
id: str
knowledge_type: str
content_text: str
similarity: float
domain: Optional[str] = None
metadata: dict = {}
class KnowledgeSearchResponse(BaseModel):
"""Response from the knowledge search API."""
results: list[KnowledgeSearchResult]
total: int
query_time_ms: float
class ReconciliationReport(BaseModel):
"""Report from the daily reconciliation job."""
timestamp: datetime
drifts: list[DriftEntry] = []
backfilled: int = 0
class DriftEntry(BaseModel):
"""A single drift between Neo4j and pgvector."""
node_type: str
neo4j_count: int
pgvector_count: int
difference: int
class ChunkResult(BaseModel):
"""A single chunk from document chunking."""
text: str
chunk_index: int
start_token: int
end_token: int
metadata: dict = {}
Repository Structure
digital_twin/
├── knowledge/ # NEW — Knowledge Layer
│ ├── store.py # pgvector CRUD + semantic search
│ ├── embedding_service.py # Local + cloud embedding abstraction
│ ├── consumer.py # Kafka event processor (KnowledgeConsumer)
│ ├── reconciler.py # Periodic reconciliation (Neo4j ↔ pgvector)
│ ├── chunker.py # Document chunking (PDF/markdown)
│ ├── templates.py # Embedding content templates per knowledge type
│ ├── models.py # Pydantic models
│ └── api.py # Knowledge query REST API
domain_agents/
└── shared/ # NEW — Shared cross-agent skills
└── skills/
├── retrieve_knowledge/ # Semantic search skill (pure, read-only)
│ ├── definition.json
│ ├── SKILL.md
│ ├── schema.py
│ ├── handler.py
│ └── tests.py
└── ingest_knowledge/ # Document ingestion skill (pure)
├── definition.json
├── SKILL.md
├── schema.py
├── handler.py
└── tests.py
See Repository Structure for the full platform layout.
Phased Delivery
| Phase | Timeline | Deliverable |
|---|---|---|
| P1.5 | Months 5-6 | pgvector setup, local embedding (all-MiniLM-L6-v2), retrieve_knowledge skill, design decision + session summary indexing |
| P2 | Months 7-10 | All 5 graph knowledge types indexed, RAG integrated in 5+ agents, search API, dashboard knowledge panel |
| P2.5 | Months 10-12 | Datasheet chunking, application note indexing, ingest_knowledge skill, manual ingestion API |
| P3 | Months 13-18 | Cross-project knowledge sharing, supplier reliability patterns, knowledge quality scoring |
| P4 | Months 21-24+ | Fleet knowledge aggregation, anomaly pattern embeddings, optional Qdrant migration, knowledge marketplace |
Phase Dependencies
flowchart LR
P1["P1: Digital Thread<br/>Months 1-6"] --> P15["P1.5: Knowledge Layer<br/>Months 5-6"]
P15 --> P2["P2: Full Indexing + RAG<br/>Months 7-10"]
P2 --> P25["P2.5: Document Ingestion<br/>Months 10-12"]
P25 --> P3["P3: Cross-Project<br/>Months 13-18"]
P3 --> P4["P4: Fleet Knowledge<br/>Months 21-24+"]
style P1 fill:#27ae60,color:#fff
style P15 fill:#2ecc71,color:#fff
style P2 fill:#3498db,color:#fff
style P25 fill:#3498db,color:#fff
style P3 fill:#9b59b6,color:#fff
style P4 fill:#E67E22,color:#fff
P1.5 Deliverables (Detail)
P1.5 overlaps with the tail of P1 (months 5-6) and delivers the foundational knowledge infrastructure:
- PostgreSQL pgvector extension —
CREATE EXTENSION vector;on the existing session state database knowledge_embeddingstable — schema, HNSW index, filter indexesEmbeddingService— local model (all-MiniLM-L6-v2) as default, cloud model as configuration optionKnowledgeConsumer— Kafka consumer forgraph.mutationsandsession.completed- Two knowledge types indexed:
design_decision(fromDecisionRecordnodes) andsession_summary(from session completion events) retrieve_knowledgeskill — pure skill available to all agents- Basic RAG integration — one agent (BOM Agent) demonstrates knowledge-augmented prompting
Kafka Topics
Two new Kafka topics support the Knowledge Layer:
| Topic | Purpose | Publisher | Consumer | Retention |
|---|---|---|---|---|
knowledge.ingest |
Manual document ingestion requests | API / CLI | KnowledgeConsumer |
30 days |
knowledge.indexed |
Confirmation that knowledge was embedded | KnowledgeConsumer |
Dashboard (optional) | 7 days |
See Event Sourcing for the complete topic list and retention policies.
Performance Targets
| Operation | Target | Notes |
|---|---|---|
| Embedding (local) | < 50ms per text | all-MiniLM-L6-v2 on CPU |
| Embedding (cloud) | < 200ms per text | OpenAI API call |
| Semantic search (10 results) | < 100ms | pgvector HNSW with filters |
| End-to-end event → searchable | < 2s | Kafka → embed → INSERT |
| Batch re-embedding (1000 entries) | < 60s | Local model, batch processing |
| Reconciliation job | < 5 min | Daily, full scan |
Related Documents
| Document | Description |
|---|---|
| Event Sourcing | Event stream architecture, Kafka topics, and the knowledge.ingest/knowledge.indexed topics |
| Graph Schema | Node types that feed into knowledge embeddings, plus the KnowledgeSource node type |
| Digital Twin Evolution | Phased delivery timeline (P1.5 knowledge layer) and technology stack |
| Repository Structure | digital_twin/knowledge/ and domain_agents/shared/skills/ directory layout |
| Constraint Engine | Constraint violations that become knowledge entries |
| Architecture Overview | System architecture showing knowledge layer in the data flow |
Document Version: v1.0 Last Updated: 2026-02-28 Status: Technical Architecture Document
| ← Digital Twin Evolution | Architecture Home → |