Browse Source

feat: durable progress truth — replicaFlushedLSN in barrier (CP13-3)

Barrier response extended from 1-byte status to 9-byte payload
carrying the replica's durable WAL progress (FlushedLSN). Updated
only after successful fd.Sync(), never on receive/append/send.

Replica side: new flushedLSN field on ReplicaReceiver, advanced
only in handleBarrier after proven contiguous receipt + sync.
max() guard prevents regression.

Shipper side: new replicaFlushedLSN (authoritative) replacing
ShippedLSN (diagnostic only). Monotonic CAS update from barrier
response. hasFlushedProgress flag tracks whether replica supports
the extended protocol.

ShipperGroup: MinReplicaFlushedLSN() returns (uint64, bool) —
minimum across shippers with known progress. (0, false) for empty
groups or legacy replicas.

Backward compat: 1-byte legacy responses decoded as FlushedLSN=0.
Legacy replicas explicitly excluded from sync_all correctness.

7 new tests: roundtrip, backward compat, flush-only-after-sync,
not-on-receive, shipper update, monotonicity, group minimum.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
Ping Qiu 1 week ago
parent
commit
499e244b8e
  1. 26
      weed/storage/blockvol/repl_proto.go
  2. 9
      weed/storage/blockvol/replica_apply.go
  3. 13
      weed/storage/blockvol/replica_barrier.go
  4. 24
      weed/storage/blockvol/shipper_group.go
  5. 1440
      weed/storage/blockvol/sync_all_protocol_test.go
  6. 49
      weed/storage/blockvol/wal_shipper.go

26
weed/storage/blockvol/repl_proto.go

@ -34,8 +34,32 @@ type BarrierRequest struct {
}
// BarrierResponse is the replica's reply to a barrier request.
// Wire format: [1B status][8B flushedLSN] = 9 bytes.
// Legacy replicas send only 1 byte (FlushedLSN defaults to 0).
type BarrierResponse struct {
Status byte
Status byte
FlushedLSN uint64 // replica's durable WAL progress after this barrier
}
// EncodeBarrierResponse serializes a BarrierResponse (1+8 = 9 bytes).
func EncodeBarrierResponse(resp BarrierResponse) []byte {
buf := make([]byte, 9)
buf[0] = resp.Status
binary.BigEndian.PutUint64(buf[1:9], resp.FlushedLSN)
return buf
}
// DecodeBarrierResponse deserializes a BarrierResponse.
// Handles both 9-byte (new) and 1-byte (legacy) responses.
func DecodeBarrierResponse(buf []byte) BarrierResponse {
if len(buf) < 1 {
return BarrierResponse{}
}
resp := BarrierResponse{Status: buf[0]}
if len(buf) >= 9 {
resp.FlushedLSN = binary.BigEndian.Uint64(buf[1:9])
}
return resp
}
// Frame header: [1B type][4B payload_len].

9
weed/storage/blockvol/replica_apply.go

