diff --git a/weed/command/master.go b/weed/command/master.go index 2a7511fbf..565c9cc58 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -4,9 +4,11 @@ import ( "context" "crypto/tls" "fmt" + "net" "net/http" "os" "path" + "strconv" "strings" "time" @@ -338,8 +340,16 @@ func checkPeers(masterIp string, masterPort int, masterGrpcPort int, peers strin } peers = strings.TrimSpace(peers) - - cleanedPeers = pb.ServerAddresses(peers).ToAddresses() + seenPeers := make(map[string]struct{}) + for _, peer := range pb.ServerAddresses(peers).ToAddresses() { + normalizedPeer := normalizeMasterPeerAddress(peer, masterAddress) + key := string(normalizedPeer) + if _, found := seenPeers[key]; found { + continue + } + seenPeers[key] = struct{}{} + cleanedPeers = append(cleanedPeers, normalizedPeer) + } hasSelf := false for _, peer := range cleanedPeers { @@ -358,14 +368,31 @@ func checkPeers(masterIp string, masterPort int, masterGrpcPort int, peers strin return } +func normalizeMasterPeerAddress(peer pb.ServerAddress, self pb.ServerAddress) pb.ServerAddress { + if peer.ToHttpAddress() == self.ToHttpAddress() { + return self + } + + _, grpcPort, err := net.SplitHostPort(peer.ToGrpcAddress()) + if err != nil { + return peer + } + grpcPortValue, err := strconv.Atoi(grpcPort) + if err != nil { + return peer + } + + return pb.NewServerAddressWithGrpcPort(peer.ToHttpAddress(), grpcPortValue) +} + func isTheFirstOne(self pb.ServerAddress, peers []pb.ServerAddress) bool { slices.SortFunc(peers, func(a, b pb.ServerAddress) int { - return strings.Compare(string(a), string(b)) + return strings.Compare(a.ToHttpAddress(), b.ToHttpAddress()) }) if len(peers) <= 0 { return true } - return self == peers[0] + return self.ToHttpAddress() == peers[0].ToHttpAddress() } func (m *MasterOptions) toMasterOption(whiteList []string) *weed_server.MasterOption { diff --git a/weed/command/master_test.go b/weed/command/master_test.go new file mode 100644 index 000000000..b92254606 --- /dev/null +++ b/weed/command/master_test.go @@ -0,0 +1,66 @@ +package command + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb" +) + +func TestIsTheFirstOneIgnoresGrpcPort(t *testing.T) { + self := pb.ServerAddress("127.0.0.1:9000.19000") + peers := []pb.ServerAddress{ + "127.0.0.1:9000", + "127.0.0.1:9002.19002", + "127.0.0.1:9003.19003", + } + + if !isTheFirstOne(self, peers) { + t.Fatalf("expected first peer match by HTTP address between %q and %+v", self, peers) + } +} + +func TestCheckPeersAddsSelfWhenGrpcPortMismatches(t *testing.T) { + self, peers := checkPeers("127.0.0.1", 9000, 19000, "127.0.0.1:9002,127.0.0.1:9003") + + found := false + for _, peer := range peers { + if peer.ToHttpAddress() == self.ToHttpAddress() { + found = true + break + } + } + if !found { + t.Fatalf("expected peers %+v to contain self %s by HTTP address", peers, self) + } +} + +func TestCheckPeersCanonicalizesSelfEntry(t *testing.T) { + self, peers := checkPeers("127.0.0.1", 9000, 19000, "127.0.0.1:9000,127.0.0.1:9002,127.0.0.1:9003") + + for _, peer := range peers { + if peer.ToHttpAddress() == self.ToHttpAddress() && peer != self { + t.Fatalf("expected self peer to be canonicalized to %q, got %q", self, peer) + } + } +} + +func TestCheckPeersDeduplicatesAliasPeers(t *testing.T) { + _, peers := checkPeers("127.0.0.1", 9000, 19000, "127.0.0.1:9002,127.0.0.1:9002.19002,127.0.0.1:9003") + + if len(peers) != 3 { + t.Fatalf("expected 3 unique peers after normalization, got %d: %+v", len(peers), peers) + } + + count9002 := 0 + for _, peer := range peers { + if peer.ToHttpAddress() == "127.0.0.1:9002" { + count9002++ + if string(peer) != "127.0.0.1:9002.19002" { + t.Fatalf("expected canonical peer 127.0.0.1:9002.19002, got %q", peer) + } + } + } + if count9002 != 1 { + t.Fatalf("expected one peer for 127.0.0.1:9002, got %d in %+v", count9002, peers) + } +}