Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/sqlserver-ef.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const string logRndLsnCommand =
from sys.dm_db_log_stats(db_id())
""";
```
<sup><a href='/src/Delta/DeltaExtensions_Sql.cs#L88-L96' title='Snippet source file'>snippet source</a> | <a href='#snippet-SqlServerTimeStampWithServerState' title='Start of snippet'>anchor</a></sup>
<sup><a href='/src/Delta/DeltaExtensions_Sql.cs#L89-L97' title='Snippet source file'>snippet source</a> | <a href='#snippet-SqlServerTimeStampWithServerState' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->


Expand Down
2 changes: 1 addition & 1 deletion docs/sqlserver.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ const string logRndLsnCommand =
from sys.dm_db_log_stats(db_id())
""";
```
<sup><a href='/src/Delta/DeltaExtensions_Sql.cs#L88-L96' title='Snippet source file'>snippet source</a> | <a href='#snippet-SqlServerTimeStampWithServerState' title='Start of snippet'>anchor</a></sup>
<sup><a href='/src/Delta/DeltaExtensions_Sql.cs#L89-L97' title='Snippet source file'>snippet source</a> | <a href='#snippet-SqlServerTimeStampWithServerState' title='Start of snippet'>anchor</a></sup>
<!-- endSnippet -->


Expand Down
30 changes: 16 additions & 14 deletions src/Delta/DeltaExtensions_Sql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,24 @@ public static partial class DeltaExtensions
{
public static async Task<string> GetLastTimeStamp(this DbConnection connection, DbTransaction? transaction = null, Cancel cancel = default)
{
// Cache the Task (not the result) so concurrent callers share a single ResolveQuery call.
// Interlocked.CompareExchange ensures only the first caller's Task wins;
// all others await that same already-in-flight Task.
var resolved = queryTask;
if (resolved is null)
// Serialize ResolveQuery so at most one caller ever runs it.
// A CompareExchange-on-Task approach would still let losing callers
// execute ResolveQuery to completion against their own connection,
// racing the caller's subsequent Execute on the same connection.
var execute = query;
if (execute is null)
{
resolved = ResolveQuery(connection, transaction, cancel);
var original = Interlocked.CompareExchange(ref queryTask, resolved, null);
if (original is not null)
await queryLock.WaitAsync(cancel);
try
{
resolved = original;
execute = query ??= await ResolveQuery(connection, transaction, cancel);
}
finally
{
queryLock.Release();
}
}

// Awaiting an already-completed Task is essentially free — the runtime short-circuits it without
// touching the thread pool or allocating a state machine. same cost as reading a field.
var execute = await resolved;
return await Execute(connection, transaction, execute, cancel);
}

Expand Down Expand Up @@ -143,11 +144,12 @@ static async Task<bool> HasViewServerState(DbCommand command, Cancel cancel = de
return result == 1;
}

static Task<Func<DbCommand, Cancel, Task<string>>>? queryTask;
static Func<DbCommand, Cancel, Task<string>>? query;
static readonly SemaphoreSlim queryLock = new(1, 1);

internal static void Reset()
{
queryTask = null;
query = null;
timeStampCache = null;
}
}
53 changes: 42 additions & 11 deletions src/DeltaTests/ConcurrencyTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,60 @@ public async Task GetLastTimeStamp_ConcurrentAfterReset()
{
await using var database = await LocalDb();

// Reset to null so all concurrent callers enter ResolveQuery
DeltaExtensions.Reset();
// Several iterations: the race between a losing ResolveQuery task
// and the caller's own Execute on the same connection is timing
// dependent, so loop to make reproduction reliable.
for (var iteration = 0; iteration < 20; iteration++)
{
DeltaExtensions.Reset();
await RunConcurrent(database);
}
}

// Open separate connections so concurrent calls don't share a single non-thread-safe SqlConnection
const int concurrency = 20;
static async Task RunConcurrent(SqlDatabase database)
{
// Closed connections force Execute through its Open/Close branch —
// the production scenario (EF manages connection lifetime) and
// the precondition for the race on the static query field.
const int concurrency = 64;
var connections = new SqlConnection[concurrency];
for (var i = 0; i < concurrency; i++)
{
connections[i] = await database.OpenNewConnection();
connections[i] = new(database.ConnectionString);
}

try
{
// Synchronous barrier + dedicated threads so all callers really
// race into ResolveQuery together, rather than being staggered
// by threadpool dispatch.
using var barrier = new ManualResetEventSlim(false);
var tasks = connections
.Select(_ => _.GetLastTimeStamp())
.Select(connection => Task.Factory.StartNew(
async () =>
{
// ReSharper disable once AccessToDisposedClosure
barrier.Wait();
return await connection.GetLastTimeStamp();
},
Cancel.None,
TaskCreationOptions.LongRunning,
TaskScheduler.Default)
.Unwrap())
.ToArray();

var results = await Task.WhenAll(tasks);
await Task.Delay(50);
barrier.Set();

// All calls should return the same timestamp regardless of the race on the static query field
var distinct = results.Distinct().ToList();
That(distinct, Has.Count.EqualTo(1));
IsNotEmpty(distinct[0]);
// Task.WhenAll surfaces any exception from the race; the
// original bug manifested as "The connection is closed" thrown
// by ExecuteReaderAsync after an orphaned ResolveQuery closed
// the same connection mid-query.
var results = await Task.WhenAll(tasks);
foreach (var result in results)
{
IsNotEmpty(result);
}
}
finally
{
Expand Down
Loading