diff --git a/weed/cluster/cluster.go b/weed/cluster/cluster.go index 445b89a36..8496a7f6e 100644 --- a/weed/cluster/cluster.go +++ b/weed/cluster/cluster.go @@ -15,24 +15,13 @@ const ( BrokerType = "broker" ) -type FilerGroup string -type Filers struct { - members map[pb.ServerAddress]*ClusterNode - leaders *Leaders -} -type Leaders struct { - leaders [3]pb.ServerAddress -} - +type FilerGroupName string type DataCenter string type Rack string -type DataCenterBrokers struct { - brokers map[Rack]*RackBrokers -} -type RackBrokers struct { - brokers map[pb.ServerAddress]*ClusterNode -} +type Leaders struct { + leaders [3]pb.ServerAddress +} type ClusterNode struct { Address pb.ServerAddress Version string @@ -41,92 +30,131 @@ type ClusterNode struct { DataCenter DataCenter Rack Rack } - +type GroupMembers struct { + members map[pb.ServerAddress]*ClusterNode + leaders *Leaders +} +type ClusterNodeGroups struct { + groupMembers map[FilerGroupName]*GroupMembers + sync.RWMutex +} type Cluster struct { - filerGroup2filers map[FilerGroup]*Filers - filersLock sync.RWMutex - brokers map[DataCenter]*DataCenterBrokers - brokersLock sync.RWMutex + filerGroups *ClusterNodeGroups + brokerGroups *ClusterNodeGroups } -func NewCluster() *Cluster { - return &Cluster{ - filerGroup2filers: make(map[FilerGroup]*Filers), - brokers: make(map[DataCenter]*DataCenterBrokers), +func newClusterNodeGroups() *ClusterNodeGroups { + return &ClusterNodeGroups{ + groupMembers: map[FilerGroupName]*GroupMembers{}, } } - -func (cluster *Cluster) getFilers(filerGroup FilerGroup, createIfNotFound bool) *Filers { - filers, found := cluster.filerGroup2filers[filerGroup] +func (g *ClusterNodeGroups) getGroupMembers(filerGroup FilerGroupName, createIfNotFound bool) *GroupMembers { + filers, found := g.groupMembers[filerGroup] if !found && createIfNotFound { - filers = &Filers{ + filers = &GroupMembers{ members: make(map[pb.ServerAddress]*ClusterNode), leaders: &Leaders{}, } - cluster.filerGroup2filers[filerGroup] = filers + g.groupMembers[filerGroup] = filers } return filers } -func (cluster *Cluster) AddClusterNode(ns, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse { - filerGroup := FilerGroup(ns) +func (m *GroupMembers) addMember(dataCenter DataCenter, rack Rack, address pb.ServerAddress, version string) *ClusterNode { + if existingNode, found := m.members[address]; found { + existingNode.counter++ + return nil + } + t := &ClusterNode{ + Address: address, + Version: version, + counter: 1, + CreatedTs: time.Now(), + DataCenter: dataCenter, + Rack: rack, + } + m.members[address] = t + return t +} +func (m *GroupMembers) removeMember(address pb.ServerAddress) bool { + if existingNode, found := m.members[address]; !found { + return false + } else { + existingNode.counter-- + if existingNode.counter <= 0 { + delete(m.members, address) + return true + } + } + return false +} + +func (g *ClusterNodeGroups) AddClusterNode(filerGroup FilerGroupName, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse { + g.Lock() + defer g.Unlock() + m := g.getGroupMembers(filerGroup, true) + if t := m.addMember(dataCenter, rack, address, version); t != nil { + return ensureGroupLeaders(m, true, filerGroup, nodeType, address) + } + return nil +} +func (g *ClusterNodeGroups) RemoveClusterNode(filerGroup FilerGroupName, nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse { + g.Lock() + defer g.Unlock() + m := g.getGroupMembers(filerGroup, false) + if m == nil { + return nil + } + if m.removeMember(address) { + return ensureGroupLeaders(m, false, filerGroup, nodeType, address) + } + return nil +} +func (g *ClusterNodeGroups) ListClusterNode(filerGroup FilerGroupName) (nodes []*ClusterNode) { + g.Lock() + defer g.Unlock() + m := g.getGroupMembers(filerGroup, false) + if m == nil { + return nil + } + for _, node := range m.members { + nodes = append(nodes, node) + } + return +} +func (g *ClusterNodeGroups) IsOneLeader(filerGroup FilerGroupName, address pb.ServerAddress) bool { + g.Lock() + defer g.Unlock() + m := g.getGroupMembers(filerGroup, false) + if m == nil { + return false + } + return m.leaders.isOneLeader(address) +} +func NewCluster() *Cluster { + return &Cluster{ + filerGroups: newClusterNodeGroups(), + brokerGroups: newClusterNodeGroups(), + } +} + +func (cluster *Cluster) getGroupMembers(filerGroup FilerGroupName, nodeType string, createIfNotFound bool) *GroupMembers { switch nodeType { case FilerType: - cluster.filersLock.Lock() - defer cluster.filersLock.Unlock() - filers := cluster.getFilers(filerGroup, true) - if existingNode, found := filers.members[address]; found { - existingNode.counter++ - return nil - } - filers.members[address] = &ClusterNode{ - Address: address, - Version: version, - counter: 1, - CreatedTs: time.Now(), - DataCenter: dataCenter, - Rack: rack, - } - return ensureFilerLeaders(filers, true, filerGroup, nodeType, address) + return cluster.filerGroups.getGroupMembers(filerGroup, createIfNotFound) case BrokerType: - cluster.brokersLock.Lock() - defer cluster.brokersLock.Unlock() - existingDataCenterBrokers, foundDataCenter := cluster.brokers[dataCenter] - if !foundDataCenter { - existingDataCenterBrokers = &DataCenterBrokers{ - brokers: make(map[Rack]*RackBrokers), - } - cluster.brokers[dataCenter] = existingDataCenterBrokers - } - existingRackBrokers, foundRack := existingDataCenterBrokers.brokers[rack] - if !foundRack { - existingRackBrokers = &RackBrokers{ - brokers: make(map[pb.ServerAddress]*ClusterNode), - } - existingDataCenterBrokers.brokers[rack] = existingRackBrokers - } + return cluster.brokerGroups.getGroupMembers(filerGroup, createIfNotFound) + } + return nil +} - if existingBroker, found := existingRackBrokers.brokers[address]; found { - existingBroker.counter++ - return nil - } - existingRackBrokers.brokers[address] = &ClusterNode{ - Address: address, - Version: version, - counter: 1, - CreatedTs: time.Now(), - DataCenter: dataCenter, - Rack: rack, - } - return []*master_pb.KeepConnectedResponse{ - { - ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ - NodeType: nodeType, - Address: string(address), - IsAdd: true, - }, - }, - } +func (cluster *Cluster) AddClusterNode(ns, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse { + filerGroup := FilerGroupName(ns) + switch nodeType { + case FilerType: + return cluster.filerGroups.AddClusterNode(filerGroup, nodeType, dataCenter, rack, address, version) + case BrokerType: + return cluster.brokerGroups.AddClusterNode(filerGroup, nodeType, dataCenter, rack, address, version) case MasterType: return []*master_pb.KeepConnectedResponse{ { @@ -141,57 +169,13 @@ func (cluster *Cluster) AddClusterNode(ns, nodeType string, dataCenter DataCente return nil } -func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, dataCenter DataCenter, rack Rack, address pb.ServerAddress) []*master_pb.KeepConnectedResponse { - filerGroup := FilerGroup(ns) +func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse { + filerGroup := FilerGroupName(ns) switch nodeType { case FilerType: - cluster.filersLock.Lock() - defer cluster.filersLock.Unlock() - filers := cluster.getFilers(filerGroup, false) - if filers == nil { - return nil - } - if existingNode, found := filers.members[address]; !found { - return nil - } else { - existingNode.counter-- - if existingNode.counter <= 0 { - delete(filers.members, address) - return ensureFilerLeaders(filers, false, filerGroup, nodeType, address) - } - } + return cluster.filerGroups.RemoveClusterNode(filerGroup, nodeType, address) case BrokerType: - cluster.brokersLock.Lock() - defer cluster.brokersLock.Unlock() - - existingDataCenterBrokers, foundDataCenter := cluster.brokers[dataCenter] - if !foundDataCenter { - return nil - } - existingRackBrokers, foundRack := existingDataCenterBrokers.brokers[Rack(rack)] - if !foundRack { - return nil - } - - existingBroker, found := existingRackBrokers.brokers[address] - if !found { - return nil - } - existingBroker.counter-- - if existingBroker.counter <= 0 { - delete(existingRackBrokers.brokers, address) - return []*master_pb.KeepConnectedResponse{ - { - ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ - NodeType: nodeType, - Address: string(address), - IsAdd: false, - }, - }, - } - } - return nil - + return cluster.brokerGroups.RemoveClusterNode(filerGroup, nodeType, address) case MasterType: return []*master_pb.KeepConnectedResponse{ { @@ -206,44 +190,31 @@ func (cluster *Cluster) RemoveClusterNode(ns string, nodeType string, dataCenter return nil } -func (cluster *Cluster) ListClusterNode(filerGroup FilerGroup, nodeType string) (nodes []*ClusterNode) { +func (cluster *Cluster) ListClusterNode(filerGroup FilerGroupName, nodeType string) (nodes []*ClusterNode) { switch nodeType { case FilerType: - cluster.filersLock.RLock() - defer cluster.filersLock.RUnlock() - filers := cluster.getFilers(filerGroup, false) - if filers == nil { - return - } - for _, node := range filers.members { - nodes = append(nodes, node) - } + return cluster.filerGroups.ListClusterNode(filerGroup) case BrokerType: - cluster.brokersLock.RLock() - defer cluster.brokersLock.RUnlock() - for _, dcNodes := range cluster.brokers { - for _, rackNodes := range dcNodes.brokers { - for _, node := range rackNodes.brokers { - nodes = append(nodes, node) - } - } - } + return cluster.brokerGroups.ListClusterNode(filerGroup) case MasterType: } return } -func (cluster *Cluster) IsOneLeader(filerGroup FilerGroup, address pb.ServerAddress) bool { - filers := cluster.getFilers(filerGroup, false) - if filers == nil { - return false +func (cluster *Cluster) IsOneLeader(filerGroup FilerGroupName, nodeType string, address pb.ServerAddress) bool { + switch nodeType { + case FilerType: + return cluster.filerGroups.IsOneLeader(filerGroup, address) + case BrokerType: + return cluster.brokerGroups.IsOneLeader(filerGroup, address) + case MasterType: } - return filers.leaders.isOneLeader(address) + return false } -func ensureFilerLeaders(filers *Filers, isAdd bool, filerGroup FilerGroup, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) { +func ensureGroupLeaders(m *GroupMembers, isAdd bool, filerGroup FilerGroupName, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) { if isAdd { - if filers.leaders.addLeaderIfVacant(address) { + if m.leaders.addLeaderIfVacant(address) { // has added the address as one leader result = append(result, &master_pb.KeepConnectedResponse{ ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ @@ -266,7 +237,7 @@ func ensureFilerLeaders(filers *Filers, isAdd bool, filerGroup FilerGroup, nodeT }) } } else { - if filers.leaders.removeLeaderIfExists(address) { + if m.leaders.removeLeaderIfExists(address) { result = append(result, &master_pb.KeepConnectedResponse{ ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ @@ -282,8 +253,8 @@ func ensureFilerLeaders(filers *Filers, isAdd bool, filerGroup FilerGroup, nodeT var shortestDuration int64 = math.MaxInt64 now := time.Now() var candidateAddress pb.ServerAddress - for _, node := range filers.members { - if filers.leaders.isOneLeader(node.Address) { + for _, node := range m.members { + if m.leaders.isOneLeader(node.Address) { continue } duration := now.Sub(node.CreatedTs).Nanoseconds() @@ -293,7 +264,7 @@ func ensureFilerLeaders(filers *Filers, isAdd bool, filerGroup FilerGroup, nodeT } } if candidateAddress != "" { - filers.leaders.addLeaderIfVacant(candidateAddress) + m.leaders.addLeaderIfVacant(candidateAddress) // added a new leader result = append(result, &master_pb.KeepConnectedResponse{ ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ diff --git a/weed/cluster/cluster_test.go b/weed/cluster/cluster_test.go index 76cff2a3e..57ee4a023 100644 --- a/weed/cluster/cluster_test.go +++ b/weed/cluster/cluster_test.go @@ -16,7 +16,7 @@ func TestClusterAddRemoveNodes(t *testing.T) { assert.Equal(t, []pb.ServerAddress{ pb.ServerAddress("111:1"), pb.ServerAddress("111:2"), - }, c.getFilers("", false).leaders.GetLeaders()) + }, c.getGroupMembers("", "filer", true).leaders.GetLeaders()) c.AddClusterNode("", "filer", "", "", pb.ServerAddress("111:3"), "23.45") c.AddClusterNode("", "filer", "", "", pb.ServerAddress("111:4"), "23.45") @@ -24,27 +24,27 @@ func TestClusterAddRemoveNodes(t *testing.T) { pb.ServerAddress("111:1"), pb.ServerAddress("111:2"), pb.ServerAddress("111:3"), - }, c.getFilers("", false).leaders.GetLeaders()) + }, c.getGroupMembers("", "filer", true).leaders.GetLeaders()) c.AddClusterNode("", "filer", "", "", pb.ServerAddress("111:5"), "23.45") c.AddClusterNode("", "filer", "", "", pb.ServerAddress("111:6"), "23.45") - c.RemoveClusterNode("", "filer", "", "", pb.ServerAddress("111:4")) + c.RemoveClusterNode("", "filer", pb.ServerAddress("111:4")) assert.Equal(t, []pb.ServerAddress{ pb.ServerAddress("111:1"), pb.ServerAddress("111:2"), pb.ServerAddress("111:3"), - }, c.getFilers("", false).leaders.GetLeaders()) + }, c.getGroupMembers("", "filer", true).leaders.GetLeaders()) // remove oldest - c.RemoveClusterNode("", "filer", "", "", pb.ServerAddress("111:1")) + c.RemoveClusterNode("", "filer", pb.ServerAddress("111:1")) assert.Equal(t, []pb.ServerAddress{ pb.ServerAddress("111:6"), pb.ServerAddress("111:2"), pb.ServerAddress("111:3"), - }, c.getFilers("", false).leaders.GetLeaders()) + }, c.getGroupMembers("", "filer", true).leaders.GetLeaders()) // remove oldest - c.RemoveClusterNode("", "filer", "", "", pb.ServerAddress("111:1")) + c.RemoveClusterNode("", "filer", pb.ServerAddress("111:1")) } @@ -66,7 +66,7 @@ func TestConcurrentAddRemoveNodes(t *testing.T) { go func(i int) { defer wg.Done() address := strconv.Itoa(i) - node := c.RemoveClusterNode("", "filer", "", "", pb.ServerAddress(address)) + node := c.RemoveClusterNode("", "filer", pb.ServerAddress(address)) if len(node) == 0 { t.Errorf("TestConcurrentAddRemoveNodes: node[%s] not found", address) diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index d530e238e..363aa2979 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -259,7 +259,7 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ } defer func() { - for _, update := range ms.Cluster.RemoveClusterNode(req.FilerGroup, req.ClientType, cluster.DataCenter(req.DataCenter), cluster.Rack(req.Rack), peerAddress) { + for _, update := range ms.Cluster.RemoveClusterNode(req.FilerGroup, req.ClientType, peerAddress) { ms.broadcastToClients(update) } ms.deleteClient(clientName) diff --git a/weed/server/master_grpc_server_cluster.go b/weed/server/master_grpc_server_cluster.go index 31d7a6658..e368cb763 100644 --- a/weed/server/master_grpc_server_cluster.go +++ b/weed/server/master_grpc_server_cluster.go @@ -10,14 +10,14 @@ import ( func (ms *MasterServer) ListClusterNodes(ctx context.Context, req *master_pb.ListClusterNodesRequest) (*master_pb.ListClusterNodesResponse, error) { resp := &master_pb.ListClusterNodesResponse{} - filerGroup := cluster.FilerGroup(req.FilerGroup) + filerGroup := cluster.FilerGroupName(req.FilerGroup) clusterNodes := ms.Cluster.ListClusterNode(filerGroup, req.ClientType) for _, node := range clusterNodes { resp.ClusterNodes = append(resp.ClusterNodes, &master_pb.ListClusterNodesResponse_ClusterNode{ Address: string(node.Address), Version: node.Version, - IsLeader: ms.Cluster.IsOneLeader(filerGroup, node.Address), + IsLeader: ms.Cluster.IsOneLeader(filerGroup, req.ClientType, node.Address), CreatedAtNs: node.CreatedTs.UnixNano(), DataCenter: string(node.DataCenter), Rack: string(node.Rack), @@ -26,13 +26,13 @@ func (ms *MasterServer) ListClusterNodes(ctx context.Context, req *master_pb.Lis return resp, nil } -func (ms *MasterServer) GetOneFiler(filerGroup cluster.FilerGroup) pb.ServerAddress { +func (ms *MasterServer) GetOneFiler(filerGroup cluster.FilerGroupName) pb.ServerAddress { clusterNodes := ms.Cluster.ListClusterNode(filerGroup, cluster.FilerType) var filers []pb.ServerAddress for _, node := range clusterNodes { - if ms.Cluster.IsOneLeader(filerGroup, node.Address) { + if ms.Cluster.IsOneLeader(filerGroup, cluster.FilerType, node.Address) { filers = append(filers, node.Address) } } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 756d14dbc..92d38d648 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -287,7 +287,7 @@ func (ms *MasterServer) startAdminScripts() { for { time.Sleep(time.Duration(sleepMinutes) * time.Minute) if ms.Topo.IsLeader() { - shellOptions.FilerAddress = ms.GetOneFiler(cluster.FilerGroup(*shellOptions.FilerGroup)) + shellOptions.FilerAddress = ms.GetOneFiler(cluster.FilerGroupName(*shellOptions.FilerGroup)) if shellOptions.FilerAddress == "" { continue }