diff --git a/weed/cluster/cluster.go b/weed/cluster/cluster.go index 08ff97198..fa01bd03d 100644 --- a/weed/cluster/cluster.go +++ b/weed/cluster/cluster.go @@ -29,12 +29,15 @@ type Cluster struct { filers map[pb.ServerAddress]*ClusterNode filersLock sync.RWMutex filerLeaders *Leaders + brokers map[pb.ServerAddress]*ClusterNode + brokersLock sync.RWMutex } func NewCluster() *Cluster { return &Cluster{ filers: make(map[pb.ServerAddress]*ClusterNode), filerLeaders: &Leaders{}, + brokers: make(map[pb.ServerAddress]*ClusterNode), } } @@ -54,6 +57,28 @@ func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress createdTs: time.Now(), } return cluster.ensureFilerLeaders(true, nodeType, address) + case BrokerType: + cluster.brokersLock.Lock() + defer cluster.brokersLock.Unlock() + if existingNode, found := cluster.brokers[address]; found { + existingNode.counter++ + return nil + } + cluster.brokers[address] = &ClusterNode{ + Address: address, + Version: version, + counter: 1, + createdTs: time.Now(), + } + return []*master_pb.KeepConnectedResponse{ + { + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(address), + IsAdd: true, + }, + }, + } case MasterType: } return nil @@ -73,6 +98,26 @@ func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddr return cluster.ensureFilerLeaders(false, nodeType, address) } } + case BrokerType: + cluster.brokersLock.Lock() + defer cluster.brokersLock.Unlock() + if existingNode, found := cluster.brokers[address]; !found { + return nil + } else { + existingNode.counter-- + if existingNode.counter <= 0 { + delete(cluster.brokers, address) + return []*master_pb.KeepConnectedResponse{ + { + ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ + NodeType: nodeType, + Address: string(address), + IsAdd: false, + }, + }, + } + } + } case MasterType: } return nil @@ -86,6 +131,12 @@ func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) for _, node := range cluster.filers { nodes = append(nodes, node) } + case BrokerType: + cluster.brokersLock.RLock() + defer cluster.brokersLock.RUnlock() + for _, node := range cluster.brokers { + nodes = append(nodes, node) + } case MasterType: } return