Event Sourcing for Graph Mutations

Append-only event streams as the system of record for all Digital Twin state changes

Table of Contents

  1. Architecture Overview
  2. Event Schema
    1. GraphEvent — Graph Mutations (L1-L4)
    2. TelemetryEvent — High-Frequency Sensor Data (L3)
  3. Event Stream Topics
    1. Retention Policies
  4. Event Flow by Layer
    1. L1 — Digital Thread (P1)
    2. L3 — Live Twin (P3)
  5. Concurrency Model
    1. Optimistic Concurrency Control
    2. Write Set Declaration
    3. Telemetry Write Path (L3)
  6. Propagation Control
    1. Origin Tracking
    2. Oscillation Detection
    3. Entity Locking During Constraint Evaluation
    4. Multi-Agent Event Coordination
  7. Point-in-Time Reconstruction
    1. Snapshot Optimization
  8. State Store Resolution
  9. Related Documents

Architecture Overview

The event stream is the system of record. Neo4j is a read-optimized projection. Telemetry data forks to a time-series database (InfluxDB/TimescaleDB) for high-frequency sensor data.

flowchart LR
    A["Agent / Device"] -->|"propose change"| B["Event Created"]
    B --> C["Kafka Topic"]
    C --> D["Consumer: Apply to Neo4j"]
    C --> E["Event Store (immutable)"]
    C --> F["Telemetry Fork → TSDB"]

    style B fill:#E67E22,color:#fff
    style E fill:#2C3E50,color:#fff
    style F fill:#3498db,color:#fff

This design enables:

  • Full audit trail — every mutation is recorded immutably
  • Point-in-time reconstruction — replay events to reconstruct any historical state
  • Decoupled consumers — multiple downstream systems can independently process events
  • Telemetry separation — high-frequency sensor data goes to purpose-built TSDB

Event Schema

GraphEvent — Graph Mutations (L1-L4)

All changes to the artifact graph and device state flow through GraphEvent:

# digital_twin/events.py
from pydantic import BaseModel
from datetime import datetime
from enum import Enum
from typing import Optional

class EventType(str, Enum):
    # Graph mutations (L1 — Digital Thread)
    NODE_CREATED = "node.created"
    NODE_UPDATED = "node.updated"
    NODE_DELETED = "node.deleted"
    EDGE_CREATED = "edge.created"
    EDGE_DELETED = "edge.deleted"
    PROPERTY_SET = "property.set"
    # Device events (L2-L3 — Twin Instance)
    DEVICE_PROVISIONED = "device.provisioned"
    DEVICE_STATE_UPDATED = "device.state_updated"
    DEVICE_DECOMMISSIONED = "device.decommissioned"
    TELEMETRY_RECEIVED = "telemetry.received"
    # Simulation events (L4)
    SIMULATION_STARTED = "simulation.started"
    SIMULATION_COMPLETED = "simulation.completed"
    MODEL_CALIBRATED = "model.calibrated"
    # Assistant Mode — human-edit ingestion
    ARTIFACT_FILE_CHANGED = "artifact.file_changed"
    ARTIFACT_INGESTED = "artifact.ingested"
    ARTIFACT_INGEST_FAILED = "artifact.ingest_failed"
    # Drift detection — file ↔ graph reconciliation
    DRIFT_DETECTED = "drift.detected"
    DRIFT_RESOLVED = "drift.resolved"
    # Agent Chat Channel
    CHAT_MESSAGE_SENT = "chat.message.sent"
    CHAT_MESSAGE_CHUNK = "chat.message.chunk"
    CHAT_THREAD_CREATED = "chat.thread.created"
    CHAT_AGENT_TYPING = "chat.agent.typing"

class GraphEvent(BaseModel):
    event_id: str                          # UUID
    timestamp: datetime
    session_id: str                        # Links to orchestration session
    actor_id: str                          # Agent code, user ID, or device ID
    event_type: EventType
    node_type: Optional[str] = None        # e.g. "Requirement", "DeviceInstance"
    node_id: Optional[str] = None
    edge_type: Optional[str] = None        # e.g. "SATISFIED_BY", "INSTANCE_OF"
    source_id: Optional[str] = None
    target_id: Optional[str] = None
    properties: Optional[dict] = None
    previous_properties: Optional[dict] = None
    expected_version: Optional[int] = None
    rationale: Optional[str] = None

TelemetryEvent — High-Frequency Sensor Data (L3)

Telemetry data is separated from graph events because it has fundamentally different characteristics: high frequency, append-only, no version control needed.

