Skip to content

Commit 3682b3f

Browse files
authored
Reset CTS in ReplicaSyncManager (#1408)
* reset cts in ReplicaSyncManager * fix formatting * make explicit DEBUG for test * use lock to avoid race on dispose * wrap directive with DEBUG * bump version
1 parent c2cfd0b commit 3682b3f

File tree

4 files changed

+74
-7
lines changed

4 files changed

+74
-7
lines changed

Version.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<Project>
22
<!-- VersionPrefix property for builds and packages -->
33
<PropertyGroup>
4-
<VersionPrefix>1.0.84</VersionPrefix>
4+
<VersionPrefix>1.0.85</VersionPrefix>
55
</PropertyGroup>
66
</Project>

libs/cluster/Server/Replication/PrimaryOps/DisklessReplication/ReplicationSyncManager.cs

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ namespace Garnet.cluster
1313
internal sealed class ReplicationSyncManager
1414
{
1515
SingleWriterMultiReaderLock syncInProgress;
16-
readonly CancellationTokenSource cts;
16+
CancellationTokenSource cts;
1717
readonly TimeSpan replicaSyncTimeout;
1818
readonly ILogger logger;
1919

@@ -25,6 +25,8 @@ internal sealed class ReplicationSyncManager
2525

2626
public ClusterProvider ClusterProvider { get; }
2727

28+
SingleWriterMultiReaderLock disposed;
29+
2830
public ReplicationSyncManager(ClusterProvider clusterProvider, ILogger logger = null)
2931
{
3032
GetSessionStore = new ReplicaSyncSessionTaskStore(clusterProvider.storeWrapper, clusterProvider, logger);
@@ -38,8 +40,11 @@ public ReplicationSyncManager(ClusterProvider clusterProvider, ILogger logger =
3840

3941
public void Dispose()
4042
{
41-
cts.Cancel();
42-
cts.Dispose();
43+
// Return if original value is true, hence already disposed
44+
disposed.WriteLock();
45+
cts?.Cancel();
46+
cts?.Dispose();
47+
cts = null;
4348
syncInProgress.WriteLock();
4449
}
4550

@@ -313,6 +318,9 @@ async Task TakeStreamingCheckpoint()
313318
// Wait for stream sync to make some progress
314319
await Task.Delay(delay);
315320

321+
// Trigger exception to test reset cts mechanism
322+
ExceptionInjectionHelper.TriggerException(ExceptionInjectionType.Replication_Diskless_Sync_Reset_Cts);
323+
316324
// Check if checkpoint has completed
317325
if (checkpointTask.IsCompleted)
318326
return await checkpointTask;
@@ -322,17 +330,35 @@ async Task TakeStreamingCheckpoint()
322330

323331
// Throw timeout equals to zero
324332
if (timeout.TotalSeconds <= 0)
325-
{
326-
cts.Cancel();
327333
throw new TimeoutException("Streaming snapshot checkpoint timed out");
328-
}
329334
}
330335
}
331336
catch (Exception ex)
332337
{
333338
logger?.LogError(ex, "{method} faulted", nameof(WaitOrDie));
334339
cts.Cancel();
335340
}
341+
342+
// At this point we failed through a timeout or any other exception
343+
// so try to reset token.
344+
// No race here because the only other cancellation will happen at dispose only
345+
try
346+
{
347+
_ = await checkpointTask;
348+
}
349+
finally
350+
{
351+
var readLock = disposed.TryReadLock();
352+
if (readLock && !cts.TryReset())
353+
{
354+
cts.Dispose();
355+
cts = new();
356+
}
357+
358+
if (readLock)
359+
disposed.ReadUnlock();
360+
}
361+
336362
return (false, default);
337363
}
338364
}

libs/common/ExceptionInjectionType.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,5 +61,9 @@ public enum ExceptionInjectionType
6161
/// Replication InProgress during diskless replica attach sync operation
6262
/// </summary>
6363
Replication_InProgress_During_Diskless_Replica_Attach_Sync,
64+
/// <summary>
65+
/// Replication diskless sync reset cts
66+
/// </summary>
67+
Replication_Diskless_Sync_Reset_Cts
6468
}
6569
}

test/Garnet.test.cluster/ReplicationTests/ClusterReplicationDisklessSyncTests.cs

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33

44
using System;
55
using System.Collections.Generic;
6+
#if DEBUG
7+
using Garnet.common;
8+
#endif
69
using Microsoft.Extensions.Logging;
710
using NUnit.Framework;
811
using NUnit.Framework.Legacy;
@@ -395,5 +398,39 @@ public void ClusterDisklessSyncFailover([Values] bool disableObjects, [Values] b
395398
for (var replica = 1; replica < nodes_count; replica++)
396399
Validate(nOffsets[primary], nOffsets[replica], disableObjects);
397400
}
401+
402+
#if DEBUG
403+
[Test, Order(6)]
404+
[Category("REPLICATION")]
405+
public void ClusterDisklessSyncResetSyncManagerCts()
406+
{
407+
var nodes_count = 2;
408+
var primaryIndex = 0;
409+
var replicaOneIndex = 1;
410+
context.CreateInstances(nodes_count, enableAOF: true, useTLS: useTLS, enableDisklessSync: true, timeout: timeout);
411+
context.CreateConnection(useTLS: useTLS);
412+
413+
_ = context.clusterTestUtils.AddDelSlotsRange(primaryIndex, [(0, 16383)], addslot: true, logger: context.logger);
414+
context.clusterTestUtils.SetConfigEpoch(primaryIndex, primaryIndex + 1, logger: context.logger);
415+
context.clusterTestUtils.SetConfigEpoch(replicaOneIndex, replicaOneIndex + 1, logger: context.logger);
416+
417+
context.clusterTestUtils.Meet(primaryIndex, replicaOneIndex, logger: context.logger);
418+
context.clusterTestUtils.WaitUntilNodeIsKnown(replicaOneIndex, primaryIndex, logger: context.logger);
419+
420+
try
421+
{
422+
ExceptionInjectionHelper.EnableException(ExceptionInjectionType.Replication_Diskless_Sync_Reset_Cts);
423+
var _resp = context.clusterTestUtils.ClusterReplicate(replicaNodeIndex: replicaOneIndex, primaryNodeIndex: primaryIndex, failEx: false, logger: context.logger);
424+
ClassicAssert.AreEqual("Wait for sync task faulted", _resp);
425+
}
426+
finally
427+
{
428+
ExceptionInjectionHelper.DisableException(ExceptionInjectionType.Replication_Diskless_Sync_Reset_Cts);
429+
}
430+
431+
var resp = context.clusterTestUtils.ClusterReplicate(replicaNodeIndex: replicaOneIndex, primaryNodeIndex: primaryIndex, logger: context.logger);
432+
ClassicAssert.AreEqual("OK", resp);
433+
}
434+
#endif
398435
}
399436
}

0 commit comments

Comments
 (0)