diff --git a/apps/backend/drizzle/0009_push_enabled.sql b/apps/backend/drizzle/0009_push_enabled.sql new file mode 100644 index 0000000..d0e8b08 --- /dev/null +++ b/apps/backend/drizzle/0009_push_enabled.sql @@ -0,0 +1 @@ +ALTER TABLE "user_devices" ADD COLUMN "push_enabled" boolean DEFAULT true NOT NULL; diff --git a/apps/backend/drizzle/meta/_journal.json b/apps/backend/drizzle/meta/_journal.json index 5cd9250..6e4c211 100644 --- a/apps/backend/drizzle/meta/_journal.json +++ b/apps/backend/drizzle/meta/_journal.json @@ -68,6 +68,8 @@ { "idx": 9, "version": "7", + "when": 1785000000000, + "tag": "0009_push_enabled", "when": 1783500000000, "tag": "0009_message_edits", "breakpoints": true diff --git a/apps/backend/package.json b/apps/backend/package.json index 8eab407..e367107 100644 --- a/apps/backend/package.json +++ b/apps/backend/package.json @@ -29,6 +29,7 @@ "@aws-sdk/s3-request-presigner": "^3.1075.0", "@socket.io/redis-adapter": "^8.3.0", "@stellar/stellar-sdk": "^15.1.0", + "@types/web-push": "^3.6.4", "cors": "^2.8.6", "dotenv": "^17.3.1", "drizzle-orm": "^0.45.2", diff --git a/apps/backend/src/db/schema.ts b/apps/backend/src/db/schema.ts index 0fdb295..2d80fbf 100644 --- a/apps/backend/src/db/schema.ts +++ b/apps/backend/src/db/schema.ts @@ -253,6 +253,7 @@ export const userDevices = pgTable( identityPublicKey: text('identity_public_key').notNull(), registrationId: integer('registration_id'), lastSeenAt: timestamp('last_seen_at'), + pushEnabled: boolean('push_enabled').notNull().default(true), revokedAt: timestamp('revoked_at'), createdAt: timestamp('created_at').notNull().defaultNow(), }, diff --git a/apps/backend/src/index.ts b/apps/backend/src/index.ts index 10246b5..0b0a152 100644 --- a/apps/backend/src/index.ts +++ b/apps/backend/src/index.ts @@ -3,6 +3,9 @@ import { Server } from 'socket.io'; import { createAdapter } from '@socket.io/redis-adapter'; import { createClient } from 'redis'; import dotenv from 'dotenv'; +import { eq, isNull, and } from 'drizzle-orm'; +import { db } from './db/index.js'; +import { conversationMembers, users, userDevices } from './db/schema.js'; import { eq, inArray } from 'drizzle-orm'; import { db } from './db/index.js'; import { conversationMembers, users } from './db/schema.js'; @@ -12,7 +15,7 @@ import { registerMessagingHandlers } from './socket/messaging.js'; import { app } from './app.js'; import { redis as appRedis } from './lib/redis.js'; import { setSocketServer } from './lib/socket.js'; -import { setOnline, setOffline, refreshPresence, isOnline } from './services/presence.js'; +import { setOnline, setOffline, refreshPresence, isOnline, deriveDevicePresence } from './services/presence.js'; import { cleanupStaleSockets, reconcileBoot, @@ -107,6 +110,8 @@ io.use(socketAuthMiddleware); io.on('connection', async (socket: AuthSocket) => { const userId = socket.auth!.userId; + const deviceId = socket.auth!.deviceId; + const identityPublicKey = socket.identityPublicKey; console.log('User connected:', userId, socket.id); socket.data['userId'] = userId; @@ -116,7 +121,25 @@ io.on('connection', async (socket: AuthSocket) => { registerDeviceSocket(deviceId, socket.id); // Start the server-side heartbeat watchdog (90 s timeout). - startHeartbeatTimer(socket, userId, deviceId, appRedis, io); + startHeartbeatTimer(socket, userId, deviceId, appRedis, io, identityPublicKey); + + // Update user_devices.lastSeenAt for device-based presence derivation. + if (identityPublicKey) { + try { + await db + .update(userDevices) + .set({ lastSeenAt: new Date() }) + .where( + and( + eq(userDevices.userId, userId), + eq(userDevices.identityPublicKey, identityPublicKey), + isNull(userDevices.revokedAt), + ), + ); + } catch { + // Non-critical update; ignore errors. + } + } // Per-socket middleware: intercept every incoming event before handlers. const EXCLUDED_EVENTS = new Set(['heartbeat']); @@ -236,6 +259,23 @@ io.on('connection', async (socket: AuthSocket) => { unregisterForBackpressure(socket); clearViolations(socket.id); + // Update user_devices.lastSeenAt on disconnect. + if (identityPublicKey) { + try { + await db + .update(userDevices) + .set({ lastSeenAt: new Date() }) + .where( + and( + eq(userDevices.userId, userId), + eq(userDevices.identityPublicKey, identityPublicKey), + isNull(userDevices.revokedAt), + ), + ); + } catch { + // Non-critical update; ignore errors. + } + } // During a gateway restart we must NOT wipe presence — surviving devices // re-assert via heartbeat and Redis TTLs. if ( @@ -271,9 +311,16 @@ io.on('connection', async (socket: AuthSocket) => { where: eq(conversationMembers.userId, userId), columns: { conversationId: true }, }); + + const { lastSeen } = await deriveDevicePresence(userId); + for (const m of memberships) { io.to(m.conversationId).emit('user_offline', { userId }); - io.to(m.conversationId).emit('presence_update', { userId, online: false }); + io.to(m.conversationId).emit('presence_update', { + userId, + online: false, + ...(lastSeen ? { lastSeen } : {}), + }); } await recordPresenceForCoMembers( userId, diff --git a/apps/backend/src/middleware/socketAuth.ts b/apps/backend/src/middleware/socketAuth.ts index b866b7c..99c5e2e 100644 --- a/apps/backend/src/middleware/socketAuth.ts +++ b/apps/backend/src/middleware/socketAuth.ts @@ -6,6 +6,7 @@ import { devices } from '../db/schema.js'; export interface AuthSocket extends Socket { auth?: JwtPayload; + identityPublicKey?: string; } export async function socketAuthMiddleware( @@ -40,5 +41,6 @@ export async function socketAuthMiddleware( } socket.auth = payload; + socket.identityPublicKey = device.identityPublicKey; next(); } diff --git a/apps/backend/src/routes/users.ts b/apps/backend/src/routes/users.ts index c40d949..2a5178b 100644 --- a/apps/backend/src/routes/users.ts +++ b/apps/backend/src/routes/users.ts @@ -5,7 +5,7 @@ import { db } from '../db/index.js'; import { users, wallets, devices, conversationMembers } from '../db/schema.js'; import { requireAuth, type AuthRequest } from '../middleware/auth.js'; import { redis } from '../lib/redis.js'; -import { isOnline } from '../services/presence.js'; +import { isOnline, deriveDevicePresence } from '../services/presence.js'; import { getSocketServer } from '../lib/socket.js'; export const usersRouter: RouterType = Router(); @@ -163,12 +163,22 @@ usersRouter.get('/:id/presence', async (req: AuthRequest, res) => { return; } - if (!redis) { + // Check Redis for active WS connections first. + if (redis) { + const online = await isOnline(redis, id); + if (online) { + res.json({ online: true }); + return; + } + } + + // Fall back to device-based presence from user_devices.lastSeenAt. + try { + const { online, lastSeen } = await deriveDevicePresence(id); + res.json({ online, ...(lastSeen ? { lastSeen } : {}) }); + } catch { res.json({ online: false }); - return; } - const online = await isOnline(redis, id); - res.json({ online }); } catch { res.status(404).json({ error: 'User not found' }); } diff --git a/apps/backend/src/services/heartbeat.ts b/apps/backend/src/services/heartbeat.ts index a786c4c..40d35d1 100644 --- a/apps/backend/src/services/heartbeat.ts +++ b/apps/backend/src/services/heartbeat.ts @@ -2,14 +2,9 @@ import type { Server } from 'socket.io'; import type { Redis } from 'ioredis'; import type { AuthSocket } from '../middleware/socketAuth.js'; import { db } from '../db/index.js'; -import { devices } from '../db/schema.js'; -import { eq } from 'drizzle-orm'; -import { - markDeviceOffline, - refreshPresence, - refreshPresenceSocket, - unregisterPresenceSocket, -} from './presence.js'; +import { devices, userDevices } from '../db/schema.js'; +import { eq, and, isNull } from 'drizzle-orm'; +import { refreshPresence, markDeviceOffline, refreshPresenceSocket, unregisterPresenceSocket } from './presence.js'; const HEARTBEAT_TIMEOUT_MS = 90_000; const LAST_SEEN_THROTTLE_MS = 30_000; @@ -23,6 +18,7 @@ export function startHeartbeatTimer( deviceId: string, redis: Redis | null, io: Server, + identityPublicKey?: string, ): void { const schedule = () => { clearTimeout(timers.get(socket.id)); @@ -79,6 +75,24 @@ export function startHeartbeatTimer( } catch { // Non-critical update; ignore errors. } + + // Update user_devices.lastSeenAt for device-based presence derivation. + if (identityPublicKey) { + try { + await db + .update(userDevices) + .set({ lastSeenAt: new Date() }) + .where( + and( + eq(userDevices.userId, userId), + eq(userDevices.identityPublicKey, identityPublicKey), + isNull(userDevices.revokedAt), + ), + ); + } catch { + // Non-critical update; ignore errors. + } + } } schedule(); diff --git a/apps/backend/src/services/presence.ts b/apps/backend/src/services/presence.ts index 6c712cd..c08034b 100644 --- a/apps/backend/src/services/presence.ts +++ b/apps/backend/src/services/presence.ts @@ -5,6 +5,17 @@ * device also has a small per-device key with its own TTL so heartbeat timeouts * can remove that device entry without forcing the whole user offline. * + * - On connect: add socketId to `presence:{userId}` set, set TTL 60s + * - On heartbeat: refresh TTL to 60s + * - On disconnect: remove socketId from set, if set empty → user_offline + * - GET /users/:id/presence → { online: boolean, lastSeen?: string } + * + * User presence is derived from device presence: a user is online when any + * non-expired device entry exists (Redis OR user_devices.lastSeenAt within + * the window). When offline, lastSeen reflects the most recent device activity. + * - On connect: upsert device entry in `presence:user:{userId}` and refresh TTL + * - On heartbeat: update lastSeen and refresh the device TTL + * - On disconnect/timeout: remove that device entry; if none remain → user offline * Socket IDs are tracked in Redis separately from device presence. Those * mappings let a freshly booted gateway rebuild Socket.IO room membership for * sockets that are still active on other gateway instances, without creating @@ -18,9 +29,9 @@ */ import type { Server } from 'socket.io'; import type { Redis } from 'ioredis'; -import { eq } from 'drizzle-orm'; +import { isNull, eq, and, gte, desc } from 'drizzle-orm'; import { db } from '../db/index.js'; -import { conversationMembers } from '../db/schema.js'; +import { userDevices, conversationMembers } from '../db/schema.js'; const PRESENCE_TTL = 90; // seconds const SOCKET_MAPPING_PREFIX = 'presence:sockets:'; @@ -222,6 +233,43 @@ export async function isOnline(redis: Redis, userId: string): Promise { return count > 0; } +const DEVICE_PRESENCE_WINDOW_MS = 90_000; + +/** + * Derive user presence from device presence: a user is considered online + * if any non-revoked device has a lastSeenAt within the presence window. + * When offline, returns the most recent lastSeenAt across all devices. + */ +export async function deriveDevicePresence( + userId: string, +): Promise<{ online: boolean; lastSeen: string | null }> { + const windowStart = new Date(Date.now() - DEVICE_PRESENCE_WINDOW_MS); + + const activeDevice = await db.query.userDevices.findFirst({ + where: and( + eq(userDevices.userId, userId), + isNull(userDevices.revokedAt), + gte(userDevices.lastSeenAt, windowStart), + ), + columns: { id: true }, + }); + + if (activeDevice) { + return { online: true, lastSeen: null }; + } + + const mostRecent = await db.query.userDevices.findFirst({ + where: and(eq(userDevices.userId, userId), isNull(userDevices.revokedAt)), + orderBy: desc(userDevices.lastSeenAt), + columns: { lastSeenAt: true }, + }); + + return { + online: false, + lastSeen: mostRecent?.lastSeenAt?.toISOString() ?? null, + }; +} + async function removeStaleSocketMapping( redis: Redis, userId: string, diff --git a/apps/backend/src/services/push.ts b/apps/backend/src/services/push.ts new file mode 100644 index 0000000..3569e64 --- /dev/null +++ b/apps/backend/src/services/push.ts @@ -0,0 +1,83 @@ +import webpush from 'web-push'; +import { eq, and, isNull } from 'drizzle-orm'; +import { db } from '../db/index.js'; +import { conversationMembers, pushSubscriptions, userDevices } from '../db/schema.js'; +import { redis } from '../lib/redis.js'; +import { isOnline } from './presence.js'; + +const VAPID_SUBJECT = process.env['VAPID_SUBJECT'] || 'mailto:admin@clicked.app'; + +if (process.env['VAPID_PUBLIC_KEY'] && process.env['VAPID_PRIVATE_KEY']) { + webpush.setVapidDetails( + VAPID_SUBJECT, + process.env['VAPID_PUBLIC_KEY'], + process.env['VAPID_PRIVATE_KEY'], + ); +} + +export interface PushContext { + conversationId: string; + messageId: string; + senderId: string; +} + +export async function sendPushForMessage(ctx: PushContext): Promise { + if (!process.env['VAPID_PUBLIC_KEY'] || !process.env['VAPID_PRIVATE_KEY']) { + return; + } + + try { + const allMembers = await db.query.conversationMembers.findMany({ + where: eq(conversationMembers.conversationId, ctx.conversationId), + columns: { userId: true, isMuted: true }, + }); + + for (const member of allMembers) { + if (member.userId === ctx.senderId) continue; + if (member.isMuted) continue; + + // Skip online users (active WS connection). + if (redis) { + const online = await isOnline(redis, member.userId); + if (online) continue; + } + + // Get non-revoked devices with push enabled. + const devices = await db.query.userDevices.findMany({ + where: and( + eq(userDevices.userId, member.userId), + eq(userDevices.pushEnabled, true), + isNull(userDevices.revokedAt), + ), + columns: { id: true }, + }); + + for (const device of devices) { + const sub = await db.query.pushSubscriptions.findFirst({ + where: eq(pushSubscriptions.deviceId, device.id), + columns: { endpoint: true, p256dh: true, auth: true }, + }); + + if (!sub) continue; + + try { + await webpush.sendNotification( + { + endpoint: sub.endpoint, + keys: { p256dh: sub.p256dh, auth: sub.auth }, + }, + JSON.stringify({ + type: 'new_message', + conversationId: ctx.conversationId, + messageId: ctx.messageId, + }), + ); + } catch { + // Push delivery failures are non-critical. + } + } + } + } catch { + // Push is best-effort; never let it break message delivery. + } +} diff --git a/apps/backend/src/socket/messaging.ts b/apps/backend/src/socket/messaging.ts index e5378a9..062134b 100644 --- a/apps/backend/src/socket/messaging.ts +++ b/apps/backend/src/socket/messaging.ts @@ -1,4 +1,5 @@ import type { Server } from 'socket.io'; +import { createHash } from 'node:crypto'; import { and, eq, lt, desc, sql, inArray } from 'drizzle-orm'; import { db } from '../db/index.js'; @@ -14,6 +15,7 @@ import type { AuthSocket } from '../middleware/socketAuth.js'; import { invalidateConversationCaches } from '../lib/conversationCache.js'; import { serializeMessage } from '../lib/messages.js'; import { redis } from '../lib/redis.js'; +import { sendPushForMessage } from '../services/push.js'; import { validateMessagePayload } from '../lib/validateMessagePayload.js'; import { dispatchOfflinePush, FILE_CONTENT_TYPES } from '../services/pushNotification.js'; import { deliverMessage } from '../services/deliveryPipeline.js'; @@ -65,25 +67,28 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void }); // ── send_message ─────────────────────────────────────────────────────────── - dispatcher.register('send_message', async (payload) => { - const { - conversationId, - messageId, - content, - contentType, - ciphertext, - envelopes, - fileId: payloadFileId, - } = payload as { + // Payload: { conversationId, messageId, contentType, ciphertext, envelopes, ciphertextSha256? } + // Persists the message and broadcasts it to all room members. + // + // Integrity: when `ciphertextSha256` is present the server computes + // SHA-256 over the stored ciphertext and rejects the message on mismatch. + // This is a transport-corruption check; the AEAD tag inside the ciphertext + // remains the primary integrity mechanism for clients at decryption time. + socket.on( + 'send_message', + async (payload: { conversationId: string; messageId?: string; content?: string; contentType?: string; ciphertext?: string; + ciphertextSha256?: string; envelopes?: Array<{ recipientDeviceId: string; ciphertext: string }>; - fileId?: string; - }; - const deviceId = socket.auth!.deviceId; + }) => { + const { conversationId, messageId, contentType, ciphertext, ciphertextSha256, envelopes } = + payload; + const { conversationId, messageId, content, contentType, ciphertext, envelopes } = payload; + const deviceId = socket.auth!.deviceId; // Clear active typing state as soon as the member attempts to send. for (const [timerKey, timer] of typingTimers.entries()) { @@ -249,6 +254,29 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void return; } + // Verify ciphertext integrity when a sha256 is provided. + if (ciphertextSha256 && ciphertext) { + const computed = createHash('sha256').update(ciphertext, 'utf8').digest('hex'); + if (computed !== ciphertextSha256) { + socket.emit('error', { + event: 'integrity_error', + message: 'Ciphertext sha256 mismatch', + }); + return; + } + } + + const [message] = await db + .insert(messages) + .values({ + id: messageId, + conversationId, + senderId: userId, + senderDeviceId: deviceId, + contentType: contentType || 'text/plain', + ciphertext: effectiveCiphertext, + }) + .returning(); const original = await db.query.messages.findFirst({ where: eq(messages.id, originalMessageId), }); @@ -442,12 +470,37 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void io.to(conversationId).emit('new_message', message); + // Emit a file_message event for file-type content so recipients + // know to fetch file bytes via GET /files/:id over HTTP. + const ct = contentType || 'text/plain'; + if ( + ct.startsWith('file/') || + ct === 'file' || + ct.startsWith('image/') || + ct.startsWith('video/') || + ct.startsWith('audio/') + ) { + io.to(conversationId).emit('file_message', { + messageId, + conversationId, + fileId: messageId, + }); + } + const members = await db.query.conversationMembers.findMany({ where: eq(conversationMembers.conversationId, conversationId), columns: { userId: true }, }); await invalidateConversationCaches(members.map((member) => member.userId)); + + // Dispatch push notifications to offline members who + // haven't muted the conversation and have push enabled. + sendPushForMessage({ + conversationId, + messageId, + senderId: userId, + }); }, ); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 2209e3f..5db0c73 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -41,6 +41,9 @@ importers: '@stellar/stellar-sdk': specifier: ^15.1.0 version: 15.1.0 + '@types/web-push': + specifier: ^3.6.4 + version: 3.6.4 cors: specifier: ^2.8.6 version: 2.8.6