class TelemetryEvent(BaseModel):
    """High-frequency sensor data — goes to TSDB, NOT Neo4j."""
    event_id: str
    device_id: str                         # DeviceInstance.serialNumber
    source_id: str                         # TelemetrySource.id
    timestamp: datetime
    values: dict[str, float]               # sensor_name → reading
    quality: Optional[str] = None          # "good", "uncertain", "bad"

Event Stream Topics

topics:
  - "graph.mutations"         # All graph changes (append-only)
  - "graph.projections"       # Materialized view updates (Neo4j sync)
  - "graph.snapshots"         # Periodic full-graph snapshots
  - "device.telemetry"        # High-frequency sensor data → TSDB
  - "device.state"            # Device state changes → Neo4j
  - "simulation.events"       # Simulation lifecycle events
  - "knowledge.ingest"        # Manual document ingestion requests (P1.5)
  - "knowledge.indexed"       # Confirmation of knowledge embedding (P1.5)
  - "artifact.ingest"        # Human-edit ingestion events (Assistant Mode)
  - "twin.drift"             # Drift detection and resolution events
  - "agent.chat"             # Chat messages, typing indicators, thread lifecycle

Retention Policies

Topic Retention Rationale
graph.mutations Infinite Never delete — audit trail and reconstruction source
graph.projections 7 days Ephemeral materialized view updates
graph.snapshots 90 days 3 months of periodic full-graph snapshots
device.telemetry 30 days Hot data in Kafka; older data in TSDB cold storage
device.state Infinite Device lifecycle audit trail
simulation.events 365 days 1 year of simulation history
knowledge.ingest 30 days Manual ingestion requests — replayable from MinIO originals
knowledge.indexed 7 days Ephemeral confirmation events for dashboard updates
agent.chat 90 days Conversation history for context; older threads archived to PostgreSQL

Event Flow by Layer

L1 — Digital Thread (P1)

sequenceDiagram
    participant Agent as Domain Agent
    participant API as Twin API
    participant Kafka as Kafka
    participant Consumer as Event Consumer
    participant Neo4j as Neo4j

    Agent->>API: Propose node update (with expected_version)
    API->>Kafka: Publish GraphEvent to "graph.mutations"
    Kafka->>Consumer: Consume event
    Consumer->>Neo4j: Apply with optimistic concurrency check
    Neo4j-->>Consumer: Success (version incremented)
    Consumer->>Kafka: Publish to "graph.projections"

L3 — Live Twin (P3)

sequenceDiagram
    participant Device as Physical Device
    participant MQTT as MQTT Broker
    participant Router as Telemetry Router
    participant Kafka as Kafka
    participant TSDB as InfluxDB
    participant Neo4j as Neo4j

    Device->>MQTT: Publish sensor readings
    MQTT->>Router: Forward telemetry
    Router->>Kafka: Publish TelemetryEvent to "device.telemetry"
    Router->>Kafka: Publish state change to "device.state" (if state changed)
    Kafka->>TSDB: Write time-series data
    Kafka->>Neo4j: Update DeviceInstance node (state changes only)

Concurrency Model

Optimistic Concurrency Control

Every graph node carries a version integer. Mutations must include expected_version:

async def apply_event(event: GraphEvent):
    if event.event_type == EventType.NODE_UPDATED:
        result = await neo4j.run("""
            MATCH (n {id: $node_id})
            WHERE n.version = $expected_version
            SET n += $properties, n.version = n.version + 1, n.updatedAt = datetime()
            RETURN n
        """, node_id=event.node_id, expected_version=event.expected_version,
             properties=event.properties)
        if not result.records:
            raise ConcurrencyConflict(
                f"Node {event.node_id} modified by another session"
            )

Write Set Declaration

Agents declare intended modifications before execution. The scheduler serializes conflicting write sets:

class TaskDispatch(BaseModel):
    agent_code: str
    session_id: str
    write_set: list[str]  # Node IDs or patterns like "BOMItem:*"

Telemetry Write Path (L3)

Telemetry data bypasses Neo4j concurrency — it writes directly to the TSDB. Only device state changes (firmware version, status, location) go through the graph event pipeline with version control.

Sensor data (100 Hz) → Kafka "device.telemetry" → TSDB (no version control needed)
Device state change   → Kafka "device.state"     → Neo4j (version-controlled)

Propagation Control

When a graph mutation triggers downstream effects (constraint re-evaluation, cross-domain propagation, agent reactions), the system must prevent infinite loops and oscillation.

Origin Tracking

Every event carries an origin_id — the ID of the root event that started the causal chain. Downstream events inherit this origin. The event consumer rejects any event whose origin_id matches an event already applied in the current propagation chain:

class GraphEvent(BaseModel):
    # ... existing fields ...
    origin_id: Optional[str] = None   # Root event that started this chain
    causal_depth: int = 0             # How many hops from the origin