@ -24,6 +24,7 @@ type ReplicaReceiver struct {
mu sync.Mutex
receivedLSN uint64
flushedLSN uint64 // highest LSN durably persisted (fd.Sync completed); updated only in handleBarrier
cond *sync.Cond
connMu sync.Mutex // protects activeConns
@ -302,6 +303,14 @@ func (r *ReplicaReceiver) ReceivedLSN() uint64 {
return r.receivedLSN
}
// FlushedLSN returns the highest LSN durably persisted on this replica
// (after successful WAL fd.Sync). Updated only by handleBarrier.
func (r *ReplicaReceiver) FlushedLSN() uint64 {
r.mu.Lock()
defer r.mu.Unlock()
return r.flushedLSN
}
// DataAddr returns the data listener's canonical address (ip:port).
// Wildcard listener addresses are resolved using the advertised host
// or outbound-IP fallback.

13
weed/storage/blockvol/replica_barrier.go

@ -40,7 +40,7 @@ func (r *ReplicaReceiver) handleControlConn(conn net.Conn) {
resp := r.handleBarrier(req)
respPayload := []byte{resp.Status}
respPayload := EncodeBarrierResponse(resp)
if err := WriteFrame(conn, MsgBarrierResp, respPayload); err != nil {
log.Printf("replica: write barrier response: %v", err)
return
@ -97,5 +97,14 @@ func (r *ReplicaReceiver) handleBarrier(req BarrierRequest) BarrierResponse {
return BarrierResponse{Status: BarrierFsyncFailed}
}
return BarrierResponse{Status: BarrierOK}
// Advance durable progress. Only after fd.Sync() succeeds and contiguous
// receipt through req.LSN has been proven (step 2 above).
r.mu.Lock()
if req.LSN > r.flushedLSN {
r.flushedLSN = req.LSN
}
flushed := r.flushedLSN
r.mu.Unlock()
return BarrierResponse{Status: BarrierOK, FlushedLSN: flushed}
}

24
weed/storage/blockvol/shipper_group.go

@ -103,6 +103,30 @@ func (sg *ShipperGroup) Len() int {
return len(sg.shippers)
}
// MinReplicaFlushedLSN returns the minimum replicaFlushedLSN across all
// shippers that have reported valid progress (HasFlushedProgress == true).
// The bool return indicates whether any shipper has known progress.
// Returns (0, false) for empty groups or groups where no shipper has
// received a valid FlushedLSN response yet.
// Used by WAL retention (CP13-6) to gate WAL reclaim.
func (sg *ShipperGroup) MinReplicaFlushedLSN() (uint64, bool) {
sg.mu.RLock()
defer sg.mu.RUnlock()
var min uint64
found := false
for _, s := range sg.shippers {
if !s.HasFlushedProgress() {
continue
}
lsn := s.ReplicaFlushedLSN()
if !found || lsn < min {
min = lsn
found = true
}
}
return min, found
}
// Shipper returns the shipper at index i. For internal/test use.
func (sg *ShipperGroup) Shipper(i int) *WALShipper {
sg.mu.RLock()

1440
weed/storage/blockvol/sync_all_protocol_test.go
File diff suppressed because it is too large
View File

49
weed/storage/blockvol/wal_shipper.go

@ -31,9 +31,11 @@ type WALShipper struct {
ctrlMu sync.Mutex // protects ctrlConn
ctrlConn net.Conn
shippedLSN atomic.Uint64
degraded atomic.Bool
stopped atomic.Bool
shippedLSN atomic.Uint64 // diagnostic: highest LSN sent to TCP socket
replicaFlushedLSN atomic.Uint64 // authoritative: highest LSN durably persisted on replica
hasFlushedProgress atomic.Bool // true once replica returns a valid (non-zero) FlushedLSN
degraded atomic.Bool
stopped atomic.Bool
}
// NewWALShipper creates a WAL shipper. Connections are established lazily on
@ -53,7 +55,8 @@ func NewWALShipper(dataAddr, controlAddr string, epochFn func() uint64, metrics
}
// Ship sends a WAL entry to the replica over the data channel.
// On write error, the shipper enters degraded mode permanently.
// On write error, the shipper enters degraded mode. Recovery requires
// the full reconnect protocol. See design/sync-all-reconnect-protocol.md.
func (s *WALShipper) Ship(entry *WALEntry) error {
if s.stopped.Load() || s.degraded.Load() {
return nil
@ -98,7 +101,9 @@ func (s *WALShipper) Ship(entry *WALEntry) error {
// Barrier sends a barrier request on the control channel and waits for the
// replica to confirm durability up to lsnMax. Returns ErrReplicaDegraded if
// the shipper is in degraded mode.
// the shipper is in degraded mode. Reconnection requires the full reconnect
// protocol (ResumeShipReq handshake + WAL catch-up), not just TCP retry.
// See design/sync-all-reconnect-protocol.md.
func (s *WALShipper) Barrier(lsnMax uint64) error {
if s.stopped.Load() {
return ErrShipperStopped
@ -144,8 +149,23 @@ func (s *WALShipper) Barrier(lsnMax uint64) error {
return ErrReplicaDegraded
}
switch payload[0] {
resp := DecodeBarrierResponse(payload)
switch resp.Status {
case BarrierOK:
// Update authoritative durable progress (monotonic: only advance).
if resp.FlushedLSN > 0 {
s.hasFlushedProgress.Store(true)
for {
cur := s.replicaFlushedLSN.Load()
if resp.FlushedLSN <= cur {
break
}
if s.replicaFlushedLSN.CompareAndSwap(cur, resp.FlushedLSN) {
break
}
}
}
s.recordBarrierMetric(barrierStart, false)
return nil
case BarrierEpochMismatch:
@ -173,11 +193,26 @@ func (s *WALShipper) recordBarrierMetric(start time.Time, failed bool) {
}
}
// ShippedLSN returns the highest LSN successfully sent to the replica.
// ShippedLSN returns the highest LSN successfully sent to the replica (diagnostic only).
// This is NOT authoritative for sync durability — use ReplicaFlushedLSN() instead.
func (s *WALShipper) ShippedLSN() uint64 {
return s.shippedLSN.Load()
}
// ReplicaFlushedLSN returns the highest LSN durably persisted on the replica,
// as reported in the barrier response after fd.Sync(). This is the authoritative
// durable progress variable for sync_all correctness.
func (s *WALShipper) ReplicaFlushedLSN() uint64 {
return s.replicaFlushedLSN.Load()
}
// HasFlushedProgress returns true if the replica has ever reported a valid
// (non-zero) FlushedLSN. Legacy replicas that only support 1-byte barrier
// responses will never set this, and must not count toward sync_all.
func (s *WALShipper) HasFlushedProgress() bool {
return s.hasFlushedProgress.Load()
}
// IsDegraded returns true if the replica is unreachable.
func (s *WALShipper) IsDegraded() bool {
return s.degraded.Load()

Loading…
Cancel
Save