diff --git a/weed/storage/blockvol/replica_apply.go b/weed/storage/blockvol/replica_apply.go index eb3e68024..bf570a417 100644 --- a/weed/storage/blockvol/replica_apply.go +++ b/weed/storage/blockvol/replica_apply.go @@ -58,17 +58,23 @@ func NewReplicaReceiver(vol *BlockVol, dataAddr, ctrlAddr string, advertisedHost if len(advertisedHost) > 0 { advHost = advertisedHost[0] } - // Initialize receivedLSN/flushedLSN from the volume's persisted state. - // This handles the case where a ReplicaReceiver is recreated on a - // volume that already has data (e.g., after process restart or reconnect). - initLSN := uint64(0) + // Initialize from the volume's persisted state on receiver recreation. + // receivedLSN: highest LSN in the WAL (may include unflushed entries). + // flushedLSN: only checkpoint LSN is guaranteed durable (fd.Sync completed + // during flusher cycle). Using nextLSN would overstate durability because + // applyEntry advances nextLSN before barrier fd.Sync. + initReceived := uint64(0) if vol.nextLSN.Load() > 1 { - initLSN = vol.nextLSN.Load() - 1 + initReceived = vol.nextLSN.Load() - 1 + } + initFlushed := uint64(0) + if vol.flusher != nil { + initFlushed = vol.flusher.CheckpointLSN() } r := &ReplicaReceiver{ vol: vol, - receivedLSN: initLSN, - flushedLSN: initLSN, + receivedLSN: initReceived, + flushedLSN: initFlushed, barrierTimeout: defaultBarrierTimeout, advertisedHost: advHost, dataListener: dataLn, diff --git a/weed/storage/blockvol/wal_shipper.go b/weed/storage/blockvol/wal_shipper.go index ef8c2576e..ec0f09e56 100644 --- a/weed/storage/blockvol/wal_shipper.go +++ b/weed/storage/blockvol/wal_shipper.go @@ -353,12 +353,15 @@ func (s *WALShipper) resetConnections() { // doReconnectAndCatchUp runs the full reconnect handshake + catch-up protocol. // On success, transitions to InSync and resets ctrl connection for barrier. func (s *WALShipper) doReconnectAndCatchUp() error { - targetState, _, err := s.reconnectWithHandshake() + targetState, replicaFlushed, err := s.reconnectWithHandshake() switch targetState { case ReplicaInSync: s.markInSync() case ReplicaCatchingUp: - if catchErr := s.runCatchUp(s.replicaFlushedLSN.Load()); catchErr != nil { + // Use the handshake-reported flushedLSN as catch-up start point, + // NOT the shipper's cached value. The replica may have lost progress + // since the shipper last heard from it. + if catchErr := s.runCatchUp(replicaFlushed); catchErr != nil { s.catchupFailures++ if s.catchupFailures >= maxCatchupRetries { s.state.Store(uint32(ReplicaNeedsRebuild))