Tool Adapters

Integrating external EDA tools, simulators, and APIs with MetaForge

Table of contents

  1. Overview
  2. Tool-Agent Mapping
    1. EDA Tools
    2. Simulation & Analysis
    3. Supply Chain APIs
    4. CAD Tools
    5. Firmware Toolchain
    6. Lab Automation
    7. Manufacturing
    8. Data Layer
    9. CI/CD Integration
  3. Adapter Interface
    1. Base Adapter
    2. Capability Detection
  4. KiCad Adapter
    1. Installation Detection
    2. Schematic Operations
  5. SPICE Adapter
    1. Simulation Interface
  6. Supplier API Adapters
    1. Octopart Adapter
  7. Digi-Key API Adapter
    1. Overview
    2. Authentication
    3. Part Search
    4. Pricing & Stock
    5. Lifecycle Status
  8. Mouser API Adapter
    1. Overview
    2. Authentication
    3. Part Search
    4. Stock Check (Real-Time)
  9. GitHub Actions Integration
    1. Overview
    2. Webhook Listener
  10. Neo4j Integration
    1. Overview
    2. Graph Schema
    3. Schema Definition
    4. Example Queries
    5. Adapter Implementation
  11. Creating a Custom Adapter
    1. Step 1: Define Interface
    2. Step 2: Implement Detection
    3. Step 3: Implement Operations
    4. Step 4: Add Error Handling
  12. Testing Adapters
    1. Unit Tests
    2. Mock Adapter for Testing
  13. Best Practices
    1. 1. Graceful Degradation
    2. 2. Caching
    3. 3. Timeout Handling
  14. Next Steps

Overview

Tool adapters provide clean interfaces between MetaForge agents and external tools (KiCad, SPICE, supplier APIs, etc.).

flowchart LR
    A[Agent] --> B[Tool Adapter]
    B --> C{Tool Type}

    C -->|EDA| D[KiCad CLI]
    C -->|Simulation| E[ngspice]
    C -->|Supply Chain| F[Octopart API]
    C -->|Firmware| G[GCC Toolchain]

    style B fill:#e67e22,color:#fff

Tool-Agent Mapping

Reverse mapping showing which agents use each tool adapter. See Agent System for the forward mapping.

EDA Tools

Adapter Agents Capabilities Used Phase
KiCadAdapter EE ERC, DRC, BOM export, Gerber export, netlist (P1 read-only; P2 adds schematic gen, PCB auto-routing) P1+
KiCadAdapter FW Pin mapping, netlist reading P1
KiCadAdapter SIM Netlist export for SPICE simulation P1
KiCadAdapter TST ERC validation P1
KiCadAdapter MFG DRC, Gerber export, pick-and-place export P1
AltiumAdapter EE Design rule checks, variant management, release management P2
EagleAdapter EE Legacy EDA import/export P2

Simulation & Analysis

Adapter Agents Capabilities Used Phase
NGSpiceAdapter EE DC, AC, transient circuit simulation P1
NGSpiceAdapter SIM Full SPICE analysis suite P1
NGSpiceAdapter ROB Control loop simulation P2
LTspiceAdapter EE Alternative SPICE simulation P2
LTspiceAdapter SIM Alternative SPICE analysis P2
CalculiXAdapter ME Static stress, modal, thermal FEA (Abaqus-compatible .inp) P2
OpenFOAMAdapter ME CFD thermal/flow simulation (buoyantSimpleFoam, snappyHexMesh) P2
AnsysFEAAdapter SIM Structural/thermal validation P2
AnsysFEAAdapter ROB Vibration, structural analysis P2
AnsysFEAAdapter REL Fatigue, lifetime prediction P2

Supply Chain APIs

Adapter Agents Capabilities Used Phase
DigiKeyAdapter EE, SC Part search, pricing, stock, lifecycle, lead times P1
MouserAdapter EE, SC Part search, pricing, stock, dual-sourcing P1
NexarAdapter EE, SC Aggregated part data, lifecycle tracking, alternates P1
LCSCAdapter EE, SC Low-cost sourcing, JLCPCB parts integration P2

CAD Tools

