From 350ec0f67e669bda1b3f2cd507c047367ce3f710 Mon Sep 17 00:00:00 2001 From: Dhaval Patel Date: Wed, 17 Jun 2026 15:17:57 -0400 Subject: [PATCH] example workflow for FMSR Signed-off-by: Dhaval Patel --- src/servers/fmsr/failure_modes.yaml | 15 -- src/servers/fmsr/main.py | 364 +++++++++++++++++++++------- src/servers/iot/main.py | 177 +++++++++++++- 3 files changed, 455 insertions(+), 101 deletions(-) delete mode 100644 src/servers/fmsr/failure_modes.yaml diff --git a/src/servers/fmsr/failure_modes.yaml b/src/servers/fmsr/failure_modes.yaml deleted file mode 100644 index 16712b8ee..000000000 --- a/src/servers/fmsr/failure_modes.yaml +++ /dev/null @@ -1,15 +0,0 @@ -chiller: - - "Compressor Overheating: Failed due to Normal wear, overheating" - - "Heat Exchangers: Fans: Degraded motor or worn bearing due to Normal use" - - "Evaporator Water side fouling" - - "Condenser Water side fouling" - - "Condenser Improper water side flow rate" - - "Purge Unit Excessive purge" - - "Refrigerant Operated Control Valve Failed spring" - -ahu: - - "Pressure Regulators Diaphragm failure" - - "Steam Heating Coils Air side fouling" - - "Belts or sheaves Wear" - - "Improper switch position" - - "Solenoid Valves Bound due to hardened grease" diff --git a/src/servers/fmsr/main.py b/src/servers/fmsr/main.py index 8091dd148..342788526 100644 --- a/src/servers/fmsr/main.py +++ b/src/servers/fmsr/main.py @@ -1,16 +1,18 @@ """FMSR (Failure Mode and Sensor Reasoning) MCP Server. -Exposes two tools: - get_failure_modes – lists failure modes for an asset - get_failure_mode_sensor_mapping – returns bidirectional FM↔sensor relevancy mapping - -For chillers and AHUs get_failure_modes returns a curated hardcoded list. -For any other asset type the LLM is queried as a fallback. -The mapping tool always calls the LLM to determine per-pair relevancy. - -LLM backend is configured via the FMSR_MODEL_ID environment variable -(default: ``watsonx/meta-llama/llama-3-3-70b-instruct``). Any model string -supported by litellm works — the provider is encoded in the prefix. +Tools: + get_failure_modes – READ the (partial) failure-mode list for an asset from CouchDB + generate_failure_modes – GENERATE failure modes via the LLM (when DB is missing/partial) + add_failure_modes – WRITE: persist/augment a class's failure modes in CouchDB + generate_failure_mode_sensor_mapping – GENERATE the bidirectional FM↔sensor relevancy via the LLM + +Failure modes live in CouchDB (collection "failure_mode", doctype "failure_mode", one doc per asset +class, seeded from failure_modes.yaml). Coverage is NOT exhaustive: docs carry `exhaustive: false`, so +get_failure_modes returns what is known and signals when generation is needed. Retrieval (get_*) and +generation (generate_*) are separate by design. + +LLM backend is configured via FMSR_MODEL_ID (default: watsonx/meta-llama/llama-3-3-70b-instruct). +CouchDB via COUCHDB_URL / FMSR_DBNAME / COUCHDB_USERNAME / COUCHDB_PASSWORD. """ from __future__ import annotations @@ -18,12 +20,11 @@ import logging import os import re -from pathlib import Path -from typing import Dict, List, Union +from typing import Dict, List, Optional, Union from concurrent.futures import ThreadPoolExecutor, as_completed -import yaml +import couchdb3 from dotenv import load_dotenv from mcp.server.fastmcp import FastMCP from pydantic import BaseModel @@ -35,11 +36,35 @@ logger = logging.getLogger("fmsr-mcp-server") -# ── Hardcoded asset data ────────────────────────────────────────────────────── +# ── CouchDB stores ──────────────────────────────────────────────────────────── +# Under AssetOpsBench's loader, database name == collection key, so the FMSR data is two separate +# databases (loaded from the manifest, no seed): 'failure_mode' and 'catalog'. We open one handle +# each — there is no single 'fmsr' database. + +COUCHDB_URL = os.environ.get("COUCHDB_URL") +COUCHDB_USERNAME = os.environ.get("COUCHDB_USERNAME") +COUCHDB_PASSWORD = os.environ.get("COUCHDB_PASSWORD") +FAILURE_MODE_DBNAME = os.environ.get("FAILURE_MODE_DBNAME", "failure_mode") +CATALOG_DBNAME = os.environ.get("CATALOG_DBNAME", "catalog") + + +def _connect(dbname): + try: + h = couchdb3.Database(dbname, url=COUCHDB_URL, user=COUCHDB_USERNAME, password=COUCHDB_PASSWORD) + logger.info("Connected to CouchDB: %s", dbname) + return h + except Exception as e: # noqa: BLE001 + logger.error("Failed to connect to CouchDB '%s': %s", dbname, e) + return None + + +fm_db = _connect(FAILURE_MODE_DBNAME) # failure_mode docs +catalog_db = _connect(CATALOG_DBNAME) # catalog docs -_FAILURE_MODES_FILE = Path(__file__).parent / "failure_modes.yaml" -with _FAILURE_MODES_FILE.open() as _f: - _ASSET_FAILURE_MODES: dict[str, list[str]] = yaml.safe_load(_f) + +def _asset_key(asset_name: str) -> str: + """Normalise an asset name to a class key: strip digits, trim, lowercase ('Pump 1' -> 'pump').""" + return re.sub(r"\d+", "", asset_name or "").strip().lower() # ── Prompt templates ────────────────────────────────────────────────────────── @@ -51,6 +76,13 @@ "For example: \n\n1. foo\n\n2. bar\n\n3. baz" ) +_ASSET2FM_EXTEND_PROMPT = ( + "The asset {asset_name} already has these known failure modes:\n{known}\n\n" + "List ADDITIONAL failure modes for {asset_name} that are NOT already in the list above. " + "Your response should be a numbered list with each failure mode on a new line. " + "Please only list the failure mode name." +) + _RELEVANCY_PROMPT = ( "For the asset {asset_name}, if the failure {failure_mode} occurs, " "can sensor {sensor} help monitor or detect the failure for {asset_name}?\n" @@ -63,7 +95,6 @@ # ── Output parsers ──────────────────────────────────────────────────────────── def _parse_numbered_list(text: str) -> list[str]: - """Parse a numbered list response into a plain list of strings.""" items = [] for line in text.splitlines(): m = re.match(r"^\d+[\.\)]\s*(.+)", line.strip()) @@ -73,7 +104,6 @@ def _parse_numbered_list(text: str) -> list[str]: def _parse_relevancy(text: str) -> dict: - """Parse a 3-line relevancy response into {answer, reason, temporal_behavior}.""" lines = [ln for ln in text.strip().splitlines() if ln.strip()] if lines and lines[0].lower().startswith("yes"): answer = "Yes" @@ -90,47 +120,38 @@ def _parse_relevancy(text: str) -> dict: _DEFAULT_MODEL_ID = "watsonx/meta-llama/llama-3-3-70b-instruct" _MAX_RETRIES = 3 +_MODEL_ID = os.environ.get("FMSR_MODEL_ID", _DEFAULT_MODEL_ID) def _build_llm(): - from llm import make_backend + from llm import LiteLLMBackend - model_id = os.environ.get("FMSR_MODEL_ID", _DEFAULT_MODEL_ID) - if model_id.startswith("watsonx/"): + if _MODEL_ID.startswith("watsonx/"): missing = [v for v in ("WATSONX_APIKEY", "WATSONX_PROJECT_ID") if not os.environ.get(v)] if missing: raise RuntimeError(f"Missing env vars for WatsonX: {missing}") - elif model_id.startswith("tokenrouter/"): - missing = [v for v in ("TOKENROUTER_API_KEY", "TOKENROUTER_BASE_URL") if not os.environ.get(v)] - if missing: - raise RuntimeError(f"Missing env vars for TokenRouter: {missing}") else: missing = [v for v in ("LITELLM_API_KEY", "LITELLM_BASE_URL") if not os.environ.get(v)] if missing: raise RuntimeError(f"Missing env vars for LiteLLM: {missing}") - return make_backend(model_id) + return LiteLLMBackend(_MODEL_ID) try: _llm = _build_llm() _llm_available = True -except Exception as _e: - logger.warning("LLM unavailable (FMSR will use curated data only): %s", _e) +except Exception as _e: # noqa: BLE001 + logger.warning("LLM unavailable (generate_* tools disabled): %s", _e) _llm = None _llm_available = False -# ── LLM call helpers with retry ─────────────────────────────────────────────── - _asset2fm_cache: dict[str, list[str]] = {} def _call_asset2fm(asset_name: str) -> list[str]: - """Query the LLM for failure modes of an asset. Retries up to _MAX_RETRIES times. - Results are cached to avoid redundant LLM calls for the same asset.""" if asset_name in _asset2fm_cache: return _asset2fm_cache[asset_name] - prompt = _ASSET2FM_PROMPT.format(asset_name=asset_name) last_exc: Exception | None = None for _ in range(_MAX_RETRIES): @@ -138,21 +159,33 @@ def _call_asset2fm(asset_name: str) -> list[str]: result = _parse_numbered_list(_llm.generate(prompt)) _asset2fm_cache[asset_name] = result return result - except Exception as exc: + except Exception as exc: # noqa: BLE001 last_exc = exc raise last_exc -def _call_relevancy(asset_name: str, failure_mode: str, sensor: str) -> dict: - """Query the LLM for FM↔sensor relevancy. Retries up to _MAX_RETRIES times.""" - prompt = _RELEVANCY_PROMPT.format( - asset_name=asset_name, failure_mode=failure_mode, sensor=sensor +def _call_asset2fm_extend(asset_name: str, known: list[str]) -> list[str]: + """Ask the LLM for ADDITIONAL failure modes given the already-known ones. Retries up to + _MAX_RETRIES. Not cached (depends on the known list).""" + prompt = _ASSET2FM_EXTEND_PROMPT.format( + asset_name=asset_name, known="\n".join(f"- {k}" for k in known) ) last_exc: Exception | None = None + for _ in range(_MAX_RETRIES): + try: + return _parse_numbered_list(_llm.generate(prompt)) + except Exception as exc: # noqa: BLE001 + last_exc = exc + raise last_exc + + +def _call_relevancy(asset_name: str, failure_mode: str, sensor: str) -> dict: + prompt = _RELEVANCY_PROMPT.format(asset_name=asset_name, failure_mode=failure_mode, sensor=sensor) + last_exc: Exception | None = None for _ in range(_MAX_RETRIES): try: return _parse_relevancy(_llm.generate(prompt)) - except Exception as exc: + except Exception as exc: # noqa: BLE001 last_exc = exc raise last_exc @@ -166,6 +199,35 @@ class ErrorResult(BaseModel): class FailureModesResult(BaseModel): asset_name: str failure_modes: List[str] + exhaustive: bool = False # the stored list is not claimed to be complete + source: Optional[str] = None # provenance: ISO / curated / LLM: + + +class GenerateFailureModesResult(BaseModel): + asset_name: str + known: List[str] # pre-existing modes used as context (from DB or provided) + generated: List[str] # newly generated modes NOT already in `known` + failure_modes: List[str] # known + generated (the extended list) + source: str # LLM: + message: str + + +class AddFailureModesResult(BaseModel): + asset_class: str + added: List[str] # newly inserted (not previously present) + total: int # total after the write + exhaustive: bool + source: Optional[str] + message: str + + +class CatalogResult(BaseModel): + kind: str # "sensor" | "failure_mode" + scenario_id: Optional[str] # which scope was served (None = global default) + total: int + items: List[str] + source: Optional[str] + message: str class RelevancyEntry(BaseModel): @@ -192,48 +254,194 @@ class FailureModeSensorMappingResult(BaseModel): # ── FastMCP server ──────────────────────────────────────────────────────────── -mcp = FastMCP("fmsr", instructions="Failure mode and sensor reasoning: get failure modes for assets and determine which sensors can detect each failure.") +mcp = FastMCP( + "fmsr", + instructions=( + "Failure mode and sensor reasoning. get_failure_modes READS a class's (possibly partial) " + "failure modes from CouchDB; generate_failure_modes GENERATES them via the LLM when the DB " + "is missing or incomplete (exhaustive=false); add_failure_modes WRITES them back; " + "generate_failure_mode_sensor_mapping GENERATES which sensors can detect each failure. " + "get_sensor_catalog / get_failure_mode_catalog READ the global (class-independent) reference " + "lists from the loaded catalog dataset (scenario-scoped if a scenario_id catalog was loaded); " + "if no catalog collection is loaded they report that none is available." + ), +) @mcp.tool(title="Get Failure Modes") def get_failure_modes(asset_name: str) -> Union[FailureModesResult, ErrorResult]: - """Returns a list of known failure modes for the given asset. - For chillers and AHUs returns a curated list. For other assets queries the LLM.""" - asset_key = re.sub(r"\d+", "", asset_name).strip().lower() - if not asset_key or asset_key == "none": + """READ the known failure modes for an asset from CouchDB (collection 'failure_mode'). The list + may be partial: check `exhaustive` — if false, call generate_failure_modes to supplement. + Does NOT call the LLM. Returns an error if the class is not in the DB.""" + key = _asset_key(asset_name) + if not key or key == "none": return ErrorResult(error="asset_name is required") - - if asset_key in _ASSET_FAILURE_MODES: + if not fm_db: + return ErrorResult(error="CouchDB not connected") + try: + res = fm_db.find({"doctype": "failure_mode", "asset_class": key}, limit=1) + docs = res["docs"] + if not docs: + return ErrorResult( + error=f"no failure_mode record for '{key}' in DB; try generate_failure_modes" + ) + d = docs[0] return FailureModesResult( asset_name=asset_name, - failure_modes=_ASSET_FAILURE_MODES[asset_key], + failure_modes=d.get("failure_modes", []), + exhaustive=d.get("exhaustive", False), + source=d.get("source"), ) + except Exception as exc: # noqa: BLE001 + logger.error("get_failure_modes failed: %s", exc) + return ErrorResult(error=str(exc)) + +def _known_failure_modes(asset_name: str) -> List[str]: + """Current (partial) failure modes stored for the asset's class, [] if none / no DB.""" + if not fm_db: + return [] + try: + r = fm_db.find({"doctype": "failure_mode", "asset_class": _asset_key(asset_name)}, limit=1) + return r["docs"][0].get("failure_modes", []) if r["docs"] else [] + except Exception: # noqa: BLE001 + return [] + + +@mcp.tool(title="Generate Failure Modes") +def generate_failure_modes( + asset_name: str, known: Optional[List[str]] = None +) -> Union[GenerateFailureModesResult, ErrorResult]: + """GENERATE failure modes for an asset via the LLM, EXTENDING the known (partial) list. The DB + list is usually not exhaustive, so this asks the LLM for ADDITIONAL modes beyond what is already + known. If `known` is omitted, the current DB list for the class is used as context; if there is + no known list, it generates from scratch. Generated modes already present in `known` are dropped. + Nothing is persisted — call add_failure_modes to save the new ones.""" + if not asset_name: + return ErrorResult(error="asset_name is required") if not _llm_available: - return ErrorResult(error="LLM unavailable and asset not in local database") + return ErrorResult(error="LLM unavailable") + base = known if known is not None else _known_failure_modes(asset_name) + base = [k.strip() for k in base if k and k.strip()] + try: + raw = _call_asset2fm_extend(asset_name, base) if base else _call_asset2fm(asset_name) + seen = {k.lower() for k in base} + new: List[str] = [] + for g in raw: + if g and g.lower() not in seen: + seen.add(g.lower()) + new.append(g) + return GenerateFailureModesResult( + asset_name=asset_name, known=base, generated=new, failure_modes=base + new, + source=f"LLM:{_MODEL_ID}", + message=(f"generated {len(new)} new failure mode(s) extending {len(base)} known " + f"({len(base) + len(new)} total); call add_failure_modes to persist."), + ) + except Exception as exc: # noqa: BLE001 + logger.error("generate_failure_modes failed: %s", exc) + return ErrorResult(error=str(exc)) + +@mcp.tool(title="Add Failure Modes") +def add_failure_modes( + asset_class: str, + failure_modes: List[str], + exhaustive: bool = False, + source: Optional[str] = None, +) -> Union[AddFailureModesResult, ErrorResult]: + """WRITE: persist/augment the failure modes for a class in CouchDB. Merges with any existing list + (union, de-duplicated). Use to save generated or curated modes so future get_failure_modes calls + return them. Set exhaustive=true only if the list is now believed complete.""" + key = _asset_key(asset_class) + if not key: + return ErrorResult(error="asset_class is required") + if not failure_modes: + return ErrorResult(error="failure_modes list is required") + if not fm_db: + return ErrorResult(error="CouchDB not connected") + doc_id = f"fm:{key}" try: - result = _call_asset2fm(asset_name) - return FailureModesResult(asset_name=asset_name, failure_modes=result) - except Exception as exc: - logger.error("_call_asset2fm failed: %s", exc) + try: + doc = fm_db.get(doc_id) + except Exception: # noqa: BLE001 + doc = None + existing = set(doc.get("failure_modes", [])) if doc else set() + incoming = {fm.strip() for fm in failure_modes if fm and fm.strip()} + added = sorted(incoming - existing) + merged = sorted(existing | incoming) + new_source = source or (doc.get("source") if doc else None) or "user" + if doc: + doc["failure_modes"] = merged + doc["exhaustive"] = exhaustive + doc["source"] = new_source + fm_db.save(doc) + else: + fm_db.save({ + "_id": doc_id, "doctype": "failure_mode", "asset_class": key, + "failure_modes": merged, "exhaustive": exhaustive, "source": new_source, + }) + return AddFailureModesResult( + asset_class=key, added=added, total=len(merged), exhaustive=exhaustive, + source=new_source, + message=f"added {len(added)} new failure mode(s) to '{key}' ({len(merged)} total).", + ) + except Exception as exc: # noqa: BLE001 + logger.error("add_failure_modes failed: %s", exc) return ErrorResult(error=str(exc)) -@mcp.tool(title="Get Failure Mode Sensor Mapping") -def get_failure_mode_sensor_mapping( +def _read_catalog(kind: str, scenario_id: Optional[str]): + """Return the catalog doc for (kind): the scenario-scoped doc if scenario_id is given and exists, + otherwise the global default (scenario_id = null). None if absent / no DB.""" + if not catalog_db: + return None + if scenario_id: + r = catalog_db.find({"doctype": "catalog", "kind": kind, "scenario_id": scenario_id}, limit=1) + if r["docs"]: + return r["docs"][0] + r = catalog_db.find({"doctype": "catalog", "kind": kind, "scenario_id": None}, limit=1) + return r["docs"][0] if r["docs"] else None + + +def _catalog_result(kind: str, scenario_id: Optional[str]) -> Union[CatalogResult, ErrorResult]: + if not catalog_db: + return ErrorResult(error="CouchDB not connected") + doc = _read_catalog(kind, scenario_id) + if not doc: + return ErrorResult(error=f"no catalog information available for kind '{kind}'") + items = doc.get("items", []) + served = doc.get("scenario_id") + return CatalogResult( + kind=kind, scenario_id=served, total=len(items), items=items, source=doc.get("source"), + message=f"{len(items)} {kind} items " + f"({'scenario ' + served if served else 'global default'} catalog).", + ) + + +@mcp.tool(title="Get Sensor Catalog") +def get_sensor_catalog(scenario_id: Optional[str] = None) -> Union[CatalogResult, ErrorResult]: + """READ the catalog of all potential sensors / monitoring parameters (class-independent reference + list). If scenario_id is given and a scenario-scoped catalog exists, it is returned; otherwise the + global default. Useful as the candidate-sensor input to generate_failure_mode_sensor_mapping.""" + return _catalog_result("sensor", scenario_id) + + +@mcp.tool(title="Get Failure Mode Catalog") +def get_failure_mode_catalog(scenario_id: Optional[str] = None) -> Union[CatalogResult, ErrorResult]: + """READ the catalog of all potential failure modes (class-independent reference list). Scenario- + scoped if scenario_id matches a registered catalog, else the global default.""" + return _catalog_result("failure_mode", scenario_id) + + +@mcp.tool(title="Generate Failure Mode Sensor Mapping") +def generate_failure_mode_sensor_mapping( asset_name: str, failure_modes: List[str], sensors: List[str], ) -> Union[FailureModeSensorMappingResult, ErrorResult]: - """For each (failure_mode, sensor) pair determines whether the sensor can detect - the failure. Returns a bidirectional mapping (fm→sensors, sensor→fms) plus - the full per-pair relevancy details. - - Note: one LLM call is made per (failure_mode, sensor) pair sequentially. - Keep both lists small (e.g. ≤5 failure modes, ≤10 sensors) to avoid long - runtimes. For a chiller with 7 failure modes and 20+ sensors the call will - take several minutes.""" + """GENERATE, for each (failure_mode, sensor) pair, whether the sensor can detect the failure + (one LLM call per pair). Returns a bidirectional mapping (fm→sensors, sensor→fms) plus per-pair + relevancy details. Keep both lists small (e.g. ≤5 failure modes, ≤10 sensors) to bound runtime.""" if not asset_name: return ErrorResult(error="asset_name is required") if not failure_modes: @@ -246,42 +454,30 @@ def get_failure_mode_sensor_mapping( full_relevancy: List[RelevancyEntry] = [] fm2sensor: Dict[str, List[str]] = {} sensor2fm: Dict[str, List[str]] = {} - try: pairs = [(s, fm) for s in sensors for fm in failure_modes] with ThreadPoolExecutor() as executor: futures = { - executor.submit(_call_relevancy, asset_name, fm, s): (s, fm) - for s, fm in pairs + executor.submit(_call_relevancy, asset_name, fm, s): (s, fm) for s, fm in pairs } for future in as_completed(futures): s, fm = futures[future] gen = future.result() - entry = RelevancyEntry( - asset_name=asset_name, - failure_mode=fm, - sensor=s, - relevancy_answer=gen["answer"], - relevancy_reason=gen["reason"], + full_relevancy.append(RelevancyEntry( + asset_name=asset_name, failure_mode=fm, sensor=s, + relevancy_answer=gen["answer"], relevancy_reason=gen["reason"], temporal_behavior=gen["temporal_behavior"], - ) - full_relevancy.append(entry) + )) if "yes" in gen["answer"].lower(): fm2sensor.setdefault(fm, []).append(s) sensor2fm.setdefault(s, []).append(fm) - except Exception as exc: + except Exception as exc: # noqa: BLE001 logger.error("_call_relevancy failed: %s", exc) return ErrorResult(error=str(exc)) return FailureModeSensorMappingResult( - metadata=MappingMetadata( - asset_name=asset_name, - failure_modes=failure_modes, - sensors=sensors, - ), - fm2sensor=fm2sensor, - sensor2fm=sensor2fm, - full_relevancy=full_relevancy, + metadata=MappingMetadata(asset_name=asset_name, failure_modes=failure_modes, sensors=sensors), + fm2sensor=fm2sensor, sensor2fm=sensor2fm, full_relevancy=full_relevancy, ) diff --git a/src/servers/iot/main.py b/src/servers/iot/main.py index 9e4732087..6a1c42e57 100644 --- a/src/servers/iot/main.py +++ b/src/servers/iot/main.py @@ -37,7 +37,32 @@ logger.error(f"Failed to connect to CouchDB: {e}") db = None -mcp = FastMCP("iot", instructions="IoT sensor data: browse sites, assets, sensors, and query historical readings from CouchDB.") +# The asset registry is loaded as its own collection (manifest key "asset"), and the loader makes +# database name == collection key — so it lives in the "asset" database, NOT in IOT_DBNAME. Open a +# second handle for it. Telemetry (assets/sensors/history) keeps using `db` (the iot readings DB). +ASSET_DBNAME = os.environ.get("ASSET_DBNAME", "asset") +try: + asset_db = couchdb3.Database( + ASSET_DBNAME, + url=COUCHDB_URL, + user=COUCHDB_USERNAME, + password=COUCHDB_PASSWORD, + ) + logger.info(f"Connected to CouchDB: {ASSET_DBNAME}") +except Exception as e: + logger.error(f"Failed to connect to asset registry DB: {e}") + asset_db = None + +mcp = FastMCP( + "iot", + instructions=( + "IoT sensor data + asset registry. Browse sites, assets, and sensors, read the asset " + "nameplate (registry), see which installed sensors are actually measured (streaming), and " + "query historical readings from CouchDB. NOTE: assets()/sensors() reflect TELEMETRY (what " + "streams = measured); get_asset()/asset_sensors()/registry_assets() reflect the REGISTRY " + "(what is installed, by name). Compare the two to find installed-but-not-streaming sensors." + ), +) # Static site as per original requirement SITES = ["MAIN"] @@ -76,6 +101,35 @@ class HistoryResult(BaseModel): message: str +# ── Asset-registry result models (identity / nameplate + installed sensor names) ── +class AssetDetail(BaseModel): + site_name: str + asset_id: str + description: Optional[str] + assettype: Optional[str] + status: Optional[str] + location: Optional[str] + installdate: Optional[str] + vintage: Optional[str] + n_sensors: int + message: str + + +class AssetSensorsResult(BaseModel): + site_name: str + asset_id: str + total_sensors: int + sensors: List[str] + message: str + + +class RegistryAssetsResult(BaseModel): + site_name: str + total_assets: int + assets: List[Dict[str, Any]] + message: str + + _asset_list_cache: Optional[List[str]] = None @@ -131,6 +185,29 @@ def get_sensor_list(asset_id: str) -> List[str]: return [] +_asset_doc_cache: Dict[str, Dict[str, Any]] = {} + + +def get_asset_doc(asset_id: str) -> Optional[Dict[str, Any]]: + """Helper to fetch one asset-registry document (doctype 'asset', _id 'asset:') + by assetnum. Cached per asset_id. The registry holds identity/nameplate + the INSTALLED + sensor inventory, separate from the telemetry reading docs.""" + if asset_id in _asset_doc_cache: + return _asset_doc_cache[asset_id] + if not asset_db: + return None + try: + res = asset_db.find({"doctype": "asset", "assetnum": asset_id}, limit=1) + docs = res["docs"] + if not docs: + return None + _asset_doc_cache[asset_id] = docs[0] + return docs[0] + except Exception as e: + logger.error(f"Error fetching asset doc {asset_id}: {e}") + return None + + @mcp.tool(title="List Sites") def sites() -> SitesResult: """Retrieves a list of sites. Each site is represented by a name.""" @@ -154,7 +231,10 @@ def assets(site_name: str) -> Union[AssetsResult, ErrorResult]: @mcp.tool(title="List Sensors") def sensors(site_name: str, asset_id: str) -> Union[SensorsResult, ErrorResult]: - """Lists the sensors available for a specified asset at a given site.""" + """Lists the sensors available for a specified asset at a given site. + These are the MEASURED sensors — names discovered from the asset's telemetry documents, + i.e. points that actually stream to the historian. For the full INSTALLED inventory + (including sensors fitted but not streaming), use asset_sensors().""" if site_name not in SITES: return ErrorResult(error=f"unknown site {site_name}") @@ -171,6 +251,99 @@ def sensors(site_name: str, asset_id: str) -> Union[SensorsResult, ErrorResult]: ) +@mcp.tool(title="Get Asset") +def get_asset(site_name: str, asset_id: str) -> Union[AssetDetail, ErrorResult]: + """Return registry/nameplate details for one asset (Maximo MXASSET-aligned: description, + assettype, status, location, installdate, vintage) plus installed/measured sensor counts. + This is asset IDENTITY — distinct from the telemetry-derived assets() list.""" + if site_name not in SITES: + return ErrorResult(error=f"unknown site {site_name}") + doc = get_asset_doc(asset_id) + if not doc: + return ErrorResult(error=f"unknown asset_id {asset_id} in registry") + n = len(doc.get("sensors", [])) + return AssetDetail( + site_name=site_name, + asset_id=doc.get("assetnum", asset_id), + description=doc.get("description"), + assettype=doc.get("assettype"), + status=doc.get("status"), + location=doc.get("location"), + installdate=doc.get("installdate"), + vintage=doc.get("vintage"), + n_sensors=n, + message=( + f"asset {asset_id} is a {doc.get('assettype')} " + f"({doc.get('vintage')} vintage) at {doc.get('location')} with {n} installed sensors." + ), + ) + + +@mcp.tool(title="List Asset Sensors") +def asset_sensors(site_name: str, asset_id: str) -> Union[AssetSensorsResult, ErrorResult]: + """List the INSTALLED sensors for an asset, by name (installed is assumed). This is the registry + inventory — distinct from sensors(), which lists only what actually streams (the MEASURED set). + Compare the two to find installed-but-not-streaming sensors.""" + if site_name not in SITES: + return ErrorResult(error=f"unknown site {site_name}") + doc = get_asset_doc(asset_id) + if not doc: + return ErrorResult(error=f"unknown asset_id {asset_id} in registry") + names = list(doc.get("sensors", [])) + return AssetSensorsResult( + site_name=site_name, + asset_id=asset_id, + total_sensors=len(names), + sensors=names, + message=f"{len(names)} sensors installed on {asset_id}: {', '.join(names)}.", + ) + + +@mcp.tool(title="List Registry Assets") +def registry_assets( + site_name: str, assettype: Optional[str] = None +) -> Union[RegistryAssetsResult, ErrorResult]: + """List assets from the registry with metadata (assettype, vintage, sensor count), optionally + filtered by assettype (e.g. 'PUMP'). Complements assets(), which returns bare ids derived from + telemetry.""" + if site_name not in SITES: + return ErrorResult(error=f"unknown site {site_name}") + if not asset_db: + return ErrorResult(error="CouchDB not connected") + try: + selector: Dict[str, Any] = {"doctype": "asset"} + if assettype: + selector["assettype"] = assettype + res = asset_db.find( + selector, + fields=["assetnum", "assettype", "vintage", "sensors"], + limit=100000, + ) + rows = sorted( + ( + { + "asset_id": d["assetnum"], + "assettype": d.get("assettype"), + "vintage": d.get("vintage"), + "n_sensors": len(d.get("sensors", [])), + } + for d in res["docs"] + ), + key=lambda r: r["asset_id"], + ) + return RegistryAssetsResult( + site_name=site_name, + total_assets=len(rows), + assets=rows, + message=f"found {len(rows)} registry assets" + + (f" of type '{assettype}'" if assettype else "") + + ".", + ) + except Exception as e: + logger.error(f"registry_assets failed: {e}") + return ErrorResult(error=str(e)) + + @mcp.tool(title="Get Sensor History") def history( site_name: str, asset_id: str, start: str, final: Optional[str] = None