Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 51 additions & 4 deletions backend/FwLite/FwLiteShared/Projects/LexboxProjectService.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System.Net.Http.Json;
using System.Runtime.CompilerServices;
using FwLiteShared.Auth;
using FwLiteShared.Events;
using FwLiteShared.Sync;
Expand Down Expand Up @@ -212,15 +213,59 @@ public void InvalidateProjectsCache(LexboxServer server)
cache.Remove(CacheKey(server));
}

private static readonly ConditionalWeakTable<HubConnection, HashSet<Guid>> _reconnectProjects = new();

public async Task ListenForProjectChanges(ProjectData projectData, CancellationToken stoppingToken)
{
if (!options.Value.TryGetServer(projectData, out var server)) return;
var lexboxConnection = await StartLexboxProjectChangeListener(server, stoppingToken);
if (lexboxConnection is null) return;
await lexboxConnection.SendAsync("ListenForProjectChanges", projectData.Id, stoppingToken);
var subscribedProjects = _reconnectProjects.GetValue(lexboxConnection, static conn =>
{
var projects = new HashSet<Guid>();
conn.Reconnected += async _ =>
{
Guid[] projectIds;
lock (projects)
{
projectIds = [.. projects];
}
foreach (var projectId in projectIds)
{
try
{
await conn.SendAsync(nameof(IProjectChangeHubServer.ListenForProjectChanges), projectId);
}
catch (Exception)
{
// Ensure one project's failing to reconnect doesn't block others from reconnecting
}
}
};
return projects;
});
lock (subscribedProjects)
{
subscribedProjects.Add(projectData.Id);
}
await lexboxConnection.SendAsync(nameof(IProjectChangeHubServer.ListenForProjectChanges), projectData.Id, stoppingToken);
}

private static string HubConnectionCacheKey(LexboxServer server) => $"LexboxProjectChangeListener|{server.Authority.Authority}";
private class InfiniteRetryPolicy : IRetryPolicy
{
public TimeSpan? NextRetryDelay(RetryContext retryContext)
{
return retryContext.PreviousRetryCount switch
{
0 => TimeSpan.Zero,
1 => TimeSpan.FromSeconds(2),
2 => TimeSpan.FromSeconds(10),
3 => TimeSpan.FromSeconds(30),
_ => TimeSpan.FromSeconds(60),
};
}
}

public async Task<HubConnection?> StartLexboxProjectChangeListener(LexboxServer server,
CancellationToken stoppingToken)
Expand All @@ -241,7 +286,7 @@ public async Task ListenForProjectChanges(ProjectData projectData, CancellationT
connection = new HubConnectionBuilder()
//todo bridge logging to the aspnet logger
.ConfigureLogging(logging => logging.AddConsole())
.WithAutomaticReconnect()
.WithAutomaticReconnect(new InfiniteRetryPolicy())
.WithUrl($"{server.Authority}api/hub/crdt/project-changes",
connectionOptions =>
{
Expand All @@ -256,7 +301,7 @@ public async Task ListenForProjectChanges(ProjectData projectData, CancellationT
.Build();

//it would be cleaner to pass the callback in to this method however it's not supposed to be generic, it should always trigger a sync
connection.On(nameof(IProjectChangeListener.OnProjectUpdated),
connection.On(nameof(IProjectChangeHubClient.OnProjectUpdated),
(Guid projectId, Guid? clientId) =>
{
logger.LogInformation("Received project update for {ProjectId}, triggering sync", projectId);
Expand All @@ -274,7 +319,9 @@ public async Task ListenForProjectChanges(ProjectData projectData, CancellationT
cache.Remove(HubConnectionCacheKey(server));
await connection.DisposeAsync();
};
cache.CreateEntry(HubConnectionCacheKey(server)).SetValue(connection).RegisterPostEvictionCallback(
// ICacheEntry value returned from CreateEntry must be disposed in order to be committed to the cache,
// so do not remove the "using var __" below that appears to be doing nothing.
using var __ = cache.CreateEntry(HubConnectionCacheKey(server)).SetValue(connection).RegisterPostEvictionCallback(
static (key, value, reason, state) =>
{
if (value is HubConnection con)
Expand Down
11 changes: 11 additions & 0 deletions backend/FwLite/MiniLcm/Push/IProjectChangeHub.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace MiniLcm.Push;

public interface IProjectChangeHubServer
{
Task ListenForProjectChanges(Guid projectId);
}

public interface IProjectChangeHubClient
{
Task OnProjectUpdated(Guid projectId, Guid? clientId);
}
6 changes: 0 additions & 6 deletions backend/FwLite/MiniLcm/Push/IProjectChangeListener.cs

This file was deleted.

2 changes: 1 addition & 1 deletion backend/LexBoxApi/Controllers/CrdtController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ namespace LexBoxApi.Controllers;
[ApiExplorerSettings(GroupName = LexBoxKernel.OpenApiPublicDocumentName)]
public class CrdtController(
LexBoxDbContext dbContext,
IHubContext<CrdtProjectChangeHub, IProjectChangeListener> hubContext,
IHubContext<CrdtProjectChangeHub, IProjectChangeHubClient> hubContext,
IPermissionService permissionService,
LoggedInContext loggedInContext,
ProjectService projectService,
Expand Down
3 changes: 1 addition & 2 deletions backend/LexBoxApi/Hub/CrdtProjectChangeHub.cs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
using LexBoxApi.Auth;
using LexCore.ServiceInterfaces;
using Microsoft.AspNetCore.SignalR;
using MiniLcm.Push;

namespace LexBoxApi.Hub;

public class CrdtProjectChangeHub(IPermissionService permissionService) : Hub<IProjectChangeListener>
public class CrdtProjectChangeHub(IPermissionService permissionService) : Hub<IProjectChangeHubClient>, IProjectChangeHubServer
{
public static string ProjectGroup(Guid projectId) => $"project-{projectId}";

Expand Down
Loading