diff --git a/Signal.xcodeproj/project.pbxproj b/Signal.xcodeproj/project.pbxproj index 41d570d460..2882f3858a 100644 --- a/Signal.xcodeproj/project.pbxproj +++ b/Signal.xcodeproj/project.pbxproj @@ -594,6 +594,7 @@ 4C6E6C6924241C00009DE948 /* ConversationViewControllerTest.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4C6E6C6824241C00009DE948 /* ConversationViewControllerTest.swift */; }; 4C751BE523FA0284002A8AF1 /* ContactSupportActionSheet.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4C751BE423FA0284002A8AF1 /* ContactSupportActionSheet.swift */; }; 4C83AC4223C55D9C00D4F2E6 /* SignalBaseTest.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4C83AC4123C55D9C00D4F2E6 /* SignalBaseTest.swift */; }; + C0D1E5012F0C000000000001 /* ThreadUtilSignalUITest.swift in Sources */ = {isa = PBXBuildFile; fileRef = C0D1E5002F0C000000000001 /* ThreadUtilSignalUITest.swift */; }; 4C8A6DFC22E5499300469AE7 /* MediaZoomAnimationController.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4C8A6DFB22E5499300469AE7 /* MediaZoomAnimationController.swift */; }; 4C8A6DFE22E54AFA00469AE7 /* MediaInteractiveDismiss.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4C8A6DFD22E54AFA00469AE7 /* MediaInteractiveDismiss.swift */; }; 4C9D347B23679C25006A4307 /* ContactStreamTest.swift in Sources */ = {isa = PBXBuildFile; fileRef = 4C9D347923679C13006A4307 /* ContactStreamTest.swift */; }; @@ -4828,6 +4829,7 @@ 4C6F527B20FFE8400097DEEE /* SignalUBSan.supp */ = {isa = PBXFileReference; lastKnownFileType = text; path = SignalUBSan.supp; sourceTree = ""; }; 4C751BE423FA0284002A8AF1 /* ContactSupportActionSheet.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ContactSupportActionSheet.swift; sourceTree = ""; }; 4C83AC4123C55D9C00D4F2E6 /* SignalBaseTest.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = SignalBaseTest.swift; sourceTree = ""; }; + C0D1E5002F0C000000000001 /* ThreadUtilSignalUITest.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ThreadUtilSignalUITest.swift; sourceTree = ""; }; 4C8A6DFB22E5499300469AE7 /* MediaZoomAnimationController.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MediaZoomAnimationController.swift; sourceTree = ""; }; 4C8A6DFD22E54AFA00469AE7 /* MediaInteractiveDismiss.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = MediaInteractiveDismiss.swift; sourceTree = ""; }; 4C9D347923679C13006A4307 /* ContactStreamTest.swift */ = {isa = PBXFileReference; lastKnownFileType = sourcecode.swift; path = ContactStreamTest.swift; sourceTree = ""; }; @@ -11645,6 +11647,7 @@ 50791B1B2D037A7800D747F8 /* RecipientPickers */, 661278052996BA6700A1D5A1 /* Registration */, 040507132F80639B0078B769 /* RemoteReleaseNotes */, + C0D1E5022F0C000000000001 /* Sending */, 4C3EF8002109184A0007EBF7 /* SSKTests */, D97046082E81D5B60034C05D /* Storage */, E75DD3DC2810CD3500E32C36 /* subscriptions */, @@ -11668,6 +11671,14 @@ path = contact; sourceTree = ""; }; + C0D1E5022F0C000000000001 /* Sending */ = { + isa = PBXGroup; + children = ( + C0D1E5002F0C000000000001 /* ThreadUtilSignalUITest.swift */, + ); + path = Sending; + sourceTree = ""; + }; B660F69D1C29868000687D6E /* Supporting Files */ = { isa = PBXGroup; children = ( @@ -18874,6 +18885,7 @@ 4C3EF802210918740007EBF7 /* SSKProtoEnvelopeTest.swift in Sources */, 8803FF6628EF89B50023574A /* StorySharingTests.swift in Sources */, E75DD3E02810CDBD00E32C36 /* SubscriptionModelsTest.swift in Sources */, + C0D1E5012F0C000000000001 /* ThreadUtilSignalUITest.swift in Sources */, 5033D47329DCB3FF007FEADA /* UrlOpenerTest.swift in Sources */, 45A3579827DAAC6A0051CE8B /* UserProfileTest.swift in Sources */, 5042EAA3287F96FB00C9B19F /* VisibleBadgeResolverTest.swift in Sources */, diff --git a/Signal/ConversationView/ConversationInputToolbar.swift b/Signal/ConversationView/ConversationInputToolbar.swift index 505f1f69ce..164e612534 100644 --- a/Signal/ConversationView/ConversationInputToolbar.swift +++ b/Signal/ConversationView/ConversationInputToolbar.swift @@ -2001,6 +2001,12 @@ public class ConversationInputToolbar: UIView, QuotedReplyPreviewDelegate { return linkPreviewFetchState.linkPreviewDraftIfLoaded } + func consumeLinkPreviewDraftForSendingTask() -> Task? { + AssertIsOnMainThread() + + return linkPreviewFetchState.consumeLinkPreviewDraftForSendingTask() + } + private func updateInputLinkPreview() { AssertIsOnMainThread() diff --git a/Signal/ConversationView/ConversationViewController+ConversationInputToolbarDelegate.swift b/Signal/ConversationView/ConversationViewController+ConversationInputToolbarDelegate.swift index 54cc85a8c2..69312249e8 100644 --- a/Signal/ConversationView/ConversationViewController+ConversationInputToolbarDelegate.swift +++ b/Signal/ConversationView/ConversationViewController+ConversationInputToolbarDelegate.swift @@ -136,6 +136,7 @@ extension ConversationViewController: ConversationInputToolbarDelegate { // If its cleared, "change" it to nothing (clear it). quotedReplyEdit: inputToolbar.quotedReplyDraft == nil ? .change(()) : .keep, linkPreviewDraft: inputToolbar.linkPreviewDraft, + linkPreviewDraftForSending: inputToolbar.consumeLinkPreviewDraftForSendingTask(), editTarget: editTarget, persistenceCompletionHandler: { AssertIsOnMainThread() @@ -148,6 +149,7 @@ extension ConversationViewController: ConversationInputToolbarDelegate { thread: self.thread, quotedReplyDraft: inputToolbar.quotedReplyDraft, linkPreviewDraft: inputToolbar.linkPreviewDraft, + linkPreviewDraftForSending: inputToolbar.consumeLinkPreviewDraftForSendingTask(), persistenceCompletionHandler: { AssertIsOnMainThread() self.loadCoordinator.enqueueReload() diff --git a/Signal/test/Sending/ThreadUtilSignalUITest.swift b/Signal/test/Sending/ThreadUtilSignalUITest.swift new file mode 100644 index 0000000000..db13c99dfe --- /dev/null +++ b/Signal/test/Sending/ThreadUtilSignalUITest.swift @@ -0,0 +1,147 @@ +// +// Copyright 2026 Signal Messenger, LLC +// SPDX-License-Identifier: AGPL-3.0-only +// + +import LibSignalClient +import XCTest + +@testable import SignalServiceKit +@testable import SignalUI + +final class ThreadUtilSignalUITest: SignalBaseTest { + + private var thread: TSContactThread! + + @MainActor + override func setUp() { + super.setUp() + + ThreadUtil.enqueueSendQueue = SerialTaskQueue() + ThreadUtil.sendFinalizationQueue = SerialTaskQueue() + + write { tx in + (DependenciesBridge.shared.registrationStateChangeManager as! RegistrationStateChangeManagerImpl) + .registerForTests(localIdentifiers: .forUnitTests, tx: tx) + SSKPreferences.setAreIntentDonationsEnabled(false, transaction: tx) + + self.thread = TSContactThread.getOrCreateThread( + withContactAddress: SignalServiceAddress(Aci.randomForTesting()), + transaction: tx, + ) + } + } + + @MainActor + override func tearDown() { + ThreadUtil.enqueueSendQueue = SerialTaskQueue() + ThreadUtil.sendFinalizationQueue = SerialTaskQueue() + + super.tearDown() + } + + func testPendingLinkPreviewDoesNotBlockOutgoingRowPersistence() async throws { + let previewContinuation = AtomicValue?>(nil, lock: .init()) + let previewTask = Task { + await withCheckedContinuation { continuation in + previewContinuation.set(continuation) + } + } + let previewTaskIsWaiting = await waitUntil { previewContinuation.get() != nil } + XCTAssertTrue(previewTaskIsWaiting) + + let firstMessageDidPersist = AtomicValue(false, lock: .init()) + ThreadUtil.enqueueMessage( + body: MessageBody(text: "https://signal.org", ranges: .empty), + thread: thread, + linkPreviewDraftForSending: previewTask, + persistenceCompletionHandler: { firstMessageDidPersist.set(true) }, + ) + + let firstMessagePersisted = await waitUntil(firstMessageDidPersist.get) + XCTAssertTrue( + firstMessagePersisted, + "The outgoing row should be visible before the pending preview resolves.", + ) + if !firstMessagePersisted { + return + } + + var outgoingMessages = self.outgoingMessages() + XCTAssertEqual(outgoingMessages.count, 1) + XCTAssertNil(outgoingMessages.first?.linkPreview) + + let secondMessageDidPersist = AtomicValue(false, lock: .init()) + ThreadUtil.enqueueMessage( + body: MessageBody(text: "second message", ranges: .empty), + thread: thread, + persistenceCompletionHandler: { secondMessageDidPersist.set(true) }, + ) + + let secondMessagePersisted = await waitUntil(secondMessageDidPersist.get) + XCTAssertTrue( + secondMessagePersisted, + "A pending preview finalization should not block later outgoing rows from appearing.", + ) + if !secondMessagePersisted { + return + } + outgoingMessages = self.outgoingMessages() + XCTAssertEqual(outgoingMessages.count, 2) + XCTAssertFalse( + self.messageSenderJobMessageIds().contains(outgoingMessages[0].uniqueId), + "The pending-preview message should hold an in-memory ordering barrier until finalization, not a runnable durable sender job.", + ) + + previewContinuation.swap(nil)?.resume(returning: OWSLinkPreviewDraft( + url: try XCTUnwrap(URL(string: "https://signal.org")), + title: "Signal", + isForwarded: false, + )) + + try await ThreadUtil.enqueueSendQueue.enqueue(operation: {}).value + try await ThreadUtil.sendFinalizationQueue.enqueue(operation: {}).value + + outgoingMessages = self.outgoingMessages() + XCTAssertEqual(outgoingMessages.count, 2) + XCTAssertEqual(outgoingMessages.first?.linkPreview?.urlString, "https://signal.org") + XCTAssertEqual(outgoingMessages.first?.linkPreview?.title, "Signal") + } + + private func outgoingMessages() -> [TSOutgoingMessage] { + read { tx in + try! InteractionFinder(threadUniqueId: thread.uniqueId) + .fetchAllInteractions(rowIdFilter: .newest, limit: Int.max, tx: tx) + .compactMap { $0 as? TSOutgoingMessage } + .sorted { $0.timestamp < $1.timestamp } + } + } + + private func messageSenderJobMessageIds() -> [String] { + read { tx in + MessageSenderJobRecord.anyFetchAll(transaction: tx) + .sorted { ($0.id ?? 0) < ($1.id ?? 0) } + .compactMap { jobRecord -> String? in + guard case .persisted(let messageId, _) = jobRecord.messageType else { + return nil + } + return messageId + } + } + } + + private func waitUntil( + timeout: TimeInterval = 1, + _ predicate: @escaping () -> Bool, + ) async -> Bool { + let deadline = Date().addingTimeInterval(timeout) + while Date() < deadline { + if predicate() { + return true + } + try? await Task.sleep(nanoseconds: 10 * NSEC_PER_MSEC) + } + + return predicate() + } +} diff --git a/SignalServiceKit/Jobs/MessageSenderJobQueue.swift b/SignalServiceKit/Jobs/MessageSenderJobQueue.swift index 793bbd425c..741b250cbb 100644 --- a/SignalServiceKit/Jobs/MessageSenderJobQueue.swift +++ b/SignalServiceKit/Jobs/MessageSenderJobQueue.swift @@ -33,6 +33,12 @@ import LibSignalClient public class MessageSenderJobQueue { private var jobSerializer = CompletionSerializer() + public struct DeferredJob: Sendable { + fileprivate let uniqueId: String + fileprivate let isHighPriority: Bool + fileprivate let isInMemoryOnly: Bool + } + public init(appReadiness: AppReadiness) { appReadiness.runNowOrWhenAppDidBecomeReadyAsync { self.setUp() @@ -72,6 +78,110 @@ public class MessageSenderJobQueue { } } + public func addDeferred( + _ namespace: PromiseNamespace, + message: PreparedOutgoingMessage, + limitToCurrentProcessLifetime: Bool = false, + isHighPriority: Bool = false, + transaction: DBWriteTransaction, + ) -> (DeferredJob, Promise) { + let deferredJob = DeferredJob( + uniqueId: UUID().uuidString, + isHighPriority: isHighPriority, + isInMemoryOnly: limitToCurrentProcessLifetime, + ) + let promise = Promise { future in + // Mark as sending now so the UI updates immediately. + message.updateAllUnsentRecipientsAsSending(tx: transaction) + self.state.update { + $0.pendingJobs.append(.deferred(deferredJob)) + $0.jobFutures[deferredJob.uniqueId] = future + } + } + transaction.addSyncCompletion { + self.startPendingJobRecordsIfPossible() + } + + return (deferredJob, promise) + } + + public func startDeferredJob( + _ deferredJob: DeferredJob, + message: PreparedOutgoingMessage, + transaction: DBWriteTransaction, + ) throws { + guard self.state.get().pendingJobs.contains(where: { $0.deferredJobUniqueId == deferredJob.uniqueId }) else { + throw OWSAssertionError("Missing deferred message sender job.") + } + + let jobRecord: MessageSenderJobRecord + do { + jobRecord = try message.asMessageSenderJobRecord(isHighPriority: deferredJob.isHighPriority, tx: transaction) + } catch { + message.updateWithAllSendingRecipientsMarkedAsFailed(error: error, tx: transaction) + self.rejectDeferredJob(deferredJob, error: error, transaction: transaction) + throw error + } + owsAssertDebug(jobRecord.status == .ready) + if deferredJob.isInMemoryOnly { + // Nothing to do. Just don't insert it into the database. + } else { + jobRecord.anyInsert(transaction: transaction) + } + + let job = Job(record: jobRecord, isInMemoryOnly: deferredJob.isInMemoryOnly) + var didReplaceDeferredJob = false + self.state.update { + guard + let pendingJobIndex = $0.pendingJobs.firstIndex(where: { + $0.deferredJobUniqueId == deferredJob.uniqueId + }) + else { + return + } + $0.pendingJobs[pendingJobIndex] = .job(job) + if let future = $0.jobFutures.removeValue(forKey: deferredJob.uniqueId) { + $0.jobFutures[jobRecord.uniqueId] = future + } + didReplaceDeferredJob = true + } + guard didReplaceDeferredJob else { + if !deferredJob.isInMemoryOnly { + jobRecord.anyRemove(transaction: transaction) + } + throw OWSAssertionError("Missing deferred message sender job.") + } + + transaction.addSyncCompletion { + self.startPendingJobRecordsIfPossible() + } + } + + public func cancelDeferredJob( + _ deferredJob: DeferredJob, + error: any Error, + transaction: DBWriteTransaction, + ) { + self.rejectDeferredJob(deferredJob, error: error, transaction: transaction) + } + + private func rejectDeferredJob( + _ deferredJob: DeferredJob, + error: any Error, + transaction: DBWriteTransaction, + ) { + transaction.addSyncCompletion { + let future = self.state.update { state in + state.pendingJobs.removeAll { + $0.deferredJobUniqueId == deferredJob.uniqueId + } + return state.jobFutures.removeValue(forKey: deferredJob.uniqueId) + } + future?.reject(error) + self.startPendingJobRecordsIfPossible() + } + } + private func add( message: PreparedOutgoingMessage, exclusiveToCurrentProcessIdentifier: Bool, @@ -97,7 +207,7 @@ public class MessageSenderJobQueue { } self.state.update { - $0.pendingJobs.append(Job(record: jobRecord, isInMemoryOnly: exclusiveToCurrentProcessIdentifier)) + $0.pendingJobs.append(.job(Job(record: jobRecord, isInMemoryOnly: exclusiveToCurrentProcessIdentifier))) if let future { $0.jobFutures[jobRecord.uniqueId] = future } @@ -116,6 +226,29 @@ public class MessageSenderJobQueue { let isInMemoryOnly: Bool } + private enum PendingJob { + case job(Job) + case deferred(DeferredJob) + + var job: Job? { + switch self { + case .job(let job): + return job + case .deferred: + return nil + } + } + + var deferredJobUniqueId: String? { + switch self { + case .job: + return nil + case .deferred(let deferredJob): + return deferredJob.uniqueId + } + } + } + /// A job that's been queued but hasn't started yet. private struct QueuedOperationState { let job: Job @@ -214,7 +347,7 @@ public class MessageSenderJobQueue { private struct State { var isLoaded = false - var pendingJobs = [Job]() + var pendingJobs = [PendingJob]() var isTransferringPendingJobs = false var queueStates = [QueueKey: QueueState]() var jobFutures = [String: Future]() @@ -277,8 +410,9 @@ public class MessageSenderJobQueue { pendingJobQueue.async { let pendingJobs = self.state.update { if $0.isLoaded { - let result = $0.pendingJobs - $0.pendingJobs = [] + let readyJobCount = $0.pendingJobs.prefix(while: { $0.job != nil }).count + let result = $0.pendingJobs.prefix(readyJobCount).compactMap(\.job) + $0.pendingJobs.removeFirst(readyJobCount) return result } $0.isTransferringPendingJobs = true @@ -342,12 +476,15 @@ public class MessageSenderJobQueue { let jobRecords = try await jobRecordFinder.loadRunnableJobs(updateRunnableJobRecord: { jobRecord, tx in self.didMarkAsReady(oldJobRecord: jobRecord, transaction: tx) }) - let jobRecordUniqueIds = Set(jobRecords.lazy.map(\.uniqueId)) self.state.update { - var newlyPendingJobs = $0.pendingJobs - newlyPendingJobs.removeAll(where: { jobRecordUniqueIds.contains($0.record.uniqueId) }) - $0.pendingJobs = jobRecords.map { Job(record: $0, isInMemoryOnly: false) } - $0.pendingJobs.append(contentsOf: newlyPendingJobs) + let pendingJobs = $0.pendingJobs + let pendingJobRecordUniqueIds = Set(pendingJobs.lazy.compactMap { $0.job?.record.uniqueId }) + $0.pendingJobs = Array( + jobRecords.lazy + .filter { !pendingJobRecordUniqueIds.contains($0.uniqueId) } + .map { .job(Job(record: $0, isInMemoryOnly: false)) }, + ) + $0.pendingJobs.append(contentsOf: pendingJobs) } } catch { owsFailDebug("Couldn't load existing message send jobs: \(error)") diff --git a/SignalServiceKit/Messages/OutgoingMessagePreparer/UnpreparedOutgoingMessage.swift b/SignalServiceKit/Messages/OutgoingMessagePreparer/UnpreparedOutgoingMessage.swift index bac9e218c6..2df12bc459 100644 --- a/SignalServiceKit/Messages/OutgoingMessagePreparer/UnpreparedOutgoingMessage.swift +++ b/SignalServiceKit/Messages/OutgoingMessagePreparer/UnpreparedOutgoingMessage.swift @@ -90,6 +90,71 @@ public class UnpreparedOutgoingMessage { return try self._prepare(tx: tx) } + public struct DeferredPersistedMessage: Sendable { + public let messageUniqueId: String + public let messageRowId: Int64 + } + + public func preparePersistableMessageForDeferredSend( + tx: DBWriteTransaction, + ) throws -> DeferredPersistedMessage { + switch messageType { + case .persistable(let message): + let preparedMessageType = try preparePersistableMessage(message, tx: tx) + guard case .persisted(let persistedMessage) = preparedMessageType else { + throw OWSAssertionError("Unexpected deferred message type.") + } + return DeferredPersistedMessage( + messageUniqueId: persistedMessage.message.uniqueId, + messageRowId: persistedMessage.rowId, + ) + case .editMessage, .story, .transient: + throw OWSAssertionError("Only persistable messages can be deferred.") + } + } + + public static func applyLinkPreviewDataSourceToDeferredMessage( + _ linkPreviewDataSource: LinkPreviewDataSource, + messageUniqueId: String, + messageRowId: Int64, + tx: DBWriteTransaction, + ) throws -> TSOutgoingMessage { + guard + let message = TSOutgoingMessage.fetchOutgoingMessageViaCache(uniqueId: messageUniqueId, transaction: tx), + message.sqliteRowId == messageRowId + else { + throw OWSAssertionError("Missing deferred message.") + } + guard + let thread = message.thread(tx: tx), + let threadRowId = thread.sqliteRowId + else { + throw OWSAssertionError("Deferred message missing thread.") + } + + let linkPreviewManager = DependenciesBridge.shared.linkPreviewManager + let validatedLinkPreview = try linkPreviewManager.validateDataSource(dataSource: linkPreviewDataSource, tx: tx) + message.update(with: validatedLinkPreview.preview, transaction: tx) + + if let imageDataSource = validatedLinkPreview.imageDataSource { + let attachmentID = try DependenciesBridge.shared.attachmentManager.createAttachmentStream( + from: OwnedAttachmentDataSource( + dataSource: imageDataSource, + owner: .messageLinkPreview(.init( + messageRowId: messageRowId, + receivedAtTimestamp: message.receivedAtTimestamp, + threadRowId: threadRowId, + isPastEditRevision: message.isPastEditRevision(), + )), + ), + tx: tx, + ) + Logger.info("Created deferred link preview attachment \(attachmentID) for outgoing message \(message.timestamp)") + } + + return message + } + public var messageTimestampForLogging: UInt64 { switch messageType { case .persistable(let message): diff --git a/SignalServiceKit/Util/ThreadUtil.swift b/SignalServiceKit/Util/ThreadUtil.swift index b3d8cb5880..16550033e2 100644 --- a/SignalServiceKit/Util/ThreadUtil.swift +++ b/SignalServiceKit/Util/ThreadUtil.swift @@ -16,6 +16,10 @@ public final class ThreadUtil { // same order in which they are enqueued. public static var enqueueSendQueue = SerialTaskQueue() + // A serial queue for work that must happen after an outgoing message is + // visible but before its sender job is enqueued. + public static var sendFinalizationQueue = SerialTaskQueue() + public static func enqueueSendAsyncWrite(_ block: @escaping (DBWriteTransaction) -> Void) { enqueueSendQueue.enqueue { await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { transaction in diff --git a/SignalServiceKit/tests/Network/MessageSendJobQueueTest.swift b/SignalServiceKit/tests/Network/MessageSendJobQueueTest.swift index ffc4384311..3dc72a0146 100644 --- a/SignalServiceKit/tests/Network/MessageSendJobQueueTest.swift +++ b/SignalServiceKit/tests/Network/MessageSendJobQueueTest.swift @@ -69,6 +69,64 @@ class MessageSenderJobQueueTest: SSKBaseTest { XCTAssertEqual(fakeMessageSender.sentMessages.map { $0.uniqueId }, messages.map { $0.uniqueId }) } + func test_deferredJobBlocksLaterJobsUntilStarted() async throws { + let jobQueue = MessageSenderJobQueue(appReadiness: AppReadinessMock()) + let (messages, deferredJob, promises) = try await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in + let contactThread = ContactThreadFactory().create(transaction: tx) + let outgoingMessageFactory = OutgoingMessageFactory() + outgoingMessageFactory.threadCreator = { _ in contactThread } + let messages = (1...2).map { _ in outgoingMessageFactory.create(transaction: tx) } + let preparedMessages = try messages.map { + let jobRecord = try MessageSenderJobRecord( + persistedMessage: .init( + rowId: $0.sqliteRowId!, + message: $0, + ), + isHighPriority: false, + transaction: tx, + ) + return PreparedOutgoingMessage.restore(from: jobRecord, tx: tx)! + } + let (deferredJob, firstPromise) = jobQueue.addDeferred( + .promise, + message: preparedMessages[0], + transaction: tx, + ) + let secondPromise = jobQueue.add(.promise, message: preparedMessages[1], transaction: tx) + return (messages, deferredJob, [firstPromise, secondPromise]) + } + + XCTAssertEqual( + self.messageSenderJobMessageIds(), + [messages[1].uniqueId], + "Deferred jobs should block later jobs in memory without becoming runnable persisted jobs before finalization.", + ) + + fakeMessageSender.stubbedFailingErrors = [nil, nil] + jobQueue.setUp() + try await Task.sleep(nanoseconds: 50 * NSEC_PER_MSEC) + XCTAssertEqual(fakeMessageSender.sentMessages.map { $0.uniqueId }, []) + + try await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { tx in + let message = try XCTUnwrap(TSOutgoingMessage.fetchOutgoingMessageViaCache( + uniqueId: messages[0].uniqueId, + transaction: tx, + )) + try jobQueue.startDeferredJob( + deferredJob, + message: PreparedOutgoingMessage.preprepared( + forResending: message, + messageRowId: try XCTUnwrap(message.sqliteRowId), + ), + transaction: tx, + ) + } + for promise in promises { + try await promise.awaitable() + } + XCTAssertEqual(fakeMessageSender.sentMessages.map { $0.uniqueId }, messages.map { $0.uniqueId }) + } + func test_sendingInvisibleMessage() async throws { let jobQueue = MessageSenderJobQueue(appReadiness: AppReadinessMock()) fakeMessageSender.stubbedFailingErrors = [nil] @@ -185,3 +243,18 @@ private extension MessageSenderJobRecord { return Self.anyFetch(uniqueId: uniqueId, transaction: transaction) } } + +private extension MessageSenderJobQueueTest { + func messageSenderJobMessageIds() -> [String] { + SSKEnvironment.shared.databaseStorageRef.read { tx in + MessageSenderJobRecord.anyFetchAll(transaction: tx) + .sorted { ($0.id ?? 0) < ($1.id ?? 0) } + .compactMap { jobRecord -> String? in + guard case .persisted(let messageId, _) = jobRecord.messageType else { + return nil + } + return messageId + } + } + } +} diff --git a/SignalUI/LinkPreview/LinkPreviewFetchState.swift b/SignalUI/LinkPreview/LinkPreviewFetchState.swift index 04140d4ab2..99e6ac0c0e 100644 --- a/SignalUI/LinkPreview/LinkPreviewFetchState.swift +++ b/SignalUI/LinkPreview/LinkPreviewFetchState.swift @@ -56,6 +56,7 @@ public class LinkPreviewFetchState { } private var fetchTask: Task? + private var fetchDraftTask: Task? /// Invoked when `currentState` is updated. public var onStateChange: (() -> Void)? @@ -86,6 +87,20 @@ public class LinkPreviewFetchState { } } + public func consumeLinkPreviewDraftForSendingTask() -> Task? { + switch currentState { + case .none, .failed: + return nil + case .loaded(let linkPreviewDraft): + return Task { linkPreviewDraft } + case .loading: + let fetchDraftTask = self.fetchDraftTask + self.fetchTask = nil + self.fetchDraftTask = nil + return fetchDraftTask + } + } + // MARK: - Fetching /// Updates the user-provided text that may contain a URL. @@ -117,6 +132,8 @@ public class LinkPreviewFetchState { self.fetchTask?.cancel() self.fetchTask = nil + self.fetchDraftTask?.cancel() + self.fetchDraftTask = nil guard let proposedUrl else { _currentState = (.none, nil) @@ -125,22 +142,28 @@ public class LinkPreviewFetchState { _currentState = (.loading, proposedUrl) - self.fetchTask = Task { @MainActor [weak self, linkPreviewFetcher] in + let fetchDraftTask: Task = Task { @MainActor [weak self, linkPreviewFetcher] in do { let linkPreviewDraft = try await linkPreviewFetcher.fetchLinkPreview(for: proposedUrl) - guard let self else { return } + guard let self else { return linkPreviewDraft } // Obsolete callback. - guard self.currentUrl == proposedUrl else { return } + guard self.currentUrl == proposedUrl else { return linkPreviewDraft } self._currentState = (.loaded(linkPreviewDraft), proposedUrl) + return linkPreviewDraft } catch { - guard let self else { return } + guard let self else { return nil } // Obsolete callback. - guard self.currentUrl == proposedUrl else { return } + guard self.currentUrl == proposedUrl else { return nil } self._currentState = (.failed(error), proposedUrl) + return nil } } + self.fetchDraftTask = fetchDraftTask + self.fetchTask = Task { + _ = await fetchDraftTask.value + } return self.fetchTask } diff --git a/SignalUI/LinkPreview/LinkPreviewFetchStateTest.swift b/SignalUI/LinkPreview/LinkPreviewFetchStateTest.swift index 36eb13833d..85f8dadc0d 100644 --- a/SignalUI/LinkPreview/LinkPreviewFetchStateTest.swift +++ b/SignalUI/LinkPreview/LinkPreviewFetchStateTest.swift @@ -165,6 +165,46 @@ class LinkPreviewFetchStateTest: XCTestCase { XCTAssertEqual(mockLinkPreviewFetcher.fetchedURLs, [validURL]) } + func testLinkPreviewDraftForSendingTaskSurvivesClearingCurrentText() async throws { + let linkPreviewFetchState = self.linkPreviewFetchState() + + let validURL = try XCTUnwrap(URL(string: "https://signal.org")) + let pendingFetchState = AtomicValue(PendingFetchState(expectedCount: 1), lock: .init()) + mockLinkPreviewFetcher.fetchLinkPreviewBlock = { fetchedURL in + return try await withCheckedThrowingContinuation { continuation in + pendingFetchState.update { + $0.deferredBlocks.append { + continuation.resume(returning: OWSLinkPreviewDraft(url: fetchedURL, title: "Website Title", isForwarded: false)) + } + $0.resolveIfReady() + } + } + } + + let updateTask = linkPreviewFetchState.update(.init(text: "https://signal.org", ranges: .empty)) + XCTAssert(linkPreviewFetchState.currentState.isLoading) + XCTAssertNil(linkPreviewFetchState.linkPreviewDraftIfLoaded) + + let linkPreviewDraftForSendingTask = try XCTUnwrap( + linkPreviewFetchState.consumeLinkPreviewDraftForSendingTask(), + ) + + linkPreviewFetchState.update(.init(text: "", ranges: .empty)) + XCTAssert(linkPreviewFetchState.currentState.isNone) + XCTAssertNil(linkPreviewFetchState.currentUrl) + + pendingFetchState.update { + $0.isReady = true + $0.resolveIfReady() + } + + await updateTask?.value + let linkPreviewDraft = await linkPreviewDraftForSendingTask.value + + XCTAssertEqual(linkPreviewDraft?.url, validURL) + XCTAssertEqual(mockLinkPreviewFetcher.fetchedURLs, [validURL]) + } + func testUpdateObsolete() async throws { let linkPreviewFetchState = self.linkPreviewFetchState() diff --git a/SignalUI/Sending/ThreadUtil+SignalUI.swift b/SignalUI/Sending/ThreadUtil+SignalUI.swift index 27a86cd716..b02084f9cc 100644 --- a/SignalUI/Sending/ThreadUtil+SignalUI.swift +++ b/SignalUI/Sending/ThreadUtil+SignalUI.swift @@ -16,12 +16,14 @@ extension ThreadUtil { thread: TSThread, quotedReplyDraft: DraftQuotedReplyModel? = nil, linkPreviewDraft: OWSLinkPreviewDraft? = nil, + linkPreviewDraftForSending: Task? = nil, persistenceCompletionHandler persistenceCompletion: PersistenceCompletion? = nil, ) { let messageTimestamp = MessageTimestampGenerator.sharedInstance.generateTimestamp() let benchEventId = sendMessageBenchEventStart(messageTimestamp: messageTimestamp) self.enqueueSendQueue.enqueue { + let pendingLinkPreviewDraftTask = linkPreviewDraft == nil ? linkPreviewDraftForSending : nil let unpreparedMessage: UnpreparedOutgoingMessage do { let messageBody = try await messageBody.mapAsync { @@ -56,12 +58,22 @@ extension ThreadUtil { return } - await Self.enqueueMessageSync( - unpreparedMessage, - benchEventId: benchEventId, - thread: thread, - persistenceCompletionHandler: persistenceCompletion, - ) + guard + let deferredSend = await Self.prepareMessageForDeferredSend( + unpreparedMessage, + persistenceCompletionHandler: persistenceCompletion, + ) + else { + return + } + + Self.sendFinalizationQueue.enqueue { + await Self.finalizeDeferredSend( + deferredSend, + pendingLinkPreviewDraftForSending: pendingLinkPreviewDraftTask, + benchEventId: benchEventId, + ) + } } } @@ -70,6 +82,7 @@ extension ThreadUtil { thread: TSThread, quotedReplyEdit: MessageEdits.Edit, linkPreviewDraft: OWSLinkPreviewDraft?, + linkPreviewDraftForSending: Task? = nil, editTarget: TSOutgoingMessage, persistenceCompletionHandler persistenceCompletion: PersistenceCompletion? = nil, ) { @@ -85,7 +98,11 @@ extension ThreadUtil { try await DependenciesBridge.shared.attachmentContentValidator .prepareOversizeTextIfNeeded($0) } ?? nil - let linkPreviewDataSource = try await linkPreviewDraft.mapAsync { + let resolvedLinkPreviewDraft = await Self.resolveLinkPreviewDraftForSending( + linkPreviewDraft: linkPreviewDraft, + linkPreviewDraftForSending: linkPreviewDraftForSending, + ) + let linkPreviewDataSource = try await resolvedLinkPreviewDraft.mapAsync { try await DependenciesBridge.shared.linkPreviewManager.buildDataSource(from: $0) } @@ -129,6 +146,18 @@ extension ThreadUtil { } } + private static func resolveLinkPreviewDraftForSending( + linkPreviewDraft: OWSLinkPreviewDraft?, + linkPreviewDraftForSending: Task?, + ) async -> OWSLinkPreviewDraft? { + guard let linkPreviewDraftForSending else { + return linkPreviewDraft + } + + // Preserve send/edit action order when a current link preview is still resolving. + return await linkPreviewDraftForSending.value + } + /// WARNING: MUST be called on enqueueSendQueue! private class func enqueueMessageSync( _ unpreparedMessage: UnpreparedOutgoingMessage, @@ -166,6 +195,135 @@ extension ThreadUtil { } } + /// WARNING: MUST be called on enqueueSendQueue! + private struct DeferredSend { + let message: UnpreparedOutgoingMessage.DeferredPersistedMessage + let job: MessageSenderJobQueue.DeferredJob + let promise: Promise + } + + /// WARNING: MUST be called on enqueueSendQueue! + private class func prepareMessageForDeferredSend( + _ unpreparedMessage: UnpreparedOutgoingMessage, + persistenceCompletionHandler persistenceCompletion: PersistenceCompletion? = nil, + ) async -> DeferredSend? { + await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { writeTransaction in + do { + let deferredMessage = try unpreparedMessage.preparePersistableMessageForDeferredSend(tx: writeTransaction) + guard + let message = TSOutgoingMessage.fetchOutgoingMessageViaCache( + uniqueId: deferredMessage.messageUniqueId, + transaction: writeTransaction, + ), + message.sqliteRowId == deferredMessage.messageRowId + else { + throw OWSAssertionError("Failed to enqueue deferred message sender job.") + } + let (deferredJob, promise) = SSKEnvironment.shared.messageSenderJobQueueRef.addDeferred( + .promise, + message: PreparedOutgoingMessage.preprepared( + forResending: message, + messageRowId: deferredMessage.messageRowId, + ), + transaction: writeTransaction, + ) + if let persistenceCompletion { + writeTransaction.addSyncCompletion { + Task { @MainActor in + persistenceCompletion() + } + } + } + return DeferredSend(message: deferredMessage, job: deferredJob, promise: promise) + } catch { + owsFailDebug("Failed to prepare deferred message") + return nil + } + } + } + + /// WARNING: MUST be called on sendFinalizationQueue! + private class func finalizeDeferredSend( + _ deferredSend: DeferredSend, + pendingLinkPreviewDraftForSending: Task?, + benchEventId: String, + ) async { + let deferredMessage = deferredSend.message + let linkPreviewDataSource: LinkPreviewDataSource? + do { + let linkPreviewDraft = await pendingLinkPreviewDraftForSending?.value + linkPreviewDataSource = try await linkPreviewDraft.mapAsync { + try await DependenciesBridge.shared.linkPreviewManager.buildDataSource(from: $0) + } + } catch { + owsFailDebug("Failed to build deferred link preview") + await Self.markDeferredMessageAsFailed(deferredSend, error: error) + return + } + + do { + try await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { writeTransaction in + let message: TSOutgoingMessage + if let linkPreviewDataSource { + message = try UnpreparedOutgoingMessage.applyLinkPreviewDataSourceToDeferredMessage( + linkPreviewDataSource, + messageUniqueId: deferredMessage.messageUniqueId, + messageRowId: deferredMessage.messageRowId, + tx: writeTransaction, + ) + } else { + guard + let fetchedMessage = TSOutgoingMessage.fetchOutgoingMessageViaCache( + uniqueId: deferredMessage.messageUniqueId, + transaction: writeTransaction, + ), + fetchedMessage.sqliteRowId == deferredMessage.messageRowId + else { + throw OWSAssertionError("Missing deferred message.") + } + message = fetchedMessage + } + try SSKEnvironment.shared.messageSenderJobQueueRef.startDeferredJob( + deferredSend.job, + message: PreparedOutgoingMessage.preprepared( + forResending: message, + messageRowId: deferredMessage.messageRowId, + ), + transaction: writeTransaction, + ) + } + + _ = deferredSend.promise.done(on: DispatchQueue.global()) { + BenchEventComplete(eventId: benchEventId) + } + } catch { + owsFailDebug("Failed to finalize deferred message") + await Self.markDeferredMessageAsFailed(deferredSend, error: error) + } + } + + private class func markDeferredMessageAsFailed( + _ deferredSend: DeferredSend, + error: any Error, + ) async { + let deferredMessage = deferredSend.message + await SSKEnvironment.shared.databaseStorageRef.awaitableWrite { writeTransaction in + if + let message = TSOutgoingMessage.fetchOutgoingMessageViaCache( + uniqueId: deferredMessage.messageUniqueId, + transaction: writeTransaction, + ) + { + message.updateWithAllSendingRecipientsMarkedAsFailed(error: error, transaction: writeTransaction) + } + SSKEnvironment.shared.messageSenderJobQueueRef.cancelDeferredJob( + deferredSend.job, + error: error, + transaction: writeTransaction, + ) + } + } + private static func sendMessageBenchEventStart(messageTimestamp: UInt64) -> String { let eventId = "sendMessageMarkedAsSent-\(messageTimestamp)" BenchEventStart( diff --git a/SignalUI/ViewControllers/TextApprovalViewController.swift b/SignalUI/ViewControllers/TextApprovalViewController.swift index 20c7666f4f..4564321b76 100644 --- a/SignalUI/ViewControllers/TextApprovalViewController.swift +++ b/SignalUI/ViewControllers/TextApprovalViewController.swift @@ -32,8 +32,18 @@ public class TextApprovalViewController: OWSViewController, BodyRangesTextViewDe private let textView = BodyRangesTextView() private let footerView = ApprovalFooterView() + private var approvalTask: Task? + private var isApproving = false { + didSet { + textView.isEditable = !isApproving + footerView.updateContents() + } + } private var approvalMode: ApprovalMode { + if isApproving { + return .loading + } guard let delegate else { return .send } @@ -55,6 +65,10 @@ public class TextApprovalViewController: OWSViewController, BodyRangesTextViewDe linkPreviewFetchState.onStateChange = { [weak self] in self?.updateLinkPreviewView() } } + deinit { + approvalTask?.cancel() + } + // MARK: - View Lifecycle override public func viewDidLoad() { @@ -73,7 +87,7 @@ public class TextApprovalViewController: OWSViewController, BodyRangesTextViewDe navigationItem.leftBarButtonItem = .cancelButton { [weak self] in guard let self else { return } - self.delegate?.textApprovalDidCancel(self) + self.cancelApproval() } let stackView = UIStackView(arrangedSubviews: [linkPreviewView, textView]) @@ -120,6 +134,13 @@ public class TextApprovalViewController: OWSViewController, BodyRangesTextViewDe footerView.isHidden = false } + private func cancelApproval() { + approvalTask?.cancel() + approvalTask = nil + isApproving = false + delegate?.textApprovalDidCancel(self) + } + override public func viewDidAppear(_ animated: Bool) { super.viewDidAppear(animated) @@ -207,8 +228,31 @@ public class TextApprovalViewController: OWSViewController, BodyRangesTextViewDe extension TextApprovalViewController: ApprovalFooterDelegate { public func approvalFooterDelegateDidRequestProceed(_ approvalFooterView: ApprovalFooterView) { - let linkPreviewDraft = linkPreviewFetchState.linkPreviewDraftIfLoaded - delegate?.textApproval(self, didApproveMessage: textView.messageBodyForSending, linkPreviewDraft: linkPreviewDraft) + guard !isApproving else { + return + } + + let messageBody = textView.messageBodyForSending + if let linkPreviewDraft = linkPreviewFetchState.linkPreviewDraftIfLoaded { + delegate?.textApproval(self, didApproveMessage: messageBody, linkPreviewDraft: linkPreviewDraft) + return + } + + guard let linkPreviewDraftForSendingTask = linkPreviewFetchState.consumeLinkPreviewDraftForSendingTask() else { + delegate?.textApproval(self, didApproveMessage: messageBody, linkPreviewDraft: nil) + return + } + + isApproving = true + approvalTask = Task { @MainActor [weak self] in + let linkPreviewDraft = await linkPreviewDraftForSendingTask.value + guard !Task.isCancelled, let self else { + return + } + self.approvalTask = nil + self.isApproving = false + self.delegate?.textApproval(self, didApproveMessage: messageBody, linkPreviewDraft: linkPreviewDraft) + } } public func approvalMode(_ approvalFooterView: ApprovalFooterView) -> ApprovalMode {