Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions apps/backend/src/__tests__/fileCleanup.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@ vi.mock('@aws-sdk/client-s3', () => ({
const mockFindMany = vi.fn();
const mockUpdate = vi.fn();
const mockExecute = vi.fn();
const mockDelete = vi.fn();

vi.mock('../db/index.js', () => ({
db: {
query: { files: { findMany: mockFindMany } },
update: mockUpdate,
execute: mockExecute,
delete: mockDelete,
},
}));

Expand All @@ -35,6 +37,9 @@ vi.mock('../db/schema.js', () => ({
vi.mock('drizzle-orm', () => ({
isNotNull: vi.fn((col: unknown) => ({ col, isNotNull: true })),
isNull: vi.fn((col: unknown) => ({ col, isNull: true })),
and: vi.fn(),
eq: vi.fn(),
lt: vi.fn(),
sql: Object.assign(
vi.fn((strings: TemplateStringsArray, ...vals: unknown[]) => ({ strings, vals })),
{ raw: vi.fn() },
Expand All @@ -53,6 +58,7 @@ beforeEach(() => {
mockS3Send.mockResolvedValue(undefined);
mockUpdate.mockReturnValue({ set: mockSetFn });
mockSetFn.mockReturnValue({ where: mockWhereFn });
mockDelete.mockReturnValue({ where: mockWhereFn });
});

const { softDeleteFile, runHardDeletePass } = await import('../services/fileCleanup.js');
Expand All @@ -67,7 +73,8 @@ describe('#231 – softDeleteFile', () => {

describe('#231 – runHardDeletePass', () => {
it('skips files that still have live message references', async () => {
mockFindMany.mockResolvedValue([{ id: 'file-1', storageKey: 'key-1' }]);
mockFindMany.mockResolvedValueOnce([{ id: 'file-1', storageKey: 'key-1' }]) // first pass (candidates)
.mockResolvedValueOnce([]); // second pass (pendingCandidates)
mockExecute.mockResolvedValueOnce([{ '?column?': 1 }]); // live ref exists

await runHardDeletePass();
Expand All @@ -77,7 +84,8 @@ describe('#231 – runHardDeletePass', () => {
});

it('hard-deletes from S3 and marks hardDeletedAt when no live refs', async () => {
mockFindMany.mockResolvedValue([{ id: 'file-2', storageKey: 'key-2' }]);
mockFindMany.mockResolvedValueOnce([{ id: 'file-2', storageKey: 'key-2' }])
.mockResolvedValueOnce([]);
mockExecute.mockResolvedValueOnce([]); // no live refs

await runHardDeletePass();
Expand All @@ -87,7 +95,8 @@ describe('#231 – runHardDeletePass', () => {
});

it('does not mark hardDeletedAt when S3 delete throws (safe retry)', async () => {
mockFindMany.mockResolvedValue([{ id: 'file-3', storageKey: 'key-3' }]);
mockFindMany.mockResolvedValueOnce([{ id: 'file-3', storageKey: 'key-3' }])
.mockResolvedValueOnce([]);
mockExecute.mockResolvedValueOnce([]);
mockS3Send.mockRejectedValueOnce(new Error('NoSuchKey'));

Expand All @@ -97,14 +106,26 @@ describe('#231 – runHardDeletePass', () => {
});

it('processes multiple files in one pass', async () => {
mockFindMany.mockResolvedValue([
mockFindMany.mockResolvedValueOnce([
{ id: 'file-a', storageKey: 'key-a' },
{ id: 'file-b', storageKey: 'key-b' },
]);
]).mockResolvedValueOnce([]);
mockExecute.mockResolvedValue([]); // no live refs for either

await runHardDeletePass();

expect(mockS3Send).toHaveBeenCalledTimes(2);
});

it('deletes pending files older than 24 hours', async () => {
mockFindMany.mockResolvedValueOnce([]) // no soft-deleted candidates
.mockResolvedValueOnce([
{ id: 'pending-1', storageKey: 'pending-key-1' },
]);

await runHardDeletePass();

expect(mockS3Send).toHaveBeenCalledTimes(1);
expect(mockDelete).toHaveBeenCalled();
});
});
2 changes: 2 additions & 0 deletions apps/backend/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { usersRouter } from './routes/users.js';
import { treasuryRouter } from './routes/treasury.js';
import { uploadsRouter } from './routes/uploads.js';
import { filesRouter } from './routes/files.js';
import { uploadsRouter } from './routes/uploads.js';
import { pushRouter } from './routes/push.js';
import { syncRouter } from './routes/sync.js';
import { requireAuth, type AuthRequest } from './middleware/auth.js';
Expand Down Expand Up @@ -57,6 +58,7 @@ app.use('/users', usersRouter);
app.use('/treasury', treasuryRouter);
app.use('/uploads', uploadsRouter);
app.use('/files', filesRouter);
app.use('/uploads', uploadsRouter);
app.use('/push', pushRouter);
app.use('/sync', syncRouter);

Expand Down
21 changes: 21 additions & 0 deletions apps/backend/src/db/schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,27 @@ export const contentTypeEnum = pgEnum('content_type', [
'system',
]);

// ─── Files (#231) ─────────────────────────────────────────────────────────────
//
// Tracks S3 storage objects for file-type messages. Soft-deleted when all
// referencing messages are retracted; hard-deleted by the background cleanup job.

export const fileStatusEnum = pgEnum('file_status', ['pending', 'ready']);

export const files = pgTable('files', {
id: uuid('id').primaryKey().defaultRandom(),
storageKey: text('storage_key').notNull().unique(),
status: fileStatusEnum('status').notNull().default('pending'),
size: integer('size'),
sha256: text('sha256'),
deletedAt: timestamp('deleted_at'),
hardDeletedAt: timestamp('hard_deleted_at'),
createdAt: timestamp('created_at').notNull().defaultNow(),
});

export type File = typeof files.$inferSelect;
export type NewFile = typeof files.$inferInsert;

export const conversationMembers = pgTable('conversation_members', {
id: uuid('id').primaryKey().defaultRandom(),
conversationId: uuid('conversation_id')
Expand Down
9 changes: 9 additions & 0 deletions apps/backend/src/routes/files.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,15 @@ filesRouter.get('/:fileId', async (req: AuthRequest, res) => {
});

if (!message) {
res.status(404).json({ error: 'File not referenced by any message' });
return;
}

const file = await db.query.files.findFirst({
where: eq(files.id, fileId),
});

if (!file) {
// File may not yet be attached to a message (upload in progress) — deny.
res.status(404).json({ error: 'File not found' });
return;
Expand Down
13 changes: 12 additions & 1 deletion apps/backend/src/routes/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { Router } from 'express';
import type { IRouter } from 'express';
import { and, eq, inArray } from 'drizzle-orm';
import { db } from '../db/index.js';
import { conversationMembers, messages, messageEnvelopes, userDevices } from '../db/schema.js';
import { conversationMembers, messages, messageEnvelopes, userDevices, files } from '../db/schema.js';
import { softDeleteFile } from '../services/fileCleanup.js';
import { requireAuth, type AuthRequest } from '../middleware/auth.js';
import { validate } from '../middleware/validate.js';
Expand Down Expand Up @@ -61,6 +61,16 @@ messagesRouter.post('/', validate(SendMessageSchema), async (req: AuthRequest, r
return;
}

if (fileId) {
const fileRecord = await db.query.files.findFirst({
where: eq(files.id, fileId),
});
if (!fileRecord || fileRecord.status !== 'ready') {
res.status(400).json({ error: 'File is not ready or does not exist' });
return;
}
}

// ── persist ────────────────────────────────────────────────────────────────
const [message] = await db
.insert(messages)
Expand All @@ -71,6 +81,7 @@ messagesRouter.post('/', validate(SendMessageSchema), async (req: AuthRequest, r
senderDeviceId: deviceId ?? null,
contentType: contentType?.trim().toLowerCase() || 'text',
ciphertext: ciphertext || null,
fileId: fileId || null,
})
.returning();

Expand Down
50 changes: 41 additions & 9 deletions apps/backend/src/routes/uploads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,17 +104,49 @@ uploadsRouter.post('/:fileId/confirm', async (req: AuthRequest, res) => {
return;
}

if (file.uploaderId !== userId) {
res.status(403).json({ error: 'Only the uploader may confirm this file' });
if (file.status === 'ready') {
res.status(200).json({ message: 'File is already ready' });
return;
}

if (file.status !== 'pending') {
res.status(409).json({ error: `File is already ${file.status}` });
return;
try {
const headCommand = new HeadObjectCommand({
Bucket: bucketName,
Key: file.storageKey,
});
const headOutput = await s3.send(headCommand);

if (headOutput.ContentLength !== size) {
res.status(400).json({ error: 'Size mismatch' });
return;
}

if (sha256) {
if (headOutput.ChecksumSHA256 && headOutput.ChecksumSHA256 !== sha256) {
res.status(400).json({ error: 'Hash mismatch' });
return;
}
if (
headOutput.Metadata &&
headOutput.Metadata['sha256'] &&
headOutput.Metadata['sha256'] !== sha256
) {
res.status(400).json({ error: 'Hash mismatch' });
return;
}
}

await db
.update(files)
.set({ status: 'ready', size, sha256: sha256 || null })
.where(eq(files.id, fileId));

res.status(200).json({ message: 'File confirmed' });
} catch (error: any) {
if (error.name === 'NotFound' || error.$metadata?.httpStatusCode === 404) {
res.status(400).json({ error: 'File not found in storage. Ensure upload completed.' });
return;
}
res.status(500).json({ error: 'Failed to confirm upload' });
}

await db.update(files).set({ status: 'ready' }).where(eq(files.id, fileId));

res.json({ fileId, status: 'ready' });
});
19 changes: 18 additions & 1 deletion apps/backend/src/services/fileCleanup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
* delete, so a crash between steps is safe to retry.
*/
import { S3Client, DeleteObjectCommand } from '@aws-sdk/client-s3';
import { isNotNull, isNull, sql } from 'drizzle-orm';
import { isNotNull, isNull, sql, and, eq, lt } from 'drizzle-orm';
import { db } from '../db/index.js';
import { files } from '../db/schema.js';
import { reenableExpiredBackoffs } from './pushNotification.js';
Expand Down Expand Up @@ -67,6 +67,23 @@ export async function runHardDeletePass(): Promise<void> {
console.error(`[file-cleanup] failed to delete ${file.storageKey}:`, err);
}
}

// Garbage-collect unconfirmed pending files older than 24 hours
const stalePendingDate = new Date(Date.now() - 24 * 60 * 60 * 1000);
const pendingCandidates = await db.query.files.findMany({
where: (f) => and(eq(f.status, 'pending'), lt(f.createdAt, stalePendingDate)),
columns: { id: true, storageKey: true },
});

for (const file of pendingCandidates) {
try {
await s3.send(new DeleteObjectCommand({ Bucket: BUCKET, Key: file.storageKey }));
await db.delete(files).where(eq(files.id, file.id));
console.log(`[file-cleanup] deleted pending file s3://${BUCKET}/${file.storageKey}`);
} catch (err) {
console.error(`[file-cleanup] failed to delete pending file ${file.storageKey}:`, err);
}
}
}

let cleanupTimer: ReturnType<typeof setInterval> | null = null;
Expand Down
20 changes: 5 additions & 15 deletions apps/backend/src/socket/messaging.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,28 +85,18 @@ export function registerMessagingHandlers(io: Server, socket: AuthSocket): void
});

// ── send_message ───────────────────────────────────────────────────────────
// Payload: { conversationId, messageId, contentType, ciphertext, envelopes, ciphertextSha256? }
// Persists the message and broadcasts it to all room members.
//
// Integrity: when `ciphertextSha256` is present the server computes
// SHA-256 over the stored ciphertext and rejects the message on mismatch.
// This is a transport-corruption check; the AEAD tag inside the ciphertext
// remains the primary integrity mechanism for clients at decryption time.
socket.on(
'send_message',
async (payload: {
dispatcher.register('send_message', async (payload) => {
const { conversationId, messageId, content, contentType, ciphertext, envelopes, fileId } = payload as {
conversationId: string;
messageId?: string;
content?: string;
contentType?: string;
ciphertext?: string;
ciphertextSha256?: string;
envelopes?: Array<{ recipientDeviceId: string; ciphertext: string }>;
}) => {
const { conversationId, messageId, contentType, ciphertext, ciphertextSha256, envelopes } =
payload;
const { conversationId, messageId, content, contentType, ciphertext, envelopes } = payload;
const deviceId = socket.auth!.deviceId;
fileId?: string;
};
const deviceId = socket.auth!.deviceId;

// Clear active typing state as soon as the member attempts to send.
for (const [timerKey, timer] of typingTimers.entries()) {
Expand Down