Event Sourcing for Graph Mutations
Append-only event streams as the system of record for all Digital Twin state changes
Table of Contents
- Architecture Overview
- Event Schema
- Event Stream Topics
- Event Flow by Layer
- Concurrency Model
- Propagation Control
- Point-in-Time Reconstruction
- State Store Resolution
- Related Documents
Architecture Overview
The event stream is the system of record. Neo4j is a read-optimized projection. Telemetry data forks to a time-series database (InfluxDB/TimescaleDB) for high-frequency sensor data.
flowchart LR
A["Agent / Device"] -->|"propose change"| B["Event Created"]
B --> C["Kafka Topic"]
C --> D["Consumer: Apply to Neo4j"]
C --> E["Event Store (immutable)"]
C --> F["Telemetry Fork → TSDB"]
style B fill:#E67E22,color:#fff
style E fill:#2C3E50,color:#fff
style F fill:#3498db,color:#fff
This design enables:
- Full audit trail — every mutation is recorded immutably
- Point-in-time reconstruction — replay events to reconstruct any historical state
- Decoupled consumers — multiple downstream systems can independently process events
- Telemetry separation — high-frequency sensor data goes to purpose-built TSDB
Event Schema
GraphEvent — Graph Mutations (L1-L4)
All changes to the artifact graph and device state flow through GraphEvent:
# digital_twin/events.py
from pydantic import BaseModel
from datetime import datetime
from enum import Enum
from typing import Optional
class EventType(str, Enum):
# Graph mutations (L1 — Digital Thread)
NODE_CREATED = "node.created"
NODE_UPDATED = "node.updated"
NODE_DELETED = "node.deleted"
EDGE_CREATED = "edge.created"
EDGE_DELETED = "edge.deleted"
PROPERTY_SET = "property.set"
# Device events (L2-L3 — Twin Instance)
DEVICE_PROVISIONED = "device.provisioned"
DEVICE_STATE_UPDATED = "device.state_updated"
DEVICE_DECOMMISSIONED = "device.decommissioned"
TELEMETRY_RECEIVED = "telemetry.received"
# Simulation events (L4)
SIMULATION_STARTED = "simulation.started"
SIMULATION_COMPLETED = "simulation.completed"
MODEL_CALIBRATED = "model.calibrated"
# Assistant Mode — human-edit ingestion
ARTIFACT_FILE_CHANGED = "artifact.file_changed"
ARTIFACT_INGESTED = "artifact.ingested"
ARTIFACT_INGEST_FAILED = "artifact.ingest_failed"
# Drift detection — file ↔ graph reconciliation
DRIFT_DETECTED = "drift.detected"
DRIFT_RESOLVED = "drift.resolved"
# Agent Chat Channel
CHAT_MESSAGE_SENT = "chat.message.sent"
CHAT_MESSAGE_CHUNK = "chat.message.chunk"
CHAT_THREAD_CREATED = "chat.thread.created"
CHAT_AGENT_TYPING = "chat.agent.typing"
class GraphEvent(BaseModel):
event_id: str # UUID
timestamp: datetime
session_id: str # Links to orchestration session
actor_id: str # Agent code, user ID, or device ID
event_type: EventType
node_type: Optional[str] = None # e.g. "Requirement", "DeviceInstance"
node_id: Optional[str] = None
edge_type: Optional[str] = None # e.g. "SATISFIED_BY", "INSTANCE_OF"
source_id: Optional[str] = None
target_id: Optional[str] = None
properties: Optional[dict] = None
previous_properties: Optional[dict] = None
expected_version: Optional[int] = None
rationale: Optional[str] = None
TelemetryEvent — High-Frequency Sensor Data (L3)
Telemetry data is separated from graph events because it has fundamentally different characteristics: high frequency, append-only, no version control needed.
class TelemetryEvent(BaseModel):
"""High-frequency sensor data — goes to TSDB, NOT Neo4j."""
event_id: str
device_id: str # DeviceInstance.serialNumber
source_id: str # TelemetrySource.id
timestamp: datetime
values: dict[str, float] # sensor_name → reading
quality: Optional[str] = None # "good", "uncertain", "bad"
Event Stream Topics
topics:
- "graph.mutations" # All graph changes (append-only)
- "graph.projections" # Materialized view updates (Neo4j sync)
- "graph.snapshots" # Periodic full-graph snapshots
- "device.telemetry" # High-frequency sensor data → TSDB
- "device.state" # Device state changes → Neo4j
- "simulation.events" # Simulation lifecycle events
- "knowledge.ingest" # Manual document ingestion requests (P1.5)
- "knowledge.indexed" # Confirmation of knowledge embedding (P1.5)
- "artifact.ingest" # Human-edit ingestion events (Assistant Mode)
- "twin.drift" # Drift detection and resolution events
- "agent.chat" # Chat messages, typing indicators, thread lifecycle
Retention Policies
| Topic | Retention | Rationale |
|---|---|---|
graph.mutations |
Infinite | Never delete — audit trail and reconstruction source |
graph.projections |
7 days | Ephemeral materialized view updates |
graph.snapshots |
90 days | 3 months of periodic full-graph snapshots |
device.telemetry |
30 days | Hot data in Kafka; older data in TSDB cold storage |
device.state |
Infinite | Device lifecycle audit trail |
simulation.events |
365 days | 1 year of simulation history |
knowledge.ingest |
30 days | Manual ingestion requests — replayable from MinIO originals |
knowledge.indexed |
7 days | Ephemeral confirmation events for dashboard updates |
agent.chat |
90 days | Conversation history for context; older threads archived to PostgreSQL |
Event Flow by Layer
L1 — Digital Thread (P1)
sequenceDiagram
participant Agent as Domain Agent
participant API as Twin API
participant Kafka as Kafka
participant Consumer as Event Consumer
participant Neo4j as Neo4j
Agent->>API: Propose node update (with expected_version)
API->>Kafka: Publish GraphEvent to "graph.mutations"
Kafka->>Consumer: Consume event
Consumer->>Neo4j: Apply with optimistic concurrency check
Neo4j-->>Consumer: Success (version incremented)
Consumer->>Kafka: Publish to "graph.projections"
L3 — Live Twin (P3)
sequenceDiagram
participant Device as Physical Device
participant MQTT as MQTT Broker
participant Router as Telemetry Router
participant Kafka as Kafka
participant TSDB as InfluxDB
participant Neo4j as Neo4j
Device->>MQTT: Publish sensor readings
MQTT->>Router: Forward telemetry
Router->>Kafka: Publish TelemetryEvent to "device.telemetry"
Router->>Kafka: Publish state change to "device.state" (if state changed)
Kafka->>TSDB: Write time-series data
Kafka->>Neo4j: Update DeviceInstance node (state changes only)
Concurrency Model
Optimistic Concurrency Control
Every graph node carries a version integer. Mutations must include expected_version:
async def apply_event(event: GraphEvent):
if event.event_type == EventType.NODE_UPDATED:
result = await neo4j.run("""
MATCH (n {id: $node_id})
WHERE n.version = $expected_version
SET n += $properties, n.version = n.version + 1, n.updatedAt = datetime()
RETURN n
""", node_id=event.node_id, expected_version=event.expected_version,
properties=event.properties)
if not result.records:
raise ConcurrencyConflict(
f"Node {event.node_id} modified by another session"
)
Write Set Declaration
Agents declare intended modifications before execution. The scheduler serializes conflicting write sets:
class TaskDispatch(BaseModel):
agent_code: str
session_id: str
write_set: list[str] # Node IDs or patterns like "BOMItem:*"
Telemetry Write Path (L3)
Telemetry data bypasses Neo4j concurrency — it writes directly to the TSDB. Only device state changes (firmware version, status, location) go through the graph event pipeline with version control.
Sensor data (100 Hz) → Kafka "device.telemetry" → TSDB (no version control needed)
Device state change → Kafka "device.state" → Neo4j (version-controlled)
Propagation Control
When a graph mutation triggers downstream effects (constraint re-evaluation, cross-domain propagation, agent reactions), the system must prevent infinite loops and oscillation.
Origin Tracking
Every event carries an origin_id — the ID of the root event that started the causal chain. Downstream events inherit this origin. The event consumer rejects any event whose origin_id matches an event already applied in the current propagation chain:
class GraphEvent(BaseModel):
# ... existing fields ...
origin_id: Optional[str] = None # Root event that started this chain
causal_depth: int = 0 # How many hops from the origin
MAX_CAUSAL_DEPTH = 10 # Hard limit on propagation chains
async def should_apply(event: GraphEvent, chain: set[str]) -> bool:
"""Reject events that would create cycles or exceed depth."""
if event.origin_id in chain:
return False # Cycle detected
if event.causal_depth > MAX_CAUSAL_DEPTH:
log.warning("Propagation depth exceeded", origin=event.origin_id)
return False
return True
Oscillation Detection
When two agents repeatedly modify the same node (Agent A sets value to X, Agent B resets to Y, triggering Agent A again), the system detects the pattern:
class OscillationDetector:
"""Tracks node mutation frequency to detect feedback loops."""
async def check(self, node_id: str, window_s: float = 60.0) -> bool:
"""Returns True if node is oscillating."""
recent = await self.get_mutations(node_id, window_s)
if len(recent) < 4:
return False
# Check if values alternate between two states
values = [e.properties for e in recent]
unique = set(map(frozenset, [v.items() for v in values if v]))
return len(unique) <= 2 and len(recent) >= 4
When oscillation is detected:
- The node is locked from further automatic mutations
- A
CONFLICTevent is published to the event bus - The orchestrator surfaces the conflict to the engineer for manual resolution
Entity Locking During Constraint Evaluation
When the constraint engine evaluates a node, that node is temporarily locked to prevent concurrent mutations from invalidating the evaluation:
class EvaluationLock:
"""Short-lived lock during constraint evaluation."""
node_id: str
session_id: str
acquired_at: datetime
ttl_ms: int = 5000 # 5 second max hold time — auto-releases
async def acquire(self, node_id: str) -> bool:
"""Non-blocking acquire. Returns False if already held."""
...
async def release(self, node_id: str) -> None:
"""Explicit release after evaluation completes."""
...
Multi-Agent Event Coordination
When multiple agents subscribe to the same event types, the orchestrator ensures orderly processing:
| Pattern | Description |
|---|---|
| Fan-out with write sets | Each agent declares its write set before processing. The scheduler serializes agents with overlapping write sets. |
| Priority ordering | Safety-critical agents (constraint validation) process events before optimization agents (cost reduction). |
| Idempotent consumers | Every event consumer must be idempotent — replaying the same event produces the same result. |
| Dead letter queue | Events that fail processing after 3 retries go to a dead letter topic for manual investigation. |
Point-in-Time Reconstruction
The append-only event stream enables reconstructing the graph at any historical point:
async def reconstruct_graph_at(timestamp: datetime) -> GraphState:
"""Replay events up to timestamp to reconstruct historical state."""
events = await event_store.read_until(timestamp)
graph = InMemoryGraph()
for event in events:
graph.apply(event)
return graph
Use cases:
- “Show me the traceability graph as it was when we passed the EVT gate”
- “Show me device FC-001’s state at the time of the anomaly”
- “What was the BOM when we submitted for FCC certification?”
- “Reconstruct the power budget calculation from 3 months ago”
Snapshot Optimization
For large graphs, periodic snapshots accelerate reconstruction:
Snapshot (every 24h) + Events since snapshot = Current state
Snapshots are published to the graph.snapshots topic and stored in MinIO.
State Store Resolution
Three separate stores with a clear consistency model:
| Data | System of Record | Rationale |
|---|---|---|
| Traceability graph | Kafka event stream → Neo4j projection | Event-sourced, auditable, reconstructable |
| Binary artifacts | MinIO | Content-addressable, immutable, S3-compatible |
| Session state | PostgreSQL (team) / SQLite (local) | Structured, queryable |
| Telemetry (L3) | Kafka → InfluxDB/TimescaleDB | Optimized for time-series queries |
| Workspace config | .forge/config.json (local) |
Only local state in .forge/ |
Migration from .forge/ flat files:
.forge/sessions/and.forge/traces/→ PostgreSQL/SQLite.forge/artifacts/→ MinIO
Related Documents
| Document | Description |
|---|---|
| Graph Schema | Node types, relationships, and Neo4j constraints |
| Constraint Engine | Engineering constraint evaluation triggered by events |
| Digital Twin Evolution | Synchronization and simulation architecture |
| AI Memory & Knowledge | Knowledge Layer consuming graph.mutations, knowledge.ingest, and knowledge.indexed topics |
| System Observability | Kafka consumer lag metrics, DLQ monitoring, oscillation detection alerting |
| Assistant Mode | Dual-mode operation — human edits and AI workflows sharing the same event pipeline |
| Agent Chat Channel | Real-time conversational interface — uses agent.chat topic and 4 new EventType values |
| Architecture Overview | System architecture and data flows |
Document Version: v1.1 Last Updated: 2026-02-28 Status: Technical Architecture Document
| ← Graph Schema | Constraint Engine → |