Browse Source

fix: store canonical replica addresses in heartbeat state

setupReplicaReceiver now reads back canonical addresses from
the ReplicaReceiver (which applies CP13-2 canonicalization)
instead of storing raw assignment addresses in replStates.

This fixes the API-level leak where replica_data_addr showed
":port" instead of "ip:port" in /block/volumes responses,
even though the engine-level CP13-2 fix was working.

New BlockVol.ReplicaReceiverAddr() returns canonical addresses
from the running receiver. Falls back to assignment addresses
if receiver didn't report.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
feature/sw-block
pingqiu 3 days ago
parent
commit
ae87a31d22
  1. 26
      weed/server/volume_server_block.go
  2. 19
      weed/storage/blockvol/blockvol.go

26
weed/server/volume_server_block.go

@ -421,22 +421,40 @@ func (bs *BlockService) setupPrimaryReplicationMulti(path string, addrs []blockv
// setupReplicaReceiver starts the replica WAL receiver.
func (bs *BlockService) setupReplicaReceiver(path, dataAddr, ctrlAddr string) {
// Store canonical addresses from the receiver (not raw assignment addresses).
// The receiver canonicalizes wildcard ":port" to "ip:port" via CP13-2.
var canonDataAddr, canonCtrlAddr string
if err := bs.blockStore.WithVolume(path, func(vol *blockvol.BlockVol) error {
return vol.StartReplicaReceiver(dataAddr, ctrlAddr)
if err := vol.StartReplicaReceiver(dataAddr, ctrlAddr); err != nil {
return err
}
// Read back canonical addresses from the receiver.
if vol.ReplicaReceiverAddr() != nil {
canonDataAddr = vol.ReplicaReceiverAddr().DataAddr
canonCtrlAddr = vol.ReplicaReceiverAddr().CtrlAddr
}
return nil
}); err != nil {
glog.Warningf("block service: setup replica receiver %s: %v", path, err)
return
}
// Fallback to assignment addresses if receiver didn't report.
if canonDataAddr == "" {
canonDataAddr = dataAddr
}
if canonCtrlAddr == "" {
canonCtrlAddr = ctrlAddr
}
bs.replMu.Lock()
if bs.replStates == nil {
bs.replStates = make(map[string]*volReplState)
}
bs.replStates[path] = &volReplState{
replicaDataAddr: dataAddr,
replicaCtrlAddr: ctrlAddr,
replicaDataAddr: canonDataAddr,
replicaCtrlAddr: canonCtrlAddr,
}
bs.replMu.Unlock()
glog.V(0).Infof("block service: replica %s receiving on %s/%s", path, dataAddr, ctrlAddr)
glog.V(0).Infof("block service: replica %s receiving on %s/%s", path, canonDataAddr, canonCtrlAddr)
}
// startRebuild starts a rebuild in the background.

19
weed/storage/blockvol/blockvol.go

@ -864,6 +864,25 @@ func (v *BlockVol) ReplicaShipperStates() []ReplicaShipperStatus {
return v.shipperGroup.ShipperStates()
}
// ReplicaReceiverAddrInfo holds canonical addresses from the replica receiver.
type ReplicaReceiverAddrInfo struct {
DataAddr string
CtrlAddr string
}
// ReplicaReceiverAddr returns the canonical addresses of the replica receiver,
// or nil if no receiver is running. Used by the VS to store canonical addresses
// in heartbeat state instead of raw assignment addresses.
func (v *BlockVol) ReplicaReceiverAddr() *ReplicaReceiverAddrInfo {
if v.replRecv == nil {
return nil
}
return &ReplicaReceiverAddrInfo{
DataAddr: v.replRecv.DataAddr(),
CtrlAddr: v.replRecv.CtrlAddr(),
}
}
// StartReplicaReceiver starts listening for replicated WAL entries from a primary.
func (v *BlockVol) StartReplicaReceiver(dataAddr, ctrlAddr string) error {
recv, err := NewReplicaReceiver(v, dataAddr, ctrlAddr)

Loading…
Cancel
Save