diff --git a/.gitignore b/.gitignore index 3ce54dae..e468abbb 100644 --- a/.gitignore +++ b/.gitignore @@ -220,4 +220,5 @@ certs **/tests/ **/test_docs/ -scripts/ \ No newline at end of file +scripts/ +.cursor \ No newline at end of file diff --git a/flexus_client_kit/ckit_automation.py b/flexus_client_kit/ckit_automation.py new file mode 100644 index 00000000..db38a0ca --- /dev/null +++ b/flexus_client_kit/ckit_automation.py @@ -0,0 +1,109 @@ +from __future__ import annotations + +import json +import logging +from pathlib import Path +from typing import Any + +import jsonschema + +from flexus_client_kit.ckit_automation_v1_schema_build import build_automation_v1_schema_document + +logger = logging.getLogger(__name__) + +# Eagerly built from the Discord automation catalog (integration triggers/actions) at import time. +# Tests and offline fixtures can override via set_automation_schema_dict() / set_automation_schema(). +_AUTOMATION_SCHEMA: dict = build_automation_v1_schema_document() + + +def set_automation_schema_dict(schema: dict) -> None: + global _AUTOMATION_SCHEMA + if not isinstance(schema, dict): + raise TypeError("set_automation_schema_dict expects dict") + _AUTOMATION_SCHEMA = schema + + +def set_automation_schema(schema_path: str) -> None: + """Load automation JSON Schema from disk. Raises if file is missing or invalid JSON.""" + global _AUTOMATION_SCHEMA + _AUTOMATION_SCHEMA = json.loads(Path(schema_path).read_text(encoding="utf-8")) + + +def extract_automation_published(persona_setup: dict) -> dict: + """ + Extract published automation config from persona_setup JSONB. + Returns empty dict if no published automations exist. + Published config lives inside persona_setup so changes trigger bot restart + via the existing subscription comparison in ckit_bot_exec.py. + + Prefer resolve_automation_rules() for new code; this function is kept for + legacy call-sites that have not yet been migrated. + """ + try: + result = persona_setup.get("automation_published", {}) + return result if isinstance(result, dict) else {} + except (AttributeError, TypeError) as e: + logger.error("extract_automation_published failed", exc_info=e) + return {} + + +def resolve_automation_rules(persona_setup: dict) -> dict: + """ + Resolve the published automation rules document from persona_setup JSONB. + + Migration-safe: prefers the new setup field automation_rules (stored as a + JSON string by automation_publish) and falls back to the legacy + automation_published dict for personas that were published before the storage + move. Returns an empty dict when neither field is present or valid. + """ + try: + if not isinstance(persona_setup, dict): + return {} + raw = persona_setup.get("automation_rules") + if isinstance(raw, str) and raw.strip(): + try: + parsed = json.loads(raw) + if isinstance(parsed, dict): + return parsed + except (json.JSONDecodeError, ValueError): + logger.warning("resolve_automation_rules: automation_rules field is not valid JSON, falling back to automation_published") + legacy = persona_setup.get("automation_published", {}) + return legacy if isinstance(legacy, dict) else {} + except (AttributeError, TypeError) as e: + logger.error("resolve_automation_rules failed", exc_info=e) + return {} + + +def extract_automation_draft(persona_automation_draft: Any) -> dict: + """ + Extract draft automation config from the separate persona_automation_draft column. + Draft lives in its own Prisma column to avoid triggering bot restarts on save. + Returns empty dict if column is NULL or invalid. + """ + try: + if persona_automation_draft is None: + return {} + if isinstance(persona_automation_draft, dict): + return persona_automation_draft + return {} + except (AttributeError, TypeError) as e: + logger.error("extract_automation_draft failed", exc_info=e) + return {} + + +def validate_automation_json(data: dict) -> list[str]: + """ + Validate an automation config dict against the v1 schema. + Returns list of error strings (empty = valid). + """ + errors = [] + try: + validator = jsonschema.Draft202012Validator(_AUTOMATION_SCHEMA) + for error in validator.iter_errors(data): + path = ".".join(str(p) for p in error.absolute_path) if error.absolute_path else "$" + errors.append("%s: %s" % (path, error.message)) + except jsonschema.SchemaError as e: + errors.append("schema error: %s" % e.message) + return errors + + diff --git a/flexus_client_kit/ckit_automation_actions.py b/flexus_client_kit/ckit_automation_actions.py new file mode 100644 index 00000000..0540452d --- /dev/null +++ b/flexus_client_kit/ckit_automation_actions.py @@ -0,0 +1,397 @@ +""" +Execute resolved automation actions for Discord community bots. + +The automation engine (ckit_automation_engine) produces flat action dicts with pre-resolved +_resolved_body / _resolved_channel_id. This module performs side effects only and returns per-action +results for logging. Execution context uses event_member: guild_id, user_id, and optional +discord_username from the inbound normalized Discord event (no persisted CRM rows). +""" + +from __future__ import annotations + +import logging +from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple + +import aiohttp + +from flexus_client_kit import ckit_person_domain + +logger = logging.getLogger(__name__) + +# Dispatcher: action type string -> async handler returning a normalized result dict. +ActionHandler = Callable[[dict, dict], Awaitable[dict]] + + +def _result_dict( + *, + ok: bool, + error: Optional[str] = None, + note: Optional[str] = None, + cancelled_count: Optional[int] = None, +) -> dict: + """ + Normalized per-action outcome merged into execute_actions output rows. + + ok/error are the contract; note carries dedupe hints where applicable. + """ + out: dict[str, Any] = {"ok": ok, "error": error} + if note is not None: + out["note"] = note + if cancelled_count is not None: + out["cancelled_count"] = cancelled_count + return out + + +def _guild_user_from_event_member(event_member: dict) -> Tuple[Optional[int], Optional[int]]: + """Read guild/user ids from the in-memory event_member dict.""" + try: + gid = event_member.get("guild_id") + uid = event_member.get("user_id") + if gid is None or uid is None: + return (None, None) + return (int(gid), int(uid)) + except (TypeError, ValueError): + return (None, None) + + +async def _do_send_dm(action: dict, ctx: dict) -> dict: + """Deliver a DM via connector.execute_action(send_dm).""" + try: + persona_id = str(ctx.get("persona_id") or "") + body_raw = action.get("_resolved_body") + body = body_raw if isinstance(body_raw, str) else "" + if not (body or "").strip(): + return _result_dict(ok=False, error="empty_body") + connector = ctx.get("connector") + if connector is not None: + em = ctx.get("event_member") + if not isinstance(em, dict): + return _result_dict(ok=False, error="bad_event_member") + uid_s = str(em.get("user_id", "") or "") + if not uid_s: + return _result_dict(ok=False, error="missing_user_id") + dm_params: dict = {"user_id": uid_s, "text": body} + sid = str(ctx.get("server_id") or "") + if sid: + dm_params["server_id"] = sid + result = await connector.execute_action("send_dm", dm_params) + return _result_dict(ok=result.ok, error=result.error) + logger.warning( + "send_dm: missing ChatConnector in ctx; persona_id=%s", + persona_id, + ) + return _result_dict(ok=False, error="no_connector") + except aiohttp.ClientError as e: + logger.warning("send_dm ClientError: %s %s", type(e).__name__, e) + return _result_dict(ok=False, error=type(e).__name__) + except (KeyError, TypeError) as e: + logger.error("send_dm context error", exc_info=e) + return _result_dict(ok=False, error=type(e).__name__) + + +async def _do_post_to_channel(action: dict, ctx: dict) -> dict: + """Post to a guild text channel via connector.execute_action.""" + try: + persona_id = str(ctx.get("persona_id") or "") + cid = action.get("_resolved_channel_id") + if cid is None: + return _result_dict(ok=False, error="no_channel_id") + try: + channel_id = int(cid) + except (TypeError, ValueError): + return _result_dict(ok=False, error="bad_channel_id") + body_raw = action.get("_resolved_body") + body = body_raw if isinstance(body_raw, str) else "" + if not (body or "").strip(): + return _result_dict(ok=False, error="empty_body") + connector = ctx.get("connector") + if connector is not None: + sid = str(ctx.get("server_id") or "") + payload = {"channel_id": str(channel_id), "text": body} + if sid: + payload["server_id"] = sid + result = await connector.execute_action("post_to_channel", payload) + return _result_dict(ok=result.ok, error=result.error) + logger.warning( + "post_to_channel: missing ChatConnector in ctx; persona_id=%s", + persona_id, + ) + return _result_dict(ok=False, error="no_connector") + except aiohttp.ClientError as e: + logger.warning("post_to_channel ClientError: %s %s", type(e).__name__, e) + return _result_dict(ok=False, error=type(e).__name__) + except (KeyError, TypeError, AttributeError) as e: + logger.error("post_to_channel context error", exc_info=e) + return _result_dict(ok=False, error=type(e).__name__) + + +async def _do_add_role(action: dict, ctx: dict) -> dict: + """Resolve role id and call connector add_role.""" + try: + rid = action.get("_resolved_role_id") + if rid is None: + return _result_dict(ok=False, error="no_role_id") + em = ctx.get("event_member") + if not isinstance(em, dict): + return _result_dict(ok=False, error="bad_event_member") + uid_s = str(em.get("user_id", "") or "") + if not uid_s: + return _result_dict(ok=False, error="missing_user_id") + connector = ctx.get("connector") + if connector is None: + return _result_dict(ok=False, error="no_connector") + sid = str(ctx.get("server_id") or "") + if not sid: + return _result_dict(ok=False, error="missing_server_id") + result = await connector.execute_action( + "add_role", + {"user_id": uid_s, "role_id": str(int(rid)), "server_id": sid}, + ) + return _result_dict(ok=result.ok, error=result.error) + except aiohttp.ClientError as e: + logger.warning("add_role ClientError: %s %s", type(e).__name__, e) + return _result_dict(ok=False, error=type(e).__name__) + except (KeyError, TypeError, ValueError) as e: + logger.error("add_role context error", exc_info=e) + return _result_dict(ok=False, error=type(e).__name__) + + +async def _do_remove_role(action: dict, ctx: dict) -> dict: + """Resolve role id and call connector remove_role.""" + try: + rid = action.get("_resolved_role_id") + if rid is None: + return _result_dict(ok=False, error="no_role_id") + em = ctx.get("event_member") + if not isinstance(em, dict): + return _result_dict(ok=False, error="bad_event_member") + uid_s = str(em.get("user_id", "") or "") + if not uid_s: + return _result_dict(ok=False, error="missing_user_id") + connector = ctx.get("connector") + if connector is None: + return _result_dict(ok=False, error="no_connector") + sid = str(ctx.get("server_id") or "") + if not sid: + return _result_dict(ok=False, error="missing_server_id") + result = await connector.execute_action( + "remove_role", + {"user_id": uid_s, "role_id": str(int(rid)), "server_id": sid}, + ) + return _result_dict(ok=result.ok, error=result.error) + except aiohttp.ClientError as e: + logger.warning("remove_role ClientError: %s %s", type(e).__name__, e) + return _result_dict(ok=False, error=type(e).__name__) + except (KeyError, TypeError, ValueError) as e: + logger.error("remove_role context error", exc_info=e) + return _result_dict(ok=False, error=type(e).__name__) + + +async def _do_kick(action: dict, ctx: dict) -> dict: + """Kick the member in context via connector.execute_action(kick).""" + try: + em = ctx.get("event_member") + if not isinstance(em, dict): + return _result_dict(ok=False, error="bad_event_member") + uid_s = str(em.get("user_id", "") or "") + if not uid_s: + return _result_dict(ok=False, error="missing_user_id") + connector = ctx.get("connector") + if connector is None: + return _result_dict(ok=False, error="no_connector") + sid = str(ctx.get("server_id") or "") + if not sid: + return _result_dict(ok=False, error="missing_server_id") + reason_raw = action.get("_resolved_kick_reason") + reason = reason_raw if isinstance(reason_raw, str) else "" + result = await connector.execute_action( + "kick", + {"user_id": uid_s, "reason": reason, "server_id": sid}, + ) + return _result_dict(ok=result.ok, error=result.error) + except aiohttp.ClientError as e: + logger.warning("kick ClientError: %s %s", type(e).__name__, e) + return _result_dict(ok=False, error=type(e).__name__) + except (KeyError, TypeError, ValueError) as e: + logger.error("kick context error", exc_info=e) + return _result_dict(ok=False, error=type(e).__name__) + + +async def _do_call_gatekeeper_tool(action: dict, ctx: dict) -> dict: + """ + Apply a gatekeeper decision using workspace person-domain APIs only (GraphQL). + + Maps accept / reject / request_info to application status updates. Does not write Mongo. + """ + try: + tool_name = action.get("tool_name") + if tool_name not in ("accept", "reject", "request_info"): + return _result_dict(ok=False, error="bad_tool_name") + + em = ctx.get("event_member") + if not isinstance(em, dict): + return _result_dict(ok=False, error="bad_event_member") + + guild_id, user_id = _guild_user_from_event_member(em) + if guild_id is None or user_id is None: + return _result_dict(ok=False, error="missing_guild_or_user") + + if tool_name == "accept": + app_status = "DECIDED" + app_decision = "APPROVED" + elif tool_name == "reject": + app_status = "DECIDED" + app_decision = "REJECTED" + else: + app_status = "REVIEWING" + app_decision = "" + + reason_template = action.get("reason_template") + details: Optional[dict] = {"reason_template": reason_template} if isinstance(reason_template, str) and reason_template else None + + fclient = ctx.get("fclient") + ws_id = ctx.get("ws_id") or "" + if fclient is None or not ws_id: + logger.warning( + "call_gatekeeper_tool: fclient or ws_id missing from ctx, skipping person domain sync " + "persona_id=%s tool=%s guild=%s user=%s", + ctx.get("persona_id"), + tool_name, + guild_id, + user_id, + ) + return _result_dict(ok=False, error="missing_workspace_context") + + username = str(em.get("discord_username") or user_id) + discord_user_id = str(user_id) + person_id = await ckit_person_domain.ensure_person_for_discord_user( + fclient, + ws_id, + discord_user_id, + username, + ) + if not person_id: + return _result_dict(ok=False, error="person_unresolved") + + existing_app = await ckit_person_domain.application_find_latest( + fclient, + ws_id, + person_id, + ) + if existing_app: + app_id: Optional[str] = existing_app["application_id"] + else: + app_id = await ckit_person_domain.application_create_pending( + fclient, + ws_id, + person_id, + source="discord_bot", + platform="discord", + payload={"guild_id": str(guild_id), "discord_user_id": discord_user_id}, + ) + if not app_id: + return _result_dict(ok=False, error="no_application_id") + + await ckit_person_domain.application_apply_decision( + fclient, + app_id, + app_status, + app_decision, + details, + ) + return _result_dict(ok=True, error=None) + except aiohttp.ClientError as e: + logger.warning("call_gatekeeper_tool ClientError: %s %s", type(e).__name__, e) + return _result_dict(ok=False, error=type(e).__name__) + except (TypeError, AttributeError, KeyError) as e: + logger.error("call_gatekeeper_tool unexpected error", exc_info=e) + return _result_dict(ok=False, error=type(e).__name__) + + +# Maps automation action.type to handler coroutine. +_ACTION_DISPATCH: Dict[str, ActionHandler] = { + "send_dm": _do_send_dm, + "post_to_channel": _do_post_to_channel, + "add_role": _do_add_role, + "remove_role": _do_remove_role, + "kick": _do_kick, + "call_gatekeeper_tool": _do_call_gatekeeper_tool, +} + + +async def execute_actions(actions: List[dict], ctx: dict) -> List[dict]: + """ + Run resolved actions in order; collect logging rows. One failing action does not stop the rest. + """ + try: + results: List[dict] = [] + if not isinstance(actions, list): + logger.error("execute_actions: actions must be a list") + return [] + for action in actions: + if not isinstance(action, dict): + logger.warning("execute_actions: skip non-dict action") + continue + action_type = action.get("type") + rule_id = str(action.get("rule_id") or "") + if not isinstance(action_type, str) or not action_type: + results.append( + { + "action_type": "", + "rule_id": rule_id, + "ok": False, + "error": "missing_type", + }, + ) + continue + handler = _ACTION_DISPATCH.get(action_type) + if handler is None: + logger.warning("execute_actions: unknown action type %s", action_type) + results.append( + { + "action_type": action_type, + "rule_id": rule_id, + "ok": False, + "error": "unknown_action_type", + }, + ) + continue + try: + partial = await handler(action, ctx) + except aiohttp.ClientError as e: + logger.warning("execute_actions handler ClientError: %s", e) + results.append( + { + "action_type": action_type, + "rule_id": rule_id, + "ok": False, + "error": type(e).__name__, + }, + ) + continue + except (TypeError, KeyError, ValueError) as e: + logger.error("execute_actions handler data error", exc_info=e) + results.append( + { + "action_type": action_type, + "rule_id": rule_id, + "ok": False, + "error": type(e).__name__, + }, + ) + continue + row = { + "action_type": action_type, + "rule_id": rule_id, + "ok": bool(partial.get("ok")), + "error": partial.get("error"), + } + if "note" in partial: + row["note"] = partial["note"] + if "cancelled_count" in partial: + row["cancelled_count"] = partial["cancelled_count"] + results.append(row) + return results + except (TypeError, KeyError) as e: + logger.error("execute_actions fatal input error", exc_info=e) + return [] diff --git a/flexus_client_kit/ckit_automation_catalog.py b/flexus_client_kit/ckit_automation_catalog.py new file mode 100644 index 00000000..1f13b06b --- /dev/null +++ b/flexus_client_kit/ckit_automation_catalog.py @@ -0,0 +1,53 @@ +# Automation catalog descriptors: persisted-rule semantics and trigger/action metadata for schema assembly. +# Runtime connector contracts (NormalizedEvent, ChatConnector) live in ckit_connector. + +from __future__ import annotations + +import dataclasses +from typing import Any + + +@dataclasses.dataclass(frozen=True) +class SemanticContract: + """ + Canonical runtime semantics for one trigger or action: what authors persist, what the + executor fills, and hard guarantees from engine + executor code (single source with descriptors). + """ + + operator_summary: str + rule_author_configures: tuple[str, ...] = () + platform_fills_automatically: tuple[str, ...] = () + runtime_guarantees: tuple[str, ...] = () + operator_must_not_set: tuple[str, ...] = () + + +def semantic_contract_to_dict(contract: SemanticContract | None) -> dict[str, Any] | None: + if contract is None: + return None + return { + "operator_summary": contract.operator_summary, + "rule_author_configures": list(contract.rule_author_configures), + "platform_fills_automatically": list(contract.platform_fills_automatically), + "runtime_guarantees": list(contract.runtime_guarantees), + "operator_must_not_set": list(contract.operator_must_not_set), + } + + +@dataclasses.dataclass +class TriggerDescriptor: + type: str + label: str + description: str + payload_schema: dict + semantic_contract: SemanticContract | None = None + automation_schema_def: dict | None = None + + +@dataclasses.dataclass +class ActionDescriptor: + type: str + label: str + description: str + parameter_schema: dict + semantic_contract: SemanticContract | None = None + automation_schema_def: dict | None = None diff --git a/flexus_client_kit/ckit_automation_engine.py b/flexus_client_kit/ckit_automation_engine.py new file mode 100644 index 00000000..020ed1a8 --- /dev/null +++ b/flexus_client_kit/ckit_automation_engine.py @@ -0,0 +1,387 @@ +""" +Pure automation rule engine: dict in, dict out. No Discord, Mongo, or async. + +Community bots pass an in-memory member/event snapshot (fields from the current Discord event) +for condition checks and template resolution before the executor applies side effects. +""" + +from __future__ import annotations + +import copy +import logging +import re +import time + +from flexus_client_kit import ckit_automation + +logger = logging.getLogger(__name__) + +# Regex for {placeholder} tokens in templates; word chars only (schema field names). +_PLACEHOLDER_RE = re.compile(r"\{(\w+)\}") + +# Action types that may carry a message body via template or template_field (executor reads _resolved_body). +_BODY_ACTION_TYPES = frozenset({"send_dm", "post_to_channel"}) + +_ROLE_ACTION_TYPES = frozenset({"add_role", "remove_role"}) + + +def _safe_float_pair(field_value, operand) -> tuple[float, float] | None: + """ + Parse both sides to float for numeric comparisons. Returns None on any failure + so callers can treat the condition as failed (fail-safe, no crash on bad data). + """ + try: + return (float(field_value), float(operand)) + except (TypeError, ValueError): + return None + + +def _single_condition_ok(condition: dict, member: dict) -> bool: + """ + Evaluate one condition dict against member. Caller ensures condition is a dict. + Unknown op logs a warning and yields False (blocks the rule). + """ + op = condition.get("op") + field_name = condition.get("field") + if not isinstance(field_name, str): + return False + field_value = member.get(field_name) + + if op == "eq": + return field_value == condition["value"] + if op == "neq": + return field_value != condition["value"] + if op == "gt": + if field_value is None: + return False + pair = _safe_float_pair(field_value, condition["value"]) + return pair is not None and pair[0] > pair[1] + if op == "lt": + if field_value is None: + return False + pair = _safe_float_pair(field_value, condition["value"]) + return pair is not None and pair[0] < pair[1] + if op == "is_set": + return field_value is not None + if op == "is_not_set": + return field_value is None + if op == "elapsed_gt": + if field_value is None: + return False + pair = _safe_float_pair(field_value, condition["value"]) + return pair is not None and (time.time() - pair[0]) > pair[1] + if op == "elapsed_lt": + if field_value is None: + return False + pair = _safe_float_pair(field_value, condition["value"]) + return pair is not None and (time.time() - pair[0]) < pair[1] + + logger.warning("evaluate_conditions: unknown op %r (fail-safe False)", op) + return False + + +def load_rules(persona_setup: dict) -> list[dict]: + """ + Load enabled automation rules from persona_setup. Delegates to + resolve_automation_rules which prefers the new automation_rules setup field + and falls back to the legacy automation_published key for backward + compatibility. Returns [] if missing or invalid. + + Rules with enabled=False are excluded; rules without an enabled field + (legacy documents) are treated as enabled for backward compatibility. + """ + try: + published = ckit_automation.resolve_automation_rules(persona_setup) + rules_raw = published.get("rules", []) + if not isinstance(rules_raw, list): + return [] + return [r for r in rules_raw if isinstance(r, dict) and r.get("enabled", True) is not False] + except (KeyError, TypeError, ValueError) as e: + logger.error("load_rules failed", exc_info=e) + return [] + + +def match_trigger(event_type: str, event_data: dict, rule: dict, setup: dict) -> bool: + """ + Return True if this rule's trigger matches the synthetic event_type and payload. + Unknown event_type -> False. Malformed rule/trigger -> False. + """ + try: + trigger = rule.get("trigger") + if not isinstance(trigger, dict): + return False + ttype = trigger.get("type") + + if event_type == "member_joined": + return ttype == "member_joined" + + if event_type == "member_removed": + return ttype == "member_removed" + + if event_type == "message_in_channel": + if ttype != "message_in_channel": + return False + ref = trigger.get("channel_id_field") + if not isinstance(ref, str): + return False + resolved = resolve_channel_id(ref, setup) + return event_data.get("channel_id") == resolved + + return False + except (KeyError, TypeError, ValueError) as e: + logger.error("match_trigger failed", exc_info=e) + return False + + +def evaluate_conditions(conditions: list[dict], member: dict) -> bool: + """ + AND all conditions; empty list is True. Reads fields from the member/event snapshot dict. + """ + try: + if not conditions: + return True + if not isinstance(conditions, list): + return False + if not isinstance(member, dict): + return False + for cond in conditions: + if not isinstance(cond, dict): + return False + if not _single_condition_ok(cond, member): + return False + return True + except (KeyError, TypeError, ValueError) as e: + logger.error("evaluate_conditions failed", exc_info=e) + return False + + +def resolve_template(template: str, member: dict, setup: dict) -> str: + """ + Replace {name} placeholders: special keys now, mention; else member then setup. + Unknown or unset placeholders stay literal in the output string. + """ + try: + if not isinstance(template, str): + logger.warning( + "resolve_template expected str, got %s", + type(template).__name__, + ) + return "" + + if not isinstance(member, dict): + member = {} + if not isinstance(setup, dict): + setup = {} + + def repl(match) -> str: + name = match.group(1) + if name == "now": + return str(int(time.time())) + if name == "mention": + uid = member.get("user_id") + if uid is None: + return match.group(0) + fmt_fn = setup.get("_format_mention") + if callable(fmt_fn): + return fmt_fn(str(uid)) + return "<@%s>" % uid + if name in member: + v = member[name] + if v is not None: + return str(v) + if name in setup: + v = setup[name] + if v is not None: + return str(v) + return match.group(0) + + return _PLACEHOLDER_RE.sub(repl, template) + except (KeyError, TypeError, ValueError) as e: + logger.error("resolve_template failed", exc_info=e) + return template if isinstance(template, str) else "" + + +def resolve_channel_id(field_ref: str, setup: dict) -> int | None: + """ + Resolve #snowflake literal or setup key to int channel id. None if invalid or missing. + """ + try: + if not isinstance(field_ref, str) or not field_ref: + return None + if not isinstance(setup, dict): + setup = {} + if field_ref.isdigit(): + return int(field_ref) + if field_ref.startswith("#"): + return int(field_ref[1:]) + raw = setup.get(field_ref) + if raw is None: + return None + return int(raw) + except (KeyError, TypeError, ValueError) as e: + logger.error("resolve_channel_id failed for %r", field_ref, exc_info=e) + return None + + +def resolve_role_id(field_ref: str, setup: dict) -> int | None: + """Same resolution rules as resolve_channel_id (setup key, digits, or #snowflake).""" + return resolve_channel_id(field_ref, setup) + + +def _resolve_body_fields(action: dict, member: dict, setup: dict) -> None: + """ + Mutates action copy: sets _resolved_body for send_dm / post_to_channel from + template_field (setup indirection) or inline template. + """ + atype = action.get("type") + if atype not in _BODY_ACTION_TYPES: + return + if "template_field" in action: + key = action["template_field"] + raw = setup.get(key, "") if isinstance(setup, dict) else "" + if raw is None: + raw = "" + if not isinstance(raw, str): + raw = str(raw) + action["_resolved_body"] = resolve_template(raw, member, setup) + elif "template" in action: + tpl = action["template"] + if not isinstance(tpl, str): + tpl = str(tpl) + action["_resolved_body"] = resolve_template(tpl, member, setup) + + +def _resolve_channel_field(action: dict, setup: dict) -> None: + """Mutates action copy: _resolved_channel_id from channel_id_field if present.""" + if "channel_id_field" not in action: + return + ref = action["channel_id_field"] + if isinstance(ref, str): + action["_resolved_channel_id"] = resolve_channel_id(ref, setup) + else: + action["_resolved_channel_id"] = None + + +def _resolve_role_field(action: dict, setup: dict) -> None: + if action.get("type") not in _ROLE_ACTION_TYPES: + return + ref = action.get("role_id_field") + if isinstance(ref, str): + action["_resolved_role_id"] = resolve_role_id(ref, setup) + else: + action["_resolved_role_id"] = None + + +def _resolve_kick_reason(action: dict, member: dict, setup: dict) -> None: + if action.get("type") != "kick": + return + raw = action.get("reason") + if not isinstance(raw, str) or not (raw or "").strip(): + action["_resolved_kick_reason"] = "" + return + action["_resolved_kick_reason"] = resolve_template(raw, member, setup) + + +def resolve_actions(actions: list[dict], member: dict, setup: dict) -> list[dict]: + """ + Deep-copy each action and fill executor-facing fields: _resolved_body, + _resolved_channel_id, _resolved_role_id, _resolved_kick_reason. + """ + try: + if not isinstance(actions, list): + return [] + if not isinstance(member, dict): + member = {} + if not isinstance(setup, dict): + setup = {} + out = [] + for act in actions: + if not isinstance(act, dict): + continue + cloned = copy.deepcopy(act) + _resolve_body_fields(cloned, member, setup) + _resolve_channel_field(cloned, setup) + _resolve_role_field(cloned, setup) + _resolve_kick_reason(cloned, member, setup) + out.append(cloned) + return out + except (KeyError, TypeError, ValueError) as e: + logger.error("resolve_actions failed", exc_info=e) + return [] + + +def _execute_flat_rule(rule: dict, member: dict, setup: dict, result: list[dict]) -> None: + """Old-style rule: single conditions+actions block.""" + conds = rule.get("conditions", []) + if not evaluate_conditions(conds, member): + return + acts = rule.get("actions", []) + if not isinstance(acts, list): + return + resolved = resolve_actions(acts, member, setup) + rid = rule.get("rule_id", "") + for a in resolved: + a["rule_id"] = rid + result.extend(resolved) + + +def _execute_branched_rule(rule: dict, member: dict, setup: dict, result: list[dict]) -> None: + """Branched rule: first branch whose conditions all pass wins, rest are skipped.""" + branches = rule.get("branches", []) + if not isinstance(branches, list): + return + rid = rule.get("rule_id", "") + for branch in branches: + if not isinstance(branch, dict): + continue + conds = branch.get("conditions", []) + if not evaluate_conditions(conds, member): + continue + acts = branch.get("actions", []) + if not isinstance(acts, list): + continue + resolved = resolve_actions(acts, member, setup) + for a in resolved: + a["rule_id"] = rid + result.extend(resolved) + return + + +def process_event( + event_type: str, + event_data: dict, + rules: list[dict], + member: dict, + setup: dict, +) -> list[dict]: + """ + Run all rules: for each, if trigger matches, evaluate conditions/branches and + append resolved actions. Supports both flat rules (conditions+actions) and + branched rules (branches array, first matching branch wins). + Returns a flat list of action dicts ready for the executor. + """ + try: + if not isinstance(event_data, dict): + event_data = {} + if not isinstance(rules, list): + return [] + if not isinstance(member, dict): + member = {} + if not isinstance(setup, dict): + setup = {} + result = [] + for rule in rules: + if not isinstance(rule, dict): + continue + if not match_trigger(event_type, event_data, rule, setup): + continue + if "branches" in rule: + _execute_branched_rule(rule, member, setup, result) + else: + _execute_flat_rule(rule, member, setup, result) + return result + except (KeyError, TypeError, ValueError) as e: + logger.error("process_event failed", exc_info=e) + return [] + + diff --git a/flexus_client_kit/ckit_automation_schema_defs.py b/flexus_client_kit/ckit_automation_schema_defs.py new file mode 100644 index 00000000..5a92379c --- /dev/null +++ b/flexus_client_kit/ckit_automation_schema_defs.py @@ -0,0 +1,49 @@ +""" +Product-level and shared JSON Schema fragments for automation_schema_version 1. + +Discord-specific trigger/action shapes live in ckit_discord_automation_schema_defs and are +merged into the full document by ckit_automation_v1_schema_build. +""" + +from __future__ import annotations + +# Product-level trigger (not tied to any integration platform). +SCHEMA_TRIGGER_MANUAL_CAMPAIGN_PRODUCT = { + "type": "object", + "required": ["type"], + "additionalProperties": False, + "properties": { + "type": {"const": "manual_campaign"}, + "segment_ref": { + "type": "string", + "description": "Optional reference to a saved segment definition or filter id. Omit for 'all members'.", + }, + }, + "description": ( + "Not auto-triggered. Operator initiates from UI ('send now' or 'schedule at'). " + "Segment filtering happens before per-member condition evaluation." + ), +} + +# Product-level action (not tied to any integration platform). +SCHEMA_ACTION_CALL_GATEKEEPER_PRODUCT = { + "type": "object", + "required": ["type", "tool_name"], + "additionalProperties": False, + "properties": { + "type": {"const": "call_gatekeeper_tool"}, + "tool_name": { + "type": "string", + "enum": ["accept", "reject", "request_info"], + "description": "Gatekeeper decision tool to invoke.", + }, + "reason_template": { + "type": "string", + "description": "Optional message/reason with {field} placeholders.", + }, + }, + "description": ( + "Invoke a gatekeeper decision. Typically used from expert-driven flows; " + "workspace person-domain application state is updated, not local bot CRM." + ), +} diff --git a/flexus_client_kit/ckit_automation_v1_schema_build.py b/flexus_client_kit/ckit_automation_v1_schema_build.py new file mode 100644 index 00000000..bc81f8af --- /dev/null +++ b/flexus_client_kit/ckit_automation_v1_schema_build.py @@ -0,0 +1,303 @@ +""" +Build automation_schema_version 1 JSON Schema from a neutral integration catalog +(triggers + actions) plus product-only trigger/action defs. Single compile-time +assembly surface: backend validation and assist contracts derive from the same document. + +Discord is the default/reference integration; pass triggers/actions explicitly to build +a schema for a different integration. +""" + +from __future__ import annotations + +import copy +from typing import Any + +from flexus_client_kit.ckit_connector_discord_catalog import DISCORD_ACTIONS, DISCORD_TRIGGERS +from flexus_client_kit.ckit_automation_schema_defs import ( + SCHEMA_ACTION_CALL_GATEKEEPER_PRODUCT, + SCHEMA_TRIGGER_MANUAL_CAMPAIGN_PRODUCT, +) + + +def _def_name_trigger(type_id: str) -> str: + return "trigger_%s" % type_id + + +def _def_name_action(type_id: str) -> str: + return "action_%s" % type_id + + +def _persisted_keys_from_schema_object(fragment: dict) -> frozenset[str]: + props = fragment.get("properties") + if not isinstance(props, dict): + return frozenset() + return frozenset(props.keys()) + + +def automation_persisted_trigger_property_keys( + triggers: list | None = None, +) -> dict[str, frozenset[str]]: + """Return {type -> frozenset of persisted property names} for each trigger. + + Defaults to the Discord integration catalog. Pass ``triggers`` explicitly + to use a different integration's catalog. + """ + if triggers is None: + triggers = DISCORD_TRIGGERS + out: dict[str, frozenset[str]] = {} + for t in triggers: + d = t.automation_schema_def + if d is None: + raise RuntimeError("trigger %r missing automation_schema_def" % t.type) + out[t.type] = _persisted_keys_from_schema_object(d) + out["manual_campaign"] = _persisted_keys_from_schema_object(SCHEMA_TRIGGER_MANUAL_CAMPAIGN_PRODUCT) + return out + + +def automation_persisted_action_property_keys( + actions: list | None = None, +) -> dict[str, frozenset[str]]: + """Return {type -> frozenset of persisted property names} for each action. + + Defaults to the Discord integration catalog. Pass ``actions`` explicitly + to use a different integration's catalog. + """ + if actions is None: + actions = DISCORD_ACTIONS + out: dict[str, frozenset[str]] = {} + for a in actions: + d = a.automation_schema_def + if d is None: + raise RuntimeError("action %r missing automation_schema_def" % a.type) + out[a.type] = _persisted_keys_from_schema_object(d) + out["call_gatekeeper_tool"] = _persisted_keys_from_schema_object(SCHEMA_ACTION_CALL_GATEKEEPER_PRODUCT) + return out + + +def schema_trigger_types_ordered(triggers: list | None = None) -> tuple[str, ...]: + keys = sorted(automation_persisted_trigger_property_keys(triggers).keys()) + return tuple(keys) + + +def schema_action_types_ordered(actions: list | None = None) -> tuple[str, ...]: + keys = sorted(automation_persisted_action_property_keys(actions).keys()) + return tuple(keys) + + +_STATIC_DEFS: dict[str, Any] = { + "rule": { + "type": "object", + "required": ["rule_id", "enabled", "trigger"], + "additionalProperties": False, + "properties": { + "rule_id": { + "type": "string", + "minLength": 1, + "description": ( + "Stable, human-readable id (e.g. 'intro-reminder-48h'). Unique within a config blob. " + "Used in job dedup keys and logs." + ), + }, + "enabled": { + "type": "boolean", + "description": "Master switch. Disabled rules are stored but never evaluated.", + }, + "description": { + "type": "string", + "description": "Optional human-readable note shown in UI.", + }, + "trigger": {"$ref": "#/$defs/trigger"}, + "conditions": { + "type": "array", + "items": {"$ref": "#/$defs/condition"}, + "default": [], + "description": ( + "All conditions are AND-ed. Empty array = unconditional (trigger alone is sufficient). " + "Used in flat (non-branched) rules." + ), + }, + "actions": { + "type": "array", + "items": {"$ref": "#/$defs/action"}, + "minItems": 1, + "description": ( + "Executed sequentially when trigger fires and all conditions pass. Used in flat (non-branched) rules." + ), + }, + "branches": { + "type": "array", + "items": {"$ref": "#/$defs/branch"}, + "minItems": 1, + "description": ( + "If/elif/else logic: first branch whose conditions all pass is executed, rest are skipped. " + "Mutually exclusive with top-level conditions+actions." + ), + }, + }, + "oneOf": [ + {"required": ["actions"]}, + {"required": ["branches"]}, + ], + }, + "branch": { + "type": "object", + "required": ["actions"], + "additionalProperties": False, + "properties": { + "label": { + "type": "string", + "description": "Optional UI label for this branch (e.g. 'English speakers', 'Default').", + }, + "conditions": { + "type": "array", + "items": {"$ref": "#/$defs/condition"}, + "default": [], + "description": ( + "AND-ed conditions for this branch. Empty array = 'otherwise' (always matches, use as last branch)." + ), + }, + "actions": { + "type": "array", + "items": {"$ref": "#/$defs/action"}, + "minItems": 1, + "description": "Actions to execute when this branch's conditions pass.", + }, + }, + "description": "One branch in an if/elif/else chain. First matching branch wins.", + }, + "condition": { + "type": "object", + "required": ["field", "op"], + "additionalProperties": False, + "properties": { + "field": { + "type": "string", + "minLength": 1, + "description": "Member field path to evaluate in automation context (dot path).", + }, + "op": { + "type": "string", + "enum": ["eq", "neq", "gt", "lt", "is_set", "is_not_set", "elapsed_gt", "elapsed_lt"], + "description": ( + "Comparison operator. elapsed_gt/elapsed_lt: 'now - field_value > value_seconds' / '< value_seconds'. " + "is_set/is_not_set: field is non-null / null (value ignored)." + ), + }, + "value": { + "description": ( + "Comparison operand. Type depends on op: number for gt/lt/elapsed_*, string or number for eq/neq, " + "ignored for is_set/is_not_set." + ), + }, + }, + "description": "Single boolean predicate on a member field. All conditions in a rule are AND-ed.", + }, +} + + +def build_automation_v1_schema_document( + triggers: list | None = None, + actions: list | None = None, +) -> dict[str, Any]: + """ + Assemble the full automation v1 JSON Schema document. + + ``triggers`` and ``actions`` are lists of TriggerDescriptor / ActionDescriptor from an + integration automation catalog. Defaults to the Discord integration. Pass different + lists to produce a schema for another integration. + """ + if triggers is None: + triggers = DISCORD_TRIGGERS + if actions is None: + actions = DISCORD_ACTIONS + + defs: dict[str, Any] = dict(_STATIC_DEFS) + + trigger_refs: list[dict[str, str]] = [] + for t in sorted(triggers, key=lambda x: x.type): + frag = t.automation_schema_def + if frag is None: + raise RuntimeError("trigger %s: automation_schema_def required" % t.type) + name = _def_name_trigger(t.type) + defs[name] = copy.deepcopy(frag) + trigger_refs.append({"$ref": "#/$defs/%s" % name}) + + defs["trigger_manual_campaign"] = copy.deepcopy(SCHEMA_TRIGGER_MANUAL_CAMPAIGN_PRODUCT) + trigger_refs.append({"$ref": "#/$defs/trigger_manual_campaign"}) + trigger_refs.sort(key=lambda r: r["$ref"]) + + defs["trigger"] = { + "type": "object", + "required": ["type"], + "description": "Discriminated union on 'type'. Each type carries its own required payload fields.", + "oneOf": trigger_refs, + } + + action_refs: list[dict[str, str]] = [] + for a in sorted(actions, key=lambda x: x.type): + frag = a.automation_schema_def + if frag is None: + raise RuntimeError("action %s: automation_schema_def required" % a.type) + name = _def_name_action(a.type) + defs[name] = copy.deepcopy(frag) + action_refs.append({"$ref": "#/$defs/%s" % name}) + + defs["action_call_gatekeeper_tool"] = copy.deepcopy(SCHEMA_ACTION_CALL_GATEKEEPER_PRODUCT) + action_refs.append({"$ref": "#/$defs/action_call_gatekeeper_tool"}) + action_refs.sort(key=lambda r: r["$ref"]) + + defs["action"] = { + "type": "object", + "required": ["type"], + "description": "Discriminated union on 'type'.", + "oneOf": action_refs, + } + + return { + "$schema": "https://json-schema.org/draft/2020-12/schema", + "$id": "flexus-automation-v1", + "title": "Flexus automation rules v1", + "description": ( + "Machine-readable contract for community bot automation. Built from the integration automation " + "catalog (triggers + actions) plus product-level triggers/actions. " + "Validate with jsonschema or check-jsonschema." + ), + "type": "object", + "required": ["automation_schema_version", "rules"], + "additionalProperties": False, + "properties": { + "automation_schema_version": { + "const": 1, + "description": "Schema version for forward-compatible migrations.", + }, + "rules": { + "type": "array", + "items": {"$ref": "#/$defs/rule"}, + "description": ( + "Ordered list of automation rules. Evaluation order matters only for actions that mutate state " + "consumed by later rules in the same event cycle." + ), + }, + }, + "$defs": defs, + } + + +if __name__ == "__main__": + import argparse + import json + from pathlib import Path + + p = argparse.ArgumentParser(description="Write automation v1 JSON Schema built from the Discord automation catalog.") + p.add_argument( + "--write", + metavar="PATH", + help="If set, write schema JSON to this path (UTF-8, indent 4).", + ) + args = p.parse_args() + # Default to Discord integration for the CLI tool. + doc = build_automation_v1_schema_document(triggers=DISCORD_TRIGGERS, actions=DISCORD_ACTIONS) + if args.write: + Path(args.write).write_text(json.dumps(doc, indent=4, ensure_ascii=False) + "\n", encoding="utf-8") + else: + print(json.dumps(doc, indent=2, ensure_ascii=False)) diff --git a/flexus_client_kit/ckit_bot_exec.py b/flexus_client_kit/ckit_bot_exec.py index ef040b75..01c61da2 100644 --- a/flexus_client_kit/ckit_bot_exec.py +++ b/flexus_client_kit/ckit_bot_exec.py @@ -317,6 +317,8 @@ async def crash_boom_bang(fclient: ckit_client.FlexusClient, rcx: RobotContext, rcx.messengers.clear() # new loop will populate this with new auth continue except RestartBecauseSettingsChanged: + # Subscription handler already created a replacement bot instance with updated setup + # (lines ~496-504), so this old task must exit, not loop again with stale _restart_requested logger.info("%s restart requested (settings changed)", rcx.persona.persona_id) await rcx.wait_for_bg_tasks(timeout=30.0) await _close_messengers(rcx) diff --git a/flexus_client_kit/ckit_bot_install.py b/flexus_client_kit/ckit_bot_install.py index 758cd431..2fd1df03 100644 --- a/flexus_client_kit/ckit_bot_install.py +++ b/flexus_client_kit/ckit_bot_install.py @@ -88,6 +88,7 @@ async def marketplace_upsert_dev_bot( marketable_auth_supported: List[str] = [], marketable_auth_scopes: Optional[Dict[str, List[str]]] = None, marketable_features: List[str] = [], + marketable_rules_toolkit: Optional[Any] = None, add_integrations_into_expert_system_prompt: Optional[List[ckit_integrations_db.IntegrationRecord]] = None, ) -> FBotInstallOutput: assert ws_id, "Set FLEXUS_WORKSPACE environment variable to your workspace ID" @@ -178,7 +179,7 @@ async def marketplace_upsert_dev_bot( expert_dict["fexp_name"] = f"{marketable_name}_{expert_name}" experts_input.append(expert_dict) - mutation = gql.gql(f"""mutation InstallBot($ws: String!, $name: String!, $ver: String!, $title1: String!, $title2: String!, $author: String!, $accent_color: String!, $occupation: String!, $desc: String!, $typical_group: String!, $repo: String!, $run: String!, $setup: String!, $featured: [FFeaturedActionInput!]!, $intro: String!, $model_expensive: String!, $model_cheap: String!, $daily: Int!, $inbox: Int!, $experts: [FMarketplaceExpertInput!]!, $schedule: String!, $big: String!, $small: String!, $tags: [String!]!, $forms: String, $required_policydocs: [String!]!, $auth_needed: [String!]!, $auth_supported: [String!]!, $auth_scopes: String, $max_inprogress: Int!, $features: [String!]!) {{ + mutation = gql.gql(f"""mutation InstallBot($ws: String!, $name: String!, $ver: String!, $title1: String!, $title2: String!, $author: String!, $accent_color: String!, $occupation: String!, $desc: String!, $typical_group: String!, $repo: String!, $run: String!, $setup: String!, $featured: [FFeaturedActionInput!]!, $intro: String!, $model_expensive: String!, $model_cheap: String!, $daily: Int!, $inbox: Int!, $experts: [FMarketplaceExpertInput!]!, $schedule: String!, $big: String!, $small: String!, $tags: [String!]!, $forms: String, $required_policydocs: [String!]!, $auth_needed: [String!]!, $auth_supported: [String!]!, $auth_scopes: String, $max_inprogress: Int!, $features: [String!]!, $rules_toolkit: String) {{ marketplace_upsert_dev_bot( ws_id: $ws, marketable_name: $name, @@ -210,7 +211,8 @@ async def marketplace_upsert_dev_bot( marketable_auth_supported: $auth_supported, marketable_auth_scopes: $auth_scopes, marketable_max_inprogress: $max_inprogress, - marketable_features: $features + marketable_features: $features, + marketable_rules_toolkit: $rules_toolkit ) {{ {gql_utils.gql_fields(FBotInstallOutput)} }} @@ -247,6 +249,7 @@ async def marketplace_upsert_dev_bot( "auth_scopes": json.dumps(marketable_auth_scopes) if marketable_auth_scopes else None, "max_inprogress": marketable_max_inprogress, "features": marketable_features, + "rules_toolkit": json.dumps(marketable_rules_toolkit) if marketable_rules_toolkit is not None else None, } http = await client.use_http_on_behalf("", "") async with http as h: diff --git a/flexus_client_kit/ckit_bot_query.py b/flexus_client_kit/ckit_bot_query.py index 5aef2ba6..202cb568 100644 --- a/flexus_client_kit/ckit_bot_query.py +++ b/flexus_client_kit/ckit_bot_query.py @@ -28,6 +28,7 @@ class FPersonaOutput: marketable_radix: Optional[int] = None marketable_auth_needed: Optional[List[str]] = None marketable_auth_supported: Optional[List[str]] = None + persona_external_addresses: Optional[List[str]] = None @dataclass diff --git a/flexus_client_kit/ckit_connector.py b/flexus_client_kit/ckit_connector.py new file mode 100644 index 00000000..33ec846e --- /dev/null +++ b/flexus_client_kit/ckit_connector.py @@ -0,0 +1,79 @@ +# Abstract chat platform connector (unified bot plan U3): normalized events, actions, runtime ChatConnector API. +# Automation catalog descriptors (TriggerDescriptor, ActionDescriptor, SemanticContract) live in ckit_automation_catalog. + +from __future__ import annotations + +import abc +import dataclasses +import logging +from typing import Any, Awaitable, Callable + +from flexus_client_kit.ckit_automation_catalog import ActionDescriptor, TriggerDescriptor + +logger = logging.getLogger(__name__) + + +@dataclasses.dataclass +class NormalizedEvent: + source: str + server_id: str + channel_id: str + user_id: str + event_type: str + payload: dict + timestamp: float + + +@dataclasses.dataclass +class ActionResult: + ok: bool + error: str | None = None + data: dict | None = None + + +class ChatConnector(abc.ABC): + @property + @abc.abstractmethod + def platform(self) -> str: + ... + + @property + @abc.abstractmethod + def raw_client(self) -> Any: + ... + + @abc.abstractmethod + async def connect(self) -> None: + ... + + @abc.abstractmethod + async def disconnect(self) -> None: + ... + + @abc.abstractmethod + def supported_triggers(self) -> list[TriggerDescriptor]: + ... + + @abc.abstractmethod + def supported_actions(self) -> list[ActionDescriptor]: + ... + + @abc.abstractmethod + def on_event(self, callback: Callable[[NormalizedEvent], Awaitable[None]]) -> None: + ... + + @abc.abstractmethod + async def execute_action(self, action_type: str, params: dict) -> ActionResult: + ... + + @abc.abstractmethod + def format_mention(self, user_id: str) -> str: + ... + + @abc.abstractmethod + async def get_user_info(self, user_id: str, server_id: str = "") -> dict | None: + ... + + @abc.abstractmethod + async def get_channel(self, channel_id: str) -> dict | None: + ... diff --git a/flexus_client_kit/ckit_connector_discord.py b/flexus_client_kit/ckit_connector_discord.py new file mode 100644 index 00000000..0f7e65f7 --- /dev/null +++ b/flexus_client_kit/ckit_connector_discord.py @@ -0,0 +1,75 @@ +""" +Shared Discord helpers (no discord.py socket here): snowflake/setup/auth/logging for bots, connectors, fi_discord2. + +Automation v1 trigger/action catalogs and ``discord_automation_semantics_bundle()`` live in +``ckit_connector_discord_catalog``; runtime ``DiscordLocalConnector`` is in ``ckit_connector_discord_local``. +""" + +from __future__ import annotations + +import logging +from typing import Any, Dict, Optional + +# Same logger name as fi_discord2.IntegrationDiscord / community utilities so log +# lines keep the same logger and formatting when code moves between modules. +_discord_shared_logger = logging.getLogger("discord") + + +def parse_snowflake(raw: str) -> Optional[int]: + """ + Parse a bare decimal Discord snowflake string to int, or None if invalid. + + Accepts only stripped all-digit strings; used for setup ids and persona_external_addresses. + """ + if not raw or not isinstance(raw, str): + return None + s = raw.strip() + if not s or not s.isdigit(): + return None + return int(s) + + +def setup_truthy(raw: Any) -> bool: + """ + Coerce setup checkbox / string flags to bool (1, true, yes, on). + + Matches legacy community-bot semantics for disable_* and similar keys. + """ + if raw is True: + return True + if raw is False or raw is None: + return False + s = str(raw).strip().lower() + return s in ("1", "true", "yes", "on") + + +def discord_bot_api_key_from_external_auth(ext: Dict[str, Any]) -> str: + """ + Resolve Discord bot token from workspace external_auth (legacy OAuth payloads). + + Precedence: discord_manual, then discord; skips non-dict provider values with a warning. + """ + for provider_key in ("discord_manual", "discord"): + raw = ext.get(provider_key) + if raw is None: + continue + if not isinstance(raw, dict): + _discord_shared_logger.warning( + "discord_bot_api_key_from_external_auth: provider %r value is not a dict, skipping", + provider_key, + ) + continue + tok = (raw.get("api_key") or "").strip() + if tok: + return tok + return "" + + +def log_ctx(persona_id: str, guild_id: Optional[int], msg: str, *args: Any) -> None: + """ + Prefix structured Discord integration logs with persona and optional guild id. + + Format matches historical fi_discord2 community-bot lines: [%s guild=%s] + message. + """ + gid = str(guild_id) if guild_id is not None else "-" + _discord_shared_logger.info("[%s guild=%s] " + msg, persona_id, gid, *args) diff --git a/flexus_client_kit/ckit_connector_discord_catalog.py b/flexus_client_kit/ckit_connector_discord_catalog.py new file mode 100644 index 00000000..7ba44893 --- /dev/null +++ b/flexus_client_kit/ckit_connector_discord_catalog.py @@ -0,0 +1,306 @@ +""" +Discord automation v1 catalogs and assist semantics (JSON Schema fragments, trigger/action descriptors). + +No discord.py: ``DISCORD_TRIGGERS``, ``DISCORD_ACTIONS``, and ``discord_automation_semantics_bundle()`` +feed schema assembly and reviewer payloads. Shared Discord runtime helpers stay in ``ckit_connector_discord``. +""" + +from __future__ import annotations + +from typing import Any + +from flexus_client_kit.ckit_automation_catalog import ( + ActionDescriptor, + SemanticContract, + TriggerDescriptor, + semantic_contract_to_dict, +) +from flexus_client_kit.ckit_discord_automation_schema_defs import ( + SCHEMA_ACTION_ADD_ROLE, + SCHEMA_ACTION_KICK, + SCHEMA_ACTION_POST_TO_CHANNEL, + SCHEMA_ACTION_REMOVE_ROLE, + SCHEMA_ACTION_SEND_DM, + SCHEMA_TRIGGER_MEMBER_JOINED, + SCHEMA_TRIGGER_MEMBER_REMOVED, + SCHEMA_TRIGGER_MESSAGE_IN_CHANNEL, +) + +DISCORD_TRIGGERS: list[TriggerDescriptor] = [ + TriggerDescriptor( + type="member_joined", + label="Member joined", + description="Fires when a new member joins the server", + payload_schema={ + "type": "object", + "properties": { + "guild_id": {"type": "integer"}, + "user_id": {"type": "integer"}, + "username": {"type": "string"}, + }, + }, + semantic_contract=SemanticContract( + operator_summary="Runs when someone joins a Discord server the bot can access.", + rule_author_configures=("trigger.type member_joined only (no extra trigger fields in saved JSON).",), + platform_fills_automatically=( + "When someone joins, the normalized event carries the server id and member id; the payload includes " + "guild id, user id, and display name.", + "The worker builds in-memory event context (ids + username) for templates and actions; no persisted profile.", + ), + runtime_guarantees=( + "A rule matches when its trigger type is member_joined and the incoming event is a join for that server.", + "Conditions and actions apply to the current member in scope.", + ), + operator_must_not_set=("Trigger payload keys in persisted rules (automation_v1 has none beyond type).",), + ), + automation_schema_def=SCHEMA_TRIGGER_MEMBER_JOINED, + ), + TriggerDescriptor( + type="message_in_channel", + label="Message in channel", + description="Fires when a message is posted in a watched channel", + payload_schema={ + "type": "object", + "properties": { + "channel_id": {"type": "integer"}, + "guild_id": {"type": "integer"}, + "user_id": {"type": "integer"}, + "content": {"type": "string"}, + "message_id": {"type": "string"}, + }, + }, + semantic_contract=SemanticContract( + operator_summary="Runs when a human posts in one configured channel.", + rule_author_configures=("trigger.channel_id_field referencing a setup key, bare numeric id, or #snowflake literal.",), + platform_fills_automatically=( + "The channel reference from the bot setup is resolved to a numeric channel id for matching.", + "The event payload includes channel, server, author, message text, and message id from the live message.", + ), + runtime_guarantees=( + "Matching requires the event channel id to equal the resolved channel id from your setup; " + "a missing or unresolvable channel reference yields no match.", + ), + operator_must_not_set=("Hard-coded channel ids inside trigger except via channel_id_field string form.",), + ), + automation_schema_def=SCHEMA_TRIGGER_MESSAGE_IN_CHANNEL, + ), + TriggerDescriptor( + type="member_removed", + label="Member left/kicked", + description="Fires when a member leaves or is removed from the server", + payload_schema={ + "type": "object", + "properties": { + "guild_id": {"type": "integer"}, + "user_id": {"type": "integer"}, + "username": {"type": "string"}, + }, + }, + semantic_contract=SemanticContract( + operator_summary="Runs when someone leaves the server or is kicked (same Discord event).", + rule_author_configures=("trigger.type member_removed only (no extra trigger fields in saved JSON).",), + platform_fills_automatically=( + "When someone leaves, the normalized event carries the member id, server id, and display name from Discord.", + ), + runtime_guarantees=( + "A rule matches when its trigger type is member_removed and the event is a leave or kick for that member.", + ), + operator_must_not_set=("Extra trigger payload keys beyond type in persisted rules.",), + ), + automation_schema_def=SCHEMA_TRIGGER_MEMBER_REMOVED, + ), +] + + +DISCORD_ACTIONS: list[ActionDescriptor] = [ + ActionDescriptor( + type="send_dm", + label="Send DM", + description="Send a direct message to a user", + parameter_schema={ + "type": "object", + "properties": { + "user_id": {"type": "string"}, + "text": {"type": "string"}, + }, + "required": ["user_id", "text"], + }, + semantic_contract=SemanticContract( + operator_summary="Send a private message to the member in context (saved automation) or to explicit ids (programmatic API).", + rule_author_configures=( + "Persisted automation: exactly one of template or template_field for the body (automation_v1).", + "Programmatic call: user_id and text parameters as in parameter_schema.", + ), + platform_fills_automatically=( + "The engine resolves the message body from the template or from a setup-backed field, then delivers it.", + "For saved rules, the recipient is the user id from the triggering event; the action does not supply a separate recipient id.", + ), + runtime_guarantees=( + "If the body is empty after resolution, delivery fails with empty_body.", + "Direct-message delivery uses the resolved recipient id and body text only.", + ), + operator_must_not_set=( + "Persisted rule: user_id field on the action (not in schema); recipient is always the member in context.", + ), + ), + automation_schema_def=SCHEMA_ACTION_SEND_DM, + ), + ActionDescriptor( + type="post_to_channel", + label="Post to channel", + description="Post a message in a text channel", + parameter_schema={ + "type": "object", + "properties": { + "channel_id": {"type": "string"}, + "text": {"type": "string"}, + "server_id": {"type": "string"}, + }, + "required": ["channel_id", "text"], + }, + semantic_contract=SemanticContract( + operator_summary="Post a message into a chosen channel; guild scope comes from execution context when using the live integration.", + rule_author_configures=( + "Persisted automation: channel_id_field and template (engine also resolves template_field like send_dm).", + "Programmatic call: channel_id text required; optional server_id disambiguates allowed guild when posting.", + ), + platform_fills_automatically=( + "The engine resolves the channel id from the field reference and resolves the message body like direct messages.", + "When posting, the current server scope is passed so delivery can reject unauthorized guilds.", + ), + runtime_guarantees=( + "Posting loads the channel, requires a text channel, and the bot must be allowed in that server or delivery returns guild_not_allowed.", + ), + operator_must_not_set=( + "Persisted rule: server_id on the action; guild is implied by the event or job execution scope.", + ), + ), + automation_schema_def=SCHEMA_ACTION_POST_TO_CHANNEL, + ), + ActionDescriptor( + type="add_role", + label="Add role", + description="Add a Discord role to the member", + parameter_schema={ + "type": "object", + "properties": { + "user_id": {"type": "string"}, + "role_id": {"type": "string"}, + "server_id": {"type": "string"}, + }, + "required": ["user_id", "role_id"], + }, + semantic_contract=SemanticContract( + operator_summary="Add a Discord role to the member the rule is running for.", + rule_author_configures=("role_id_field naming a setup key, literal role id, or #snowflake (same as channel_id_field).",), + platform_fills_automatically=( + "The engine resolves the role id from the field reference; the executor passes the member id and server scope for delivery.", + ), + runtime_guarantees=( + "The member and role must exist in the server; otherwise delivery fails with member_or_role_not_found.", + ), + operator_must_not_set=("user_id, server_id, guild_id, role_id on persisted action; use role_id_field only.",), + ), + automation_schema_def=SCHEMA_ACTION_ADD_ROLE, + ), + ActionDescriptor( + type="remove_role", + label="Remove role", + description="Remove a Discord role from the member", + parameter_schema={ + "type": "object", + "properties": { + "user_id": {"type": "string"}, + "role_id": {"type": "string"}, + "server_id": {"type": "string"}, + }, + "required": ["user_id", "role_id"], + }, + semantic_contract=SemanticContract( + operator_summary="Remove a Discord role from the member the rule is running for.", + rule_author_configures=("role_id_field like add_role.",), + platform_fills_automatically=("Same role resolution and member/server scope as add_role.",), + runtime_guarantees=("Same as add_role for guild and member resolution.",), + operator_must_not_set=("user_id, server_id, guild_id, role_id on persisted action; use role_id_field only.",), + ), + automation_schema_def=SCHEMA_ACTION_REMOVE_ROLE, + ), + ActionDescriptor( + type="kick", + label="Kick member", + description="Kick the member from the server", + parameter_schema={ + "type": "object", + "properties": { + "user_id": {"type": "string"}, + "reason": {"type": "string"}, + "server_id": {"type": "string"}, + }, + "required": ["user_id"], + }, + semantic_contract=SemanticContract( + operator_summary="Kick the member the rule is running for from the current server.", + rule_author_configures=("Optional reason string; supports {field} placeholders like message templates.",), + platform_fills_automatically=( + "The platform uses the current member and server in scope and resolves the reason text before delivery.", + ), + runtime_guarantees=( + "Kick requires the member to be in the guild before kick; fails if the member already left.", + ), + operator_must_not_set=("user_id, server_id, guild_id on persisted action.",), + ), + automation_schema_def=SCHEMA_ACTION_KICK, + ), +] + + +_DISCORD_AUTOMATION_CROSS_CUTTING: dict[str, dict[str, Any]] = { + "resolve_channel_id": semantic_contract_to_dict( + SemanticContract( + operator_summary="Turns channel_id_field strings into integer Discord channel ids for matching and posting.", + rule_author_configures=( + "channel_id_field on trigger message_in_channel or action post_to_channel.", + ), + platform_fills_automatically=( + "All-decimal string parses as int; #suffix parses suffix as int; else setup[key] coerced with int().", + ), + runtime_guarantees=( + "Invalid channel references yield no resolved id; trigger matching and posting fail closed.", + ), + operator_must_not_set=(), + ), + ), + "resolve_template": semantic_contract_to_dict( + SemanticContract( + operator_summary="Substitutes braced tokens in message templates for DM and channel posts.", + rule_author_configures=( + "template string and/or template_field referencing setup; send_dm and post_to_channel in automation.", + ), + platform_fills_automatically=( + "{now} -> unix seconds; {mention} -> formatted mention from member user_id; " + "other names from member then setup; unknown tokens left unchanged.", + ), + runtime_guarantees=( + "Body resolution runs for direct message and channel post actions only; other actions do not use it.", + ), + operator_must_not_set=(), + ), + ), +} + + +def discord_automation_semantics_bundle() -> dict[str, Any]: + """ + Flatten trigger/action semantic contracts plus cross-cutting helpers for assist payloads. + + Returns a JSON-serializable dict (semantic_schema_version, triggers, actions, cross_cutting). + """ + try: + return { + "semantic_schema_version": 1, + "triggers": {t.type: semantic_contract_to_dict(t.semantic_contract) for t in DISCORD_TRIGGERS}, + "actions": {a.type: semantic_contract_to_dict(a.semantic_contract) for a in DISCORD_ACTIONS}, + "cross_cutting": dict(_DISCORD_AUTOMATION_CROSS_CUTTING), + } + except (TypeError, KeyError, AttributeError) as e: + raise RuntimeError("discord_automation_semantics_bundle: failed to assemble semantics dict") from e diff --git a/flexus_client_kit/ckit_connector_discord_local.py b/flexus_client_kit/ckit_connector_discord_local.py new file mode 100644 index 00000000..990140f5 --- /dev/null +++ b/flexus_client_kit/ckit_connector_discord_local.py @@ -0,0 +1,184 @@ +""" +In-process Discord ``ChatConnector`` for the discord_bot worker: one discord.py client per +process, events as ``NormalizedEvent`` via ``bind_discord_gateway_client``, actions via +``discord_run_platform_action``. Guild allowlist is enforced on both ingress and +``resolve_guild`` so behavior matches the former gateway emessage path. +""" + +from __future__ import annotations + +import asyncio +import logging +from collections.abc import Awaitable, Callable, Iterable +from typing import Any + +import discord +from discord.errors import DiscordException + +from flexus_client_kit.ckit_automation_catalog import ActionDescriptor, TriggerDescriptor +from flexus_client_kit.ckit_connector import ActionResult, ChatConnector, NormalizedEvent +from flexus_client_kit.ckit_connector_discord_catalog import DISCORD_ACTIONS, DISCORD_TRIGGERS +from flexus_client_kit.integrations.fi_discord2 import ( + bind_discord_gateway_client, + close_discord_client, + discord_run_platform_action, + start_discord_client, +) + +logger = logging.getLogger(__name__) + + +class DiscordLocalConnector(ChatConnector): + """ + Live discord.py client in the bot process: same normalized events and actions as the + gateway service, without Redis or ``on_emessage("DISCORD")``. + """ + + def __init__( + self, + token: str, + persona_id: str, + *, + initial_guild_ids: set[int] | None = None, + ) -> None: + self._token = (token or "").strip() + self._persona_id = persona_id + self._allowed_guild_ids: set[int] = {int(x) for x in (initial_guild_ids or set())} + self._client: discord.Client | None = None + self._runner_task: asyncio.Task[None] | None = None + self._event_callback: Callable[[NormalizedEvent], Awaitable[None]] | None = None + self._connected = False + + @property + def platform(self) -> str: + return "discord" + + @property + def raw_client(self) -> Any: + return self._client + + @property + def allowed_guild_ids(self) -> frozenset[int]: + return frozenset(self._allowed_guild_ids) + + async def set_allowed_guild_ids(self, ids: Iterable[int]) -> None: + self._allowed_guild_ids = {int(x) for x in ids} + + async def update_guild_ids(self, ids: Iterable[int]) -> None: + await self.set_allowed_guild_ids(ids) + + def supported_triggers(self) -> list[TriggerDescriptor]: + return DISCORD_TRIGGERS + + def supported_actions(self) -> list[ActionDescriptor]: + return DISCORD_ACTIONS + + def on_event(self, callback: Callable[[NormalizedEvent], Awaitable[None]]) -> None: + self._event_callback = callback + + def format_mention(self, user_id: str) -> str: + return "<@%s>" % (user_id,) + + def _resolve_guild(self, gid: int) -> discord.Guild | None: + try: + if not self._allowed_guild_ids or gid not in self._allowed_guild_ids: + return None + if self._client is None: + return None + g = self._client.get_guild(gid) + return g + except (TypeError, ValueError, AttributeError) as e: + logger.warning("resolve_guild: %s %s", type(e).__name__, e) + return None + + async def connect(self) -> None: + try: + if not self._token: + raise ValueError("DiscordLocalConnector: empty token") + + def register(client: discord.Client) -> None: + async def emit(ev: NormalizedEvent) -> None: + try: + gid = int(ev.server_id) + except (TypeError, ValueError) as e: + logger.warning( + "%s discord emit skip (bad server_id): %s %s", + self._persona_id, + type(e).__name__, + e, + ) + return + allowed = self._allowed_guild_ids + if not allowed or gid not in allowed: + return + cb = self._event_callback + if cb is None: + return + await cb(ev) + + bind_discord_gateway_client(client, emit) + + self._client, self._runner_task = await start_discord_client( + self._token, + self._persona_id, + register, + ) + self._connected = True + except (ValueError, DiscordException, RuntimeError, OSError) as e: + logger.error("DiscordLocalConnector connect failed: %s %s", type(e).__name__, e) + raise + + async def disconnect(self) -> None: + try: + await close_discord_client(self._client, self._runner_task) + except asyncio.CancelledError: + raise + except (DiscordException, RuntimeError) as e: + logger.warning("DiscordLocalConnector disconnect: %s %s", type(e).__name__, e) + finally: + self._client = None + self._runner_task = None + self._connected = False + + async def execute_action(self, action_type: str, params: dict) -> ActionResult: + try: + if not self._connected or self._client is None: + return ActionResult(ok=False, error="not_connected") + return await discord_run_platform_action( + self._client, + self._persona_id, + action_type, + params, + resolve_guild=self._resolve_guild, + ) + except DiscordException as e: + logger.warning( + "DiscordLocalConnector execute_action %s: %s %s", + action_type, + type(e).__name__, + e, + ) + return ActionResult(ok=False, error="%s: %s" % (type(e).__name__, e)) + + async def get_user_info(self, user_id: str, server_id: str = "") -> dict | None: + try: + r = await self.execute_action( + "get_user_info", + {"user_id": str(user_id), "server_id": str(server_id or "")}, + ) + if not r.ok or not r.data: + return None + return dict(r.data) + except (TypeError, ValueError, KeyError) as e: + logger.warning("get_user_info: %s %s", type(e).__name__, e) + return None + + async def get_channel(self, channel_id: str) -> dict | None: + try: + r = await self.execute_action("get_channel", {"channel_id": str(channel_id)}) + if not r.ok or not r.data: + return None + return dict(r.data) + except (TypeError, ValueError, KeyError) as e: + logger.warning("get_channel: %s %s", type(e).__name__, e) + return None diff --git a/flexus_client_kit/ckit_discord_automation_schema_defs.py b/flexus_client_kit/ckit_discord_automation_schema_defs.py new file mode 100644 index 00000000..8868fecc --- /dev/null +++ b/flexus_client_kit/ckit_discord_automation_schema_defs.py @@ -0,0 +1,147 @@ +""" +Discord-specific automation trigger/action JSON Schema fragments (automation_schema_version 1). + +Schemas for guild-scoped triggers and Discord actions live here; product-level definitions are in +ckit_automation_schema_defs. The full document is assembled by ckit_automation_v1_schema_build. +""" + +from __future__ import annotations + +SCHEMA_TRIGGER_MEMBER_JOINED = { + "type": "object", + "required": ["type"], + "additionalProperties": False, + "properties": { + "type": {"const": "member_joined"}, + }, + "description": ( + "Fires on Discord on_member_join (or equivalent member record creation for backfill). " + "No extra trigger fields; context is the joining user's ids and username from the event." + ), +} + +SCHEMA_TRIGGER_MEMBER_REMOVED = { + "type": "object", + "required": ["type"], + "additionalProperties": False, + "properties": { + "type": {"const": "member_removed"}, + }, + "description": ( + "Fires when a member leaves or is removed from the server. No extra saved fields; " + "guild and member context come from the leave/kick event." + ), +} + +SCHEMA_TRIGGER_MESSAGE_IN_CHANNEL = { + "type": "object", + "required": ["type", "channel_id_field"], + "additionalProperties": False, + "properties": { + "type": {"const": "message_in_channel"}, + "channel_id_field": { + "type": "string", + "minLength": 1, + "description": ( + "Reference to a setup field name containing the target channel snowflake " + "(e.g. 'intro_channel_id'), or a literal snowflake string prefixed with '#' " + "(e.g. '#1234567890')." + ), + }, + }, + "description": ( + "Fires when any guild member posts a message in the specified channel. " + "Only messages in the configured channel are delivered as events to the bot." + ), +} + +SCHEMA_ACTION_SEND_DM = { + "type": "object", + "required": ["type"], + "additionalProperties": False, + "properties": { + "type": {"const": "send_dm"}, + "template": { + "type": "string", + "description": "Inline message body. Supports {field_name} placeholders resolved from the event member snapshot and setup fields.", + }, + "template_field": { + "type": "string", + "description": ( + "Alternative: name of a setup field containing the message body. " + "Mutually preferred over 'template' when operator should edit copy in Setup UI." + ), + }, + }, + "description": "Send a DM to the member. Exactly one of 'template' or 'template_field' should be provided.", +} + +SCHEMA_ACTION_POST_TO_CHANNEL = { + "type": "object", + "required": ["type", "channel_id_field", "template"], + "additionalProperties": False, + "properties": { + "type": {"const": "post_to_channel"}, + "channel_id_field": { + "type": "string", + "description": "Setup field name or literal '#snowflake' for the target channel.", + }, + "template": { + "type": "string", + "description": "Message body with {field_name} placeholders.", + }, + }, + "description": "Post a message to a guild channel.", +} + +SCHEMA_ACTION_ADD_ROLE = { + "type": "object", + "required": ["type", "role_id_field"], + "additionalProperties": False, + "properties": { + "type": {"const": "add_role"}, + "role_id_field": { + "type": "string", + "minLength": 1, + "description": ( + "Setup field name holding the role snowflake, or a literal id (digits) or '#snowflake', " + "same resolution rules as channel_id_field. Member and server come from automation context." + ), + }, + }, + "description": "Add a Discord role to the member in context for the current server.", +} + +SCHEMA_ACTION_REMOVE_ROLE = { + "type": "object", + "required": ["type", "role_id_field"], + "additionalProperties": False, + "properties": { + "type": {"const": "remove_role"}, + "role_id_field": { + "type": "string", + "minLength": 1, + "description": ( + "Setup field name holding the role snowflake, or literal id / '#snowflake'. " + "Member and server come from automation context." + ), + }, + }, + "description": "Remove a Discord role from the member in context for the current server.", +} + +SCHEMA_ACTION_KICK = { + "type": "object", + "required": ["type"], + "additionalProperties": False, + "properties": { + "type": {"const": "kick"}, + "reason": { + "type": "string", + "description": ( + "Optional audit reason shown in Discord. Supports {field_name} placeholders like message templates." + ), + }, + }, + "description": "Kick the member in context from the current server. Guild and user ids are filled by the runtime.", +} diff --git a/flexus_client_kit/ckit_job_queue.py b/flexus_client_kit/ckit_job_queue.py new file mode 100644 index 00000000..20d12df5 --- /dev/null +++ b/flexus_client_kit/ckit_job_queue.py @@ -0,0 +1,60 @@ +from __future__ import annotations + +import logging +import time +from typing import Any, Awaitable, Callable, Dict + +logger = logging.getLogger(__name__) + + +def _log_ctx(persona_id: str, guild_id: Any, msg: str, *args: Any) -> None: + gid = str(guild_id) if guild_id is not None else "-" + logger.info("[%s guild=%s] " + msg, persona_id, gid, *args) + +COL_JOBS = "dc_community_jobs" + +JobHandler = Callable[[Dict[str, Any]], Awaitable[None]] + + +async def enqueue_job( + db: Any, + kind: str, + run_at_ts: float, + payload: Dict[str, Any], +) -> None: + coll = db[COL_JOBS] + await coll.insert_one( + { + "kind": kind, + "run_at": float(run_at_ts), + "payload": payload, + "done": False, + "created_ts": time.time(), + }, + ) + + +async def drain_due_jobs( + db: Any, + persona_id: str, + handlers: Dict[str, JobHandler], + limit: int = 50, +) -> int: + coll = db[COL_JOBS] + now = time.time() + count = 0 + cursor = coll.find({"done": False, "run_at": {"$lte": now}}).sort("run_at", 1).limit(limit) + async for doc in cursor: + kind = doc.get("kind") or "" + handler = handlers.get(kind) + if not handler: + await coll.update_one({"_id": doc["_id"]}, {"$set": {"done": True, "error": "no_handler"}}) + continue + payload = doc.get("payload") or {} + try: + await handler(payload) + except (TypeError, ValueError, KeyError) as e: + _log_ctx(persona_id, payload.get("guild_id"), "job %s data error: %s %s", kind, type(e).__name__, e) + await coll.update_one({"_id": doc["_id"]}, {"$set": {"done": True, "finished_ts": time.time()}}) + count += 1 + return count diff --git a/flexus_client_kit/ckit_messages.py b/flexus_client_kit/ckit_messages.py index 6c063cd4..bbf1a5f5 100644 --- a/flexus_client_kit/ckit_messages.py +++ b/flexus_client_kit/ckit_messages.py @@ -1,9 +1,23 @@ +""" +Thread YAML helpers for bot exec / scenarios, plus Discord discovery message persistence (dc_messages collection). +""" + +from __future__ import annotations + import json +import logging +from typing import Any, Optional, TypeVar + import yaml -from typing import Optional, TypeVar +from pymongo.errors import PyMongoError _M = TypeVar("_M") +logger = logging.getLogger(__name__) + +# Mongo collection name for ingested platform messages (Discord discovery). +COL_MESSAGES = "dc_messages" + def linearize_thread_messages(messages: list[_M], target_alt: int, target_num: int) -> list[_M]: by_key = {(m.ftm_alt, m.ftm_num): m for m in messages} @@ -101,3 +115,77 @@ def fmessages_to_yaml(messages: list, *, limits: Optional[dict[str, int]] = None m["call_id"] = msg.ftm_call_id out.append(m) return yaml_dump_with_multiline({"messages": out}) + + +async def ensure_message_indexes(db: Any) -> None: + try: + coll = db[COL_MESSAGES] + await coll.create_index( + [("server_id", 1), ("channel_id", 1), ("timestamp", 1)], + unique=False, + ) + except PyMongoError as e: + logger.error("ensure_message_indexes: MongoDB index creation failed", exc_info=e) + raise + + +async def store_message( + db: Any, + *, + server_id: str, + channel_id: str, + user_id: str, + platform: str, + content: str, + timestamp: float, + message_id: str, +) -> None: + try: + coll = db[COL_MESSAGES] + doc = { + "server_id": server_id, + "channel_id": channel_id, + "user_id": user_id, + "platform": platform, + "content": content, + "timestamp": timestamp, + "message_id": message_id, + } + await coll.insert_one(doc) + except PyMongoError as e: + logger.error( + "store_message: server_id=%s channel_id=%s message_id=%s failed", + server_id, + channel_id, + message_id, + exc_info=e, + ) + raise + + +async def get_channel_messages( + db: Any, + server_id: str, + channel_id: str, + *, + limit: int = 100, + before_ts: float | None = None, +) -> list[dict]: + try: + coll = db[COL_MESSAGES] + flt: dict[str, Any] = { + "server_id": server_id, + "channel_id": channel_id, + } + if before_ts is not None: + flt["timestamp"] = {"$lt": before_ts} + cursor = coll.find(flt).sort("timestamp", -1).limit(limit) + return await cursor.to_list(length=limit) + except PyMongoError as e: + logger.error( + "get_channel_messages: server_id=%s channel_id=%s failed", + server_id, + channel_id, + exc_info=e, + ) + raise diff --git a/flexus_client_kit/ckit_person_domain.py b/flexus_client_kit/ckit_person_domain.py new file mode 100644 index 00000000..a9599c9b --- /dev/null +++ b/flexus_client_kit/ckit_person_domain.py @@ -0,0 +1,467 @@ +""" +Client-kit GraphQL helpers for the person-centric foundation (U4.5). + +Wraps the PersonDomainQuery/PersonDomainMutation GQL API so bot runtime code +can resolve, create, and link persons and applications without direct DB access. + +Public API (all are best-effort: log warnings on error, return None/False): + ensure_person_for_discord_user - resolve or create person + link discord identity + application_find_latest - fetch the most recent application for a person + application_create_pending - create a new PENDING application + application_apply_decision - update status/decision on an existing application + +Style follows existing client-kit GraphQL helper modules: module-level gql constants, module-level async functions, +TransportQueryError + (TypeError, KeyError, ValueError) as the caught exception set. +""" + +from __future__ import annotations + +import json +import logging +from typing import Optional + +import gql +import gql.transport.exceptions + +from flexus_client_kit import ckit_client +from flexus_client_kit import ckit_erp +from flexus_client_kit import erp_schema + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# GQL documents +# --------------------------------------------------------------------------- + +_GQL_PERSON_BY_IDENTITY = gql.gql( + """query PersonByPlatformIdentityRuntime( + $ws_id: String! + $platform: String! + $user_id: String! + ) { + person_by_platform_identity( + ws_id: $ws_id + identity_platform: $platform + identity_external_user_id: $user_id + ) { + person_id + identity_id + } + }""", +) + +_GQL_PERSON_CREATE = gql.gql( + """mutation PersonCreateRuntime($ws_id: String!, $label: String!) { + person_create(input: { ws_id: $ws_id, person_label: $label }) { + person_id + } + }""", +) + +_GQL_IDENTITY_UPSERT = gql.gql( + """mutation PersonUpsertIdentityRuntime( + $person_id: String! + $platform: String! + $user_id: String! + $endpoint: String! + ) { + person_upsert_identity(input: { + person_id: $person_id + identity_platform: $platform + identity_external_user_id: $user_id + identity_external_endpoint: $endpoint + }) { + identity_id + } + }""", +) + +_GQL_APPLICATION_LIST = gql.gql( + """query ApplicationListLatestRuntime($ws_id: String!, $person_id: String!) { + application_list(ws_id: $ws_id, person_id: $person_id, limit: 1) { + application_id + application_status + application_decision + } + }""", +) + +_GQL_APPLICATION_CREATE = gql.gql( + """mutation ApplicationCreateRuntime( + $ws_id: String! + $person_id: String + $status: String! + $source: String! + $platform: String! + $payload: String + $details: String + ) { + application_create(input: { + ws_id: $ws_id + person_id: $person_id + application_status: $status + application_source: $source + application_platform: $platform + application_payload: $payload + application_details: $details + }) { + application_id + application_status + } + }""", +) + +_GQL_APPLICATION_UPDATE = gql.gql( + """mutation ApplicationUpdateRuntime( + $application_id: String! + $status: String + $decision: String + $details: String + ) { + application_update(input: { + application_id: $application_id + application_status: $status + application_decision: $decision + application_details: $details + }) { + application_id + application_status + application_decision + } + }""", +) + + +# --------------------------------------------------------------------------- +# Public helpers +# --------------------------------------------------------------------------- + +async def ensure_person_for_discord_user( + fclient: ckit_client.FlexusClient, + ws_id: str, + discord_user_id: str, + username: str, +) -> Optional[str]: + """ + Resolve or create a canonical person for a Discord user and link the identity. + + Returns person_id on success, None on any error. + + Race note: two concurrent joins for the same Discord user could create two person + records momentarily; the identity unique constraint (ws_id, platform, user_id) means + the last upsert_identity wins, leaving the earlier person_id orphaned. This is + accepted as an extremely rare event in practice. + """ + try: + async with (await fclient.use_http()) as http: + r = await http.execute( + _GQL_PERSON_BY_IDENTITY, + variable_values={ + "ws_id": ws_id, + "platform": "discord", + "user_id": discord_user_id, + }, + ) + existing = r.get("person_by_platform_identity") if isinstance(r, dict) else None + if isinstance(existing, dict) and existing.get("person_id"): + return str(existing["person_id"]) + + # No existing identity: create person + link identity + label = username or ("discord:%s" % discord_user_id) + async with (await fclient.use_http()) as http: + rc = await http.execute( + _GQL_PERSON_CREATE, + variable_values={"ws_id": ws_id, "label": label}, + ) + person_row = rc.get("person_create") if isinstance(rc, dict) else None + if not isinstance(person_row, dict) or not person_row.get("person_id"): + logger.warning( + "ensure_person_for_discord_user: person_create returned no person_id ws=%s uid=%s", + ws_id, + discord_user_id, + ) + return None + person_id = str(person_row["person_id"]) + + async with (await fclient.use_http()) as http: + await http.execute( + _GQL_IDENTITY_UPSERT, + variable_values={ + "person_id": person_id, + "platform": "discord", + "user_id": discord_user_id, + "endpoint": "", + }, + ) + logger.info( + "ensure_person_for_discord_user created person=%s ws=%s discord_uid=%s", + person_id, + ws_id, + discord_user_id, + ) + return person_id + except gql.transport.exceptions.TransportQueryError as e: + logger.warning( + "ensure_person_for_discord_user GQL error ws=%s uid=%s: %s %s", + ws_id, + discord_user_id, + type(e).__name__, + e, + ) + return None + except (TypeError, KeyError, ValueError) as e: + logger.warning( + "ensure_person_for_discord_user parse error ws=%s uid=%s: %s %s", + ws_id, + discord_user_id, + type(e).__name__, + e, + ) + return None + + +async def application_find_latest( + fclient: ckit_client.FlexusClient, + ws_id: str, + person_id: str, +) -> Optional[dict]: + """ + Return the most recent application dict for a person, or None if not found or on error. + + Dict keys: application_id, application_status, application_decision. + """ + try: + async with (await fclient.use_http()) as http: + r = await http.execute( + _GQL_APPLICATION_LIST, + variable_values={"ws_id": ws_id, "person_id": person_id}, + ) + rows = r.get("application_list") if isinstance(r, dict) else None + if not isinstance(rows, list) or not rows: + return None + row = rows[0] + if not isinstance(row, dict) or not row.get("application_id"): + return None + return { + "application_id": str(row["application_id"]), + "application_status": str(row.get("application_status") or ""), + "application_decision": str(row.get("application_decision") or ""), + } + except gql.transport.exceptions.TransportQueryError as e: + logger.warning( + "application_find_latest GQL error ws=%s person=%s: %s %s", + ws_id, + person_id, + type(e).__name__, + e, + ) + return None + except (TypeError, KeyError, ValueError) as e: + logger.warning( + "application_find_latest parse error ws=%s person=%s: %s %s", + ws_id, + person_id, + type(e).__name__, + e, + ) + return None + + +async def application_create_pending( + fclient: ckit_client.FlexusClient, + ws_id: str, + person_id: str, + *, + source: str = "discord_bot", + platform: str = "discord", + payload: Optional[dict] = None, +) -> Optional[str]: + """ + Create a new PENDING application for a person. Returns application_id or None on error. + + Used on member_joined to register a durable application record before any gatekeeper decision. + """ + try: + payload_json = json.dumps(payload) if payload is not None else None + async with (await fclient.use_http()) as http: + r = await http.execute( + _GQL_APPLICATION_CREATE, + variable_values={ + "ws_id": ws_id, + "person_id": person_id, + "status": "PENDING", + "source": source, + "platform": platform, + "payload": payload_json, + "details": None, + }, + ) + row = r.get("application_create") if isinstance(r, dict) else None + if not isinstance(row, dict) or not row.get("application_id"): + logger.warning( + "application_create_pending: no application_id returned ws=%s person=%s", + ws_id, + person_id, + ) + return None + app_id = str(row["application_id"]) + logger.info( + "application_create_pending app=%s ws=%s person=%s", + app_id, + ws_id, + person_id, + ) + return app_id + except gql.transport.exceptions.TransportQueryError as e: + logger.warning( + "application_create_pending GQL error ws=%s person=%s: %s %s", + ws_id, + person_id, + type(e).__name__, + e, + ) + return None + except (TypeError, KeyError, ValueError) as e: + logger.warning( + "application_create_pending parse error ws=%s person=%s: %s %s", + ws_id, + person_id, + type(e).__name__, + e, + ) + return None + + +async def ensure_discord_contact( + fclient: ckit_client.FlexusClient, + ws_id: str, + discord_user_id: str, + display_name: str, +) -> Optional[str]: + """ + Find or create a crm_contact keyed by contact_platform_ids.discord == discord_user_id. + + Returns contact_id on success, None on error. + Idempotent: existing contacts are returned as-is without modification. + Called both on member_joined (future joins) and during bootstrap (existing members). + """ + try: + rows = await ckit_erp.erp_table_data( + await fclient.use_http(), + "crm_contact", + ws_id, + erp_schema.CrmContact, + filters="contact_platform_ids->discord:=:%s" % discord_user_id, + limit=1, + ) + if rows: + return str(rows[0].contact_id) + + name = (display_name or ("discord:%s" % discord_user_id)).strip() + parts = name.split(" ", 1) + first_name = parts[0] + last_name = parts[1] if len(parts) > 1 else "" + + new_id = await ckit_erp.erp_record_create( + await fclient.use_http(), + "crm_contact", + ws_id, + { + "ws_id": ws_id, + "contact_first_name": first_name, + "contact_last_name": last_name, + "contact_platform_ids": {"discord": discord_user_id}, + }, + ) + contact_id = str(new_id) if new_id else "" + if contact_id: + logger.info( + "ensure_discord_contact created contact=%s ws=%s discord_uid=%s", + contact_id, + ws_id, + discord_user_id, + ) + return contact_id + logger.warning( + "ensure_discord_contact: create returned no id ws=%s discord_uid=%s", + ws_id, + discord_user_id, + ) + return None + except gql.transport.exceptions.TransportQueryError as e: + logger.warning( + "ensure_discord_contact GQL error ws=%s uid=%s: %s %s", + ws_id, + discord_user_id, + type(e).__name__, + e, + ) + return None + except (TypeError, KeyError, ValueError) as e: + logger.warning( + "ensure_discord_contact parse error ws=%s uid=%s: %s %s", + ws_id, + discord_user_id, + type(e).__name__, + e, + ) + return None + + +async def application_apply_decision( + fclient: ckit_client.FlexusClient, + application_id: str, + app_status: str, + app_decision: str, + details: Optional[dict] = None, +) -> bool: + """ + Update an application's status, decision, and optional details. Returns True on success. + + app_status values: PENDING / REVIEWING / DECIDED / CLOSED. + app_decision values: APPROVED / REJECTED / WAITLISTED / "" (empty clears decision). + details dict is serialised to JSON and stored in application_details. + """ + try: + details_json = json.dumps(details) if details is not None else None + async with (await fclient.use_http()) as http: + r = await http.execute( + _GQL_APPLICATION_UPDATE, + variable_values={ + "application_id": application_id, + "status": app_status, + "decision": app_decision if app_decision else None, + "details": details_json, + }, + ) + row = r.get("application_update") if isinstance(r, dict) else None + if not isinstance(row, dict) or not row.get("application_id"): + logger.warning( + "application_apply_decision: no application_id in response app=%s status=%s", + application_id, + app_status, + ) + return False + logger.info( + "application_apply_decision app=%s status=%s decision=%s", + application_id, + app_status, + app_decision, + ) + return True + except gql.transport.exceptions.TransportQueryError as e: + logger.warning( + "application_apply_decision GQL error app=%s: %s %s", + application_id, + type(e).__name__, + e, + ) + return False + except (TypeError, KeyError, ValueError) as e: + logger.warning( + "application_apply_decision parse error app=%s: %s %s", + application_id, + type(e).__name__, + e, + ) + return False diff --git a/flexus_client_kit/integrations/fi_discord2.py b/flexus_client_kit/integrations/fi_discord2.py index 6f607ff5..1904852d 100644 --- a/flexus_client_kit/integrations/fi_discord2.py +++ b/flexus_client_kit/integrations/fi_discord2.py @@ -5,6 +5,7 @@ import logging import os import random +import re import tempfile import time from collections import deque @@ -12,6 +13,8 @@ from dataclasses import dataclass, field from typing import Any, Awaitable, Callable, Dict, List, Optional, Tuple +import aiohttp + import discord import gql from discord import File @@ -25,10 +28,18 @@ ckit_bot_query, ckit_cloudtool, ckit_client, + ckit_job_queue, ckit_kanban, ckit_scenario, ckit_utils, ) +from flexus_client_kit.ckit_connector import ActionResult, NormalizedEvent +from flexus_client_kit.ckit_connector_discord import ( + discord_bot_api_key_from_external_auth, + log_ctx, + parse_snowflake, + setup_truthy, +) from flexus_client_kit.format_utils import format_cat_output from flexus_client_kit.integrations import fi_messenger from flexus_client_kit.integrations.fi_mongo_store import download_file, validate_path @@ -123,20 +134,6 @@ def _parse_channel_reference(ref: str) -> Tuple[Optional[str], Optional[str]]: return channel or None, thread or None -def discord_bot_api_key_from_external_auth(ext: Dict[str, Any]) -> str: - for provider_key in ("discord_manual", "discord"): - raw = ext.get(provider_key) - if raw is None: - continue - if not isinstance(raw, dict): - logger.warning("discord_bot_api_key_from_external_auth: provider %r value is not a dict, skipping", provider_key) - continue - tok = (raw.get("api_key") or "").strip() - if tok: - return tok - return "" - - class IntegrationDiscord(fi_messenger.FlexusMessenger): platform_name = "discord" emessage_type = "DISCORD" @@ -755,6 +752,15 @@ async def post_into_captured_thread_as_user(self, activity: ActivityDiscord) -> return False parts = fi_messenger.compact_message_parts(parts) + provenance = { + "source": "discord", + "discord_author_id": str(activity.message_author_id), + "discord_author_name": activity.message_author_name, + "discord_channel_id": str(activity.channel_id), + "discord_message_id": activity.message_id, + "is_dm": activity.is_dm, + } + http = await self.fclient.use_http_on_behalf(self.rcx.persona.persona_id, "") logger.info("captured_thread_post searchable=%s msg=%s", searchable, text[:200]) try: @@ -764,6 +770,7 @@ async def post_into_captured_thread_as_user(self, activity: ActivityDiscord) -> searchable, parts, only_to_expert=self.outside_messages_fexp_name, + ftm_provenance=provenance, thread_too_old_s=30*86400 if activity.thread_id else 300, ) except gql.transport.exceptions.TransportQueryError as e: # type: ignore[attr-defined] @@ -872,3 +879,647 @@ def _format_assistant_message(self, content: Any) -> str: if isinstance(parsed, dict): return parsed.get("m_content", str(parsed)) return str(parsed) + + +# --------------------------------------------------------------------------- +# Community-bot utilities (consolidated from fi_discord_community) +# --------------------------------------------------------------------------- + +COL_ONBOARDING = "dc_onboarding_state" +COL_MOD_EVENTS = "dc_mod_events" +COL_ACTIVITY = "dc_member_activity" +COL_FAQ_RATE = "dc_faq_rate" +COL_MOD_RATELIMIT = "dc_mod_ratelimit_window" + +COL_JOBS = ckit_job_queue.COL_JOBS +JobHandler = ckit_job_queue.JobHandler + + +def build_intents() -> discord.Intents: + intents = discord.Intents.default() + intents.message_content = True + intents.members = True + intents.guilds = True + intents.dm_messages = True + intents.guild_messages = True + intents.guild_reactions = True + return intents + + +def guild_matches(guild: Optional[discord.Guild], want_id: Optional[int]) -> bool: + if want_id is None: + return True + if guild is None: + return False + return int(guild.id) == int(want_id) + + +def truncate_message(text: str, limit: int = 2000) -> str: + if len(text) <= limit: + return text + return text[: limit - 20] + "\n...(truncated)" + + +async def safe_send( + channel: discord.abc.Messageable, + persona_id: str, + content: str, +) -> Optional[discord.Message]: + t = truncate_message(content) + g = getattr(channel, "guild", None) + gid = int(g.id) if g is not None else None + delay = 1.0 + for attempt in range(5): + try: + return await channel.send(t) + except discord.errors.HTTPException as e: + if e.status == 429 and attempt < 4: + ra = getattr(e, "retry_after", None) + wait = float(ra) if ra is not None else delay + wait = max(0.5, min(wait, 30.0)) + log_ctx(persona_id, gid, "safe_send 429 backoff %.1fs", wait) + await asyncio.sleep(wait) + delay = min(delay * 2.0, 16.0) + continue + log_ctx(persona_id, gid, "safe_send HTTP %s", e.status) + return None + except DiscordException as e: + log_ctx(persona_id, gid, "safe_send failed: %s %s", type(e).__name__, e) + return None + except aiohttp.ClientError as e: + log_ctx(persona_id, gid, "safe_send network: %s %s", type(e).__name__, e) + return None + return None + + +async def safe_dm( + client: discord.Client, + user: discord.abc.User, + persona_id: str, + content: str, +) -> bool: + try: + ch = user.dm_channel or await user.create_dm() + except DiscordException as e: + log_ctx(persona_id, None, "create_dm failed for user=%s: %s %s", getattr(user, "id", "?"), type(e).__name__, e) + return False + except aiohttp.ClientError as e: + log_ctx( + persona_id, + None, + "create_dm network for user=%s: %s %s", + getattr(user, "id", "?"), + type(e).__name__, + e, + ) + return False + m = await safe_send(ch, persona_id, content) + return m is not None + + +def compile_url_patterns(lines: str) -> List[re.Pattern[str]]: + out: List[re.Pattern[str]] = [] + for line in (lines or "").splitlines(): + pat = line.strip() + if not pat: + continue + try: + out.append(re.compile(pat, re.I)) + except re.error: + logger.warning("bad url regex ignored: %r", pat[:80]) + return out + + +DISCORD_INVITE_RE = re.compile( + r"(discord\.gg/|discordapp\.com/invite/|discord\.com/invite/)[a-zA-Z0-9_-]+", + re.I, +) + + +def message_has_invite(content: str) -> bool: + return bool(DISCORD_INVITE_RE.search(content or "")) + + +def match_blocked_url(content: str, patterns: List[re.Pattern[str]]) -> bool: + for p in patterns: + if p.search(content or ""): + return True + return False + + +async def start_discord_client( + token: str, + persona_id: str, + register: Callable[[discord.Client], None], +) -> Tuple[discord.Client, asyncio.Task]: + client = discord.Client(intents=build_intents()) + register(client) + + async def _runner() -> None: + try: + await client.start(token) + except asyncio.CancelledError: + raise + except DiscordException as e: + logger.error("[%s] discord client died: %s %s", persona_id, type(e).__name__, e) + + t = asyncio.create_task(_runner()) + return client, t + + +async def close_discord_client(client: Optional[discord.Client], task: Optional[asyncio.Task]) -> None: + if task and not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + if client and not client.is_closed(): + await client.close() + + +async def discord_run_platform_action( + client: discord.Client, + persona_id: str, + action_type: str, + params: dict, + *, + resolve_guild: Callable[[int], discord.Guild | None], +) -> ActionResult: + """ + Execute Discord-side platform actions for connectors and the backend gateway service. + + Maps action_type to fetch_user, safe_dm, safe_send, role/kick operations, and introspection + helpers (get_user_info, get_channel). Lives in fi_discord2 so runtime Discord I/O stays in one + integration module (Architecture: gateway + DiscordConnector delegate here). + """ + if action_type == "send_dm": + try: + uid = int(params["user_id"]) + text = str(params["text"]) + except (TypeError, ValueError, KeyError): + return ActionResult(ok=False, error="bad_params") + try: + user = await client.fetch_user(uid) + except DiscordException as e: + log_ctx(persona_id, None, "send_dm fetch_user: %s %s", type(e).__name__, e) + return ActionResult(ok=False, error="%s: %s" % (type(e).__name__, e)) + except aiohttp.ClientError as e: + log_ctx(persona_id, None, "send_dm fetch_user network: %s %s", type(e).__name__, e) + return ActionResult(ok=False, error="%s: %s" % (type(e).__name__, e)) + try: + ok = await safe_dm(client, user, persona_id, text) + return ActionResult(ok=ok) + except DiscordException as e: + log_ctx(persona_id, None, "send_dm: %s %s", type(e).__name__, e) + return ActionResult(ok=False, error="%s: %s" % (type(e).__name__, e)) + except aiohttp.ClientError as e: + log_ctx(persona_id, None, "send_dm network: %s %s", type(e).__name__, e) + return ActionResult(ok=False, error="%s: %s" % (type(e).__name__, e)) + + if action_type == "post_to_channel": + try: + cid = int(params["channel_id"]) + text = str(params["text"]) + except (TypeError, ValueError, KeyError): + return ActionResult(ok=False, error="bad_params") + ch: discord.abc.GuildChannel | discord.Thread | discord.abc.PrivateChannel | None = None + try: + ch = client.get_channel(cid) + if not isinstance(ch, discord.TextChannel): + return ActionResult(ok=False, error="channel_not_found") + gch = ch.guild + if gch is None or resolve_guild(int(gch.id)) is None: + return ActionResult(ok=False, error="guild_not_allowed") + msg = await safe_send(ch, persona_id, text) + if msg is None: + return ActionResult(ok=False, error="safe_send_failed") + return ActionResult( + ok=True, + data={"message_id": str(msg.id), "channel_id": str(ch.id)}, + ) + except DiscordException as e: + lg = None + if isinstance(ch, discord.TextChannel) and ch.guild is not None: + lg = int(ch.guild.id) + log_ctx(persona_id, lg, "post_to_channel: %s %s", type(e).__name__, e) + return ActionResult(ok=False, error="%s: %s" % (type(e).__name__, e)) + except aiohttp.ClientError as e: + log_ctx(persona_id, None, "post_to_channel network: %s %s", type(e).__name__, e) + return ActionResult(ok=False, error="%s: %s" % (type(e).__name__, e)) + + if action_type == "get_user_info": + try: + uid = int(params["user_id"]) + except (TypeError, ValueError, KeyError): + return ActionResult(ok=False, error="bad_params") + raw_sid = params.get("server_id") or params.get("guild_id") or "" + if str(raw_sid).strip(): + try: + gid = int(raw_sid) + except (TypeError, ValueError): + return ActionResult(ok=False, error="bad_server_id") + g = resolve_guild(gid) + if g is None: + return ActionResult(ok=False, error="guild_not_found") + member = g.get_member(uid) + if member is None: + try: + member = await g.fetch_member(uid) + except DiscordException: + member = None + if member is None: + return ActionResult(ok=False, error="member_not_found") + role_ids = [str(r.id) for r in member.roles] + return ActionResult( + ok=True, + data={ + "user_id": str(member.id), + "display_name": member.display_name, + "role_ids": role_ids, + }, + ) + for guild in client.guilds: + member = guild.get_member(uid) + if member is not None: + role_ids = [str(r.id) for r in member.roles] + return ActionResult( + ok=True, + data={ + "user_id": str(member.id), + "display_name": member.display_name, + "role_ids": role_ids, + }, + ) + return ActionResult(ok=False, error="member_not_found") + + if action_type == "get_channel": + try: + cid = int(params["channel_id"]) + except (TypeError, ValueError, KeyError): + return ActionResult(ok=False, error="bad_params") + ch = client.get_channel(cid) + if ch is None: + return ActionResult(ok=False, error="channel_not_found") + gch = getattr(ch, "guild", None) + if gch is None: + return ActionResult(ok=False, error="not_guild_channel") + if resolve_guild(int(gch.id)) is None: + return ActionResult(ok=False, error="guild_not_allowed") + nm = getattr(ch, "name", None) or "" + data: Dict[str, Any] = { + "channel_id": str(ch.id), + "name": nm, + "type": str(ch.type), + "guild_id": str(gch.id), + } + me = gch.me + if me is not None and hasattr(ch, "permissions_for"): + pr = ch.permissions_for(me) + data["view_channel"] = pr.view_channel + data["send_messages"] = pr.send_messages + data["read_message_history"] = pr.read_message_history + data["manage_messages"] = pr.manage_messages + return ActionResult(ok=True, data=data) + + g: discord.Guild | None = None + if action_type in ("add_role", "remove_role", "kick"): + raw = params.get("server_id") or params.get("guild_id") or "" + if raw is None or str(raw).strip() == "": + return ActionResult(ok=False, error="missing_server_id") + try: + gid = int(raw) + except (TypeError, ValueError): + return ActionResult(ok=False, error="bad_params") + g = resolve_guild(gid) + if g is None: + return ActionResult(ok=False, error="guild_not_found") + + if action_type in ("add_role", "remove_role"): + try: + uid = int(params["user_id"]) + rid = int(params["role_id"]) + except (TypeError, ValueError, KeyError): + return ActionResult(ok=False, error="bad_params") + try: + member = g.get_member(uid) + role = g.get_role(rid) + if member is None or role is None: + return ActionResult(ok=False, error="member_or_role_not_found") + if action_type == "add_role": + await member.add_roles(role) + else: + await member.remove_roles(role) + return ActionResult(ok=True) + except DiscordException as e: + log_ctx(persona_id, g.id, "%s: %s %s", action_type, type(e).__name__, e) + return ActionResult(ok=False, error="%s: %s" % (type(e).__name__, e)) + except aiohttp.ClientError as e: + log_ctx(persona_id, g.id, "%s network: %s %s", action_type, type(e).__name__, e) + return ActionResult(ok=False, error="%s: %s" % (type(e).__name__, e)) + + if action_type == "kick": + try: + uid = int(params["user_id"]) + except (TypeError, ValueError, KeyError): + return ActionResult(ok=False, error="bad_params") + reason = str(params.get("reason") or "") + try: + member = g.get_member(uid) + if member is None: + return ActionResult(ok=False, error="member_not_found") + await member.kick(reason=reason or None) + return ActionResult(ok=True) + except DiscordException as e: + log_ctx(persona_id, g.id, "kick: %s %s", type(e).__name__, e) + return ActionResult(ok=False, error="%s: %s" % (type(e).__name__, e)) + except aiohttp.ClientError as e: + log_ctx(persona_id, g.id, "kick network: %s %s", type(e).__name__, e) + return ActionResult(ok=False, error="%s: %s" % (type(e).__name__, e)) + + return ActionResult(ok=False, error="unknown_action_type") + + +def gateway_reaction_emoji_key(emoji: Any) -> str: + """ + Serialize a discord partial emoji to the same string form as reaction_roles_json bindings + (unicode name, or 'name:id' for custom emoji). Used by gateway-emitted reaction events. + """ + try: + eid = getattr(emoji, "id", None) + if eid: + name = getattr(emoji, "name", None) or "" + return "%s:%s" % (name, eid) + return str(getattr(emoji, "name", None) or "") + except (TypeError, AttributeError): + return "" + + +def bind_discord_gateway_client( + client: discord.Client, + emit: Callable[[NormalizedEvent], Awaitable[None]], +) -> None: + """ + Attach Discord event handlers to ``client`` that emit ``NormalizedEvent`` (same shape as gateway wire). + + Used by ``DiscordLocalConnector`` in the discord_bot worker to keep runtime event shape stable. + """ + + @client.event + async def on_ready() -> None: + for g in list(client.guilds): + await _emit_gateway_server_connected(g, emit) + + @client.event + async def on_member_join(member: discord.Member) -> None: + if member.bot: + return + await emit( + NormalizedEvent( + source="discord", + server_id=str(member.guild.id), + channel_id="", + user_id=str(member.id), + event_type="member_joined", + payload={ + "username": str(member), + "guild_id": int(member.guild.id), + "user_id": int(member.id), + }, + timestamp=time.time(), + ), + ) + + @client.event + async def on_message(message: discord.Message) -> None: + if message.author.bot: + return + if not message.guild: + return + if isinstance(message.channel, discord.DMChannel): + return + await emit( + NormalizedEvent( + source="discord", + server_id=str(message.guild.id), + channel_id=str(message.channel.id), + user_id=str(message.author.id), + event_type="message_in_channel", + payload={ + "content": message.content or "", + "channel_id": int(message.channel.id), + "guild_id": int(message.guild.id), + "user_id": int(message.author.id), + "message_id": str(message.id), + }, + timestamp=time.time(), + ), + ) + + @client.event + async def on_member_remove(member: discord.Member) -> None: + if member.bot: + return + await emit( + NormalizedEvent( + source="discord", + server_id=str(member.guild.id), + channel_id="", + user_id=str(member.id), + event_type="member_removed", + payload={ + "username": str(member), + "guild_id": int(member.guild.id), + "user_id": int(member.id), + }, + timestamp=time.time(), + ), + ) + + @client.event + async def on_guild_remove(guild: discord.Guild) -> None: + await emit( + NormalizedEvent( + source="discord", + server_id=str(guild.id), + channel_id="", + user_id="", + event_type="server_disconnected", + payload={"guild_id": int(guild.id)}, + timestamp=time.time(), + ), + ) + + @client.event + async def on_guild_join(guild: discord.Guild) -> None: + await _emit_gateway_server_connected(guild, emit) + + @client.event + async def on_guild_available(guild: discord.Guild) -> None: + await _emit_gateway_server_connected(guild, emit) + + @client.event + async def on_raw_reaction_add(payload: discord.RawReactionActionEvent) -> None: + """ + Forward reaction adds to worker bots (reaction role bindings) via the same DISCORD + emessage path as messages; no worker-side Discord socket required. + """ + try: + if payload.guild_id is None: + return + bot_user = client.user + if bot_user is not None and int(payload.user_id) == int(bot_user.id): + return + await emit( + NormalizedEvent( + source="discord", + server_id=str(payload.guild_id), + channel_id=str(payload.channel_id), + user_id=str(payload.user_id), + event_type="reaction_added", + payload={ + "guild_id": int(payload.guild_id), + "channel_id": int(payload.channel_id), + "user_id": int(payload.user_id), + "message_id": str(payload.message_id), + "emoji": gateway_reaction_emoji_key(payload.emoji), + }, + timestamp=time.time(), + ), + ) + except (TypeError, ValueError, AttributeError, DiscordException) as e: + log_ctx("gateway", int(payload.guild_id or 0) or None, "on_raw_reaction_add emit: %s %s", type(e).__name__, e) + + @client.event + async def on_raw_reaction_remove(payload: discord.RawReactionActionEvent) -> None: + """Mirror of on_raw_reaction_add for removing roles when a reaction is cleared.""" + try: + if payload.guild_id is None: + return + bot_user = client.user + if bot_user is not None and int(payload.user_id) == int(bot_user.id): + return + await emit( + NormalizedEvent( + source="discord", + server_id=str(payload.guild_id), + channel_id=str(payload.channel_id), + user_id=str(payload.user_id), + event_type="reaction_removed", + payload={ + "guild_id": int(payload.guild_id), + "channel_id": int(payload.channel_id), + "user_id": int(payload.user_id), + "message_id": str(payload.message_id), + "emoji": gateway_reaction_emoji_key(payload.emoji), + }, + timestamp=time.time(), + ), + ) + except (TypeError, ValueError, AttributeError, DiscordException) as e: + log_ctx("gateway", int(payload.guild_id or 0) or None, "on_raw_reaction_remove emit: %s %s", type(e).__name__, e) + + +async def _emit_gateway_server_connected( + g: discord.Guild, + emit: Callable[[NormalizedEvent], Awaitable[None]], +) -> None: + """Emit server_connected with member count for gateway ingress routing.""" + mc = getattr(g, "member_count", None) + if mc is None: + mc = 0 + await emit( + NormalizedEvent( + source="discord", + server_id=str(g.id), + channel_id="", + user_id="", + event_type="server_connected", + payload={ + "guild_id": int(g.id), + "guild_name": g.name or "", + "approx_member_count": int(mc), + }, + timestamp=time.time(), + ), + ) + + +def _perm_gaps_basic(perms: discord.Permissions) -> List[str]: + miss: List[str] = [] + if not perms.view_channel: + miss.append("view_channel") + if not perms.send_messages: + miss.append("send_messages") + if not perms.read_message_history: + miss.append("read_message_history") + return miss + + +def _perm_gaps_mod(perms: discord.Permissions) -> List[str]: + miss = _perm_gaps_basic(perms) + if not perms.manage_messages: + miss.append("manage_messages") + return miss + + +def preflight_text_channels( + guild: discord.Guild, + bot_user: discord.ClientUser, + persona_id: str, + bot_label: str, + channels: Dict[str, Tuple[Optional[int], str]], + *, + warn_manage_roles: bool = False, +) -> None: + me = guild.get_member(bot_user.id) + if not me: + log_ctx(persona_id, guild.id, "preflight %s: bot not in guild member cache", bot_label) + return + for label, (cid, level) in channels.items(): + if not cid: + continue + ch = guild.get_channel(int(cid)) + if not isinstance(ch, discord.TextChannel): + log_ctx(persona_id, guild.id, "preflight %s: %s id=%s missing or not text", bot_label, label, cid) + continue + perms = ch.permissions_for(me) + if level == "mod": + miss = _perm_gaps_mod(perms) + else: + miss = _perm_gaps_basic(perms) + if miss: + log_ctx( + persona_id, + guild.id, + "preflight %s: %s ch=%s missing %s", + bot_label, + label, + cid, + ",".join(miss), + ) + if warn_manage_roles and not me.guild_permissions.manage_roles: + log_ctx( + persona_id, + guild.id, + "preflight %s: guild.manage_roles false (assign roles only below bot role)", + bot_label, + ) + + +async def enqueue_job( + db: Any, + kind: str, + run_at_ts: float, + payload: Dict[str, Any], +) -> None: + return await ckit_job_queue.enqueue_job(db, kind, run_at_ts, payload) + + +async def drain_due_jobs( + db: Any, + persona_id: str, + handlers: Dict[str, JobHandler], + limit: int = 50, +) -> int: + return await ckit_job_queue.drain_due_jobs(db, persona_id, handlers, limit=limit) diff --git a/flexus_client_kit/setup_schema_schema.json b/flexus_client_kit/setup_schema_schema.json index fbb3cbc4..43a71c4d 100644 --- a/flexus_client_kit/setup_schema_schema.json +++ b/flexus_client_kit/setup_schema_schema.json @@ -16,15 +16,18 @@ "type": "string", "title": "Input Type", "enum": ["string_short", "string_long", "string_multiline", "bool", "int", "float"], - "description": "UI input control type. string_short: single-line text, string_long: wider single-line text, string_multiline: textarea, bool: checkbox, int/float: numeric input." + "description": "UI input control type. string_short: single-line text, string_long: wider single-line text, string_multiline: textarea, bool: toggle, int/float: numeric input." }, "bs_default": { "title": "Default Value", "oneOf": [ { "type": "string" }, + { "type": "boolean" }, { "type": "integer" }, - { "type": "number" }, - { "type": "boolean" } + { + "type": "number", + "not": { "multipleOf": 1 } + } ], "description": "Default value used when the admin has not provided an override in the setup dialog. Must match bs_type." }, diff --git a/flexus_simple_bots/admonster/admonster_install.py b/flexus_simple_bots/admonster/admonster_install.py index b1778d18..0e233c19 100644 --- a/flexus_simple_bots/admonster/admonster_install.py +++ b/flexus_simple_bots/admonster/admonster_install.py @@ -20,7 +20,7 @@ ("setup", ckit_bot_install.FMarketplaceExpertInput( fexp_system_prompt=admonster_prompts.admonster_setup, fexp_python_kernel="", - fexp_allow_tools=",".join(sorted(set(ckit_cloudtool.CLOUDTOOLS_ADVANCED) | TOOL_NAMESET)), + fexp_allow_tools=",".join(sorted(set(ckit_cloudtool.CLOUDTOOLS_QUITE_A_LOT) | TOOL_NAMESET)), fexp_nature="NATURE_INTERACTIVE", fexp_inactivity_timeout=0, fexp_description="Helps users configure Facebook OAuth connections and ad account settings, plus LinkedIn advertising credentials.", diff --git a/flexus_simple_bots/clerkwing/clerkwing_install.py b/flexus_simple_bots/clerkwing/clerkwing_install.py index b45d34a0..f7bfdfc2 100644 --- a/flexus_simple_bots/clerkwing/clerkwing_install.py +++ b/flexus_simple_bots/clerkwing/clerkwing_install.py @@ -36,7 +36,7 @@ ("setup", ckit_bot_install.FMarketplaceExpertInput( fexp_system_prompt=clerkwing_prompts.clerkwing_setup, fexp_python_kernel="", - fexp_allow_tools=",".join(TOOL_NAMESET | ckit_cloudtool.CLOUDTOOLS_ADVANCED), + fexp_allow_tools=",".join(TOOL_NAMESET | ckit_cloudtool.CLOUDTOOLS_QUITE_A_LOT), fexp_nature="NATURE_INTERACTIVE", fexp_description="Configuration assistant for setting up Gmail, Google Calendar, and Jira OAuth connections.", )), diff --git a/flexus_simple_bots/discord_bot/README.md b/flexus_simple_bots/discord_bot/README.md new file mode 100644 index 00000000..39065d2c --- /dev/null +++ b/flexus_simple_bots/discord_bot/README.md @@ -0,0 +1,5 @@ +# Discord Bot + +Welcome DMs, optional public welcome, delayed follow-up when the member has not posted in the guild, start-here checklist post, reaction roles, and `!announce` for moderators. + +Requires **`FLEXUS_DISCORD_BOT_TOKEN`** env var (or legacy `discord_manual` bot token via persona Integrations) and **`dc_guild_id`** in **Setup**. Does not use `fi_discord2` (Karen unchanged). diff --git a/flexus_simple_bots/discord_bot/__init__.py b/flexus_simple_bots/discord_bot/__init__.py new file mode 100644 index 00000000..8b137891 --- /dev/null +++ b/flexus_simple_bots/discord_bot/__init__.py @@ -0,0 +1 @@ + diff --git a/flexus_simple_bots/discord_bot/default__s1.yaml b/flexus_simple_bots/discord_bot/default__s1.yaml new file mode 100644 index 00000000..c40b7dc5 --- /dev/null +++ b/flexus_simple_bots/discord_bot/default__s1.yaml @@ -0,0 +1,9 @@ +messages: +- role: user + content: What does this bot do on Discord? +- role: assistant + content: >- + On the Discord server it sends welcome DMs (unless disabled), optional public welcome posts, + a start-here checklist, delayed follow-up DMs via a job queue, reaction-based role assignment, + and mod-only !announce with optional role pings. All channel and role IDs come from setup. +persona_marketable_name: discord_bot diff --git a/flexus_simple_bots/discord_bot/discord_bot-1024x1536.webp b/flexus_simple_bots/discord_bot/discord_bot-1024x1536.webp new file mode 100644 index 00000000..7c3aa91e Binary files /dev/null and b/flexus_simple_bots/discord_bot/discord_bot-1024x1536.webp differ diff --git a/flexus_simple_bots/discord_bot/discord_bot-256x256.webp b/flexus_simple_bots/discord_bot/discord_bot-256x256.webp new file mode 100644 index 00000000..90496b6d Binary files /dev/null and b/flexus_simple_bots/discord_bot/discord_bot-256x256.webp differ diff --git a/flexus_simple_bots/discord_bot/discord_bot.py b/flexus_simple_bots/discord_bot/discord_bot.py new file mode 100644 index 00000000..519842a2 --- /dev/null +++ b/flexus_simple_bots/discord_bot/discord_bot.py @@ -0,0 +1,601 @@ +import asyncio +import json +import logging +import os +from typing import Any, Dict, List +from pymongo import AsyncMongoClient +from pymongo.errors import PyMongoError + +from flexus_client_kit import ckit_automation_actions +from flexus_client_kit import ckit_automation_engine +from flexus_client_kit import ckit_bot_exec +from flexus_client_kit import ckit_client +from flexus_client_kit import ckit_messages +from flexus_client_kit import ckit_mongo +from flexus_client_kit import ckit_shutdown +from flexus_client_kit.ckit_connector import ChatConnector, NormalizedEvent +from flexus_client_kit.ckit_connector_discord import ( + discord_bot_api_key_from_external_auth, + log_ctx, + parse_snowflake, + setup_truthy, +) +from flexus_client_kit.ckit_connector_discord_local import DiscordLocalConnector +from flexus_simple_bots.discord_bot import discord_bot_install +from flexus_simple_bots.version_common import SIMPLE_BOTS_COMMON_VERSION + +logger = logging.getLogger("discord_bot") + + +async def _warn_discord_channel_acl( + connector: ChatConnector, + persona_id: str, + purpose_label: str, + channel_id: int, +) -> None: + """Log when the connector cannot see a channel or the bot lacks common text permissions.""" + try: + info = await connector.get_channel(str(channel_id)) + if info is None: + logger.warning( + "%s channel preflight [%s]: channel_id=%s not reachable " + "(missing, not a guild channel, or guild not allowlisted)", + persona_id, + purpose_label, + channel_id, + ) + return + missing = [ + k + for k in ( + "view_channel", + "send_messages", + "read_message_history", + "manage_messages", + ) + if k in info and info[k] is False + ] + if not missing: + return + logger.warning( + "%s channel preflight [%s]: channel_id=%s guild_id=%s name=%r missing permissions: %s", + persona_id, + purpose_label, + info.get("channel_id", str(channel_id)), + info.get("guild_id"), + info.get("name", ""), + ",".join(missing), + ) + except (TypeError, ValueError, AttributeError) as e: + logger.warning( + "%s channel preflight [%s] failed: %s %s", + persona_id, + purpose_label, + type(e).__name__, + e, + ) + + +async def _discord_channel_acl_preflight( + connector: ChatConnector, + persona_id: str, + watched_channel_ids: set[int], + setup: Dict[str, Any], +) -> None: + """Best-effort permission warnings for watched channels and checklist/welcome targets.""" + try: + for cid in sorted(watched_channel_ids): + await _warn_discord_channel_acl(connector, persona_id, "watched message_in_channel", cid) + checklist_cid = parse_snowflake(setup.get("checklist_channel_id", "")) + if checklist_cid and not setup_truthy(setup.get("disable_checklist_auto_post")): + await _warn_discord_channel_acl(connector, persona_id, "checklist_channel", checklist_cid) + welcome_cid = parse_snowflake(setup.get("welcome_channel_id", "")) + if welcome_cid: + await _warn_discord_channel_acl(connector, persona_id, "welcome_channel", welcome_cid) + except (TypeError, ValueError) as e: + logger.warning("%s acl preflight error: %s %s", persona_id, type(e).__name__, e) + + +BOT_NAME = "discord_bot" +BOT_VERSION = SIMPLE_BOTS_COMMON_VERSION + + +TOOLS: List[Any] = [] + + +def _discord_bot_hosted_bot_token() -> tuple[str, str | None]: + """Return (token, env var name) when FLEXUS_DISCORD_BOT_TOKEN is set.""" + try: + v = (os.environ.get("FLEXUS_DISCORD_BOT_TOKEN") or "").strip() + if v: + return v, "FLEXUS_DISCORD_BOT_TOKEN" + return "", None + except (TypeError, AttributeError): + return "", None + + +def _parse_bindings(raw: str) -> List[Dict[str, str]]: + """Parse reaction_roles_json: [{message_id, emoji, role_id}, ...].""" + try: + v = json.loads(raw or "[]") + except json.JSONDecodeError: + return [] + if not isinstance(v, list): + return [] + out: List[Dict[str, str]] = [] + for item in v: + if not isinstance(item, dict): + continue + mid = str(item.get("message_id", "")).strip() + emo = str(item.get("emoji", "")).strip() + rid = str(item.get("role_id", "")).strip() + if mid and emo and rid: + out.append({"message_id": mid, "emoji": emo, "role_id": rid}) + return out + + +def _role_ids_csv(s: str) -> List[int]: + """Parse comma-separated snowflakes from setup (e.g. mod roles).""" + out: List[int] = [] + for part in (s or "").split(","): + p = part.strip() + if p.isdigit(): + out.append(int(p)) + return out + + +def _guild_ids_from_persona(persona: Any, setup: Dict[str, Any]) -> set[int]: + """Guild allowlist from persona_external_addresses discord: or legacy dc_guild_id.""" + try: + addresses = getattr(persona, "persona_external_addresses", None) + if isinstance(addresses, list): + ids: set[int] = set() + for v in addresses: + if isinstance(v, str) and v.startswith("discord:"): + gid = parse_snowflake(v[len("discord:") :]) + if gid is not None: + ids.add(gid) + if ids: + return ids + legacy_gid = parse_snowflake(setup.get("dc_guild_id", "")) + if legacy_gid is not None: + return {legacy_gid} + return set() + except (TypeError, ValueError, AttributeError): + return set() + + +def _event_member_dict(guild_id: int, user_id: int, pl: Dict[str, Any]) -> Dict[str, Any]: + """ + Minimal member snapshot for the automation engine and actions (templates, role tools). + + Loaded from the normalized Discord payload only — nothing persisted on the worker. + """ + uname = pl.get("username", "") + if not isinstance(uname, str): + uname = "" + return { + "guild_id": guild_id, + "user_id": user_id, + "discord_username": uname, + } + + +async def _maybe_auto_post_checklist( + connector: ChatConnector, + setup: Dict[str, Any], + mongo_db: Any, + persona_id: str, + pl: Dict[str, Any], + allowed_guild_ids: frozenset[int], +) -> None: + """ + One-time checklist message per guild via connector post_to_channel + dc_onboarding_meta marker. + """ + try: + if setup_truthy(setup.get("disable_checklist_auto_post")): + return + gid = int(pl.get("guild_id", 0) or 0) + # Empty allowlist must not mean "all guilds" — deny checklist until persona lists guilds. + if not gid or not allowed_guild_ids or gid not in allowed_guild_ids: + return + cid = parse_snowflake(setup.get("checklist_channel_id", "")) + if not cid: + return + body = (setup.get("checklist_message_body") or "").strip() + if not body: + return + checklist_meta_coll = mongo_db["dc_onboarding_meta"] + meta_id = "checklist_posted:%s" % gid + doc = await checklist_meta_coll.find_one({"_id": meta_id}) + if doc and doc.get("posted"): + return + result = await connector.execute_action( + "post_to_channel", + { + "channel_id": str(cid), + "text": body, + "server_id": str(gid), + }, + ) + if not result.ok: + log_ctx(persona_id, gid, "checklist auto-post failed: %s", result.error or "unknown") + return + extra: Dict[str, Any] = {"posted": True, "channel_id": str(cid), "guild_id": str(gid)} + data = getattr(result, "data", None) + if isinstance(data, dict) and data.get("message_id"): + extra["message_id"] = data["message_id"] + await checklist_meta_coll.update_one({"_id": meta_id}, {"$set": extra}, upsert=True) + except PyMongoError as e: + log_ctx(persona_id, None, "checklist meta PyMongoError: %s %s", type(e).__name__, e) + except (TypeError, ValueError, KeyError) as e: + log_ctx(persona_id, None, "checklist auto-post error: %s %s", type(e).__name__, e) + + +async def _handle_reaction_binding_event( + connector: ChatConnector, + setup: Dict[str, Any], + persona_id: str, + event_type: str, + pl: Dict[str, Any], +) -> None: + """ + Apply reaction_roles_json using connector add_role/remove_role (reaction_* events from the live client). + """ + try: + if setup_truthy(setup.get("disable_reaction_roles")): + return + try: + gid = int(pl.get("guild_id", 0) or 0) + uid = int(pl.get("user_id", 0) or 0) + except (TypeError, ValueError): + return + if not gid or not uid: + return + mid = str(pl.get("message_id", "") or "") + key = str(pl.get("emoji", "") or "") + if not mid or not key: + return + bindings = _parse_bindings(setup.get("reaction_roles_json", "[]")) + for b in bindings: + if b["message_id"] != mid: + continue + if b["emoji"] != key: + continue + act = "add_role" if event_type == "reaction_added" else "remove_role" + result = await connector.execute_action( + act, + { + "user_id": str(uid), + "role_id": b["role_id"], + "server_id": str(gid), + }, + ) + if not result.ok: + log_ctx(persona_id, gid, "reaction role %s failed: %s", act, result.error or "unknown") + return + except (TypeError, ValueError, KeyError) as e: + log_ctx(persona_id, None, "reaction binding error: %s %s", type(e).__name__, e) + + +async def discord_bot_main_loop(fclient: ckit_client.FlexusClient, rcx: ckit_bot_exec.RobotContext) -> None: + """Discord community bot: in-process discord.py client; events via ChatConnector.on_event.""" + connector: ChatConnector | None = None + mongo: Any = None + persona_id_loop = rcx.persona.persona_id + try: + persona_setup_raw = rcx.persona.persona_setup or {} + setup = ckit_bot_exec.official_setup_mixing_procedure( + discord_bot_install.DISCORD_BOT_SETUP_SCHEMA, + persona_setup_raw, + ) + token, hosted_env = _discord_bot_hosted_bot_token() + if not token: + token = discord_bot_api_key_from_external_auth(rcx.external_auth) + hosted_env = "external_auth" if token else None + if not token: + logger.error( + "%s missing Discord bot token: set FLEXUS_DISCORD_BOT_TOKEN, " + "or provide legacy external_auth api_key (discord_manual / discord)", + rcx.persona.persona_id, + ) + while not ckit_shutdown.shutdown_event.is_set(): + await rcx.unpark_collected_events(sleep_if_no_work=30.0) + return + + logger.info( + "%s Discord runtime: in-process client (token source=%s, backend=%s).", + rcx.persona.persona_id, + hosted_env, + fclient.base_url_http, + ) + + mongo_conn_str = await ckit_mongo.mongo_fetch_creds(fclient, rcx.persona.persona_id) + mongo = AsyncMongoClient(mongo_conn_str) + mongo_db = mongo[persona_id_loop + "_db"] + + rules = ckit_automation_engine.load_rules(persona_setup_raw) + log_ctx(rcx.persona.persona_id, None, "loaded %d automation rules", len(rules)) + + watched_channel_ids: set[int] = set() + for r in rules: + trig = r.get("trigger", {}) + if trig.get("type") == "message_in_channel": + cid = ckit_automation_engine.resolve_channel_id(trig.get("channel_id_field", ""), setup) + if cid is not None: + watched_channel_ids.add(cid) + + workspace_id = rcx.persona.located_fgroup_id or "" + + if len(rules) == 0: + log_ctx(rcx.persona.persona_id, None, "no automation rules published, lifecycle automation inactive") + + mod_roles = set(_role_ids_csv(setup.get("mod_role_ids", ""))) + announce_pings = _role_ids_csv(setup.get("announce_ping_role_ids", "")) + + initial_guild_ids = _guild_ids_from_persona(rcx.persona, setup) + if not initial_guild_ids: + logger.warning( + "%s Discord guild allowlist is empty (no discord: addresses in persona_external_addresses " + "and no legacy dc_guild_id). Inbound events and checklist autopost are denied until configured.", + rcx.persona.persona_id, + ) + else: + log_ctx( + rcx.persona.persona_id, + None, + "allowed guild ids from persona_external_addresses: %s", + sorted(initial_guild_ids), + ) + + connector = DiscordLocalConnector( + token, + rcx.persona.persona_id, + initial_guild_ids=initial_guild_ids, + ) + + augmented_setup = dict(setup) + augmented_setup["_format_mention"] = connector.format_mention + + await ckit_messages.ensure_message_indexes(mongo_db) + + async def handle_normalized_event(event: NormalizedEvent) -> None: + """Dispatch one normalized Discord event (member, message, checklist, reactions).""" + try: + persona_id = rcx.persona.persona_id + pl = event.payload if isinstance(event.payload, dict) else {} + + if event.event_type == "server_connected": + await _maybe_auto_post_checklist( + connector, + setup, + mongo_db, + persona_id, + pl, + connector.allowed_guild_ids, + ) + return + + if event.event_type in ("server_disconnected",): + return + + if event.event_type in ("reaction_added", "reaction_removed"): + await _handle_reaction_binding_event(connector, setup, persona_id, event.event_type, pl) + return + + if event.event_type == "member_joined": + try: + gid = int(pl["guild_id"]) + uid = int(pl["user_id"]) + except (KeyError, TypeError, ValueError): + return + event_member = _event_member_dict(gid, uid, pl) + + if len(rules) == 0: + return + + ctx: Dict[str, Any] = { + "connector": connector, + "mongo_db": mongo_db, + "server_id": event.server_id, + "platform_user": await connector.get_user_info(event.user_id, server_id=event.server_id), + "event_member": event_member, + "persona_id": persona_id, + "setup": augmented_setup, + "fclient": fclient, + "ws_id": workspace_id, + } + actions = ckit_automation_engine.process_event( + "member_joined", + {"guild_id": gid, "user_id": uid}, + rules, + event_member, + augmented_setup, + ) + await ckit_automation_actions.execute_actions(actions, ctx) + return + + if event.event_type == "message_in_channel": + pl_msg = event.payload if isinstance(event.payload, dict) else {} + content = (pl_msg.get("content") or "").strip() + if content.lower().startswith("!announce ") and mod_roles: + try: + gid_ann = int(pl_msg.get("guild_id", 0) or 0) + except (TypeError, ValueError): + gid_ann = 0 + try: + uid_int = int(event.user_id) + except (TypeError, ValueError): + uid_int = 0 + if gid_ann and uid_int: + info = await connector.get_user_info(str(uid_int), server_id=str(gid_ann)) + if isinstance(info, dict): + raw_roles = info.get("role_ids") or [] + author_roles = set() + for x in raw_roles: + try: + author_roles.add(int(x)) + except (TypeError, ValueError): + continue + if author_roles.intersection(mod_roles): + rest = content[len("!announce ") :].strip() + if rest: + pings = " ".join("<@&%d>" % r for r in announce_pings) + text = "%s\n%s" % (pings, rest) if pings else rest + cid_str = str(pl_msg.get("channel_id", "")) + result = await connector.execute_action( + "post_to_channel", + { + "channel_id": cid_str, + "text": text, + "server_id": str(gid_ann), + }, + ) + if not result.ok: + log_ctx( + rcx.persona.persona_id, + gid_ann, + "!announce failed: %s", + result.error or "unknown", + ) + return + + if len(rules) == 0: + return + try: + gid = int(pl_msg["guild_id"]) + uid = int(pl_msg["user_id"]) + ch_id = int(pl_msg["channel_id"]) + except (KeyError, TypeError, ValueError): + return + + if ch_id in watched_channel_ids: + await ckit_messages.store_message( + mongo_db, + server_id=event.server_id, + channel_id=str(ch_id), + user_id=str(uid), + platform="discord", + content=pl_msg.get("content") or "", + timestamp=event.timestamp, + message_id=str(pl_msg.get("message_id") or ""), + ) + if ch_id not in watched_channel_ids: + return + + event_member = _event_member_dict(gid, uid, pl_msg) + ctx_msg: Dict[str, Any] = { + "connector": connector, + "mongo_db": mongo_db, + "server_id": event.server_id, + "platform_user": await connector.get_user_info(event.user_id, server_id=event.server_id), + "event_member": event_member, + "persona_id": persona_id, + "setup": augmented_setup, + "fclient": fclient, + "ws_id": workspace_id, + } + actions = ckit_automation_engine.process_event( + "message_in_channel", + {"guild_id": gid, "user_id": uid, "channel_id": ch_id}, + rules, + event_member, + augmented_setup, + ) + await ckit_automation_actions.execute_actions(actions, ctx_msg) + return + + if event.event_type == "member_removed": + if len(rules) == 0: + return + try: + gid = int(pl["guild_id"]) + uid = int(pl["user_id"]) + except (KeyError, TypeError, ValueError): + return + event_member = _event_member_dict(gid, uid, pl) + ctx_rm: Dict[str, Any] = { + "connector": connector, + "mongo_db": mongo_db, + "server_id": event.server_id, + "platform_user": await connector.get_user_info(event.user_id, server_id=event.server_id), + "event_member": event_member, + "persona_id": persona_id, + "setup": augmented_setup, + "fclient": fclient, + "ws_id": workspace_id, + } + actions_leave = ckit_automation_engine.process_event( + "member_removed", + {"guild_id": gid, "user_id": uid}, + rules, + event_member, + augmented_setup, + ) + await ckit_automation_actions.execute_actions(actions_leave, ctx_rm) + return + except PyMongoError as e: + gid_log = None + try: + payload = event.payload if isinstance(event.payload, dict) else {} + gid_log = int(payload.get("guild_id", 0) or 0) or None + except (TypeError, ValueError): + gid_log = None + log_ctx(rcx.persona.persona_id, gid_log, "normalized event PyMongoError: %s %s", type(e).__name__, e) + except (TypeError, KeyError, ValueError) as e: + gid_log = None + try: + payload = event.payload if isinstance(event.payload, dict) else {} + gid_log = int(payload.get("guild_id", 0) or 0) or None + except (TypeError, ValueError): + gid_log = None + log_ctx(rcx.persona.persona_id, gid_log, "normalized event data error: %s %s", type(e).__name__, e) + + connector.on_event(handle_normalized_event) + + await connector.connect() + + await _discord_channel_acl_preflight( + connector, + rcx.persona.persona_id, + watched_channel_ids, + setup, + ) + + while not ckit_shutdown.shutdown_event.is_set(): + await rcx.unpark_collected_events(sleep_if_no_work=5.0) + finally: + if connector is not None: + try: + await connector.disconnect() + except (RuntimeError, AttributeError) as e: + logger.warning("connector disconnect: %s %s", type(e).__name__, e) + if mongo is not None: + try: + await mongo.close() + except (RuntimeError, AttributeError, TypeError) as e: + logger.warning("mongo close: %s %s", type(e).__name__, e) + logger.info("%s exit", persona_id_loop) + + +def main() -> None: + """CLI entry: run bots for this process group with the discord_bot loop.""" + try: + scenario_fn = ckit_bot_exec.parse_bot_args() + fclient = ckit_client.FlexusClient(ckit_client.bot_service_name(BOT_NAME, BOT_VERSION), endpoint="/v1/jailed-bot") + asyncio.run( + ckit_bot_exec.run_bots_in_this_group( + fclient, + bot_main_loop=discord_bot_main_loop, + inprocess_tools=TOOLS, + scenario_fn=scenario_fn, + install_func=discord_bot_install.install, + ) + ) + except (RuntimeError, OSError) as e: + logger.error("main failed: %s %s", type(e).__name__, e) + raise + + +if __name__ == "__main__": + main() diff --git a/flexus_simple_bots/discord_bot/discord_bot_install.py b/flexus_simple_bots/discord_bot/discord_bot_install.py new file mode 100644 index 00000000..9be2075a --- /dev/null +++ b/flexus_simple_bots/discord_bot/discord_bot_install.py @@ -0,0 +1,73 @@ +import asyncio +import json +from pathlib import Path + +from flexus_client_kit import ckit_automation_v1_schema_build +from flexus_client_kit import ckit_bot_install +from flexus_client_kit import ckit_client +from flexus_client_kit import ckit_cloudtool +from flexus_simple_bots import prompts_common +from flexus_simple_bots.discord_bot import discord_bot_prompts +from flexus_simple_bots.version_common import SIMPLE_BOTS_COMMON_VERSION + + +# Bot package root: same folder as discord_bot.py, setup_schema.json, webp assets, README. +# Passed to marketplace_upsert_dev_bot as bot_dir so name/version/repo/run/pictures match other simple bots. +ROOT = Path(__file__).parent + +DISCORD_BOT_SETUP_SCHEMA = json.loads((ROOT / "setup_schema.json").read_text()) + +EXPERTS = [ + ( + "default", + ckit_bot_install.FMarketplaceExpertInput( + fexp_system_prompt=discord_bot_prompts.discord_bot_stub, + fexp_python_kernel="", + fexp_allow_tools=",".join(sorted(ckit_cloudtool.CLOUDTOOLS_QUITE_A_LOT)), + fexp_nature="NATURE_INTERACTIVE", + fexp_inactivity_timeout=3600, + fexp_description="Stub expert; Discord automation runs in the bot process.", + fexp_builtin_skills="[]", + ), + ), +] + + +async def install(client: ckit_client.FlexusClient): + """Upsert this dev bot in the marketplace; name/version/run/repo/images are derived inside marketplace_upsert_dev_bot from bot_dir.""" + # Import inside install so this module can load before discord_bot (discord_bot imports discord_bot_install at package load). + from flexus_simple_bots.discord_bot import discord_bot as _discord_bot + + r = await ckit_bot_install.marketplace_upsert_dev_bot( + client, + ws_id=client.ws_id, + bot_dir=ROOT, + marketable_accent_color="#5865F2", + marketable_title1="Discord Bot", + marketable_title2="Welcome flow, follow-up DMs, reaction roles, mod announcements.", + marketable_author="Flexus", + marketable_occupation="Community", + marketable_description=(ROOT / "README.md").read_text(), + marketable_typical_group="Community / Discord", + marketable_setup_default=DISCORD_BOT_SETUP_SCHEMA, + marketable_featured_actions=[ + {"feat_question": "What does this bot do on Discord?", "feat_expert": "default", "feat_depends_on_setup": []}, + ], + marketable_intro_message="I handle member welcome, delayed check-ins, reaction roles, and mod-only !announce on your Discord server.", + marketable_preferred_model_expensive="grok-4-1-fast-reasoning", + marketable_preferred_model_cheap="gpt-5.4-nano", + marketable_experts=[(n, e.filter_tools(_discord_bot.TOOLS)) for n, e in EXPERTS], + marketable_tags=["Discord", "Community"], + marketable_schedule=[prompts_common.SCHED_PICK_ONE_5M], + marketable_auth_supported=["discord"], + marketable_rules_toolkit=ckit_automation_v1_schema_build.build_automation_v1_schema_document(), + ) + return r.marketable_version + + +if __name__ == "__main__": + async def _main() -> None: + fclient = ckit_client.FlexusClient(ckit_client.bot_service_name("discord_bot", SIMPLE_BOTS_COMMON_VERSION), endpoint="/v1/jailed-bot") + await install(fclient) + + asyncio.run(_main()) diff --git a/flexus_simple_bots/discord_bot/discord_bot_prompts.py b/flexus_simple_bots/discord_bot/discord_bot_prompts.py new file mode 100644 index 00000000..604c572a --- /dev/null +++ b/flexus_simple_bots/discord_bot/discord_bot_prompts.py @@ -0,0 +1,4 @@ +discord_bot_stub = """ +You are the Discord Bot operator persona. Almost all work runs on Discord gateway events. +Use Flexus chat only for workspace setup questions; no Discord tools here. +""" diff --git a/flexus_simple_bots/discord_bot/setup_schema.json b/flexus_simple_bots/discord_bot/setup_schema.json new file mode 100644 index 00000000..c94bca25 --- /dev/null +++ b/flexus_simple_bots/discord_bot/setup_schema.json @@ -0,0 +1,119 @@ +[ + { + "bs_name": "dc_guild_id", + "bs_type": "string_short", + "bs_default": "", + "bs_group": "Discord", + "bs_order": 1, + "bs_importance": 1, + "bs_description": "Target guild snowflake ID (digits only). Bot ignores other guilds." + }, + { + "bs_name": "welcome_dm_body", + "bs_type": "string_multiline", + "bs_default": "Welcome to our server! Read #start-here and say hi in #introductions.", + "bs_group": "Welcome", + "bs_order": 1, + "bs_importance": 1, + "bs_description": "DM sent when a member joins (may fail if user blocks DMs)." + }, + { + "bs_name": "welcome_channel_id", + "bs_type": "string_short", + "bs_default": "", + "bs_group": "Welcome", + "bs_order": 2, + "bs_importance": 0, + "bs_description": "Optional channel ID for a public welcome message (empty to skip)." + }, + { + "bs_name": "intro_channel_id", + "bs_type": "string_short", + "bs_default": "", + "bs_group": "Intro Reminder", + "bs_order": 1, + "bs_importance": 1, + "bs_description": "Channel ID where members post their intro. Used by intro reminder automation to detect whether a member has introduced themselves." + }, + { + "bs_name": "followup_dm_body", + "bs_type": "string_multiline", + "bs_default": "Hi! Just checking in -- need help getting started? Reply in #help any time.", + "bs_group": "Intro Reminder", + "bs_order": 3, + "bs_importance": 1, + "bs_description": "Follow-up DM body when there was no guild text activity since join." + }, + { + "bs_name": "checklist_channel_id", + "bs_type": "string_short", + "bs_default": "", + "bs_group": "Welcome", + "bs_order": 4, + "bs_importance": 0, + "bs_description": "Channel ID where the bot posts the start-here checklist once (empty to skip)." + }, + { + "bs_name": "checklist_message_body", + "bs_type": "string_multiline", + "bs_default": "## Start here\n- [ ] Read rules\n- [ ] Introduce yourself\n- [ ] Pick roles below this message", + "bs_group": "Welcome", + "bs_order": 5, + "bs_importance": 0, + "bs_description": "Checklist content (markdown)." + }, + { + "bs_name": "reaction_roles_json", + "bs_type": "string_multiline", + "bs_default": "[]", + "bs_group": "Welcome", + "bs_order": 7, + "bs_importance": 0, + "bs_description": "JSON array: [{\"message_id\":\"\",\"emoji\":\"\",\"role_id\":\"\"}] for reaction roles (message must exist)." + }, + { + "bs_name": "mod_role_ids", + "bs_type": "string_long", + "bs_default": "", + "bs_group": "Announcements", + "bs_order": 1, + "bs_importance": 0, + "bs_description": "Comma-separated role IDs allowed to use !announce in guild." + }, + { + "bs_name": "announce_ping_role_ids", + "bs_type": "string_long", + "bs_default": "", + "bs_group": "Announcements", + "bs_order": 2, + "bs_importance": 0, + "bs_description": "Comma-separated role IDs pinged after each !announce line (optional)." + }, + { + "bs_name": "disable_checklist_auto_post", + "bs_type": "bool", + "bs_default": false, + "bs_group": "Flags", + "bs_order": 3, + "bs_importance": 0, + "bs_description": "If enabled, do not post the start-here checklist on startup." + }, + { + "bs_name": "disable_reaction_roles", + "bs_type": "bool", + "bs_default": false, + "bs_group": "Flags", + "bs_order": 4, + "bs_importance": 0, + "bs_description": "If enabled, ignore reaction add/remove for role assignment." + }, + { + "bs_name": "automation_rules", + "bs_type": "string_multiline", + "bs_default": "", + "bs_group": "Automation", + "bs_order": 1, + "bs_importance": 0, + "bs_description": "Published automation rules document (JSON). Managed by the automation editor; do not edit manually." + } +] diff --git a/flexus_simple_bots/lawyerrat/lawyerrat_install.py b/flexus_simple_bots/lawyerrat/lawyerrat_install.py index 1c1867eb..49b6011e 100644 --- a/flexus_simple_bots/lawyerrat/lawyerrat_install.py +++ b/flexus_simple_bots/lawyerrat/lawyerrat_install.py @@ -90,7 +90,7 @@ ("setup", ckit_bot_install.FMarketplaceExpertInput( fexp_system_prompt=lawyerrat_prompts.lawyerrat_setup, fexp_python_kernel=LAWYERRAT_DEFAULT_LARK, - fexp_allow_tools=",".join(TOOL_NAMESET | ckit_cloudtool.CLOUDTOOLS_ADVANCED), + fexp_allow_tools=",".join(TOOL_NAMESET | ckit_cloudtool.CLOUDTOOLS_QUITE_A_LOT), fexp_nature="NATURE_INTERACTIVE", fexp_description="Setup assistant for configuring legal specialty, formality, and jurisdiction.", )), diff --git a/flexus_simple_bots/version_common.py b/flexus_simple_bots/version_common.py new file mode 100644 index 00000000..73850314 --- /dev/null +++ b/flexus_simple_bots/version_common.py @@ -0,0 +1 @@ +SIMPLE_BOTS_COMMON_VERSION = "1.2.199" diff --git a/setup.py b/setup.py index 6e336666..11c1d366 100644 --- a/setup.py +++ b/setup.py @@ -61,6 +61,7 @@ def run(self): "xai-sdk", "mcp", "python-telegram-bot>=20.0", + "redis>=5", "google-ads", ], extras_require={