Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,13 @@ SETTLEMENT_BATCH_SIZE=20
SETTLEMENT_BATCH_TIMEOUT=60000
WALRUS_PUBLISHER_URL=http://localhost:9002/v1/blobs

# Optional: Push aggregate inference usage to the same Supabase RPC/table as chat API
USAGE_SUPABASE_URL=https://your-project.supabase.co
USAGE_SUPABASE_SERVICE_ROLE_KEY=your-service-role-key
USAGE_SUPABASE_RPC=record_ohttp_usage
# Set to false to disable the Supabase sink while keeping Redis usage rollups
USAGE_SUPABASE_ENABLED=true

# Optional: Individual settlement queue/signing
DATA_SETTLEMENT_INDIVIDUAL_QUEUE_NAME=x402-settle-data-individual-queue-v1
DATA_INDIVIDUAL_WORKER_EVM_PRIVATE_KEY=0xDedicatedIndividualSettlementKey
Expand Down
32 changes: 32 additions & 0 deletions all_networks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import {
parseUint256Field,
normalizeHeaderValue,
PAYMENT_QUEUE_NAME,
parseInferenceUsageMetadata,
parseSettlementJobDataFromHeaders,
PORT,
settlementStatusFromBullState,
Expand All @@ -38,9 +39,11 @@ import {
type DataSettlementJobData,
type HeartbeatRelayRequest,
type IndividualDataSettlementJobData,
type InferenceUsageMetadata,
type PaymentSettlementJobData,
type SettlementApiJobResponse,
} from "./all_networks_types_helpers.js";
import { closeInferenceUsageTracker, getInferenceUsageStats } from "./all_networks_usage.js";
import {
type PaymentPayload,
type PaymentRequirements,
Expand Down Expand Up @@ -188,13 +191,15 @@ async function toJobResponse(
async function enqueuePaymentSettlementJob(args: {
paymentPayload: PaymentPayload;
paymentRequirements: PaymentRequirements;
usageMetadata?: InferenceUsageMetadata;
}): Promise<SettlementApiJobResponse> {
const paymentJobId = `payment-${randomUUID()}`;
const paymentJob = await paymentQueue.add(
"payment-settlement",
{
paymentPayload: args.paymentPayload,
paymentRequirements: args.paymentRequirements,
usageMetadata: args.usageMetadata,
},
{
jobId: paymentJobId,
Expand All @@ -203,6 +208,7 @@ async function enqueuePaymentSettlementJob(args: {

console.log("[api] Payment settlement job enqueued", {
jobId: paymentJobId,
hasUsageMetadata: Boolean(args.usageMetadata),
...summarizePaymentRequirements(args.paymentRequirements),
...summarizePaymentPayload(args.paymentPayload),
});
Expand Down Expand Up @@ -322,7 +328,17 @@ app.post("/settle", async (req, res) => {
});
}

let usageMetadata: InferenceUsageMetadata | undefined;
try {
usageMetadata = parseInferenceUsageMetadata(req.body.usageMetadata);
} catch (error) {
return res.status(400).json({
error: error instanceof Error ? error.message : "Invalid usageMetadata",
});
}

console.log("[api] /settle request received", {
hasUsageMetadata: Boolean(usageMetadata),
...summarizePaymentRequirements(paymentRequirements),
...summarizePaymentPayload(paymentPayload),
});
Expand All @@ -331,6 +347,7 @@ app.post("/settle", async (req, res) => {
const paymentJob = await enqueuePaymentSettlementJob({
paymentPayload,
paymentRequirements,
usageMetadata,
});
return res.status(202).json({ paymentJob });
} catch (error) {
Expand Down Expand Up @@ -535,6 +552,20 @@ app.get("/health", (_req, res) => {
});
});

app.get("/usage", async (req, res) => {
incrementMetric("api.request.count", ["route:/usage", "method:GET"]);
try {
const daysRaw = typeof req.query.days === "string" ? Number(req.query.days) : 30;
const days = Number.isInteger(daysRaw) ? Math.min(Math.max(daysRaw, 1), 90) : 30;
res.json(await getInferenceUsageStats(days));
} catch (error) {
console.error("[api] /usage request failed", summarizeError(error));
res.status(500).json({
error: error instanceof Error ? error.message : "Unknown error",
});
}
});
Comment thread
dixitaniket marked this conversation as resolved.

let httpServer: Server | null = null;
let isShuttingDown = false;

Expand Down Expand Up @@ -562,6 +593,7 @@ async function shutdown(signal: string): Promise<void> {
dataSettlementQueue.close(),
individualDataSettlementQueue.close(),
individualSettlementPreparer.close(),
closeInferenceUsageTracker(),
]);

clearTimeout(forcedExitTimer);
Expand Down
7 changes: 5 additions & 2 deletions all_networks_shared.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ const DATA_WORKER_TX_RECEIPT_TIMEOUT_MS = Number(
process.env.DATA_WORKER_TX_RECEIPT_TIMEOUT_MS || 120_000,
);
const BASE_MAINNET_RPC_URL = process.env.BASE_MAINNET_RPC_URL;
const PAYMENT_WORKER_BASE_MAX_FEE_GWEI = process.env.PAYMENT_WORKER_BASE_MAX_FEE_GWEI || "0.10";
const PAYMENT_WORKER_BASE_MAX_PRIORITY_FEE_GWEI =
process.env.PAYMENT_WORKER_BASE_MAX_PRIORITY_FEE_GWEI || "0.02";
const HEARTBEAT_RELAY_GAS_LIMIT = BigInt(process.env.HEARTBEAT_RELAY_GAS_LIMIT || "500000");
const HEARTBEAT_RELAY_TX_RECEIPT_TIMEOUT_MS = Number(
process.env.HEARTBEAT_RELAY_TX_RECEIPT_TIMEOUT_MS || 120_000,
Expand Down Expand Up @@ -1360,8 +1363,8 @@ export async function createFacilitator(): Promise<x402Facilitator> {
...args,
args: args.args || [],
gas: 5_000_000n,
maxFeePerGas: parseGwei("0.006"),
maxPriorityFeePerGas: parseGwei("0.005"),
maxFeePerGas: parseGwei(PAYMENT_WORKER_BASE_MAX_FEE_GWEI),
maxPriorityFeePerGas: parseGwei(PAYMENT_WORKER_BASE_MAX_PRIORITY_FEE_GWEI),
}),
sendTransaction: (args: { to: `0x${string}`; data: `0x${string}` }) =>
baseViemClient.sendTransaction({
Expand Down
116 changes: 115 additions & 1 deletion all_networks_types_helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,22 @@ export type SettlementHandlerResult = {
export type PaymentSettlementJobData = {
paymentPayload: PaymentPayload;
paymentRequirements: PaymentRequirements;
usageMetadata?: InferenceUsageMetadata;
};

export type InferenceUsageMetadata = {
sessionId?: string;
requestCount: number;
costOpg: string;
costUsd: number;
service?: string;
method?: string;
path?: string;
model?: string;
settlementType?: SettlementType | string;
network?: string;
asset?: string;
isStreaming?: boolean;
};

export type DataWorkerContext = {
Expand Down Expand Up @@ -197,6 +213,51 @@ function isRecord(value: unknown): value is Record<string, unknown> {
return typeof value === "object" && value !== null;
}

function getOptionalStringField(
record: Record<string, unknown>,
fieldNames: string[],
): string | undefined {
for (const name of fieldNames) {
const value = record[name];
if (typeof value === "string" && value.trim().length > 0) {
return value.trim();
}
}
return undefined;
}

function getOptionalNumberField(
record: Record<string, unknown>,
fieldNames: string[],
): number | undefined {
for (const name of fieldNames) {
const value = record[name];
if (typeof value === "number" && Number.isFinite(value)) {
return value;
}
if (typeof value === "string" && value.trim().length > 0) {
const parsed = Number(value);
if (Number.isFinite(parsed)) {
return parsed;
}
}
}
return undefined;
}

function getOptionalBooleanField(
record: Record<string, unknown>,
fieldNames: string[],
): boolean | undefined {
for (const name of fieldNames) {
const value = record[name];
if (typeof value === "boolean") {
return value;
}
}
return undefined;
}

export function getRequiredStringField(
record: Record<string, unknown>,
fieldNames: string[],
Expand Down Expand Up @@ -226,7 +287,7 @@ export function parseUint256Field(record: Record<string, unknown>, fieldNames: s
const raw = getRequiredUnknownField(record, fieldNames);

if (typeof raw === "number") {
if (!Number.isInteger(raw) || raw < 0) {
if (!Number.isSafeInteger(raw) || raw < 0) {
throw new Error(
`Invalid uint256 field. Expected non-negative integer for: ${fieldNames.join(", ")}`,
);
Expand Down Expand Up @@ -260,6 +321,59 @@ export function parseUint256Field(record: Record<string, unknown>, fieldNames: s
return trimmed;
}

function getOptionalUint256Field(
record: Record<string, unknown>,
fieldNames: string[],
label: string,
): string | undefined {
const fieldName = fieldNames.find(name => name in record);
if (!fieldName) {
return undefined;
}

try {
return parseUint256Field(record, [fieldName]);
} catch {
throw new Error(`${label} must be a non-negative integer string`);
}
}

export function parseInferenceUsageMetadata(value: unknown): InferenceUsageMetadata | undefined {
if (value === undefined || value === null) {
return undefined;
}
if (!isRecord(value)) {
throw new Error("usageMetadata must be a JSON object");
}

const requestCount = getOptionalNumberField(value, ["requestCount", "request_count"]) ?? 0;
const costUsd = getOptionalNumberField(value, ["costUsd", "cost_usd"]) ?? 0;
const costOpg =
getOptionalUint256Field(value, ["costOpg", "cost_opg"], "usageMetadata.costOpg") ?? "0";

if (!Number.isSafeInteger(requestCount) || requestCount < 0) {
throw new Error("usageMetadata.requestCount must be a non-negative integer");
}
if (!Number.isFinite(costUsd) || costUsd < 0) {
throw new Error("usageMetadata.costUsd must be a non-negative number");
}

return {
sessionId: getOptionalStringField(value, ["sessionId", "session_id"]),
requestCount,
costOpg,
costUsd,
service: getOptionalStringField(value, ["service"]),
method: getOptionalStringField(value, ["method"]),
path: getOptionalStringField(value, ["path", "route"]),
model: getOptionalStringField(value, ["model"]),
settlementType: getOptionalStringField(value, ["settlementType", "settlement_type"]),
network: getOptionalStringField(value, ["network"]),
asset: getOptionalStringField(value, ["asset"]),
isStreaming: getOptionalBooleanField(value, ["isStreaming", "is_streaming"]),
};
}

function parseEvmAddressField(
record: Record<string, unknown>,
fieldNames: string[],
Expand Down
Loading
Loading