From 0b82df586d0f883c395323709d78f219e99105bc Mon Sep 17 00:00:00 2001 From: Aniket Dixit Date: Tue, 9 Jun 2026 23:38:29 +0530 Subject: [PATCH 1/6] metrics supabase update --- README.md | 7 + all_networks.ts | 25 ++++ all_networks_types_helpers.ts | 101 ++++++++++++++ all_networks_usage.ts | 255 ++++++++++++++++++++++++++++++++++ payment_worker.ts | 11 ++ 5 files changed, 399 insertions(+) create mode 100644 all_networks_usage.ts diff --git a/README.md b/README.md index 792f546..bc054de 100644 --- a/README.md +++ b/README.md @@ -58,6 +58,13 @@ SETTLEMENT_BATCH_SIZE=20 SETTLEMENT_BATCH_TIMEOUT=60000 WALRUS_PUBLISHER_URL=http://localhost:9002/v1/blobs +# Optional: Push aggregate inference usage to the same Supabase RPC/table as chat API +USAGE_SUPABASE_URL=https://your-project.supabase.co +USAGE_SUPABASE_SERVICE_ROLE_KEY=your-service-role-key +USAGE_SUPABASE_RPC=record_ohttp_usage +# Set to false to disable the Supabase sink while keeping Redis usage rollups +USAGE_SUPABASE_ENABLED=true + # Optional: Individual settlement queue/signing DATA_SETTLEMENT_INDIVIDUAL_QUEUE_NAME=x402-settle-data-individual-queue-v1 DATA_INDIVIDUAL_WORKER_EVM_PRIVATE_KEY=0xDedicatedIndividualSettlementKey diff --git a/all_networks.ts b/all_networks.ts index 3ed318c..5226d0e 100644 --- a/all_networks.ts +++ b/all_networks.ts @@ -28,6 +28,7 @@ import { parseUint256Field, normalizeHeaderValue, PAYMENT_QUEUE_NAME, + parseInferenceUsageMetadata, parseSettlementJobDataFromHeaders, PORT, settlementStatusFromBullState, @@ -38,9 +39,11 @@ import { type DataSettlementJobData, type HeartbeatRelayRequest, type IndividualDataSettlementJobData, + type InferenceUsageMetadata, type PaymentSettlementJobData, type SettlementApiJobResponse, } from "./all_networks_types_helpers.js"; +import { closeInferenceUsageTracker, getInferenceUsageStats } from "./all_networks_usage.js"; import { type PaymentPayload, type PaymentRequirements, @@ -188,6 +191,7 @@ async function toJobResponse( async function enqueuePaymentSettlementJob(args: { paymentPayload: PaymentPayload; paymentRequirements: PaymentRequirements; + usageMetadata?: InferenceUsageMetadata; }): Promise { const paymentJobId = `payment-${randomUUID()}`; const paymentJob = await paymentQueue.add( @@ -195,6 +199,7 @@ async function enqueuePaymentSettlementJob(args: { { paymentPayload: args.paymentPayload, paymentRequirements: args.paymentRequirements, + usageMetadata: args.usageMetadata, }, { jobId: paymentJobId, @@ -203,6 +208,7 @@ async function enqueuePaymentSettlementJob(args: { console.log("[api] Payment settlement job enqueued", { jobId: paymentJobId, + hasUsageMetadata: Boolean(args.usageMetadata), ...summarizePaymentRequirements(args.paymentRequirements), ...summarizePaymentPayload(args.paymentPayload), }); @@ -322,7 +328,10 @@ app.post("/settle", async (req, res) => { }); } + const usageMetadata = parseInferenceUsageMetadata(req.body.usageMetadata); + console.log("[api] /settle request received", { + hasUsageMetadata: Boolean(usageMetadata), ...summarizePaymentRequirements(paymentRequirements), ...summarizePaymentPayload(paymentPayload), }); @@ -331,6 +340,7 @@ app.post("/settle", async (req, res) => { const paymentJob = await enqueuePaymentSettlementJob({ paymentPayload, paymentRequirements, + usageMetadata, }); return res.status(202).json({ paymentJob }); } catch (error) { @@ -535,6 +545,20 @@ app.get("/health", (_req, res) => { }); }); +app.get("/usage", async (req, res) => { + incrementMetric("api.request.count", ["route:/usage", "method:GET"]); + try { + const daysRaw = typeof req.query.days === "string" ? Number(req.query.days) : 30; + const days = Number.isInteger(daysRaw) ? Math.min(Math.max(daysRaw, 1), 90) : 30; + res.json(await getInferenceUsageStats(days)); + } catch (error) { + console.error("[api] /usage request failed", summarizeError(error)); + res.status(500).json({ + error: error instanceof Error ? error.message : "Unknown error", + }); + } +}); + let httpServer: Server | null = null; let isShuttingDown = false; @@ -562,6 +586,7 @@ async function shutdown(signal: string): Promise { dataSettlementQueue.close(), individualDataSettlementQueue.close(), individualSettlementPreparer.close(), + closeInferenceUsageTracker(), ]); clearTimeout(forcedExitTimer); diff --git a/all_networks_types_helpers.ts b/all_networks_types_helpers.ts index 17db595..d76866b 100644 --- a/all_networks_types_helpers.ts +++ b/all_networks_types_helpers.ts @@ -130,6 +130,22 @@ export type SettlementHandlerResult = { export type PaymentSettlementJobData = { paymentPayload: PaymentPayload; paymentRequirements: PaymentRequirements; + usageMetadata?: InferenceUsageMetadata; +}; + +export type InferenceUsageMetadata = { + sessionId?: string; + requestCount: number; + costOpg: string; + costUsd: number; + service?: string; + method?: string; + path?: string; + model?: string; + settlementType?: SettlementType | string; + network?: string; + asset?: string; + isStreaming?: boolean; }; export type DataWorkerContext = { @@ -197,6 +213,51 @@ function isRecord(value: unknown): value is Record { return typeof value === "object" && value !== null; } +function getOptionalStringField( + record: Record, + fieldNames: string[], +): string | undefined { + for (const name of fieldNames) { + const value = record[name]; + if (typeof value === "string" && value.trim().length > 0) { + return value.trim(); + } + } + return undefined; +} + +function getOptionalNumberField( + record: Record, + fieldNames: string[], +): number | undefined { + for (const name of fieldNames) { + const value = record[name]; + if (typeof value === "number" && Number.isFinite(value)) { + return value; + } + if (typeof value === "string" && value.trim().length > 0) { + const parsed = Number(value); + if (Number.isFinite(parsed)) { + return parsed; + } + } + } + return undefined; +} + +function getOptionalBooleanField( + record: Record, + fieldNames: string[], +): boolean | undefined { + for (const name of fieldNames) { + const value = record[name]; + if (typeof value === "boolean") { + return value; + } + } + return undefined; +} + export function getRequiredStringField( record: Record, fieldNames: string[], @@ -260,6 +321,46 @@ export function parseUint256Field(record: Record, fieldNames: s return trimmed; } +export function parseInferenceUsageMetadata(value: unknown): InferenceUsageMetadata | undefined { + if (value === undefined || value === null) { + return undefined; + } + if (!isRecord(value)) { + throw new Error("usageMetadata must be a JSON object"); + } + + const requestCount = getOptionalNumberField(value, ["requestCount", "request_count"]) ?? 0; + const costUsd = getOptionalNumberField(value, ["costUsd", "cost_usd"]) ?? 0; + const costOpg = + getOptionalStringField(value, ["costOpg", "cost_opg"]) ?? + String(getOptionalNumberField(value, ["costOpg", "cost_opg"]) ?? 0); + + if (!Number.isInteger(requestCount) || requestCount < 0) { + throw new Error("usageMetadata.requestCount must be a non-negative integer"); + } + if (!Number.isFinite(costUsd) || costUsd < 0) { + throw new Error("usageMetadata.costUsd must be a non-negative number"); + } + if (!/^[0-9]+$/.test(costOpg)) { + throw new Error("usageMetadata.costOpg must be a non-negative integer string"); + } + + return { + sessionId: getOptionalStringField(value, ["sessionId", "session_id"]), + requestCount, + costOpg, + costUsd, + service: getOptionalStringField(value, ["service"]), + method: getOptionalStringField(value, ["method"]), + path: getOptionalStringField(value, ["path", "route"]), + model: getOptionalStringField(value, ["model"]), + settlementType: getOptionalStringField(value, ["settlementType", "settlement_type"]), + network: getOptionalStringField(value, ["network"]), + asset: getOptionalStringField(value, ["asset"]), + isStreaming: getOptionalBooleanField(value, ["isStreaming", "is_streaming"]), + }; +} + function parseEvmAddressField( record: Record, fieldNames: string[], diff --git a/all_networks_usage.ts b/all_networks_usage.ts new file mode 100644 index 0000000..06cbf30 --- /dev/null +++ b/all_networks_usage.ts @@ -0,0 +1,255 @@ +import { createHash } from "node:crypto"; +import { Redis } from "ioredis"; +import { createBullMqConnection } from "./all_networks_shared.js"; +import { incrementMetric } from "./metrics.js"; +import { type InferenceUsageMetadata } from "./all_networks_types_helpers.js"; + +const USAGE_KEY_PREFIX = process.env.USAGE_REDIS_KEY_PREFIX || "x402:usage"; +const USAGE_DEDUPE_TTL_SECONDS = Number(process.env.USAGE_DEDUPE_TTL_SECONDS || 90 * 24 * 60 * 60); +const USAGE_OPG_DECIMALS = Number(process.env.USAGE_OPG_DECIMALS || 18); +const USAGE_SUPABASE_ENABLED = process.env.USAGE_SUPABASE_ENABLED !== "false"; +const USAGE_SUPABASE_URL = process.env.USAGE_SUPABASE_URL || process.env.SUPABASE_URL || ""; +const USAGE_SUPABASE_SERVICE_ROLE_KEY = + process.env.USAGE_SUPABASE_SERVICE_ROLE_KEY || + process.env.SUPABASE_SERVICE_ROLE_KEY || + process.env.SUPABASE_ANON_KEY || + ""; +const USAGE_SUPABASE_RPC = process.env.USAGE_SUPABASE_RPC || "record_ohttp_usage"; + +let usageRedis: Redis | null = null; +let supabaseConfigWarningLogged = false; + +function getUsageRedis(): Redis { + if (!usageRedis) { + usageRedis = new Redis(createBullMqConnection()); + } + return usageRedis; +} + +function utcDay(date = new Date()): string { + return date.toISOString().slice(0, 10); +} + +function usageDedupeKey(usage: InferenceUsageMetadata): string { + const source = + usage.sessionId || + createHash("sha256") + .update( + JSON.stringify({ + requestCount: usage.requestCount, + costOpg: usage.costOpg, + costUsd: usage.costUsd, + service: usage.service, + method: usage.method, + path: usage.path, + model: usage.model, + network: usage.network, + asset: usage.asset, + }), + ) + .digest("hex"); + + return `${USAGE_KEY_PREFIX}:session:${source}`; +} + +function usageSinkDedupeKey(usage: InferenceUsageMetadata, sink: "redis" | "supabase"): string { + return `${usageDedupeKey(usage)}:${sink}`; +} + +function tagsForUsage(usage: InferenceUsageMetadata): string[] { + return [ + usage.service ? `service:${usage.service}` : "service:unknown", + usage.path ? `path:${usage.path}` : "path:unknown", + usage.network ? `network:${usage.network}` : "network:unknown", + usage.asset ? `asset:${usage.asset}` : "asset:unknown", + ]; +} + +function opgAtomicToWholeUnits(costOpg: string): number { + const atomic = Number(costOpg); + if (!Number.isFinite(atomic)) { + return 0; + } + return atomic / 10 ** USAGE_OPG_DECIMALS; +} + +async function claimUsageSink( + redis: Redis, + usage: InferenceUsageMetadata, + sink: "redis" | "supabase", +): Promise { + const dedupeKey = usageSinkDedupeKey(usage, sink); + const claimed = await redis.set(dedupeKey, "1", "EX", USAGE_DEDUPE_TTL_SECONDS, "NX"); + if (claimed !== "OK") { + incrementMetric("inference_usage.duplicate.count", tagsForUsage(usage)); + return null; + } + return dedupeKey; +} + +async function recordInferenceUsageInRedis( + redis: Redis, + usage: InferenceUsageMetadata, + costOpg: number, +): Promise { + const dedupeKey = await claimUsageSink(redis, usage, "redis"); + if (!dedupeKey) { + return false; + } + const day = utcDay(); + const dailyKey = `${USAGE_KEY_PREFIX}:daily:${day}`; + const totalsKey = `${USAGE_KEY_PREFIX}:totals`; + + try { + const multi = redis.multi(); + multi.hincrby(dailyKey, "request_count", usage.requestCount); + multi.hincrbyfloat(dailyKey, "cost_opg", costOpg); + multi.hincrbyfloat(dailyKey, "cost_usd", usage.costUsd); + multi.hset(dailyKey, "day", day); + multi.hincrby(totalsKey, "request_count", usage.requestCount); + multi.hincrbyfloat(totalsKey, "cost_opg", costOpg); + multi.hincrbyfloat(totalsKey, "cost_usd", usage.costUsd); + await multi.exec(); + } catch (error) { + await redis.del(dedupeKey); + throw error; + } + + return true; +} + +function getSupabaseRpcUrl(): string | null { + if (!USAGE_SUPABASE_ENABLED) { + return null; + } + if (!USAGE_SUPABASE_URL || !USAGE_SUPABASE_SERVICE_ROLE_KEY) { + if (!supabaseConfigWarningLogged) { + supabaseConfigWarningLogged = true; + console.warn( + "[usage] Supabase usage sink disabled: set USAGE_SUPABASE_URL and USAGE_SUPABASE_SERVICE_ROLE_KEY.", + ); + } + return null; + } + return `${USAGE_SUPABASE_URL.replace(/\/$/, "")}/rest/v1/rpc/${USAGE_SUPABASE_RPC}`; +} + +async function recordInferenceUsageInSupabase( + redis: Redis, + usage: InferenceUsageMetadata, + costOpg: number, +): Promise { + const rpcUrl = getSupabaseRpcUrl(); + if (!rpcUrl) { + return false; + } + + const dedupeKey = await claimUsageSink(redis, usage, "supabase"); + if (!dedupeKey) { + return false; + } + + const body = { + p_request_count: usage.requestCount, + p_cost_usd: usage.costUsd, + p_cost_opg: costOpg, + }; + + try { + const response = await fetch(rpcUrl, { + method: "POST", + headers: { + "Content-Type": "application/json", + apikey: USAGE_SUPABASE_SERVICE_ROLE_KEY, + Authorization: `Bearer ${USAGE_SUPABASE_SERVICE_ROLE_KEY}`, + }, + body: JSON.stringify(body), + }); + + if (!response.ok) { + throw new Error(`Supabase usage RPC failed (${response.status}): ${await response.text()}`); + } + } catch (error) { + await redis.del(dedupeKey); + throw error; + } + + incrementMetric("inference_usage.supabase.recorded.count", tagsForUsage(usage)); + return true; +} + +export async function recordInferenceUsage( + usage: InferenceUsageMetadata | undefined, +): Promise { + if (!usage || (usage.requestCount === 0 && usage.costOpg === "0" && usage.costUsd === 0)) { + return false; + } + + const redis = getUsageRedis(); + const tags = tagsForUsage(usage); + const costOpg = opgAtomicToWholeUnits(usage.costOpg); + + const [redisRecorded, supabaseRecorded] = await Promise.all([ + recordInferenceUsageInRedis(redis, usage, costOpg), + recordInferenceUsageInSupabase(redis, usage, costOpg), + ]); + + incrementMetric("inference_usage.request.count", tags, usage.requestCount); + incrementMetric("inference_usage.cost_opg", tags, costOpg); + incrementMetric("inference_usage.cost_usd", tags, usage.costUsd); + + console.log("[usage] Recorded inference session usage", { + sessionId: usage.sessionId, + requestCount: usage.requestCount, + costOpg: usage.costOpg, + costUsd: usage.costUsd, + service: usage.service, + path: usage.path, + model: usage.model, + network: usage.network, + asset: usage.asset, + redisRecorded, + supabaseRecorded, + }); + + return redisRecorded || supabaseRecorded; +} + +export async function getInferenceUsageStats(days = 30): Promise<{ + totalRequests: number; + totalCostOpg: string; + totalCostUsd: number; + daily: Array<{ day: string; requestCount: number; costOpg: string; costUsd: number }>; +}> { + const redis = getUsageRedis(); + const totalRaw = await redis.hgetall(`${USAGE_KEY_PREFIX}:totals`); + const daily: Array<{ day: string; requestCount: number; costOpg: string; costUsd: number }> = []; + + for (let offset = days - 1; offset >= 0; offset -= 1) { + const date = new Date(); + date.setUTCDate(date.getUTCDate() - offset); + const day = utcDay(date); + const raw = await redis.hgetall(`${USAGE_KEY_PREFIX}:daily:${day}`); + daily.push({ + day, + requestCount: Number(raw.request_count || 0), + costOpg: raw.cost_opg || "0", + costUsd: Number(raw.cost_usd || 0), + }); + } + + return { + totalRequests: Number(totalRaw.request_count || 0), + totalCostOpg: totalRaw.cost_opg || "0", + totalCostUsd: Number(totalRaw.cost_usd || 0), + daily, + }; +} + +export async function closeInferenceUsageTracker(): Promise { + if (!usageRedis) { + return; + } + await usageRedis.quit(); + usageRedis = null; +} diff --git a/payment_worker.ts b/payment_worker.ts index d62c832..aab762b 100644 --- a/payment_worker.ts +++ b/payment_worker.ts @@ -6,6 +6,7 @@ import { } from "./logging.js"; import { incrementMetric } from "./metrics.js"; import { createBullMqConnection, createFacilitator } from "./all_networks_shared.js"; +import { closeInferenceUsageTracker, recordInferenceUsage } from "./all_networks_usage.js"; import { PAYMENT_QUEUE_NAME, SHUTDOWN_TIMEOUT_MS, @@ -44,6 +45,15 @@ const worker = new Worker( if (amountValue !== null) { incrementMetric("payment.settled.amount", tags, amountValue); } + + try { + await recordInferenceUsage(job.data.usageMetadata); + } catch (error) { + console.error("[payment-worker] Failed to record inference usage", { + jobId: job.id, + ...summarizeError(error), + }); + } } return result; @@ -83,6 +93,7 @@ async function shutdown(signal: string): Promise { forcedExitTimer.unref(); await worker.close(); + await closeInferenceUsageTracker(); clearTimeout(forcedExitTimer); process.exit(0); From 5fedc40bed917e1dff5d9e76ba171608034065aa Mon Sep 17 00:00:00 2001 From: Aniket Dixit Date: Wed, 10 Jun 2026 18:47:30 +0530 Subject: [PATCH 2/6] copilot fixes --- all_networks.ts | 9 ++++++- all_networks_types_helpers.ts | 27 ++++++++++++++----- all_networks_usage.ts | 49 ++++++++++++++++++++++++----------- payment_worker.ts | 10 ++++++- 4 files changed, 71 insertions(+), 24 deletions(-) diff --git a/all_networks.ts b/all_networks.ts index 5226d0e..fea3f74 100644 --- a/all_networks.ts +++ b/all_networks.ts @@ -328,7 +328,14 @@ app.post("/settle", async (req, res) => { }); } - const usageMetadata = parseInferenceUsageMetadata(req.body.usageMetadata); + let usageMetadata: InferenceUsageMetadata | undefined; + try { + usageMetadata = parseInferenceUsageMetadata(req.body.usageMetadata); + } catch (error) { + return res.status(400).json({ + error: error instanceof Error ? error.message : "Invalid usageMetadata", + }); + } console.log("[api] /settle request received", { hasUsageMetadata: Boolean(usageMetadata), diff --git a/all_networks_types_helpers.ts b/all_networks_types_helpers.ts index d76866b..a0ff6ce 100644 --- a/all_networks_types_helpers.ts +++ b/all_networks_types_helpers.ts @@ -287,7 +287,7 @@ export function parseUint256Field(record: Record, fieldNames: s const raw = getRequiredUnknownField(record, fieldNames); if (typeof raw === "number") { - if (!Number.isInteger(raw) || raw < 0) { + if (!Number.isSafeInteger(raw) || raw < 0) { throw new Error( `Invalid uint256 field. Expected non-negative integer for: ${fieldNames.join(", ")}`, ); @@ -321,6 +321,23 @@ export function parseUint256Field(record: Record, fieldNames: s return trimmed; } +function getOptionalUint256Field( + record: Record, + fieldNames: string[], + label: string, +): string | undefined { + const fieldName = fieldNames.find(name => name in record); + if (!fieldName) { + return undefined; + } + + try { + return parseUint256Field(record, [fieldName]); + } catch { + throw new Error(`${label} must be a non-negative integer string`); + } +} + export function parseInferenceUsageMetadata(value: unknown): InferenceUsageMetadata | undefined { if (value === undefined || value === null) { return undefined; @@ -332,18 +349,14 @@ export function parseInferenceUsageMetadata(value: unknown): InferenceUsageMetad const requestCount = getOptionalNumberField(value, ["requestCount", "request_count"]) ?? 0; const costUsd = getOptionalNumberField(value, ["costUsd", "cost_usd"]) ?? 0; const costOpg = - getOptionalStringField(value, ["costOpg", "cost_opg"]) ?? - String(getOptionalNumberField(value, ["costOpg", "cost_opg"]) ?? 0); + getOptionalUint256Field(value, ["costOpg", "cost_opg"], "usageMetadata.costOpg") ?? "0"; - if (!Number.isInteger(requestCount) || requestCount < 0) { + if (!Number.isSafeInteger(requestCount) || requestCount < 0) { throw new Error("usageMetadata.requestCount must be a non-negative integer"); } if (!Number.isFinite(costUsd) || costUsd < 0) { throw new Error("usageMetadata.costUsd must be a non-negative number"); } - if (!/^[0-9]+$/.test(costOpg)) { - throw new Error("usageMetadata.costOpg must be a non-negative integer string"); - } return { sessionId: getOptionalStringField(value, ["sessionId", "session_id"]), diff --git a/all_networks_usage.ts b/all_networks_usage.ts index 06cbf30..d47509c 100644 --- a/all_networks_usage.ts +++ b/all_networks_usage.ts @@ -10,10 +10,7 @@ const USAGE_OPG_DECIMALS = Number(process.env.USAGE_OPG_DECIMALS || 18); const USAGE_SUPABASE_ENABLED = process.env.USAGE_SUPABASE_ENABLED !== "false"; const USAGE_SUPABASE_URL = process.env.USAGE_SUPABASE_URL || process.env.SUPABASE_URL || ""; const USAGE_SUPABASE_SERVICE_ROLE_KEY = - process.env.USAGE_SUPABASE_SERVICE_ROLE_KEY || - process.env.SUPABASE_SERVICE_ROLE_KEY || - process.env.SUPABASE_ANON_KEY || - ""; + process.env.USAGE_SUPABASE_SERVICE_ROLE_KEY || process.env.SUPABASE_SERVICE_ROLE_KEY || ""; const USAGE_SUPABASE_RPC = process.env.USAGE_SUPABASE_RPC || "record_ohttp_usage"; let usageRedis: Redis | null = null; @@ -66,11 +63,15 @@ function tagsForUsage(usage: InferenceUsageMetadata): string[] { } function opgAtomicToWholeUnits(costOpg: string): number { - const atomic = Number(costOpg); - if (!Number.isFinite(atomic)) { + try { + const atomic = BigInt(costOpg); + const decimals = Math.max(0, Math.trunc(USAGE_OPG_DECIMALS)); + const scale = 10n ** BigInt(decimals); + const parsed = Number(atomic / scale) + Number(atomic % scale) / Number(scale); + return Number.isFinite(parsed) ? parsed : 0; + } catch { return 0; } - return atomic / 10 ** USAGE_OPG_DECIMALS; } async function claimUsageSink( @@ -223,21 +224,39 @@ export async function getInferenceUsageStats(days = 30): Promise<{ }> { const redis = getUsageRedis(); const totalRaw = await redis.hgetall(`${USAGE_KEY_PREFIX}:totals`); - const daily: Array<{ day: string; requestCount: number; costOpg: string; costUsd: number }> = []; + const dailyKeys: Array<{ day: string; key: string }> = []; for (let offset = days - 1; offset >= 0; offset -= 1) { const date = new Date(); date.setUTCDate(date.getUTCDate() - offset); const day = utcDay(date); - const raw = await redis.hgetall(`${USAGE_KEY_PREFIX}:daily:${day}`); - daily.push({ - day, - requestCount: Number(raw.request_count || 0), - costOpg: raw.cost_opg || "0", - costUsd: Number(raw.cost_usd || 0), - }); + dailyKeys.push({ day, key: `${USAGE_KEY_PREFIX}:daily:${day}` }); + } + + const pipeline = redis.pipeline(); + for (const { key } of dailyKeys) { + pipeline.hgetall(key); + } + const dailyResults = await pipeline.exec(); + if (!dailyResults) { + throw new Error("Redis usage stats pipeline failed"); } + const daily = dailyResults.map(([error, raw], index) => { + if (error) { + throw error; + } + + const day = dailyKeys[index]?.day ?? utcDay(); + const record = (raw ?? {}) as Record; + return { + day, + requestCount: Number(record.request_count || 0), + costOpg: record.cost_opg || "0", + costUsd: Number(record.cost_usd || 0), + }; + }); + return { totalRequests: Number(totalRaw.request_count || 0), totalCostOpg: totalRaw.cost_opg || "0", diff --git a/payment_worker.ts b/payment_worker.ts index aab762b..fe3e4f2 100644 --- a/payment_worker.ts +++ b/payment_worker.ts @@ -47,7 +47,15 @@ const worker = new Worker( } try { - await recordInferenceUsage(job.data.usageMetadata); + await recordInferenceUsage( + job.data.usageMetadata + ? { + ...job.data.usageMetadata, + sessionId: + job.data.usageMetadata.sessionId ?? (job.id ? String(job.id) : undefined), + } + : undefined, + ); } catch (error) { console.error("[payment-worker] Failed to record inference usage", { jobId: job.id, From d7b2366258798e39dd6f73b6322a3be96cc70d33 Mon Sep 17 00:00:00 2001 From: Aniket Dixit Date: Wed, 10 Jun 2026 19:02:21 +0530 Subject: [PATCH 3/6] fixes --- all_networks_usage.ts | 49 ++++++++++++++++++++++++++++++++----------- 1 file changed, 37 insertions(+), 12 deletions(-) diff --git a/all_networks_usage.ts b/all_networks_usage.ts index d47509c..930c270 100644 --- a/all_networks_usage.ts +++ b/all_networks_usage.ts @@ -29,10 +29,9 @@ function utcDay(date = new Date()): string { function usageDedupeKey(usage: InferenceUsageMetadata): string { const source = - usage.sessionId || - createHash("sha256") - .update( - JSON.stringify({ + usage.sessionId !== undefined + ? `session:${usage.sessionId}` + : `usage:${JSON.stringify({ requestCount: usage.requestCount, costOpg: usage.costOpg, costUsd: usage.costUsd, @@ -42,11 +41,10 @@ function usageDedupeKey(usage: InferenceUsageMetadata): string { model: usage.model, network: usage.network, asset: usage.asset, - }), - ) - .digest("hex"); + })}`; + const digest = createHash("sha256").update(source).digest("hex"); - return `${USAGE_KEY_PREFIX}:session:${source}`; + return `${USAGE_KEY_PREFIX}:session:${digest}`; } function usageSinkDedupeKey(usage: InferenceUsageMetadata, sink: "redis" | "supabase"): string { @@ -62,6 +60,14 @@ function tagsForUsage(usage: InferenceUsageMetadata): string[] { ]; } +function isZeroAtomicOpg(costOpg: string): boolean { + try { + return BigInt(costOpg) === 0n; + } catch { + return true; + } +} + function opgAtomicToWholeUnits(costOpg: string): number { try { const atomic = BigInt(costOpg); @@ -107,6 +113,7 @@ async function recordInferenceUsageInRedis( multi.hincrbyfloat(dailyKey, "cost_opg", costOpg); multi.hincrbyfloat(dailyKey, "cost_usd", usage.costUsd); multi.hset(dailyKey, "day", day); + multi.expire(dailyKey, USAGE_DEDUPE_TTL_SECONDS); multi.hincrby(totalsKey, "request_count", usage.requestCount); multi.hincrbyfloat(totalsKey, "cost_opg", costOpg); multi.hincrbyfloat(totalsKey, "cost_usd", usage.costUsd); @@ -182,7 +189,10 @@ async function recordInferenceUsageInSupabase( export async function recordInferenceUsage( usage: InferenceUsageMetadata | undefined, ): Promise { - if (!usage || (usage.requestCount === 0 && usage.costOpg === "0" && usage.costUsd === 0)) { + if ( + !usage || + (usage.requestCount === 0 && isZeroAtomicOpg(usage.costOpg) && usage.costUsd === 0) + ) { return false; } @@ -190,17 +200,32 @@ export async function recordInferenceUsage( const tags = tagsForUsage(usage); const costOpg = opgAtomicToWholeUnits(usage.costOpg); - const [redisRecorded, supabaseRecorded] = await Promise.all([ + const [redisResult, supabaseResult] = await Promise.allSettled([ recordInferenceUsageInRedis(redis, usage, costOpg), recordInferenceUsageInSupabase(redis, usage, costOpg), ]); + if (redisResult.status === "rejected") { + console.error("[usage] Redis usage sink failed", { error: redisResult.reason }); + incrementMetric("inference_usage.redis.error.count", tags); + } + if (supabaseResult.status === "rejected") { + console.error("[usage] Supabase usage sink failed", { error: supabaseResult.reason }); + incrementMetric("inference_usage.supabase.error.count", tags); + } + const redisRecorded = redisResult.status === "fulfilled" && redisResult.value; + const supabaseRecorded = supabaseResult.status === "fulfilled" && supabaseResult.value; + const recorded = redisRecorded || supabaseRecorded; + + if (!recorded) { + return false; + } incrementMetric("inference_usage.request.count", tags, usage.requestCount); incrementMetric("inference_usage.cost_opg", tags, costOpg); incrementMetric("inference_usage.cost_usd", tags, usage.costUsd); console.log("[usage] Recorded inference session usage", { - sessionId: usage.sessionId, + hasSessionId: Boolean(usage.sessionId), requestCount: usage.requestCount, costOpg: usage.costOpg, costUsd: usage.costUsd, @@ -213,7 +238,7 @@ export async function recordInferenceUsage( supabaseRecorded, }); - return redisRecorded || supabaseRecorded; + return true; } export async function getInferenceUsageStats(days = 30): Promise<{ From 7e546fc108dc794c088a343f54efb1a23fc6f221 Mon Sep 17 00:00:00 2001 From: Aniket Dixit Date: Thu, 11 Jun 2026 15:54:15 +0530 Subject: [PATCH 4/6] session id --- all_networks_usage.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/all_networks_usage.ts b/all_networks_usage.ts index 930c270..f951927 100644 --- a/all_networks_usage.ts +++ b/all_networks_usage.ts @@ -226,6 +226,7 @@ export async function recordInferenceUsage( console.log("[usage] Recorded inference session usage", { hasSessionId: Boolean(usage.sessionId), + sessionId: usage.sessionId, requestCount: usage.requestCount, costOpg: usage.costOpg, costUsd: usage.costUsd, From 577359800facb1e67504cf3d5f8b6184f1d8e9a7 Mon Sep 17 00:00:00 2001 From: Aniket Dixit Date: Thu, 11 Jun 2026 19:55:23 +0530 Subject: [PATCH 5/6] app id --- all_networks_usage.ts | 149 +++++++++++++++++++++++++++++++++++++++--- payment_worker.ts | 1 + 2 files changed, 142 insertions(+), 8 deletions(-) diff --git a/all_networks_usage.ts b/all_networks_usage.ts index f951927..f6c40d0 100644 --- a/all_networks_usage.ts +++ b/all_networks_usage.ts @@ -1,5 +1,6 @@ import { createHash } from "node:crypto"; import { Redis } from "ioredis"; +import { getAddress, isAddress } from "viem"; import { createBullMqConnection } from "./all_networks_shared.js"; import { incrementMetric } from "./metrics.js"; import { type InferenceUsageMetadata } from "./all_networks_types_helpers.js"; @@ -12,10 +13,72 @@ const USAGE_SUPABASE_URL = process.env.USAGE_SUPABASE_URL || process.env.SUPABAS const USAGE_SUPABASE_SERVICE_ROLE_KEY = process.env.USAGE_SUPABASE_SERVICE_ROLE_KEY || process.env.SUPABASE_SERVICE_ROLE_KEY || ""; const USAGE_SUPABASE_RPC = process.env.USAGE_SUPABASE_RPC || "record_ohttp_usage"; +const USAGE_SUPABASE_PER_APP_RPC = + process.env.USAGE_SUPABASE_PER_APP_RPC || "record_ohttp_usage_per_app"; +const USAGE_DEFAULT_APP_ID = process.env.USAGE_DEFAULT_APP_ID || "other"; let usageRedis: Redis | null = null; let supabaseConfigWarningLogged = false; +function normalizeAddress(value: string | undefined): string | null { + const trimmed = value?.trim(); + if (!trimmed || !isAddress(trimmed)) { + return null; + } + return getAddress(trimmed).toLowerCase(); +} + +function setPayerServiceMapping( + map: Map, + address: string | undefined, + appId: string, +): void { + const normalized = normalizeAddress(address); + if (!normalized) { + if (address?.trim()) { + console.warn("[usage] Ignoring invalid payer wallet mapping", { appId }); + } + return; + } + map.set(normalized, appId); +} + +function loadPayerServiceMap(): Map { + const map = new Map(); + + setPayerServiceMapping( + map, + process.env.USAGE_CHAT_API_PAYER_WALLET || process.env.CHAT_API_PAYER_WALLET, + "opengradient-chat", + ); + setPayerServiceMapping( + map, + process.env.USAGE_BITQUANT_PAYER_WALLET || process.env.BITQUANT_PAYER_WALLET, + "bitquant", + ); + + const rawMap = process.env.USAGE_PAYER_SERVICE_MAP; + if (rawMap?.trim()) { + try { + const parsed = JSON.parse(rawMap) as unknown; + if (!parsed || typeof parsed !== "object" || Array.isArray(parsed)) { + throw new Error("expected object mapping payer wallet to app id"); + } + for (const [address, appId] of Object.entries(parsed as Record)) { + if (typeof appId === "string" && appId.trim()) { + setPayerServiceMapping(map, address, appId.trim()); + } + } + } catch (error) { + console.warn("[usage] Ignoring invalid USAGE_PAYER_SERVICE_MAP", { error }); + } + } + + return map; +} + +const USAGE_PAYER_SERVICE_MAP = loadPayerServiceMap(); + function getUsageRedis(): Redis { if (!usageRedis) { usageRedis = new Redis(createBullMqConnection()); @@ -47,7 +110,9 @@ function usageDedupeKey(usage: InferenceUsageMetadata): string { return `${USAGE_KEY_PREFIX}:session:${digest}`; } -function usageSinkDedupeKey(usage: InferenceUsageMetadata, sink: "redis" | "supabase"): string { +type UsageSink = "redis" | "supabase" | "supabase_per_app"; + +function usageSinkDedupeKey(usage: InferenceUsageMetadata, sink: UsageSink): string { return `${usageDedupeKey(usage)}:${sink}`; } @@ -83,7 +148,7 @@ function opgAtomicToWholeUnits(costOpg: string): number { async function claimUsageSink( redis: Redis, usage: InferenceUsageMetadata, - sink: "redis" | "supabase", + sink: UsageSink, ): Promise { const dedupeKey = usageSinkDedupeKey(usage, sink); const claimed = await redis.set(dedupeKey, "1", "EX", USAGE_DEDUPE_TTL_SECONDS, "NX"); @@ -126,7 +191,7 @@ async function recordInferenceUsageInRedis( return true; } -function getSupabaseRpcUrl(): string | null { +function getSupabaseRpcUrl(rpcName: string): string | null { if (!USAGE_SUPABASE_ENABLED) { return null; } @@ -139,7 +204,7 @@ function getSupabaseRpcUrl(): string | null { } return null; } - return `${USAGE_SUPABASE_URL.replace(/\/$/, "")}/rest/v1/rpc/${USAGE_SUPABASE_RPC}`; + return `${USAGE_SUPABASE_URL.replace(/\/$/, "")}/rest/v1/rpc/${rpcName}`; } async function recordInferenceUsageInSupabase( @@ -147,7 +212,7 @@ async function recordInferenceUsageInSupabase( usage: InferenceUsageMetadata, costOpg: number, ): Promise { - const rpcUrl = getSupabaseRpcUrl(); + const rpcUrl = getSupabaseRpcUrl(USAGE_SUPABASE_RPC); if (!rpcUrl) { return false; } @@ -186,9 +251,68 @@ async function recordInferenceUsageInSupabase( return true; } +async function recordInferenceUsagePerAppInSupabase( + redis: Redis, + usage: InferenceUsageMetadata, + costOpg: number, +): Promise { + const rpcUrl = getSupabaseRpcUrl(USAGE_SUPABASE_PER_APP_RPC); + if (!rpcUrl) { + return false; + } + + const dedupeKey = await claimUsageSink(redis, usage, "supabase_per_app"); + if (!dedupeKey) { + return false; + } + + const body = { + p_app_id: usage.service || USAGE_DEFAULT_APP_ID, + p_request_count: usage.requestCount, + p_cost_usd: usage.costUsd, + p_cost_opg: costOpg, + }; + + try { + const response = await fetch(rpcUrl, { + method: "POST", + headers: { + "Content-Type": "application/json", + apikey: USAGE_SUPABASE_SERVICE_ROLE_KEY, + Authorization: `Bearer ${USAGE_SUPABASE_SERVICE_ROLE_KEY}`, + }, + body: JSON.stringify(body), + }); + + if (!response.ok) { + throw new Error( + `Supabase per-app usage RPC failed (${response.status}): ${await response.text()}`, + ); + } + } catch (error) { + await redis.del(dedupeKey); + throw error; + } + + incrementMetric("inference_usage.supabase_per_app.recorded.count", tagsForUsage(usage)); + return true; +} + export async function recordInferenceUsage( - usage: InferenceUsageMetadata | undefined, + rawUsage: InferenceUsageMetadata | undefined, + payerAddress?: string, ): Promise { + const normalizedPayer = normalizeAddress(payerAddress); + const appId = normalizedPayer + ? (USAGE_PAYER_SERVICE_MAP.get(normalizedPayer) ?? USAGE_DEFAULT_APP_ID) + : USAGE_DEFAULT_APP_ID; + const usage = rawUsage + ? { + ...rawUsage, + service: appId, + } + : rawUsage; + if ( !usage || (usage.requestCount === 0 && isZeroAtomicOpg(usage.costOpg) && usage.costUsd === 0) @@ -200,9 +324,10 @@ export async function recordInferenceUsage( const tags = tagsForUsage(usage); const costOpg = opgAtomicToWholeUnits(usage.costOpg); - const [redisResult, supabaseResult] = await Promise.allSettled([ + const [redisResult, supabaseResult, perAppResult] = await Promise.allSettled([ recordInferenceUsageInRedis(redis, usage, costOpg), recordInferenceUsageInSupabase(redis, usage, costOpg), + recordInferenceUsagePerAppInSupabase(redis, usage, costOpg), ]); if (redisResult.status === "rejected") { console.error("[usage] Redis usage sink failed", { error: redisResult.reason }); @@ -212,9 +337,14 @@ export async function recordInferenceUsage( console.error("[usage] Supabase usage sink failed", { error: supabaseResult.reason }); incrementMetric("inference_usage.supabase.error.count", tags); } + if (perAppResult.status === "rejected") { + console.error("[usage] Supabase per-app usage sink failed", { error: perAppResult.reason }); + incrementMetric("inference_usage.supabase_per_app.error.count", tags); + } const redisRecorded = redisResult.status === "fulfilled" && redisResult.value; const supabaseRecorded = supabaseResult.status === "fulfilled" && supabaseResult.value; - const recorded = redisRecorded || supabaseRecorded; + const supabasePerAppRecorded = perAppResult.status === "fulfilled" && perAppResult.value; + const recorded = redisRecorded || supabaseRecorded || supabasePerAppRecorded; if (!recorded) { return false; @@ -230,6 +360,8 @@ export async function recordInferenceUsage( requestCount: usage.requestCount, costOpg: usage.costOpg, costUsd: usage.costUsd, + payerAddress: normalizedPayer, + appId, service: usage.service, path: usage.path, model: usage.model, @@ -237,6 +369,7 @@ export async function recordInferenceUsage( asset: usage.asset, redisRecorded, supabaseRecorded, + supabasePerAppRecorded, }); return true; diff --git a/payment_worker.ts b/payment_worker.ts index fe3e4f2..68e0dcb 100644 --- a/payment_worker.ts +++ b/payment_worker.ts @@ -55,6 +55,7 @@ const worker = new Worker( job.data.usageMetadata.sessionId ?? (job.id ? String(job.id) : undefined), } : undefined, + result.payer, ); } catch (error) { console.error("[payment-worker] Failed to record inference usage", { From 1af07384b56ac6db08f4acb060694ddae4d841e0 Mon Sep 17 00:00:00 2001 From: Aniket Dixit Date: Thu, 11 Jun 2026 20:56:48 +0530 Subject: [PATCH 6/6] gas increase check --- all_networks_shared.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/all_networks_shared.ts b/all_networks_shared.ts index 9013624..ff3f8ea 100644 --- a/all_networks_shared.ts +++ b/all_networks_shared.ts @@ -94,6 +94,9 @@ const DATA_WORKER_TX_RECEIPT_TIMEOUT_MS = Number( process.env.DATA_WORKER_TX_RECEIPT_TIMEOUT_MS || 120_000, ); const BASE_MAINNET_RPC_URL = process.env.BASE_MAINNET_RPC_URL; +const PAYMENT_WORKER_BASE_MAX_FEE_GWEI = process.env.PAYMENT_WORKER_BASE_MAX_FEE_GWEI || "0.10"; +const PAYMENT_WORKER_BASE_MAX_PRIORITY_FEE_GWEI = + process.env.PAYMENT_WORKER_BASE_MAX_PRIORITY_FEE_GWEI || "0.02"; const HEARTBEAT_RELAY_GAS_LIMIT = BigInt(process.env.HEARTBEAT_RELAY_GAS_LIMIT || "500000"); const HEARTBEAT_RELAY_TX_RECEIPT_TIMEOUT_MS = Number( process.env.HEARTBEAT_RELAY_TX_RECEIPT_TIMEOUT_MS || 120_000, @@ -1360,8 +1363,8 @@ export async function createFacilitator(): Promise { ...args, args: args.args || [], gas: 5_000_000n, - maxFeePerGas: parseGwei("0.006"), - maxPriorityFeePerGas: parseGwei("0.005"), + maxFeePerGas: parseGwei(PAYMENT_WORKER_BASE_MAX_FEE_GWEI), + maxPriorityFeePerGas: parseGwei(PAYMENT_WORKER_BASE_MAX_PRIORITY_FEE_GWEI), }), sendTransaction: (args: { to: `0x${string}`; data: `0x${string}` }) => baseViemClient.sendTransaction({