Browse Source

Normalize hashicorp raft peer ids (#8253)

* Normalize raft voter ids

* 4.11

* Update raft_hashicorp.go
pull/8271/head
Chris Lu 2 days ago
committed by GitHub
parent
commit
cb9e21cdc5
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 4
      k8s/charts/seaweedfs/Chart.yaml
  2. 2
      weed/server/master_server.go
  3. 45
      weed/server/raft_hashicorp.go
  4. 87
      weed/server/raft_hashicorp_test.go
  5. 2
      weed/util/version/constants.go

4
k8s/charts/seaweedfs/Chart.yaml

@ -1,6 +1,6 @@
apiVersion: v1 apiVersion: v1
description: SeaweedFS description: SeaweedFS
name: seaweedfs name: seaweedfs
appVersion: "4.10"
appVersion: "4.11"
# Dev note: Trigger a helm chart release by `git tag -a helm-<version>` # Dev note: Trigger a helm chart release by `git tag -a helm-<version>`
version: 4.0.410
version: 4.0.411

2
weed/server/master_server.go

@ -418,7 +418,7 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF
glog.V(4).Infof("OnPeerUpdate: %+v", update) glog.V(4).Infof("OnPeerUpdate: %+v", update)
peerAddress := pb.ServerAddress(update.Address) peerAddress := pb.ServerAddress(update.Address)
peerName := string(peerAddress)
peerName := raftServerID(peerAddress)
if ms.Topo.HashicorpRaft.State() != hashicorpRaft.Leader { if ms.Topo.HashicorpRaft.State() != hashicorpRaft.Leader {
return return
} }

45
weed/server/raft_hashicorp.go

@ -11,7 +11,6 @@ import (
"path" "path"
"path/filepath" "path/filepath"
"sort" "sort"
"strings"
"time" "time"
transport "github.com/Jille/raft-grpc-transport" transport "github.com/Jille/raft-grpc-transport"
@ -34,26 +33,34 @@ const (
) )
func getPeerIdx(self pb.ServerAddress, mapPeers map[string]pb.ServerAddress) int { func getPeerIdx(self pb.ServerAddress, mapPeers map[string]pb.ServerAddress) int {
peers := make([]pb.ServerAddress, 0, len(mapPeers))
peerIDs := make([]string, 0, len(mapPeers))
seen := make(map[string]struct{}, len(mapPeers))
for _, peer := range mapPeers { for _, peer := range mapPeers {
peers = append(peers, peer)
}
sort.Slice(peers, func(i, j int) bool {
return strings.Compare(string(peers[i]), string(peers[j])) < 0
})
for i, peer := range peers {
if string(peer) == string(self) {
return i
id := raftServerID(peer)
if _, ok := seen[id]; ok {
continue
} }
seen[id] = struct{}{}
peerIDs = append(peerIDs, id)
}
sort.Strings(peerIDs)
selfID := raftServerID(self)
idx := sort.SearchStrings(peerIDs, selfID)
if idx < len(peerIDs) && peerIDs[idx] == selfID {
return idx
} }
return -1 return -1
} }
func raftServerID(server pb.ServerAddress) string {
return server.ToHttpAddress()
}
func (s *RaftServer) AddPeersConfiguration() (cfg raft.Configuration) { func (s *RaftServer) AddPeersConfiguration() (cfg raft.Configuration) {
for _, peer := range s.peers { for _, peer := range s.peers {
cfg.Servers = append(cfg.Servers, raft.Server{ cfg.Servers = append(cfg.Servers, raft.Server{
Suffrage: raft.Voter, Suffrage: raft.Voter,
ID: raft.ServerID(peer),
ID: raft.ServerID(raftServerID(peer)),
Address: raft.ServerAddress(peer.ToGrpcAddress()), Address: raft.ServerAddress(peer.ToGrpcAddress()),
}) })
} }
@ -98,7 +105,12 @@ func (s *RaftServer) monitorLeaderLoop(updatePeers bool) {
} }
func (s *RaftServer) updatePeers() { func (s *RaftServer) updatePeers() {
peerLeader := string(s.serverAddr)
peerLeader := raftServerID(s.serverAddr)
desiredPeers := make(map[string]pb.ServerAddress, len(s.peers))
for _, peer := range s.peers {
desiredPeers[raftServerID(peer)] = peer
}
existsPeerName := make(map[string]bool) existsPeerName := make(map[string]bool)
for _, server := range s.RaftHashicorp.GetConfiguration().Configuration().Servers { for _, server := range s.RaftHashicorp.GetConfiguration().Configuration().Servers {
if string(server.ID) == peerLeader { if string(server.ID) == peerLeader {
@ -106,8 +118,7 @@ func (s *RaftServer) updatePeers() {
} }
existsPeerName[string(server.ID)] = true existsPeerName[string(server.ID)] = true
} }
for _, peer := range s.peers {
peerName := string(peer)
for peerName, peer := range desiredPeers {
if peerName == peerLeader || existsPeerName[peerName] { if peerName == peerLeader || existsPeerName[peerName] {
continue continue
} }
@ -116,12 +127,12 @@ func (s *RaftServer) updatePeers() {
raft.ServerID(peerName), raft.ServerAddress(peer.ToGrpcAddress()), 0, 0) raft.ServerID(peerName), raft.ServerAddress(peer.ToGrpcAddress()), 0, 0)
} }
for peer := range existsPeerName { for peer := range existsPeerName {
if _, found := s.peers[peer]; !found {
if _, found := desiredPeers[peer]; !found {
glog.V(0).Infof("removing old peer: %s", peer) glog.V(0).Infof("removing old peer: %s", peer)
s.RaftHashicorp.RemoveServer(raft.ServerID(peer), 0, 0) s.RaftHashicorp.RemoveServer(raft.ServerID(peer), 0, 0)
} }
} }
if _, found := s.peers[peerLeader]; !found {
if _, found := desiredPeers[peerLeader]; !found {
glog.V(0).Infof("removing old leader peer: %s", peerLeader) glog.V(0).Infof("removing old leader peer: %s", peerLeader)
s.RaftHashicorp.RemoveServer(raft.ServerID(peerLeader), 0, 0) s.RaftHashicorp.RemoveServer(raft.ServerID(peerLeader), 0, 0)
} }
@ -136,7 +147,7 @@ func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) {
} }
c := raft.DefaultConfig() c := raft.DefaultConfig()
c.LocalID = raft.ServerID(s.serverAddr) // TODO maybee the IP:port address will change
c.LocalID = raft.ServerID(raftServerID(s.serverAddr))
c.HeartbeatTimeout = time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1)) c.HeartbeatTimeout = time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1))
c.ElectionTimeout = option.ElectionTimeout c.ElectionTimeout = option.ElectionTimeout
if c.LeaderLeaseTimeout > c.HeartbeatTimeout { if c.LeaderLeaseTimeout > c.HeartbeatTimeout {

87
weed/server/raft_hashicorp_test.go

@ -0,0 +1,87 @@
package weed_server
import (
"sort"
"testing"
"github.com/hashicorp/raft"
"github.com/seaweedfs/seaweedfs/weed/pb"
)
func TestRaftServerID(t *testing.T) {
tests := []struct {
name string
addr pb.ServerAddress
want string
}{
{
name: "without grpc suffix",
addr: pb.ServerAddress("master-0:9333"),
want: "master-0:9333",
},
{
name: "with grpc suffix",
addr: pb.NewServerAddress("master-0", 9333, 19333),
want: "master-0:9333",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := raftServerID(tt.addr); got != tt.want {
t.Fatalf("raftServerID(%q) = %q, want %q", tt.addr, got, tt.want)
}
})
}
}
func TestGetPeerIdxUsesCanonicalID(t *testing.T) {
peers := map[string]pb.ServerAddress{
"master-0:9333": pb.ServerAddress("master-0:9333"),
"master-1:9333": pb.ServerAddress("master-1:9333"),
"master-2:9333": pb.ServerAddress("master-2:9333"),
}
self := pb.NewServerAddress("master-2", 9333, 19333)
if got := getPeerIdx(self, peers); got != 2 {
t.Fatalf("getPeerIdx(%q) = %d, want 2", self, got)
}
}
func TestAddPeersConfigurationUsesCanonicalIDs(t *testing.T) {
rs := &RaftServer{
peers: map[string]pb.ServerAddress{
"master-0:9333": pb.ServerAddress("master-0:9333"),
"master-1:9333": pb.ServerAddress("master-1:9333"),
"master-2:9333": pb.ServerAddress("master-2:9333"),
},
}
cfg := rs.AddPeersConfiguration()
if len(cfg.Servers) != 3 {
t.Fatalf("len(cfg.Servers) = %d, want 3", len(cfg.Servers))
}
var ids []string
var addrs []string
for _, s := range cfg.Servers {
if s.Suffrage != raft.Voter {
t.Fatalf("server %q has suffrage %q, want %q", s.ID, s.Suffrage, raft.Voter)
}
ids = append(ids, string(s.ID))
addrs = append(addrs, string(s.Address))
}
sort.Strings(ids)
sort.Strings(addrs)
wantIDs := []string{"master-0:9333", "master-1:9333", "master-2:9333"}
wantAddrs := []string{"master-0:19333", "master-1:19333", "master-2:19333"}
for i := range wantIDs {
if ids[i] != wantIDs[i] {
t.Fatalf("ids[%d] = %q, want %q", i, ids[i], wantIDs[i])
}
if addrs[i] != wantAddrs[i] {
t.Fatalf("addrs[%d] = %q, want %q", i, addrs[i], wantAddrs[i])
}
}
}

2
weed/util/version/constants.go

@ -9,7 +9,7 @@ import (
var ( var (
MAJOR_VERSION = int32(4) MAJOR_VERSION = int32(4)
MINOR_VERSION = int32(10)
MINOR_VERSION = int32(11)
VERSION_NUMBER = fmt.Sprintf("%d.%02d", MAJOR_VERSION, MINOR_VERSION) VERSION_NUMBER = fmt.Sprintf("%d.%02d", MAJOR_VERSION, MINOR_VERSION)
VERSION = util.SizeLimit + " " + VERSION_NUMBER VERSION = util.SizeLimit + " " + VERSION_NUMBER
COMMIT = "" COMMIT = ""

Loading…
Cancel
Save