From ae87a31d229c50b6c2c018f31a00e5c609c5960a Mon Sep 17 00:00:00 2001 From: pingqiu Date: Thu, 26 Mar 2026 19:08:48 -0700 Subject: [PATCH] fix: store canonical replica addresses in heartbeat state setupReplicaReceiver now reads back canonical addresses from the ReplicaReceiver (which applies CP13-2 canonicalization) instead of storing raw assignment addresses in replStates. This fixes the API-level leak where replica_data_addr showed ":port" instead of "ip:port" in /block/volumes responses, even though the engine-level CP13-2 fix was working. New BlockVol.ReplicaReceiverAddr() returns canonical addresses from the running receiver. Falls back to assignment addresses if receiver didn't report. Co-Authored-By: Claude Opus 4.6 (1M context) --- weed/server/volume_server_block.go | 26 ++++++++++++++++++++++---- weed/storage/blockvol/blockvol.go | 19 +++++++++++++++++++ 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/weed/server/volume_server_block.go b/weed/server/volume_server_block.go index 05562ecc6..728f40841 100644 --- a/weed/server/volume_server_block.go +++ b/weed/server/volume_server_block.go @@ -421,22 +421,40 @@ func (bs *BlockService) setupPrimaryReplicationMulti(path string, addrs []blockv // setupReplicaReceiver starts the replica WAL receiver. func (bs *BlockService) setupReplicaReceiver(path, dataAddr, ctrlAddr string) { + // Store canonical addresses from the receiver (not raw assignment addresses). + // The receiver canonicalizes wildcard ":port" to "ip:port" via CP13-2. + var canonDataAddr, canonCtrlAddr string if err := bs.blockStore.WithVolume(path, func(vol *blockvol.BlockVol) error { - return vol.StartReplicaReceiver(dataAddr, ctrlAddr) + if err := vol.StartReplicaReceiver(dataAddr, ctrlAddr); err != nil { + return err + } + // Read back canonical addresses from the receiver. + if vol.ReplicaReceiverAddr() != nil { + canonDataAddr = vol.ReplicaReceiverAddr().DataAddr + canonCtrlAddr = vol.ReplicaReceiverAddr().CtrlAddr + } + return nil }); err != nil { glog.Warningf("block service: setup replica receiver %s: %v", path, err) return } + // Fallback to assignment addresses if receiver didn't report. + if canonDataAddr == "" { + canonDataAddr = dataAddr + } + if canonCtrlAddr == "" { + canonCtrlAddr = ctrlAddr + } bs.replMu.Lock() if bs.replStates == nil { bs.replStates = make(map[string]*volReplState) } bs.replStates[path] = &volReplState{ - replicaDataAddr: dataAddr, - replicaCtrlAddr: ctrlAddr, + replicaDataAddr: canonDataAddr, + replicaCtrlAddr: canonCtrlAddr, } bs.replMu.Unlock() - glog.V(0).Infof("block service: replica %s receiving on %s/%s", path, dataAddr, ctrlAddr) + glog.V(0).Infof("block service: replica %s receiving on %s/%s", path, canonDataAddr, canonCtrlAddr) } // startRebuild starts a rebuild in the background. diff --git a/weed/storage/blockvol/blockvol.go b/weed/storage/blockvol/blockvol.go index db4297fd2..4be0ff9e8 100644 --- a/weed/storage/blockvol/blockvol.go +++ b/weed/storage/blockvol/blockvol.go @@ -864,6 +864,25 @@ func (v *BlockVol) ReplicaShipperStates() []ReplicaShipperStatus { return v.shipperGroup.ShipperStates() } +// ReplicaReceiverAddrInfo holds canonical addresses from the replica receiver. +type ReplicaReceiverAddrInfo struct { + DataAddr string + CtrlAddr string +} + +// ReplicaReceiverAddr returns the canonical addresses of the replica receiver, +// or nil if no receiver is running. Used by the VS to store canonical addresses +// in heartbeat state instead of raw assignment addresses. +func (v *BlockVol) ReplicaReceiverAddr() *ReplicaReceiverAddrInfo { + if v.replRecv == nil { + return nil + } + return &ReplicaReceiverAddrInfo{ + DataAddr: v.replRecv.DataAddr(), + CtrlAddr: v.replRecv.CtrlAddr(), + } +} + // StartReplicaReceiver starts listening for replicated WAL entries from a primary. func (v *BlockVol) StartReplicaReceiver(dataAddr, ctrlAddr string) error { recv, err := NewReplicaReceiver(v, dataAddr, ctrlAddr)