Skip to content

Commit 25f10dc

Browse files
authored
Single state machine across main and object stores (microsoft#1077)
* Simpler state machine for checkpointing * cleanup * updates * remove dead code * updates * updates * updates * update * kill code * updates * simplify LightEpoch * move epvs to test * nits * updates * updates * formatting * fix garnet * nit * comments * remove manualLockingActive * update the barrier condition and remove checkpoint version switch barrier flag. * remove INTERMEDIATE state * Remove CPR_SHIFT_DETECTED and LartchDestination.Retry * add black box test for checkpointing version switch state machine remove deprecated white box test * add transaction test * clean the test * cleanup * Refactor the phases of various machines * format * initial commit * remove sessionName * update LightEpoch based on PR comment * fix break * Use session-local isAcquiredLockable as signal for threads to decide whether to spin or operate in PREPARE phase. Moved isAcquiredLockable to Ctx. This commit also removes a race that will re-establish the invariant that no threads are operating in PREPARE while any thread is operating in IN_PROGRESS phase. * address review comments * nit * minor code move * use shared epoch across stores * nit * add unified checkpointing logic to garnet * nit * fix * use correct SMD * nit * fix * nit * fix test as versions are different due to unified ckpt * add comment * remove targetVersion from checkpoint API, versions always progress by 1. * non-working state * updates * nits * fix state machine * fixes * fixes * fix * format * add assert for safe index growth with locks * Reinstating cpr_shift_detected instead of barrier'ing threads on PREPARE. * fixes based on fuzz test * improve SMD test to use timing fuzzing * add test for index grow, fix bugs * updates and clean up, improve the test to be multi-iteration. * fix test * remove isAsync, clarify comment * sigh, fix format. * nit * Update cluster logic to handle { checkpoint_start, (v) and (v+1) records, checkpoint_end } in the AOF. * update comment * address comment * nit * add comment * fix comment * delete checkpoints only after both stores commit check that main and object stores recovered to the same version * recover to least common version. * handle the case where object store was disabled at checkpointing time
1 parent 3ba2785 commit 25f10dc

File tree

64 files changed

+1171
-791
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

64 files changed

+1171
-791
lines changed

libs/cluster/Server/ClusterConfig.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1301,6 +1301,7 @@ public ClusterConfig BumpLocalNodeConfigEpoch()
13011301
/// Check if sender has same local worker epoch as the receiver node and resolve collision.
13021302
/// </summary>
13031303
/// <param name="senderConfig">Incoming configuration object.</param>
1304+
/// <param name="logger"></param>
13041305
/// <returns>ClusterConfig object with updates.</returns>
13051306
public ClusterConfig HandleConfigEpochCollision(ClusterConfig senderConfig, ILogger logger = null)
13061307
{

libs/cluster/Server/ClusterProvider.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,13 @@ public void OnCheckpointInitiated(out long CheckpointCoveredAofAddress)
201201
{
202202
Debug.Assert(serverOptions.EnableCluster);
203203
if (serverOptions.EnableAOF && clusterManager.CurrentConfig.LocalNodeRole == NodeRole.REPLICA)
204-
CheckpointCoveredAofAddress = replicationManager.ReplicationOffset;
204+
{
205+
// When the replica takes a checkpoint on encountering the checkpoint end marker, it needs to truncate the AOF only
206+
// until the checkpoint start marker. Otherwise, we will be left with an AOF that starts at the checkpoint end marker.
207+
// ReplicationCheckpointStartOffset is set by { ReplicaReplayTask.Consume -> AofProcessor.ProcessAofRecordInternal } when
208+
// it encounters the checkpoint start marker.
209+
CheckpointCoveredAofAddress = replicationManager.ReplicationCheckpointStartOffset;
210+
}
205211
else
206212
CheckpointCoveredAofAddress = storeWrapper.appendOnlyFile.TailAddress;
207213

libs/cluster/Server/Replication/PrimaryOps/DisklessReplication/ReplicationSnapshotIterator.cs

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -194,10 +194,6 @@ public void OnStop(bool completed, long numberOfRecords, bool isMainStore, long
194194
// Wait for flush and response to complete
195195
replicationSyncManager.WaitForFlush().GetAwaiter().GetResult();
196196

197-
// Enqueue commit end marker
198-
var entryType = isMainStore ? AofEntryType.MainStoreStreamingCheckpointEndCommit : AofEntryType.ObjectStoreStreamingCheckpointEndCommit;
199-
replicationSyncManager.ClusterProvider.storeWrapper.EnqueueCommit(entryType, targetVersion);
200-
201197
logger?.LogTrace("{OnStop} {store} {numberOfRecords} {targetVersion}",
202198
nameof(OnStop), isMainStore ? "MAIN STORE" : "OBJECT STORE", numberOfRecords, targetVersion);
203199

libs/cluster/Server/Replication/ReplicaOps/ReplicaReplayTask.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,11 @@ public unsafe void Consume(byte* record, int recordLength, long currentAddress,
6060
var payloadLength = storeWrapper.appendOnlyFile.UnsafeGetLength(ptr);
6161
if (payloadLength > 0)
6262
{
63-
aofProcessor.ProcessAofRecordInternal(ptr + entryLength, payloadLength, true);
63+
aofProcessor.ProcessAofRecordInternal(ptr + entryLength, payloadLength, true, out var isCheckpointStart);
64+
// Encountered checkpoint start marker, log the ReplicationCheckpointStartOffset so we know the correct AOF truncation
65+
// point when we take a checkpoint at the checkpoint end marker
66+
if (isCheckpointStart)
67+
ReplicationCheckpointStartOffset = ReplicationOffset;
6468
entryLength += TsavoriteLog.UnsafeAlign(payloadLength);
6569
}
6670
else if (payloadLength < 0)

libs/cluster/Server/Replication/ReplicationLogCheckpointManager.cs

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,14 +25,16 @@ internal sealed class ReplicationLogCheckpointManager(
2525
public string RecoveredReplicationId = string.Empty;
2626

2727
readonly bool isMainStore = isMainStore;
28-
public Action<bool, long, long> checkpointVersionShift;
28+
public Action<bool, long, long, bool> checkpointVersionShiftStart;
29+
public Action<bool, long, long, bool> checkpointVersionShiftEnd;
2930

3031
readonly bool safelyRemoveOutdated = removeOutdated;
3132

32-
public override void CheckpointVersionShift(long oldVersion, long newVersion)
33-
{
34-
checkpointVersionShift?.Invoke(isMainStore, oldVersion, newVersion);
35-
}
33+
public override void CheckpointVersionShiftStart(long oldVersion, long newVersion, bool isStreaming)
34+
=> checkpointVersionShiftStart?.Invoke(isMainStore, oldVersion, newVersion, isStreaming);
35+
36+
public override void CheckpointVersionShiftEnd(long oldVersion, long newVersion, bool isStreaming)
37+
=> checkpointVersionShiftEnd?.Invoke(isMainStore, oldVersion, newVersion, isStreaming);
3638

3739
public void DeleteLogCheckpoint(Guid logToken)
3840
=> deviceFactory.Delete(checkpointNamingScheme.LogCheckpointBase(logToken));

libs/cluster/Server/Replication/ReplicationManager.cs

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,12 @@ public long ReplicationOffset
5353
set { replicationOffset = value; }
5454
}
5555

56+
/// <summary>
57+
/// Replication offset corresponding to the checkpoint start marker. We will truncate only to this point after taking a checkpoint (the checkpoint
58+
/// is taken only when we encounter a checkpoint end marker).
59+
/// </summary>
60+
public long ReplicationCheckpointStartOffset;
61+
5662
/// <summary>
5763
/// Replication offset until which AOF address is valid for old primary if failover has occurred
5864
/// </summary>
@@ -112,9 +118,13 @@ public ReplicationManager(ClusterProvider clusterProvider, ILogger logger = null
112118
ReplicationOffset = 0;
113119

114120
// Set the appendOnlyFile field for all stores
115-
clusterProvider.GetReplicationLogCheckpointManager(StoreType.Main).checkpointVersionShift = CheckpointVersionShift;
121+
clusterProvider.GetReplicationLogCheckpointManager(StoreType.Main).checkpointVersionShiftStart = CheckpointVersionShiftStart;
122+
clusterProvider.GetReplicationLogCheckpointManager(StoreType.Main).checkpointVersionShiftEnd = CheckpointVersionShiftEnd;
116123
if (storeWrapper.objectStore != null)
117-
clusterProvider.GetReplicationLogCheckpointManager(StoreType.Object).checkpointVersionShift = CheckpointVersionShift;
124+
{
125+
clusterProvider.GetReplicationLogCheckpointManager(StoreType.Object).checkpointVersionShiftStart = CheckpointVersionShiftStart;
126+
clusterProvider.GetReplicationLogCheckpointManager(StoreType.Object).checkpointVersionShiftEnd = CheckpointVersionShiftEnd;
127+
}
118128

119129
// If this node starts as replica, it cannot serve requests until it is connected to primary
120130
if (clusterProvider.clusterManager.CurrentConfig.LocalNodeRole == NodeRole.REPLICA && clusterProvider.serverOptions.Recover && !StartRecovery(RecoveryStatus.InitializeRecover))
@@ -157,14 +167,44 @@ public ReplicationManager(ClusterProvider clusterProvider, ILogger logger = null
157167

158168
public string GetBufferPoolStats() => networkPool.GetStats();
159169

160-
void CheckpointVersionShift(bool isMainStore, long oldVersion, long newVersion)
170+
void CheckpointVersionShiftStart(bool isMainStore, long oldVersion, long newVersion, bool isStreaming)
171+
{
172+
if (clusterProvider.clusterManager.CurrentConfig.LocalNodeRole == NodeRole.REPLICA)
173+
return;
174+
175+
if (isStreaming)
176+
{
177+
if (isMainStore)
178+
storeWrapper.EnqueueCommit(AofEntryType.MainStoreStreamingCheckpointStartCommit, newVersion);
179+
else
180+
storeWrapper.EnqueueCommit(AofEntryType.ObjectStoreStreamingCheckpointStartCommit, newVersion);
181+
}
182+
else
183+
{
184+
// We enqueue a single checkpoint start marker, since we have unified checkpointing
185+
if (isMainStore)
186+
storeWrapper.EnqueueCommit(AofEntryType.CheckpointStartCommit, newVersion);
187+
}
188+
}
189+
190+
void CheckpointVersionShiftEnd(bool isMainStore, long oldVersion, long newVersion, bool isStreaming)
161191
{
162192
if (clusterProvider.clusterManager.CurrentConfig.LocalNodeRole == NodeRole.REPLICA)
163193
return;
164-
var entryType = clusterProvider.serverOptions.ReplicaDisklessSync ?
165-
(isMainStore ? AofEntryType.MainStoreStreamingCheckpointStartCommit : AofEntryType.ObjectStoreStreamingCheckpointStartCommit) :
166-
(isMainStore ? AofEntryType.MainStoreCheckpointStartCommit : AofEntryType.ObjectStoreCheckpointStartCommit);
167-
storeWrapper.EnqueueCommit(entryType, newVersion);
194+
195+
if (isStreaming)
196+
{
197+
if (isMainStore)
198+
storeWrapper.EnqueueCommit(AofEntryType.MainStoreStreamingCheckpointEndCommit, newVersion);
199+
else
200+
storeWrapper.EnqueueCommit(AofEntryType.ObjectStoreStreamingCheckpointEndCommit, newVersion);
201+
}
202+
else
203+
{
204+
// We enqueue a single checkpoint end marker, since we have unified checkpointing
205+
if (isMainStore)
206+
storeWrapper.EnqueueCommit(AofEntryType.CheckpointEndCommit, newVersion);
207+
}
168208
}
169209

170210
/// <summary>

libs/host/GarnetServer.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,7 @@ private void InitializeServer()
227227
if (!setMax && !ThreadPool.SetMaxThreads(maxThreads, maxCPThreads))
228228
throw new Exception($"Unable to call ThreadPool.SetMaxThreads with {maxThreads}, {maxCPThreads}");
229229

230+
opts.Initialize(loggerFactory);
230231
CreateMainStore(clusterFactory, out var checkpointDir);
231232
CreateObjectStore(clusterFactory, customCommandManager, checkpointDir, out var objectStoreSizeTracker);
232233

@@ -324,7 +325,7 @@ private void CreateObjectStore(IClusterFactory clusterFactory, CustomCommandMana
324325
objectStoreSizeTracker = null;
325326
if (!opts.DisableObjects)
326327
{
327-
objKvSettings = opts.GetObjectStoreSettings(this.loggerFactory?.CreateLogger("TsavoriteKV [obj]"),
328+
objKvSettings = opts.GetObjectStoreSettings(loggerFactory,
328329
out var objHeapMemorySize, out var objReadCacheHeapMemorySize);
329330

330331
// Run checkpoint on its own thread to control p99

libs/server/AOF/AofEntryType.cs

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -42,41 +42,34 @@ public enum AofEntryType : byte
4242
/// </summary>
4343
TxnAbort = 0x22,
4444
/// <summary>
45-
/// Checkpoint for main store start
45+
/// Checkpoint start marker for unified checkpoint
4646
/// </summary>
47-
MainStoreCheckpointStartCommit = 0x30,
47+
CheckpointStartCommit = 0x30,
4848
/// <summary>
49-
/// Checkpoint for object store start
49+
/// Checkpoint end marker for unified checkpoint
5050
/// </summary>
51-
ObjectStoreCheckpointStartCommit = 0x31,
52-
/// <summary>
53-
/// Checkpoint for main store end
54-
/// </summary>
55-
MainStoreCheckpointEndCommit = 0x32,
51+
CheckpointEndCommit = 0x32,
5652
/// <summary>
57-
/// Checkpoint for object store end
58-
/// </summary>
59-
ObjectStoreCheckpointEndCommit = 0x33,
60-
/// <summary>
61-
/// Streaming checkpoint for main store start
53+
/// Streaming checkpoint start marker for main store
6254
/// </summary>
6355
MainStoreStreamingCheckpointStartCommit = 0x40,
6456
/// <summary>
65-
/// Streaming checkpoint for object store start
57+
/// Streaming checkpoint start marker for object store
6658
/// </summary>
6759
ObjectStoreStreamingCheckpointStartCommit = 0x41,
6860
/// <summary>
69-
/// Streaming checkpoint for main store end
61+
/// Streaming checkpoint end marker for main store
7062
/// </summary>
7163
MainStoreStreamingCheckpointEndCommit = 0x42,
7264
/// <summary>
73-
/// Streaming checkpoint for object store end
65+
/// Streaming checkpoint end marker for object store
7466
/// </summary>
7567
ObjectStoreStreamingCheckpointEndCommit = 0x43,
7668
/// <summary>
7769
/// StoredProcedure
7870
/// </summary>
7971
StoredProcedure = 0x50,
72+
8073
/// <summary>
8174
/// Flush all
8275
/// </summary>
@@ -85,6 +78,17 @@ public enum AofEntryType : byte
8578
/// Flush db
8679
/// </summary>
8780
FlushDb = 0x61,
81+
82+
#region Deprecated markers
83+
/// <summary>
84+
/// Deprecated with unified checkpointing: Checkpoint for object store start
85+
/// </summary>
86+
ObjectStoreCheckpointStartCommit = 0x31,
87+
/// <summary>
88+
/// Deprecated with unified checkpointing: Checkpoint for object store end
89+
/// </summary>
90+
ObjectStoreCheckpointEndCommit = 0x33,
91+
#endregion
8892
}
8993

9094
internal enum AofStoreType : byte

libs/server/AOF/AofHeader.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ struct AofHeader
1212
// * Layout, size, contents of this struct
1313
// * Any of the AofEntryType or AofStoreType enums' existing value mappings
1414
// * SpanByte format or header
15-
const byte AofHeaderVersion = 1;
15+
const byte AofHeaderVersion = 2;
1616

1717
/// <summary>
1818
/// Version of AOF

0 commit comments

Comments
 (0)