Skip to content

Commit 7c40aa0

Browse files
Fixing replica checkpointing divergence with explicit CommitAOF calls (#1305)
* repro for failure * move test to appropriate file; implement RoleRead lock for recovery, fix replica commits (still have other paths to lock down) * cleanup * extend role prevention changes to all ops on StoreApi; refactor a bit so we can express this with a using; save reusable instance of the locking object, acquire in one place * derp; restore checking the actual role -_- * failing test for if ReplicaSyncTask dies - killing it the same we've seen in practice, which is the replica throwing causing the connection from the primary to be torn down * restore ignored parameter * this works, but I don't love the polling * Revert "this works, but I don't love the polling" This reverts commit 7a05d61. * attempt to resume replication in response to a GOSSIP and the fact that no active replication task exists * throttle re-establishingment attempts; put behind a setting * include new setting in .conf * introduce notion of an upgradeable read lock for role reading, and hold it while resuming replication; prevents a TOCTOU issue with resuming replication from a replica * downgrade logic was jacked, correct * big ol' test for our particular setup (1:1 primary replica, matched settings, out of band commits); example of Replica losing all data if intervention for promotion not timely; allow Replica to come up AS a Replica even if its Primary is down * wait for replica takeover to complete before proceeding in test * add test for Replica comes back up as Replica, even if Primary unreachable * fault on Primary side too, knock out that todo * formatting * on Replicas, wait until we can connect to a Primary to toss any loaded data; punch a config in to allow Replicas to load checkpoint & aof from disk; clean up some new tests for reliability * update defaults.conf with new setting * restore cancellation for test * add tests for new configs * address feedback; wrap this giant list of parameters in a record struct * address feedback; do not block GOSSIP message while resyncing, move that work onto a background task * formatting * address feedback; default values in GarnetServerOptions * kick up KeraLua so Windows Lua bits have CFG * per call discussion; bump version --------- Co-authored-by: Vasileios Zois <[email protected]>
1 parent 162ca67 commit 7c40aa0

35 files changed

+1475
-141
lines changed

Directory.Packages.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
<PackageVersion Include="BenchmarkDotNet.Diagnostics.Windows" Version="0.13.12" />
1010
<PackageVersion Include="CommandLineParser" Version="2.9.1" />
1111
<PackageVersion Include="JsonPath.Net" Version="1.1.6" />
12-
<PackageVersion Include="KeraLua" Version="1.4.4" />
12+
<PackageVersion Include="KeraLua" Version="1.4.6" />
1313
<PackageVersion Include="Microsoft.Identity.Client" Version="4.73.1" />
1414
<PackageVersion Include="NUnit" Version="4.1.0" />
1515
<PackageVersion Include="NUnit3TestAdapter" Version="4.6.0" />

Version.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
<Project>
22
<!-- VersionPrefix property for builds and packages -->
33
<PropertyGroup>
4-
<VersionPrefix>1.0.79</VersionPrefix>
4+
<VersionPrefix>1.0.80</VersionPrefix>
55
</PropertyGroup>
66
</Project>

benchmark/BDN.benchmark/Embedded/GarnetServerEmbedded.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,28 @@ public GarnetServerEmbedded() : base(new IPEndPoint(IPAddress.Loopback, 0), 1 <<
1818
{
1919
}
2020

21+
/// <inheritdoc/>
22+
public override IEnumerable<IMessageConsumer> ActiveConsumers()
23+
{
24+
foreach (var kvp in activeHandlers)
25+
{
26+
var consumer = kvp.Key.Session;
27+
if (consumer != null)
28+
yield return consumer;
29+
}
30+
}
31+
32+
/// <inheritdoc/>
33+
public override IEnumerable<IClusterSession> ActiveClusterSessions()
34+
{
35+
foreach (var kvp in activeHandlers)
36+
{
37+
var consumer = kvp.Key.Session;
38+
if (consumer != null)
39+
yield return ((RespServerSession)consumer).clusterSession;
40+
}
41+
}
42+
2143
public EmbeddedNetworkHandler CreateNetworkHandler(SslClientAuthenticationOptions tlsOptions = null, string remoteEndpointName = null)
2244
{
2345
var networkSender = new EmbeddedNetworkSender();

libs/cluster/Server/ClusterManagerWorkerState.cs

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
using System;
55
using System.Collections.Generic;
6+
using System.Diagnostics;
67
using System.Text;
78
using System.Threading;
89
using Garnet.common;
@@ -141,11 +142,17 @@ public ReadOnlySpan<byte> TryReset(bool soft, int expirySeconds = 60)
141142
/// Try to make this node a replica of node with nodeid
142143
/// </summary>
143144
/// <param name="nodeid"></param>
144-
/// <param name="force">Check if node is clean (i.e. is PRIMARY without any assigned nodes)</param>
145+
/// <param name="force">If false, checks if node is clean (i.e. is PRIMARY without any assigned nodes) before making changes.</param>
146+
/// <param name="upgradeLock">If true, allows for a <see cref="RecoveryStatus.ReadRole"/> read lock to be upgraded to <see cref="RecoveryStatus.ClusterReplicate"/>.</param>
145147
/// <param name="errorMessage">The ASCII encoded error response if the method returned <see langword="false"/>; otherwise <see langword="default"/></param>
146148
/// <param name="logger"></param>
147-
public bool TryAddReplica(string nodeid, bool force, out ReadOnlySpan<byte> errorMessage, ILogger logger = null)
149+
public bool TryAddReplica(string nodeid, bool force, bool upgradeLock, out ReadOnlySpan<byte> errorMessage, ILogger logger = null)
148150
{
151+
Debug.Assert(
152+
!upgradeLock || clusterProvider.replicationManager.currentRecoveryStatus == RecoveryStatus.ReadRole,
153+
"Lock upgrades are only allowed if caller holds a ReadRole lock"
154+
);
155+
149156
errorMessage = default;
150157
while (true)
151158
{
@@ -188,7 +195,7 @@ public bool TryAddReplica(string nodeid, bool force, out ReadOnlySpan<byte> erro
188195

189196
// Transition to recovering state
190197
// Only one caller will succeed in becoming a replica for the provided node-id
191-
if (!clusterProvider.replicationManager.BeginRecovery(RecoveryStatus.ClusterReplicate))
198+
if (!clusterProvider.replicationManager.BeginRecovery(RecoveryStatus.ClusterReplicate, upgradeLock))
192199
{
193200
logger?.LogError($"{nameof(TryAddReplica)}: {{logMessage}}", Encoding.ASCII.GetString(CmdStrings.RESP_ERR_GENERIC_CANNOT_ACQUIRE_RECOVERY_LOCK));
194201
errorMessage = CmdStrings.RESP_ERR_GENERIC_CANNOT_ACQUIRE_RECOVERY_LOCK;
@@ -200,7 +207,14 @@ public bool TryAddReplica(string nodeid, bool force, out ReadOnlySpan<byte> erro
200207
break;
201208

202209
// If we reach here then we failed to update config so we need to suspend recovery and retry to update the config
203-
clusterProvider.replicationManager.EndRecovery(RecoveryStatus.NoRecovery);
210+
if (upgradeLock)
211+
{
212+
clusterProvider.replicationManager.EndRecovery(RecoveryStatus.ReadRole, downgradeLock: true);
213+
}
214+
else
215+
{
216+
clusterProvider.replicationManager.EndRecovery(RecoveryStatus.NoRecovery, downgradeLock: false);
217+
}
204218
}
205219
FlushConfig();
206220
return true;

libs/cluster/Server/ClusterProvider.cs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,14 @@ public void Recover()
8484
replicationManager.Recover();
8585
}
8686

87+
/// <inheritdoc />
88+
public bool PreventRoleChange()
89+
=> replicationManager.BeginRecovery(RecoveryStatus.ReadRole, upgradeLock: false);
90+
91+
/// <inheritdoc />
92+
public void AllowRoleChange()
93+
=> replicationManager.EndRecovery(RecoveryStatus.NoRecovery, downgradeLock: false);
94+
8795
/// <inheritdoc />
8896
public void Start()
8997
{

libs/cluster/Server/Failover/ReplicaFailoverSession.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,7 @@ private bool TakeOverAsPrimary()
155155
try
156156
{
157157
// Make replica syncing unavailable by setting recovery flag
158-
if (!clusterProvider.replicationManager.BeginRecovery(RecoveryStatus.ClusterFailover))
158+
if (!clusterProvider.replicationManager.BeginRecovery(RecoveryStatus.ClusterFailover, upgradeLock: false))
159159
{
160160
logger?.LogWarning($"{nameof(TakeOverAsPrimary)}: {{logMessage}}", Encoding.ASCII.GetString(CmdStrings.RESP_ERR_GENERIC_CANNOT_ACQUIRE_RECOVERY_LOCK));
161161
return false;
@@ -181,7 +181,7 @@ private bool TakeOverAsPrimary()
181181
finally
182182
{
183183
// Disable recovering as now this node has become a primary or failed in its attempt earlier
184-
if (acquiredLock) clusterProvider.replicationManager.EndRecovery(RecoveryStatus.NoRecovery);
184+
if (acquiredLock) clusterProvider.replicationManager.EndRecovery(RecoveryStatus.NoRecovery, downgradeLock: false);
185185
}
186186

187187
return true;

libs/cluster/Server/Replication/PrimaryOps/AofSyncTaskInfo.cs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using System.Threading;
66
using System.Threading.Tasks;
77
using Garnet.client;
8+
using Garnet.common;
89
using Microsoft.Extensions.Logging;
910
using Tsavorite.core;
1011

@@ -69,6 +70,8 @@ public unsafe void Consume(byte* payloadPtr, int payloadLength, long currentAddr
6970
{
7071
try
7172
{
73+
ExceptionInjectionHelper.TriggerException(ExceptionInjectionType.Aof_Sync_Task_Consume);
74+
7275
// logger?.LogInformation("Sending {payloadLength} bytes to {remoteNodeId} at address {currentAddress}-{nextAddress}", payloadLength, remoteNodeId, currentAddress, nextAddress);
7376

7477
// This is called under epoch protection, so we have to wait for appending to complete

libs/cluster/Server/Replication/RecoveryStatus.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,9 @@ public enum RecoveryStatus : byte
3232
/// Replica has recovered the checkpoint after signal from primary
3333
/// </summary>
3434
CheckpointRecoveredAtReplica,
35+
/// <summary>
36+
/// Need to ensure a node does not change its role during a commit or checkpoint
37+
/// </summary>
38+
ReadRole,
3539
}
3640
}

libs/cluster/Server/Replication/ReplicaOps/ReplicaDisklessSync.cs

Lines changed: 27 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
using System.Text;
77
using System.Threading.Tasks;
88
using Garnet.client;
9+
using Garnet.cluster.Server.Replication;
910
using Microsoft.Extensions.Logging;
1011

1112
namespace Garnet.cluster
@@ -15,36 +16,30 @@ internal sealed partial class ReplicationManager : IDisposable
1516
/// <summary>
1617
/// Try to replicate using diskless sync
1718
/// </summary>
18-
/// <param name="session"></param>
19-
/// <param name="nodeId"></param>
20-
/// <param name="background"></param>
21-
/// <param name="force"></param>
22-
/// <param name="tryAddReplica"></param>
23-
/// <param name="errorMessage"></param>
24-
/// <returns></returns>
19+
/// <param name="session">ClusterSession for this connection.</param>
20+
/// <param name="options">Options for the sync.</param>
21+
/// <param name="errorMessage">The ASCII encoded error message if the method returned <see langword="false"/>; otherwise <see langword="default"/></param>
22+
/// <returns>A boolean indicating whether replication initiation was successful.</returns>
2523
public bool TryReplicateDisklessSync(
2624
ClusterSession session,
27-
string nodeId,
28-
bool background,
29-
bool force,
30-
bool tryAddReplica,
25+
ReplicateSyncOptions options,
3126
out ReadOnlySpan<byte> errorMessage)
3227
{
3328
errorMessage = default;
3429

3530
try
3631
{
37-
logger?.LogTrace("CLUSTER REPLICATE {nodeid}", nodeId);
38-
if (!clusterProvider.clusterManager.TryAddReplica(nodeId, force: force, out errorMessage, logger: logger))
32+
logger?.LogTrace("CLUSTER REPLICATE {nodeid}", options.NodeId);
33+
if (options.TryAddReplica && !clusterProvider.clusterManager.TryAddReplica(options.NodeId, options.Force, options.UpgradeLock, out errorMessage, logger: logger))
3934
return false;
4035

4136
// Wait for threads to agree configuration change of this node
4237
session.UnsafeBumpAndWaitForEpochTransition();
43-
if (background)
44-
_ = Task.Run(() => TryBeginReplicaSync());
38+
if (options.Background)
39+
_ = Task.Run(() => TryBeginReplicaSync(options.UpgradeLock));
4540
else
4641
{
47-
var result = TryBeginReplicaSync().Result;
42+
var result = TryBeginReplicaSync(options.UpgradeLock).Result;
4843
if (result != null)
4944
{
5045
errorMessage = Encoding.ASCII.GetBytes(result);
@@ -58,7 +53,7 @@ public bool TryReplicateDisklessSync(
5853
}
5954
return true;
6055

61-
async Task<string> TryBeginReplicaSync()
56+
async Task<string> TryBeginReplicaSync(bool downgradeLock)
6257
{
6358
var disklessSync = clusterProvider.serverOptions.ReplicaDisklessSync;
6459
var disableObjects = clusterProvider.serverOptions.DisableObjects;
@@ -129,12 +124,24 @@ async Task<string> TryBeginReplicaSync()
129124
catch (Exception ex)
130125
{
131126
logger?.LogError(ex, $"{nameof(TryBeginReplicaSync)}");
132-
clusterProvider.clusterManager.TryResetReplica();
127+
128+
if (options.AllowReplicaResetOnFailure)
129+
{
130+
clusterProvider.clusterManager.TryResetReplica();
131+
}
132+
133133
return ex.Message;
134134
}
135135
finally
136136
{
137-
EndRecovery(RecoveryStatus.NoRecovery);
137+
if (downgradeLock)
138+
{
139+
EndRecovery(RecoveryStatus.ReadRole, downgradeLock: true);
140+
}
141+
else
142+
{
143+
EndRecovery(RecoveryStatus.NoRecovery, downgradeLock: false);
144+
}
138145
gcs?.Dispose();
139146
recvCheckpointHandler?.Dispose();
140147
}
@@ -188,7 +195,7 @@ public long ReplicaRecoverDiskless(SyncMetadata primarySyncMetadata, out ReadOnl
188195
finally
189196
{
190197
// Done with recovery at this point
191-
EndRecovery(RecoveryStatus.CheckpointRecoveredAtReplica);
198+
EndRecovery(RecoveryStatus.CheckpointRecoveredAtReplica, downgradeLock: false);
192199
}
193200
}
194201
}

0 commit comments

Comments
 (0)