Adapter Agents Capabilities Used Phase
FreeCADAdapter ME Headless parametric CAD (freecadcmd), STEP/STL export, mesh generation, boolean ops P2
SolidWorksAdapter ID STEP export, drawings, mechanical BOM, DFM checks P2
Fusion360Adapter ID Parametric CAD, model export, version control P2

Firmware Toolchain

Adapter Agents Capabilities Used Phase
GCCToolchainAdapter FW ARM cross-compilation, linking, symbol generation P1
CMakeAdapter FW Build system configuration, incremental builds P1
FlashToolAdapter FW, FIELD Firmware deployment via OpenOCD/pyOCD P2

Lab Automation

Adapter Agents Capabilities Used Phase
SCPIAdapter TST, FIELD Oscilloscope, spectrum analyzer, power supply control P2
VISAAdapter TST, FIELD USB/GPIB/LAN instrument communication P2
OpenTAPAdapter TST, FIELD Automated test sequencing, result capture P2

Manufacturing

Adapter Agents Capabilities Used Phase
MacroFabAdapter MFG PCB fab/assembly quoting, ordering, build tracking P2
JLCPCBAdapter MFG Low-cost PCB fabrication, design upload, ordering P2

Data Layer

Adapter Agents Capabilities Used Phase
Neo4jAdapter ALL Digital thread: traceability, EOL risk, coverage, impact queries P1
MinIOAdapter ALL Object storage: CAD files, EDA projects, sim results, test logs P1

CI/CD Integration

Adapter Agents Capabilities Used Phase
GitHubActionsAdapter TST, QA Webhook test ingestion, artifact download, JUnit parsing P1
GitLabCIAdapter TST, QA Pipeline events, build artifacts, test reports P2

Adapter Interface

Base Adapter

interface ToolAdapter {
  // Metadata
  name: string;
  version: string;
  capabilities: Capability[];

  // Lifecycle
  detect(): Promise<ToolInstall | null>;
  initialize(config: ToolConfig): Promise<void>;

  // Operations
  execute(action: Action): Promise<Result>;

  // Health
  healthCheck(): Promise<HealthStatus>;
}

Capability Detection

interface Capability {
  name: string;
  supported: boolean;
  version?: string;
  requiredVersion?: string;
}

class KiCadAdapter implements ToolAdapter {
  async detect(): Promise<ToolInstall | null> {
    try {
      const result = await exec('kicad-cli --version');
      const version = this.parseVersion(result.stdout);

      return {
        found: true,
        version,
        path: result.path,
        capabilities: this.getCapabilities(version)
      };
    } catch {
      return null;
    }
  }

  private getCapabilities(version: string): Capability[] {
    return [
      {
        name: 'erc',
        supported: semver.gte(version, '7.0.0')
      },
      {
        name: 'export-netlist',
        supported: true
      },
      {
        name: 'export-bom',
        supported: semver.gte(version, '8.0.0')
      }
    ];
  }
}

KiCad Adapter

Installation Detection

class KiCadAdapter implements ToolAdapter {
  name = 'kicad';
  version = '1.0.0';

  async detect(): Promise<ToolInstall | null> {
    const searchPaths = [
      '/usr/bin/kicad-cli',
      '/usr/local/bin/kicad-cli',
      'C:\\Program Files\\KiCad\\bin\\kicad-cli.exe',
      '/Applications/KiCad/kicad.app/Contents/MacOS/kicad-cli'
    ];

    for (const path of searchPaths) {
      if (await exists(path)) {
        const version = await this.getVersion(path);
        return { found: true, version, path };
      }
    }

    return null;
  }
}

Schematic Operations

interface KiCadAdapter extends ToolAdapter {
  // ERC (Electrical Rules Check)
  runERC(schematic: string): Promise<ERCResult>;

  // Netlist export
  exportNetlist(schematic: string, format: 'kicad' | 'spice'): Promise<string>;

  // BOM export
  exportBOM(schematic: string, format: 'csv' | 'xml'): Promise<BOMData>;

  // Component info
  getComponents(schematic: string): Promise<Component[]>;

  // Symbol operations
  searchSymbols(query: string): Promise<Symbol[]>;
  addSymbol(schematic: string, symbol: Symbol, position: Position): Promise<void>;
}

Example Implementation:

