From 330d1fde7f6a9634c1242a0490fab30cbdb12c6c Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 6 Nov 2021 04:07:38 -0700 Subject: [PATCH] send peers info to filers --- weed/election/cluster.go | 64 +++++++++++++++++++++++++++---- weed/server/master_grpc_server.go | 35 +++++++++-------- weed/server/master_server.go | 4 +- weed/wdclient/masterclient.go | 9 +++++ 4 files changed, 86 insertions(+), 26 deletions(-) diff --git a/weed/election/cluster.go b/weed/election/cluster.go index 7c7c1089b..7f247f2cf 100644 --- a/weed/election/cluster.go +++ b/weed/election/cluster.go @@ -2,6 +2,7 @@ package election import ( "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "math" "sync" "time" @@ -31,14 +32,14 @@ func NewCluster() *Cluster { } } -func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) { +func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse { switch nodeType { case "filer": cluster.nodesLock.Lock() defer cluster.nodesLock.Unlock() if existingNode, found := cluster.nodes[address]; found { existingNode.counter++ - return + return nil } cluster.nodes[address] = &ClusterNode{ Address: address, @@ -46,27 +47,29 @@ func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress counter: 1, createdTs: time.Now(), } - cluster.ensureLeader(true, address) + return cluster.ensureLeader(true, nodeType, address) case "master": } + return nil } -func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) { +func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse { switch nodeType { case "filer": cluster.nodesLock.Lock() defer cluster.nodesLock.Unlock() if existingNode, found := cluster.nodes[address]; !found { - return + return nil } else { existingNode.counter-- if existingNode.counter <= 0 { delete(cluster.nodes, address) - cluster.ensureLeader(false, address) + return cluster.ensureLeader(false, nodeType, address) } } case "master": } + return nil } func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) { @@ -82,13 +85,40 @@ func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) return } -func (cluster *Cluster) ensureLeader(isAdd bool, address pb.ServerAddress) { +func (cluster *Cluster) ensureLeader(isAdd bool, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) { if isAdd { if cluster.leaders.addLeaderIfVacant(address) { // has added the address as one leader + result = append(result, &master_pb.KeepConnectedResponse{ + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(address), + IsLeader: true, + IsAdd: true, + }, + }) + } else { + result = append(result, &master_pb.KeepConnectedResponse{ + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(address), + IsLeader: false, + IsAdd: true, + }, + }) } } else { if cluster.leaders.removeLeaderIfExists(address) { + + result = append(result, &master_pb.KeepConnectedResponse{ + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(address), + IsLeader: true, + IsAdd: false, + }, + }) + // pick the freshest one, since it is less likely to go away var shortestDuration int64 = math.MaxInt64 now := time.Now() @@ -105,10 +135,28 @@ func (cluster *Cluster) ensureLeader(isAdd bool, address pb.ServerAddress) { } if candidateAddress != "" { cluster.leaders.addLeaderIfVacant(candidateAddress) + // added a new leader + result = append(result, &master_pb.KeepConnectedResponse{ + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(candidateAddress), + IsLeader: true, + IsAdd: true, + }, + }) } - // removed the leader, and maybe added a new leader + } else { + result = append(result, &master_pb.KeepConnectedResponse{ + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(address), + IsLeader: false, + IsAdd: false, + }, + }) } } + return } func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) { diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 1e4bbd8e4..7411bbc99 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -44,11 +44,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } if len(message.DeletedVids) > 0 { - ms.clientChansLock.RLock() - for _, ch := range ms.clientChans { - ch <- message - } - ms.clientChansLock.RUnlock() + ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: message}) } } }() @@ -153,12 +149,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 { - ms.clientChansLock.RLock() - for host, ch := range ms.clientChans { - glog.V(0).Infof("master send to %s: %s", host, message.String()) - ch <- message - } - ms.clientChansLock.RUnlock() + ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: message}) } // tell the volume servers about the leader @@ -195,10 +186,14 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ stopChan := make(chan bool, 1) clientName, messageChan := ms.addClient(req.ClientType, peerAddress) - ms.Cluster.AddClusterNode(req.ClientType, peerAddress, req.Version) + for _, update := range ms.Cluster.AddClusterNode(req.ClientType, peerAddress, req.Version) { + ms.broadcastToClients(update) + } defer func() { - ms.Cluster.RemoveClusterNode(req.ClientType, peerAddress) + for _, update := range ms.Cluster.RemoveClusterNode(req.ClientType, peerAddress) { + ms.broadcastToClients(update) + } ms.deleteClient(clientName) }() @@ -223,7 +218,7 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ for { select { case message := <-messageChan: - if err := stream.Send(&master_pb.KeepConnectedResponse{VolumeLocation: message}); err != nil { + if err := stream.Send(message); err != nil { glog.V(0).Infof("=> client %v: %+v", clientName, message) return err } @@ -238,6 +233,14 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ } +func (ms *MasterServer) broadcastToClients(message *master_pb.KeepConnectedResponse) { + ms.clientChansLock.RLock() + for _, ch := range ms.clientChans { + ch <- message + } + ms.clientChansLock.RUnlock() +} + func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedServer) error { leader, err := ms.Topo.Leader() if err != nil { @@ -254,7 +257,7 @@ func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedSe return nil } -func (ms *MasterServer) addClient(clientType string, clientAddress pb.ServerAddress) (clientName string, messageChan chan *master_pb.VolumeLocation) { +func (ms *MasterServer) addClient(clientType string, clientAddress pb.ServerAddress) (clientName string, messageChan chan *master_pb.KeepConnectedResponse) { clientName = clientType + "@" + string(clientAddress) glog.V(0).Infof("+ client %v", clientName) @@ -263,7 +266,7 @@ func (ms *MasterServer) addClient(clientType string, clientAddress pb.ServerAddr // trying to send to it in SendHeartbeat and so we can't lock the // clientChansLock to remove the channel and we're stuck writing to it // 100 is probably overkill - messageChan = make(chan *master_pb.VolumeLocation, 100) + messageChan = make(chan *master_pb.KeepConnectedResponse, 100) ms.clientChansLock.Lock() ms.clientChans[clientName] = messageChan diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 26ac91e8f..39812f641 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -61,7 +61,7 @@ type MasterServer struct { // notifying clients clientChansLock sync.RWMutex - clientChans map[string]chan *master_pb.VolumeLocation + clientChans map[string]chan *master_pb.KeepConnectedResponse grpcDialOption grpc.DialOption @@ -102,7 +102,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddre option: option, preallocateSize: preallocateSize, vgCh: make(chan *topology.VolumeGrowRequest, 1<<6), - clientChans: make(map[string]chan *master_pb.VolumeLocation), + clientChans: make(map[string]chan *master_pb.KeepConnectedResponse), grpcDialOption: grpcDialOption, MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Master, "", peers), adminLocks: NewAdminLocks(), diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 6d8e2d06e..a2f6c7ffb 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -149,6 +149,15 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL } } + if resp.ClusterNodeUpdate != nil { + update := resp.ClusterNodeUpdate + if update.IsAdd { + glog.V(0).Infof("+ %s %s leader:%v\n", update.NodeType, update.Address, update.IsLeader) + } else { + glog.V(0).Infof("- %s %s leader:%v\n", update.NodeType, update.Address, update.IsLeader) + } + } + } })