diff --git a/weed/server/volume_server_block.go b/weed/server/volume_server_block.go index 05562ecc6..728f40841 100644 --- a/weed/server/volume_server_block.go +++ b/weed/server/volume_server_block.go @@ -421,22 +421,40 @@ func (bs *BlockService) setupPrimaryReplicationMulti(path string, addrs []blockv // setupReplicaReceiver starts the replica WAL receiver. func (bs *BlockService) setupReplicaReceiver(path, dataAddr, ctrlAddr string) { + // Store canonical addresses from the receiver (not raw assignment addresses). + // The receiver canonicalizes wildcard ":port" to "ip:port" via CP13-2. + var canonDataAddr, canonCtrlAddr string if err := bs.blockStore.WithVolume(path, func(vol *blockvol.BlockVol) error { - return vol.StartReplicaReceiver(dataAddr, ctrlAddr) + if err := vol.StartReplicaReceiver(dataAddr, ctrlAddr); err != nil { + return err + } + // Read back canonical addresses from the receiver. + if vol.ReplicaReceiverAddr() != nil { + canonDataAddr = vol.ReplicaReceiverAddr().DataAddr + canonCtrlAddr = vol.ReplicaReceiverAddr().CtrlAddr + } + return nil }); err != nil { glog.Warningf("block service: setup replica receiver %s: %v", path, err) return } + // Fallback to assignment addresses if receiver didn't report. + if canonDataAddr == "" { + canonDataAddr = dataAddr + } + if canonCtrlAddr == "" { + canonCtrlAddr = ctrlAddr + } bs.replMu.Lock() if bs.replStates == nil { bs.replStates = make(map[string]*volReplState) } bs.replStates[path] = &volReplState{ - replicaDataAddr: dataAddr, - replicaCtrlAddr: ctrlAddr, + replicaDataAddr: canonDataAddr, + replicaCtrlAddr: canonCtrlAddr, } bs.replMu.Unlock() - glog.V(0).Infof("block service: replica %s receiving on %s/%s", path, dataAddr, ctrlAddr) + glog.V(0).Infof("block service: replica %s receiving on %s/%s", path, canonDataAddr, canonCtrlAddr) } // startRebuild starts a rebuild in the background. diff --git a/weed/storage/blockvol/blockvol.go b/weed/storage/blockvol/blockvol.go index db4297fd2..4be0ff9e8 100644 --- a/weed/storage/blockvol/blockvol.go +++ b/weed/storage/blockvol/blockvol.go @@ -864,6 +864,25 @@ func (v *BlockVol) ReplicaShipperStates() []ReplicaShipperStatus { return v.shipperGroup.ShipperStates() } +// ReplicaReceiverAddrInfo holds canonical addresses from the replica receiver. +type ReplicaReceiverAddrInfo struct { + DataAddr string + CtrlAddr string +} + +// ReplicaReceiverAddr returns the canonical addresses of the replica receiver, +// or nil if no receiver is running. Used by the VS to store canonical addresses +// in heartbeat state instead of raw assignment addresses. +func (v *BlockVol) ReplicaReceiverAddr() *ReplicaReceiverAddrInfo { + if v.replRecv == nil { + return nil + } + return &ReplicaReceiverAddrInfo{ + DataAddr: v.replRecv.DataAddr(), + CtrlAddr: v.replRecv.CtrlAddr(), + } +} + // StartReplicaReceiver starts listening for replicated WAL entries from a primary. func (v *BlockVol) StartReplicaReceiver(dataAddr, ctrlAddr string) error { recv, err := NewReplicaReceiver(v, dataAddr, ctrlAddr)