class KiCadAdapter implements ToolAdapter {
  async runERC(schematic: string): Promise<ERCResult> {
    const result = await exec(`kicad-cli sch erc ${schematic}`);

    return {
      errors: this.parseErrors(result.stdout),
      warnings: this.parseWarnings(result.stdout),
      passed: result.exitCode === 0
    };
  }

  async exportBOM(schematic: string, format: 'csv' | 'xml'): Promise<BOMData> {
    const tempFile = `/tmp/bom.${format}`;

    await exec(`kicad-cli sch export bom --format ${format} ${schematic} -o ${tempFile}`);

    const content = await readFile(tempFile);
    return this.parseBOM(content, format);
  }

  private parseBOM(content: string, format: 'csv' | 'xml'): BOMData {
    if (format === 'csv') {
      return this.parseCSVBOM(content);
    } else {
      return this.parseXMLBOM(content);
    }
  }
}

SPICE Adapter

Simulation Interface

interface SPICEAdapter extends ToolAdapter {
  // Simulation
  runSimulation(netlist: string, analysis: AnalysisType): Promise<SimResult>;

  // Analysis types
  runDC(netlist: string, params: DCParams): Promise<DCResult>;
  runAC(netlist: string, params: ACParams): Promise<ACResult>;
  runTransient(netlist: string, params: TransientParams): Promise<TransientResult>;
}

Example:

class NGSpiceAdapter implements SPICEAdapter {
  name = 'ngspice';

  async runTransient(netlist: string, params: TransientParams): Promise<TransientResult> {
    const script = `
      ${netlist}
      .tran ${params.step} ${params.stop}
      .print tran v(out) i(vin)
      .end
    `;

    const result = await exec(`ngspice -b -`, { input: script });

    return this.parseTransientOutput(result.stdout);
  }

  private parseTransientOutput(output: string): TransientResult {
    // Parse ngspice output format
    const lines = output.split('\n');
    const data: DataPoint[] = [];

    for (const line of lines) {
      if (line.startsWith(' ')) {
        const [time, voltage, current] = line.trim().split(/\s+/);
        data.push({
          time: parseFloat(time),
          voltage: parseFloat(voltage),
          current: parseFloat(current)
        });
      }
    }

    return { data };
  }
}

Supplier API Adapters

Octopart Adapter

interface SupplierAdapter extends ToolAdapter {
  search(query: ComponentQuery): Promise<SearchResult[]>;
  getPartDetails(mpn: string): Promise<PartDetails>;
  getPricing(mpn: string, quantity: number): Promise<PriceBreakdown>;
  checkStock(mpn: string): Promise<StockInfo>;
}

Implementation:

class OctopartAdapter implements SupplierAdapter {
  private apiKey: string;
  private baseURL = 'https://octopart.com/api/v4';

  async search(query: ComponentQuery): Promise<SearchResult[]> {
    const gqlQuery = `
      query {
        search(q: "${query.keyword}", limit: ${query.limit}) {
          results {
            part {
              mpn
              manufacturer { name }
              short_description
              specs {
                attribute { name }
                value
              }
              sellers {
                company { name }
                offers {
                  sku
                  prices {
                    quantity
                    price
                    currency
                  }
                  inventory_level
                }
              }
            }
          }
        }
      }
    `;

    const response = await fetch(this.baseURL + '/graphql', {
      method: 'POST',
      headers: {
        'Authorization': `Token ${this.apiKey}`,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({ query: gqlQuery })
    });

    const data = await response.json();
    return this.mapSearchResults(data);
  }

  async getPricing(mpn: string, quantity: number): Promise<PriceBreakdown> {
    const details = await this.getPartDetails(mpn);

    const prices = details.sellers.flatMap(seller =>
      seller.offers.flatMap(offer =>
        offer.prices.map(price => ({
          supplier: seller.company.name,
          quantity: price.quantity,
          unitPrice: price.price,
          currency: price.currency,
          totalPrice: price.price * quantity
        }))
      )
    );

    return {
      mpn,
      quantity,
      prices: prices.sort((a, b) => a.unitPrice - b.unitPrice)
    };
  }
}

Digi-Key API Adapter

Overview

Digi-Key is the largest electronics component distributor (80%+ market share). The adapter provides real-time access to part search, pricing, stock levels, and lifecycle data.

API Documentation: https://developer.digikey.com/

Authentication

class DigiKeyAdapter implements SupplierAdapter {
  name = 'digikey';
  private clientId: string;
  private clientSecret: string;
  private baseURL = 'https://api.digikey.com/v1';

