Browse Source

fix: use handshake-reported flushedLSN for catch-up, fix receiver init

doReconnectAndCatchUp() now uses the replicaFlushedLSN returned by
the reconnect handshake as the catch-up start point, not the
shipper's stale cached value. The replica may have less durable
progress than the shipper last knew.

ReplicaReceiver initialization: flushedLSN now set from the
volume's checkpoint LSN (durable by definition), not nextLSN
(which includes unflushed entries). receivedLSN still uses
nextLSN-1 since those entries are in the WAL buffer even if
not yet synced.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
Ping Qiu 5 days ago
parent
commit
4429f2b8d2
  1. 20
      weed/storage/blockvol/replica_apply.go
  2. 7
      weed/storage/blockvol/wal_shipper.go

20
weed/storage/blockvol/replica_apply.go

@ -58,17 +58,23 @@ func NewReplicaReceiver(vol *BlockVol, dataAddr, ctrlAddr string, advertisedHost
if len(advertisedHost) > 0 { if len(advertisedHost) > 0 {
advHost = advertisedHost[0] advHost = advertisedHost[0]
} }
// Initialize receivedLSN/flushedLSN from the volume's persisted state.
// This handles the case where a ReplicaReceiver is recreated on a
// volume that already has data (e.g., after process restart or reconnect).
initLSN := uint64(0)
// Initialize from the volume's persisted state on receiver recreation.
// receivedLSN: highest LSN in the WAL (may include unflushed entries).
// flushedLSN: only checkpoint LSN is guaranteed durable (fd.Sync completed
// during flusher cycle). Using nextLSN would overstate durability because
// applyEntry advances nextLSN before barrier fd.Sync.
initReceived := uint64(0)
if vol.nextLSN.Load() > 1 { if vol.nextLSN.Load() > 1 {
initLSN = vol.nextLSN.Load() - 1
initReceived = vol.nextLSN.Load() - 1
}
initFlushed := uint64(0)
if vol.flusher != nil {
initFlushed = vol.flusher.CheckpointLSN()
} }
r := &ReplicaReceiver{ r := &ReplicaReceiver{
vol: vol, vol: vol,
receivedLSN: initLSN,
flushedLSN: initLSN,
receivedLSN: initReceived,
flushedLSN: initFlushed,
barrierTimeout: defaultBarrierTimeout, barrierTimeout: defaultBarrierTimeout,
advertisedHost: advHost, advertisedHost: advHost,
dataListener: dataLn, dataListener: dataLn,

7
weed/storage/blockvol/wal_shipper.go

@ -353,12 +353,15 @@ func (s *WALShipper) resetConnections() {
// doReconnectAndCatchUp runs the full reconnect handshake + catch-up protocol. // doReconnectAndCatchUp runs the full reconnect handshake + catch-up protocol.
// On success, transitions to InSync and resets ctrl connection for barrier. // On success, transitions to InSync and resets ctrl connection for barrier.
func (s *WALShipper) doReconnectAndCatchUp() error { func (s *WALShipper) doReconnectAndCatchUp() error {
targetState, _, err := s.reconnectWithHandshake()
targetState, replicaFlushed, err := s.reconnectWithHandshake()
switch targetState { switch targetState {
case ReplicaInSync: case ReplicaInSync:
s.markInSync() s.markInSync()
case ReplicaCatchingUp: case ReplicaCatchingUp:
if catchErr := s.runCatchUp(s.replicaFlushedLSN.Load()); catchErr != nil {
// Use the handshake-reported flushedLSN as catch-up start point,
// NOT the shipper's cached value. The replica may have lost progress
// since the shipper last heard from it.
if catchErr := s.runCatchUp(replicaFlushed); catchErr != nil {
s.catchupFailures++ s.catchupFailures++
if s.catchupFailures >= maxCatchupRetries { if s.catchupFailures >= maxCatchupRetries {
s.state.Store(uint32(ReplicaNeedsRebuild)) s.state.Store(uint32(ReplicaNeedsRebuild))

Loading…
Cancel
Save