|
|
@ -10,7 +10,8 @@ import ( |
|
|
|
|
|
|
|
const ( |
|
|
|
MasterType = "master" |
|
|
|
FilerType = "filer" |
|
|
|
FilerType = "filer" |
|
|
|
BrokerType = "broker" |
|
|
|
) |
|
|
|
|
|
|
|
type ClusterNode struct { |
|
|
@ -25,34 +26,34 @@ type Leaders struct { |
|
|
|
} |
|
|
|
|
|
|
|
type Cluster struct { |
|
|
|
nodes map[pb.ServerAddress]*ClusterNode |
|
|
|
nodesLock sync.RWMutex |
|
|
|
leaders *Leaders |
|
|
|
filers map[pb.ServerAddress]*ClusterNode |
|
|
|
filersLock sync.RWMutex |
|
|
|
filerLeaders *Leaders |
|
|
|
} |
|
|
|
|
|
|
|
func NewCluster() *Cluster { |
|
|
|
return &Cluster{ |
|
|
|
nodes: make(map[pb.ServerAddress]*ClusterNode), |
|
|
|
leaders: &Leaders{}, |
|
|
|
filers: make(map[pb.ServerAddress]*ClusterNode), |
|
|
|
filerLeaders: &Leaders{}, |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse { |
|
|
|
switch nodeType { |
|
|
|
case FilerType: |
|
|
|
cluster.nodesLock.Lock() |
|
|
|
defer cluster.nodesLock.Unlock() |
|
|
|
if existingNode, found := cluster.nodes[address]; found { |
|
|
|
cluster.filersLock.Lock() |
|
|
|
defer cluster.filersLock.Unlock() |
|
|
|
if existingNode, found := cluster.filers[address]; found { |
|
|
|
existingNode.counter++ |
|
|
|
return nil |
|
|
|
} |
|
|
|
cluster.nodes[address] = &ClusterNode{ |
|
|
|
cluster.filers[address] = &ClusterNode{ |
|
|
|
Address: address, |
|
|
|
Version: version, |
|
|
|
counter: 1, |
|
|
|
createdTs: time.Now(), |
|
|
|
} |
|
|
|
return cluster.ensureLeader(true, nodeType, address) |
|
|
|
return cluster.ensureFilerLeaders(true, nodeType, address) |
|
|
|
case MasterType: |
|
|
|
} |
|
|
|
return nil |
|
|
@ -61,15 +62,15 @@ func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress |
|
|
|
func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse { |
|
|
|
switch nodeType { |
|
|
|
case FilerType: |
|
|
|
cluster.nodesLock.Lock() |
|
|
|
defer cluster.nodesLock.Unlock() |
|
|
|
if existingNode, found := cluster.nodes[address]; !found { |
|
|
|
cluster.filersLock.Lock() |
|
|
|
defer cluster.filersLock.Unlock() |
|
|
|
if existingNode, found := cluster.filers[address]; !found { |
|
|
|
return nil |
|
|
|
} else { |
|
|
|
existingNode.counter-- |
|
|
|
if existingNode.counter <= 0 { |
|
|
|
delete(cluster.nodes, address) |
|
|
|
return cluster.ensureLeader(false, nodeType, address) |
|
|
|
delete(cluster.filers, address) |
|
|
|
return cluster.ensureFilerLeaders(false, nodeType, address) |
|
|
|
} |
|
|
|
} |
|
|
|
case MasterType: |
|
|
@ -80,9 +81,9 @@ func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddr |
|
|
|
func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) { |
|
|
|
switch nodeType { |
|
|
|
case FilerType: |
|
|
|
cluster.nodesLock.RLock() |
|
|
|
defer cluster.nodesLock.RUnlock() |
|
|
|
for _, node := range cluster.nodes { |
|
|
|
cluster.filersLock.RLock() |
|
|
|
defer cluster.filersLock.RUnlock() |
|
|
|
for _, node := range cluster.filers { |
|
|
|
nodes = append(nodes, node) |
|
|
|
} |
|
|
|
case MasterType: |
|
|
@ -91,12 +92,12 @@ func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) |
|
|
|
} |
|
|
|
|
|
|
|
func (cluster *Cluster) IsOneLeader(address pb.ServerAddress) bool { |
|
|
|
return cluster.leaders.isOneLeader(address) |
|
|
|
return cluster.filerLeaders.isOneLeader(address) |
|
|
|
} |
|
|
|
|
|
|
|
func (cluster *Cluster) ensureLeader(isAdd bool, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) { |
|
|
|
func (cluster *Cluster) ensureFilerLeaders(isAdd bool, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) { |
|
|
|
if isAdd { |
|
|
|
if cluster.leaders.addLeaderIfVacant(address) { |
|
|
|
if cluster.filerLeaders.addLeaderIfVacant(address) { |
|
|
|
// has added the address as one leader
|
|
|
|
result = append(result, &master_pb.KeepConnectedResponse{ |
|
|
|
ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ |
|
|
@ -117,7 +118,7 @@ func (cluster *Cluster) ensureLeader(isAdd bool, nodeType string, address pb.Ser |
|
|
|
}) |
|
|
|
} |
|
|
|
} else { |
|
|
|
if cluster.leaders.removeLeaderIfExists(address) { |
|
|
|
if cluster.filerLeaders.removeLeaderIfExists(address) { |
|
|
|
|
|
|
|
result = append(result, &master_pb.KeepConnectedResponse{ |
|
|
|
ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ |
|
|
@ -132,8 +133,8 @@ func (cluster *Cluster) ensureLeader(isAdd bool, nodeType string, address pb.Ser |
|
|
|
var shortestDuration int64 = math.MaxInt64 |
|
|
|
now := time.Now() |
|
|
|
var candidateAddress pb.ServerAddress |
|
|
|
for _, node := range cluster.nodes { |
|
|
|
if cluster.leaders.isOneLeader(node.Address) { |
|
|
|
for _, node := range cluster.filers { |
|
|
|
if cluster.filerLeaders.isOneLeader(node.Address) { |
|
|
|
continue |
|
|
|
} |
|
|
|
duration := now.Sub(node.createdTs).Nanoseconds() |
|
|
@ -143,7 +144,7 @@ func (cluster *Cluster) ensureLeader(isAdd bool, nodeType string, address pb.Ser |
|
|
|
} |
|
|
|
} |
|
|
|
if candidateAddress != "" { |
|
|
|
cluster.leaders.addLeaderIfVacant(candidateAddress) |
|
|
|
cluster.filerLeaders.addLeaderIfVacant(candidateAddress) |
|
|
|
// added a new leader
|
|
|
|
result = append(result, &master_pb.KeepConnectedResponse{ |
|
|
|
ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{ |
|
|
|