diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 5799e247e..c78dcac95 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -102,7 +102,7 @@ func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddres if err != nil { errLvl := glog.Level(0) if strings.Contains(err.Error(), "duplicated local subscription detected") { - errLvl = glog.Level(1) + errLvl = glog.Level(4) } glog.V(errLvl).Infof("subscribing remote %s meta change: %v", peer, err) } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index d2e98c2a2..1c623388c 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -1,7 +1,6 @@ package weed_server import ( - "context" "fmt" "github.com/chrislusf/seaweedfs/weed/stats" "net/http" @@ -32,10 +31,8 @@ import ( ) const ( - SequencerType = "master.sequencer.type" - SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" - RaftServerRemovalTime = 72 * time.Minute - ResetRaftServerRemovalTimeMsg = "ResetRaftServerRemovalTime" + SequencerType = "master.sequencer.type" + SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" ) type MasterOption struct { @@ -66,9 +63,6 @@ type MasterServer struct { boundedLeaderChan chan int - onPeerUpdateDoneCns map[string]*chan string - onPeerUpdateLock sync.RWMutex - // notifying clients clientChansLock sync.RWMutex clientChans map[string]chan *master_pb.KeepConnectedResponse @@ -121,7 +115,6 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se ms.boundedLeaderChan = make(chan int, 16) ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate - ms.onPeerUpdateDoneCns = make(map[string]*chan string) seq := ms.createSequencer(option) if nil == seq { @@ -352,97 +345,18 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF peerAddress := pb.ServerAddress(update.Address) peerName := string(peerAddress) isLeader := ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader - if update.IsAdd { - if isLeader { - raftServerFound := false - for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers { - if string(server.ID) == peerName { - raftServerFound = true - } - } - if !raftServerFound { - glog.V(0).Infof("adding new raft server: %s", peerName) - ms.Topo.HashicorpRaft.AddVoter( - hashicorpRaft.ServerID(peerName), - hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) - } - } - ms.onPeerUpdateLock.RLock() - isGtZero := len(ms.onPeerUpdateDoneCns) > 0 - ms.onPeerUpdateLock.RUnlock() - if isGtZero { - var chanPtrs []*chan string - ms.onPeerUpdateLock.RLock() - for _, cn := range ms.onPeerUpdateDoneCns { - chanPtrs = append(chanPtrs, cn) - } - ms.onPeerUpdateLock.RUnlock() - for _, onPeerUpdateDoneCn := range chanPtrs { - *onPeerUpdateDoneCn <- peerName + if update.IsAdd && isLeader { + raftServerFound := false + for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers { + if string(server.ID) == peerName { + raftServerFound = true } } - } else if isLeader { - if onPeerUpdateDoneCnPrev, ok := ms.onPeerUpdateDoneCns[peerName]; ok { - *onPeerUpdateDoneCnPrev <- ResetRaftServerRemovalTimeMsg - return + if !raftServerFound { + glog.V(0).Infof("adding new raft server: %s", peerName) + ms.Topo.HashicorpRaft.AddVoter( + hashicorpRaft.ServerID(peerName), + hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) } - onPeerUpdateDoneCn := make(chan string) - ms.onPeerUpdateLock.Lock() - ms.onPeerUpdateDoneCns[peerName] = &onPeerUpdateDoneCn - ms.onPeerUpdateLock.Unlock() - - go func(peerName string) { - raftServerRemovalTimeAfter := time.After(RaftServerRemovalTime) - raftServerPingTicker := time.NewTicker(5 * time.Minute) - defer func() { - ms.onPeerUpdateLock.Lock() - delete(ms.onPeerUpdateDoneCns, peerName) - ms.onPeerUpdateLock.Unlock() - close(onPeerUpdateDoneCn) - }() - for { - select { - case <-raftServerPingTicker.C: - err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { - _, err := client.Ping(context.Background(), &master_pb.PingRequest{ - Target: peerName, - TargetType: cluster.MasterType, - }) - return err - }) - if err != nil { - glog.Warningf("raft server %s ping failed %+v", peerName, err) - } else { - glog.V(0).Infof("raft server %s remove canceled on ping success", peerName) - return - } - case <-raftServerRemovalTimeAfter: - err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { - _, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{ - Id: peerName, - Force: false, - }) - return err - }) - if err != nil { - glog.Warningf("failed to removing old raft server %s: %v", peerName, err) - return - } - glog.V(0).Infof("old raft server %s removed", peerName) - return - case peerDone := <-onPeerUpdateDoneCn: - if peerName == peerDone { - glog.V(0).Infof("raft server %s remove canceled on onPeerUpdate", peerName) - return - } - if peerDone == ResetRaftServerRemovalTimeMsg { - raftServerRemovalTimeAfter = time.After(RaftServerRemovalTime) - glog.V(0).Infof("rest wait %v for raft server %s activity, otherwise delete", - RaftServerRemovalTime, peerName) - } - } - } - }(peerName) - glog.V(0).Infof("wait %v for raft server %s activity, otherwise delete", RaftServerRemovalTime, peerName) } }