MAX_CAUSAL_DEPTH = 10  # Hard limit on propagation chains

async def should_apply(event: GraphEvent, chain: set[str]) -> bool:
    """Reject events that would create cycles or exceed depth."""
    if event.origin_id in chain:
        return False  # Cycle detected
    if event.causal_depth > MAX_CAUSAL_DEPTH:
        log.warning("Propagation depth exceeded", origin=event.origin_id)
        return False
    return True

Oscillation Detection

When two agents repeatedly modify the same node (Agent A sets value to X, Agent B resets to Y, triggering Agent A again), the system detects the pattern:

class OscillationDetector:
    """Tracks node mutation frequency to detect feedback loops."""

    async def check(self, node_id: str, window_s: float = 60.0) -> bool:
        """Returns True if node is oscillating."""
        recent = await self.get_mutations(node_id, window_s)
        if len(recent) < 4:
            return False
        # Check if values alternate between two states
        values = [e.properties for e in recent]
        unique = set(map(frozenset, [v.items() for v in values if v]))
        return len(unique) <= 2 and len(recent) >= 4

When oscillation is detected:

  1. The node is locked from further automatic mutations
  2. A CONFLICT event is published to the event bus
  3. The orchestrator surfaces the conflict to the engineer for manual resolution

Entity Locking During Constraint Evaluation

When the constraint engine evaluates a node, that node is temporarily locked to prevent concurrent mutations from invalidating the evaluation:

class EvaluationLock:
    """Short-lived lock during constraint evaluation."""
    node_id: str
    session_id: str
    acquired_at: datetime
    ttl_ms: int = 5000  # 5 second max hold time — auto-releases

    async def acquire(self, node_id: str) -> bool:
        """Non-blocking acquire. Returns False if already held."""
        ...

    async def release(self, node_id: str) -> None:
        """Explicit release after evaluation completes."""
        ...

Multi-Agent Event Coordination

When multiple agents subscribe to the same event types, the orchestrator ensures orderly processing:

Pattern Description
Fan-out with write sets Each agent declares its write set before processing. The scheduler serializes agents with overlapping write sets.
Priority ordering Safety-critical agents (constraint validation) process events before optimization agents (cost reduction).
Idempotent consumers Every event consumer must be idempotent — replaying the same event produces the same result.
Dead letter queue Events that fail processing after 3 retries go to a dead letter topic for manual investigation.

Point-in-Time Reconstruction

The append-only event stream enables reconstructing the graph at any historical point:

async def reconstruct_graph_at(timestamp: datetime) -> GraphState:
    """Replay events up to timestamp to reconstruct historical state."""
    events = await event_store.read_until(timestamp)
    graph = InMemoryGraph()
    for event in events:
        graph.apply(event)
    return graph

Use cases:

  • “Show me the traceability graph as it was when we passed the EVT gate”
  • “Show me device FC-001’s state at the time of the anomaly”
  • “What was the BOM when we submitted for FCC certification?”
  • “Reconstruct the power budget calculation from 3 months ago”

Snapshot Optimization

For large graphs, periodic snapshots accelerate reconstruction:

Snapshot (every 24h) + Events since snapshot = Current state

Snapshots are published to the graph.snapshots topic and stored in MinIO.


State Store Resolution

Three separate stores with a clear consistency model:

Data System of Record Rationale
Traceability graph Kafka event stream → Neo4j projection Event-sourced, auditable, reconstructable
Binary artifacts MinIO Content-addressable, immutable, S3-compatible
Session state PostgreSQL (team) / SQLite (local) Structured, queryable
Telemetry (L3) Kafka → InfluxDB/TimescaleDB Optimized for time-series queries
Workspace config .forge/config.json (local) Only local state in .forge/

Migration from .forge/ flat files:

  • .forge/sessions/ and .forge/traces/ → PostgreSQL/SQLite
  • .forge/artifacts/ → MinIO

Document Description
Graph Schema Node types, relationships, and Neo4j constraints
Constraint Engine Engineering constraint evaluation triggered by events
Digital Twin Evolution Synchronization and simulation architecture
AI Memory & Knowledge Knowledge Layer consuming graph.mutations, knowledge.ingest, and knowledge.indexed topics
System Observability Kafka consumer lag metrics, DLQ monitoring, oscillation detection alerting
Assistant Mode Dual-mode operation — human edits and AI workflows sharing the same event pipeline
Agent Chat Channel Real-time conversational interface — uses agent.chat topic and 4 new EventType values
Architecture Overview System architecture and data flows

Document Version: v1.1 Last Updated: 2026-02-28 Status: Technical Architecture Document

← Graph Schema Constraint Engine →