  async initialize(config: ToolConfig): Promise<void> {
    this.clientId = config.digikey_client_id;
    this.clientSecret = config.digikey_client_secret;

    // OAuth 2.0 client credentials flow
    await this.authenticate();
  }

  private async authenticate(): Promise<void> {
    const response = await fetch('https://api.digikey.com/v1/oauth2/token', {
      method: 'POST',
      headers: { 'Content-Type': 'application/x-www-form-urlencoded' },
      body: new URLSearchParams({
        client_id: this.clientId,
        client_secret: this.clientSecret,
        grant_type: 'client_credentials'
      })
    });

    const data = await response.json();
    this.accessToken = data.access_token;
  }
}
interface DigiKeySearchParams {
  keyword: string;
  filters?: {
    category?: string;
    manufacturer?: string;
    in_stock?: boolean;
    rohs_compliant?: boolean;
  };
  limit?: number;
}

class DigiKeyAdapter {
  async search(params: DigiKeySearchParams): Promise<SearchResult[]> {
    const response = await fetch(`${this.baseURL}/Search/v3/Products/Keyword`, {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${this.accessToken}`,
        'Content-Type': 'application/json',
        'X-DIGIKEY-Client-Id': this.clientId
      },
      body: JSON.stringify({
        Keywords: params.keyword,
        RecordCount: params.limit || 25,
        Filters: {
          StockStatus: params.filters?.in_stock ? 'InStock' : undefined,
          RoHS: params.filters?.rohs_compliant ? 'Compliant' : undefined
        }
      })
    });

    const data = await response.json();
    return this.mapProducts(data.Products);
  }

  private mapProducts(products: any[]): SearchResult[] {
    return products.map(p => ({
      mpn: p.ManufacturerPartNumber,
      manufacturer: p.Manufacturer.Name,
      description: p.DetailedDescription,
      datasheet: p.PrimaryDatasheet,
      category: p.Category.Name,
      pricing: p.StandardPricing.map(tier => ({
        quantity: tier.BreakQuantity,
        unitPrice: tier.UnitPrice,
        currency: 'USD'
      })),
      stock: {
        quantity: p.QuantityAvailable,
        location: 'US'
      },
      lifecycle: p.ProductStatus,
      rohs: p.RohsStatus === 'Compliant'
    }));
  }
}

Pricing & Stock

interface DigiKeyPricing {
  mpn: string;
  pricing: PriceTier[];
  stock: number;
  leadTime: string;
  moq: number; // minimum order quantity
}

class DigiKeyAdapter {
  async getPricing(mpn: string): Promise<DigiKeyPricing> {
    const response = await fetch(
      `${this.baseURL}/Search/v3/Products/${encodeURIComponent(mpn)}`,
      {
        headers: {
          'Authorization': `Bearer ${this.accessToken}`,
          'X-DIGIKEY-Client-Id': this.clientId
        }
      }
    );

    const product = await response.json();

    return {
      mpn: product.ManufacturerPartNumber,
      pricing: product.StandardPricing.map(tier => ({
        quantity: tier.BreakQuantity,
        unitPrice: tier.UnitPrice,
        currency: 'USD',
        totalPrice: tier.BreakQuantity * tier.UnitPrice
      })),
      stock: product.QuantityAvailable,
      leadTime: product.ManufacturerLeadWeeks
        ? `${product.ManufacturerLeadWeeks} weeks`
        : 'In stock',
      moq: product.MinimumOrderQuantity
    };
  }
}

Lifecycle Status

async getLifecycleStatus(mpn: string): Promise<LifecycleStatus> {
  const product = await this.getProduct(mpn);

  return {
    mpn,
    status: this.mapProductStatus(product.ProductStatus),
    last_time_buy: product.DiscontinuedDate,
    recommended_replacement: product.SuggestedReplacement?.ManufacturerPartNumber
  };
}

private mapProductStatus(status: string): LifecycleStatus['status'] {
  const mapping = {
    'Active': 'active',
    'Not For New Designs': 'nrnd',
    'Obsolete': 'obsolete',
    'Last Time Buy': 'ltb'
  };
  return mapping[status] || 'unknown';
}

Mouser API Adapter

Overview

Mouser Electronics is a major distributor, especially strong in APAC and EMEA regions. Excellent for dual-sourcing strategy.

API Documentation: https://www.mouser.com/api-hub/

Authentication

class MouserAdapter implements SupplierAdapter {
  name = 'mouser';
  private apiKey: string;
  private baseURL = 'https://api.mouser.com/api/v1';

  async initialize(config: ToolConfig): Promise<void> {
    this.apiKey = config.mouser_api_key;
  }
}

Part Search

interface MouserSearchParams {
  keyword: string;
  records?: number;
  startingRecord?: number;
  searchOptions?: string; // 'Rohs', 'InStock', etc.
}

class MouserAdapter {
  async search(params: MouserSearchParams): Promise<SearchResult[]> {
    const response = await fetch(
      `${this.baseURL}/search/keyword?apiKey=${this.apiKey}`,
      {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({
          SearchByKeywordRequest: {
            keyword: params.keyword,
            records: params.records || 25,
            startingRecord: params.startingRecord || 0,
            searchOptions: params.searchOptions || ''
          }
        })
      }
    );

    const data = await response.json();
    return this.mapParts(data.SearchResults.Parts);
  }

  private mapParts(parts: any[]): SearchResult[] {
    return parts.map(p => ({
      mpn: p.ManufacturerPartNumber,
      manufacturer: p.Manufacturer,
      description: p.Description,
      datasheet: p.DataSheetUrl,
      category: p.Category,
      pricing: p.PriceBreaks.map(pb => ({
        quantity: pb.Quantity,
        unitPrice: parseFloat(pb.Price.replace('$', '')),
        currency: pb.Currency
      })),
      stock: {
        quantity: p.AvailabilityInStock || 0,
        location: p.AvailabilityOnOrder > 0 ? 'On order' : 'In stock'
      },
      lifecycle: p.LifecycleStatus,
      rohs: p.RohsStatus === 'Compliant',
      leadTime: p.LeadTime
    }));
  }
}

Stock Check (Real-Time)

async checkStock(mpns: string[]): Promise<StockInfo[]> {
  const response = await fetch(
    `${this.baseURL}/search/partnumber?apiKey=${this.apiKey}`,
    {
      method: 'POST',
      headers: { 'Content-Type': 'application/json' },
      body: JSON.stringify({
        SearchByPartRequest: {
          mouserPartNumber: mpns.join('|'),
          partSearchOptions: 'Exact'
        }
      })
    }
  );

  const data = await response.json();

  return data.SearchResults.Parts.map(p => ({
    mpn: p.ManufacturerPartNumber,
    stock: p.AvailabilityInStock || 0,
    leadTime: p.LeadTime,
    available: p.AvailabilityInStock > 0,
    alternates: p.AlternatePackagings?.map(alt => alt.APMfrPN) || []
  }));
}

GitHub Actions Integration

Overview

Integrates with GitHub Actions CI/CD to automatically ingest test results, build artifacts, and link them to the digital thread.

Webhook Listener

# tools/github_actions.py
import hashlib
import hmac
import zipfile
from io import BytesIO
from xml.etree import ElementTree

import httpx
import structlog
from fastapi import APIRouter, Header, HTTPException, Request
from pydantic import BaseModel

from digital_twin.neo4j_client import neo4j

logger = structlog.get_logger()
router = APIRouter()


class GitHubActionsAdapter:
    """Integrates with GitHub Actions CI/CD for test result ingestion."""

    name = "github-actions"

    def __init__(self, webhook_secret: str, token: str, repo: str):
        self.webhook_secret = webhook_secret
        self.token = token
        self.repo = repo

    def verify_signature(self, payload: bytes, signature: str | None) -> bool:
        if not signature:
            return False
        expected = "sha256=" + hmac.new(
            self.webhook_secret.encode(), payload, hashlib.sha256
        ).hexdigest()
        return hmac.compare_digest(expected, signature)


# --- FastAPI webhook endpoint ---

adapter: GitHubActionsAdapter | None = None


@router.post("/webhooks/github")
async def github_webhook(
    request: Request,
    x_hub_signature_256: str | None = Header(None),
):
    body = await request.body()
    if not adapter or not adapter.verify_signature(body, x_hub_signature_256):
        raise HTTPException(status_code=401, detail="Invalid signature")

    event = await request.json()
    if event.get("action") == "completed":
        await on_workflow_complete(event)
    return {"status": "ok"}


async def on_workflow_complete(event: dict) -> None:
    workflow_run = event["workflow_run"]
    logger.info(
        "Workflow completed",
        name=workflow_run["name"],
        conclusion=workflow_run["conclusion"],
    )

    artifacts = await list_artifacts(workflow_run["id"])
    for artifact in artifacts:
        if "test-results" in artifact["name"]:
            data = await download_artifact(workflow_run["id"], artifact["name"])
            test_results = parse_test_results(data)
            await link_to_digital_thread(
                commit=workflow_run["head_sha"],
                workflow=workflow_run["name"],
                results=test_results,
                timestamp=workflow_run["updated_at"],
            )


async def download_artifact(run_id: int, artifact_name: str) -> bytes:
    async with httpx.AsyncClient() as client:
        resp = await client.get(
            f"https://api.github.com/repos/{adapter.repo}/actions/runs/{run_id}/artifacts",
            headers={
                "Authorization": f"Bearer {adapter.token}",
                "Accept": "application/vnd.github+json",
            },
        )
        resp.raise_for_status()
        artifacts = resp.json()["artifacts"]
        match = next(a for a in artifacts if a["name"] == artifact_name)

        dl = await client.get(
            match["archive_download_url"],
            headers={"Authorization": f"Bearer {adapter.token}"},
        )
        return dl.content


def parse_test_results(artifact_zip: bytes) -> list[dict]:
    """Parse JUnit XML from a zipped artifact."""
    with zipfile.ZipFile(BytesIO(artifact_zip)) as zf:
        xml_content = zf.read("test-results.xml")

    root = ElementTree.fromstring(xml_content)
    results = []
    for suite in root.findall(".//testsuite"):
        for tc in suite.findall("testcase"):
            failure_el = tc.find("failure")
            skipped_el = tc.find("skipped")
            status = "failed" if failure_el is not None else (
                "skipped" if skipped_el is not None else "passed"
            )
            results.append({
                "name": tc.get("name"),
                "class_name": tc.get("classname"),
                "time": float(tc.get("time", 0)),
                "status": status,
                "failure": failure_el.text if failure_el is not None else None,
            })
    return results


async def link_to_digital_thread(
    commit: str, workflow: str, results: list[dict], timestamp: str
) -> None:
    """Create TestExecution nodes linked to Requirements in Neo4j."""
    for result in results:
        await neo4j.run(
            """
            MATCH (req:Requirement {id: $requirement_id})
            CREATE (test:TestExecution {
                commit: $commit,
                workflow: $workflow,
                timestamp: $timestamp,
                status: $status
            })
            CREATE (req)-[:VERIFIED_BY]->(test)
            """,
            requirement_id=result.get("requirement_id"),
            commit=commit,
            workflow=workflow,
            timestamp=timestamp,
            status=result["status"],
        )

Neo4j Integration

Overview

Neo4j is the graph database powering MetaForge’s digital thread. It links requirements → design → BOM → tests → compliance evidence.

Why Graph DB: Traditional relational databases struggle with multi-hop traceability queries. Neo4j excels at “Which requirements are at risk due to component EOL?” style questions.

Graph Schema

interface Neo4jAdapter extends ToolAdapter {
  name: 'neo4j';

  // Node operations
  createNode(type: NodeType, properties: any): Promise<Node>;
  getNode(type: NodeType, id: string): Promise<Node>;

  // Relationship operations
  createRelationship(from: Node, to: Node, type: RelType): Promise<Relationship>;

  // Queries
  traceRequirement(req_id: string): Promise<TraceGraph>;
  findRisks(criteria: RiskCriteria): Promise<Risk[]>;
}

// Node types
type NodeType =
  | 'Requirement'
  | 'Component'
  | 'Design'
  | 'Test'
  | 'Build'
  | 'Compliance'
  | 'Issue';

// Relationship types
type RelType =
  | 'IMPLEMENTS'
  | 'USES'
  | 'VERIFIED_BY'
  | 'BLOCKS'
  | 'DEPENDS_ON'
  | 'COMPLIES_WITH';

Schema Definition

// Requirements
CREATE CONSTRAINT req_id IF NOT EXISTS
FOR (r:Requirement) REQUIRE r.id IS UNIQUE;

// Components
CREATE CONSTRAINT component_mpn IF NOT EXISTS
FOR (c:Component) REQUIRE c.mpn IS UNIQUE;

// Indexes for fast lookup
CREATE INDEX req_status IF NOT EXISTS
FOR (r:Requirement) ON (r.status);

CREATE INDEX component_lifecycle IF NOT EXISTS
FOR (c:Component) ON (c.lifecycle_status);

Example Queries

Traceability Query - “Trace REQ-001 from requirement to test”:

MATCH path = (req:Requirement {id: 'REQ-001'})-[:IMPLEMENTS]->(design:Design)
             -[:USES]->(component:Component)
             -[:VERIFIED_BY]->(test:Test)
RETURN path

Risk Query - “Which requirements are at risk due to EOL components?”:

MATCH (req:Requirement)-[:IMPLEMENTS]->(design)
      -[:USES]->(component:Component)
WHERE component.lifecycle_status IN ['NRND', 'Obsolete']
RETURN req.id AS requirement,
       component.mpn AS at_risk_component,
       component.lifecycle_status AS status,
       component.last_time_buy AS deadline
ORDER BY component.last_time_buy ASC

Coverage Query - “What % of requirements have tests?”:

MATCH (req:Requirement)
OPTIONAL MATCH (req)-[:VERIFIED_BY]->(test:Test)
WITH count(DISTINCT req) AS total_reqs,
     count(DISTINCT test) AS tested_reqs
RETURN (tested_reqs * 100.0 / total_reqs) AS test_coverage_percent

Adapter Implementation

import neo4j from 'neo4j-driver';

class Neo4jAdapter implements ToolAdapter {
  private driver: neo4j.Driver;
  private session: neo4j.Session;

  async initialize(config: ToolConfig): Promise<void> {
    this.driver = neo4j.driver(
      config.neo4j_uri || 'bolt://localhost:7687',
      neo4j.auth.basic(config.neo4j_user, config.neo4j_password)
    );

    this.session = this.driver.session();

    // Create schema
    await this.createSchema();
  }

  async createNode(type: NodeType, properties: any): Promise<Node> {
    const result = await this.session.run(
      `CREATE (n:${type} $props) RETURN n`,
      { props: properties }
    );

    return result.records[0].get('n').properties;
  }

  async traceRequirement(req_id: string): Promise<TraceGraph> {
    const result = await this.session.run(`
      MATCH path = (req:Requirement {id: $req_id})
                   -[:IMPLEMENTS*1..3]->()
      RETURN path
    `, { req_id });

    return this.buildGraph(result.records);
  }

  async findComponentRisks(): Promise<ComponentRisk[]> {
    const result = await this.session.run(`
      MATCH (req:Requirement)-[:IMPLEMENTS]->(design)
            -[:USES]->(component:Component)
      WHERE component.lifecycle_status IN ['NRND', 'Obsolete']
         OR component.stock < 100
         OR component.lead_time_weeks > 12
      RETURN req.id AS requirement,
             component.mpn AS component,
             component.lifecycle_status AS lifecycle,
             component.stock AS stock,
             component.lead_time_weeks AS lead_time
      ORDER BY component.lifecycle_status DESC, component.stock ASC
    `);

    return result.records.map(r => ({
      requirement_id: r.get('requirement'),
      component_mpn: r.get('component'),
      risk_type: this.classifyRisk(r),
      severity: this.calculateSeverity(r)
    }));
  }

  async cleanup(): Promise<void> {
    await this.session.close();
    await this.driver.close();
  }
}

Creating a Custom Adapter

Step 1: Define Interface

interface CustomToolAdapter extends ToolAdapter {
  name: 'custom-tool';

  // Custom operations
  customOperation(params: CustomParams): Promise<CustomResult>;
}

Step 2: Implement Detection

class CustomAdapter implements CustomToolAdapter {
  name = 'custom-tool';
  version = '1.0.0';

  async detect(): Promise<ToolInstall | null> {
    try {
      // Check if tool exists
      const result = await exec('custom-tool --version');

      if (result.exitCode !== 0) {
        return null;
      }

      return {
        found: true,
        version: this.parseVersion(result.stdout),
        path: await which('custom-tool')
      };
    } catch (error) {
      return null;
    }
  }
}

Step 3: Implement Operations

class CustomAdapter implements CustomToolAdapter {
  async execute(action: Action): Promise<Result> {
    switch (action.type) {
      case 'custom-operation':
        return await this.customOperation(action.params);

      default:
        throw new Error(`Unknown action: ${action.type}`);
    }
  }

  async customOperation(params: CustomParams): Promise<CustomResult> {
    // Execute tool
    const result = await exec(
      `custom-tool ${params.arg1} ${params.arg2}`
    );

    // Parse output
    return this.parseOutput(result.stdout);
  }

  private parseOutput(output: string): CustomResult {
    // Implement parsing logic
  }
}

Step 4: Add Error Handling

class CustomAdapter implements CustomToolAdapter {
  async execute(action: Action): Promise<Result> {
    try {
      return await this.executeInternal(action);
    } catch (error) {
      if (error instanceof ExecError) {
        throw new ToolError(
          `Tool execution failed: ${error.message}`,
          {
            exitCode: error.exitCode,
            stderr: error.stderr,
            stdout: error.stdout
          }
        );
      }
      throw error;
    }
  }
}

Testing Adapters

Unit Tests

import { describe, it, expect, vi } from 'vitest';
import { KiCadAdapter } from './kicad-adapter';

describe('KiCadAdapter', () => {
  it('detects KiCad installation', async () => {
    const adapter = new KiCadAdapter();
    const install = await adapter.detect();

    expect(install).toBeDefined();
    expect(install.found).toBe(true);
  });

  it('runs ERC successfully', async () => {
    const adapter = new KiCadAdapter();
    await adapter.initialize({});

    const result = await adapter.runERC('test.kicad_sch');

    expect(result.errors).toHaveLength(0);
    expect(result.passed).toBe(true);
  });
});

Mock Adapter for Testing

class MockKiCadAdapter implements KiCadAdapter {
  async detect(): Promise<ToolInstall> {
    return { found: true, version: '8.0.0', path: '/mock/kicad' };
  }

  async runERC(schematic: string): Promise<ERCResult> {
    return {
      errors: [],
      warnings: [],
      passed: true
    };
  }
}

Best Practices

1. Graceful Degradation

class Adapter implements ToolAdapter {
  async execute(action: Action): Promise<Result> {
    const install = await this.detect();

    if (!install) {
      throw new ToolNotFoundError(
        `${this.name} not found. Install from: ${this.installURL}`
      );
    }

    const capability = this.capabilities.find(c => c.name === action.type);

    if (!capability?.supported) {
      throw new UnsupportedOperationError(
        `Operation ${action.type} requires ${this.name} ${capability.requiredVersion}`
      );
    }

    return await this.executeInternal(action);
  }
}

2. Caching

class Adapter implements ToolAdapter {
  private cache = new Map<string, CacheEntry>();

  async execute(action: Action): Promise<Result> {
    const cacheKey = this.getCacheKey(action);

    if (this.cache.has(cacheKey)) {
      const entry = this.cache.get(cacheKey);
      if (!entry.isExpired()) {
        return entry.value;
      }
    }

    const result = await this.executeInternal(action);

    this.cache.set(cacheKey, {
      value: result,
      expiresAt: Date.now() + 5 * 60 * 1000 // 5 min
    });

    return result;
  }
}

3. Timeout Handling

async execute(action: Action): Promise<Result> {
  return await Promise.race([
    this.executeInternal(action),
    this.timeout(30000) // 30 second timeout
  ]);
}

private timeout(ms: number): Promise<never> {
  return new Promise((_, reject) => {
    setTimeout(() => reject(new TimeoutError(
      `Operation timed out after ${ms}ms`
    )), ms);
  });
}

Next Steps


← AgentsAPI Reference →