Chris Lu
3 years ago
5 changed files with 207 additions and 81 deletions
-
155weed/election/cluster.go
-
47weed/election/cluster_test.go
-
4weed/server/master_grpc_server_cluster.go
-
5weed/server/master_server.go
-
77weed/server/master_server_cluster.go
@ -0,0 +1,155 @@ |
|||
package election |
|||
|
|||
import ( |
|||
"github.com/chrislusf/seaweedfs/weed/pb" |
|||
"math" |
|||
"sync" |
|||
"time" |
|||
) |
|||
|
|||
type ClusterNode struct { |
|||
Address pb.ServerAddress |
|||
Version string |
|||
counter int |
|||
createdTs time.Time |
|||
} |
|||
|
|||
type Leaders struct { |
|||
leaders [3]pb.ServerAddress |
|||
} |
|||
|
|||
type Cluster struct { |
|||
nodes map[pb.ServerAddress]*ClusterNode |
|||
nodesLock sync.RWMutex |
|||
leaders *Leaders |
|||
} |
|||
|
|||
func NewCluster() *Cluster { |
|||
return &Cluster{ |
|||
nodes: make(map[pb.ServerAddress]*ClusterNode), |
|||
leaders: &Leaders{}, |
|||
} |
|||
} |
|||
|
|||
func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) { |
|||
switch nodeType { |
|||
case "filer": |
|||
cluster.nodesLock.Lock() |
|||
defer cluster.nodesLock.Unlock() |
|||
if existingNode, found := cluster.nodes[address]; found { |
|||
existingNode.counter++ |
|||
return |
|||
} |
|||
cluster.nodes[address] = &ClusterNode{ |
|||
Address: address, |
|||
Version: version, |
|||
counter: 1, |
|||
createdTs: time.Now(), |
|||
} |
|||
cluster.ensureLeader(true, address) |
|||
case "master": |
|||
} |
|||
} |
|||
|
|||
func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) { |
|||
switch nodeType { |
|||
case "filer": |
|||
cluster.nodesLock.Lock() |
|||
defer cluster.nodesLock.Unlock() |
|||
if existingNode, found := cluster.nodes[address]; !found { |
|||
return |
|||
} else { |
|||
existingNode.counter-- |
|||
if existingNode.counter <= 0 { |
|||
delete(cluster.nodes, address) |
|||
cluster.ensureLeader(false, address) |
|||
} |
|||
} |
|||
case "master": |
|||
} |
|||
} |
|||
|
|||
func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) { |
|||
switch nodeType { |
|||
case "filer": |
|||
cluster.nodesLock.RLock() |
|||
defer cluster.nodesLock.RUnlock() |
|||
for _, node := range cluster.nodes { |
|||
nodes = append(nodes, node) |
|||
} |
|||
case "master": |
|||
} |
|||
return |
|||
} |
|||
|
|||
func (cluster *Cluster) ensureLeader(isAdd bool, address pb.ServerAddress) { |
|||
if isAdd { |
|||
if cluster.leaders.addLeaderIfVacant(address) { |
|||
// has added the address as one leader
|
|||
} |
|||
} else { |
|||
if cluster.leaders.removeLeaderIfExists(address) { |
|||
// pick the freshest one, since it is less likely to go away
|
|||
var shortestDuration int64 = math.MaxInt64 |
|||
now := time.Now() |
|||
var candidateAddress pb.ServerAddress |
|||
for _, node := range cluster.nodes { |
|||
if cluster.leaders.isOneLeader(node.Address) { |
|||
continue |
|||
} |
|||
duration := now.Sub(node.createdTs).Nanoseconds() |
|||
if duration < shortestDuration { |
|||
shortestDuration = duration |
|||
candidateAddress = node.Address |
|||
} |
|||
} |
|||
if candidateAddress != "" { |
|||
cluster.leaders.addLeaderIfVacant(candidateAddress) |
|||
} |
|||
// removed the leader, and maybe added a new leader
|
|||
} |
|||
} |
|||
} |
|||
|
|||
func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) { |
|||
if leaders.isOneLeader(address) { |
|||
return |
|||
} |
|||
for i := 0; i < len(leaders.leaders); i++ { |
|||
if leaders.leaders[i] == "" { |
|||
leaders.leaders[i] = address |
|||
hasChanged = true |
|||
return |
|||
} |
|||
} |
|||
return |
|||
} |
|||
func (leaders *Leaders) removeLeaderIfExists(address pb.ServerAddress) (hasChanged bool) { |
|||
if !leaders.isOneLeader(address) { |
|||
return |
|||
} |
|||
for i := 0; i < len(leaders.leaders); i++ { |
|||
if leaders.leaders[i] == address { |
|||
leaders.leaders[i] = "" |
|||
hasChanged = true |
|||
return |
|||
} |
|||
} |
|||
return |
|||
} |
|||
func (leaders *Leaders) isOneLeader(address pb.ServerAddress) bool { |
|||
for i := 0; i < len(leaders.leaders); i++ { |
|||
if leaders.leaders[i] == address { |
|||
return true |
|||
} |
|||
} |
|||
return false |
|||
} |
|||
func (leaders *Leaders) GetLeaders() (addresses []pb.ServerAddress) { |
|||
for i := 0; i < len(leaders.leaders); i++ { |
|||
if leaders.leaders[i] != "" { |
|||
addresses = append(addresses, leaders.leaders[i]) |
|||
} |
|||
} |
|||
return |
|||
} |
@ -0,0 +1,47 @@ |
|||
package election |
|||
|
|||
import ( |
|||
"github.com/chrislusf/seaweedfs/weed/pb" |
|||
"github.com/stretchr/testify/assert" |
|||
"testing" |
|||
) |
|||
|
|||
func TestClusterAddRemoveNodes(t *testing.T) { |
|||
c := NewCluster() |
|||
|
|||
c.AddClusterNode("filer", pb.ServerAddress("111:1"), "23.45") |
|||
c.AddClusterNode("filer", pb.ServerAddress("111:2"), "23.45") |
|||
assert.Equal(t, []pb.ServerAddress{ |
|||
pb.ServerAddress("111:1"), |
|||
pb.ServerAddress("111:2"), |
|||
}, c.leaders.GetLeaders()) |
|||
|
|||
c.AddClusterNode("filer", pb.ServerAddress("111:3"), "23.45") |
|||
c.AddClusterNode("filer", pb.ServerAddress("111:4"), "23.45") |
|||
assert.Equal(t, []pb.ServerAddress{ |
|||
pb.ServerAddress("111:1"), |
|||
pb.ServerAddress("111:2"), |
|||
pb.ServerAddress("111:3"), |
|||
}, c.leaders.GetLeaders()) |
|||
|
|||
c.AddClusterNode("filer", pb.ServerAddress("111:5"), "23.45") |
|||
c.AddClusterNode("filer", pb.ServerAddress("111:6"), "23.45") |
|||
c.RemoveClusterNode("filer", pb.ServerAddress("111:4")) |
|||
assert.Equal(t, []pb.ServerAddress{ |
|||
pb.ServerAddress("111:1"), |
|||
pb.ServerAddress("111:2"), |
|||
pb.ServerAddress("111:3"), |
|||
}, c.leaders.GetLeaders()) |
|||
|
|||
// remove oldest
|
|||
c.RemoveClusterNode("filer", pb.ServerAddress("111:1")) |
|||
assert.Equal(t, []pb.ServerAddress{ |
|||
pb.ServerAddress("111:6"), |
|||
pb.ServerAddress("111:2"), |
|||
pb.ServerAddress("111:3"), |
|||
}, c.leaders.GetLeaders()) |
|||
|
|||
// remove oldest
|
|||
c.RemoveClusterNode("filer", pb.ServerAddress("111:1")) |
|||
|
|||
} |
@ -1,77 +0,0 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"github.com/chrislusf/seaweedfs/weed/pb" |
|||
"sync" |
|||
) |
|||
|
|||
type NodeType int |
|||
|
|||
const ( |
|||
filerNodeType NodeType = iota |
|||
) |
|||
|
|||
type ClusterNode struct { |
|||
address pb.ServerAddress |
|||
version string |
|||
counter int |
|||
} |
|||
|
|||
type Cluster struct { |
|||
filers map[pb.ServerAddress]*ClusterNode |
|||
filersLock sync.RWMutex |
|||
} |
|||
|
|||
func NewCluster() *Cluster { |
|||
return &Cluster{ |
|||
filers: make(map[pb.ServerAddress]*ClusterNode), |
|||
} |
|||
} |
|||
|
|||
func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) { |
|||
switch nodeType { |
|||
case "filer": |
|||
cluster.filersLock.Lock() |
|||
defer cluster.filersLock.Unlock() |
|||
if existingNode, found := cluster.filers[address]; found { |
|||
existingNode.counter++ |
|||
return |
|||
} |
|||
cluster.filers[address] = &ClusterNode{ |
|||
address: address, |
|||
version: version, |
|||
counter: 1, |
|||
} |
|||
case "master": |
|||
} |
|||
} |
|||
|
|||
func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) { |
|||
switch nodeType { |
|||
case "filer": |
|||
cluster.filersLock.Lock() |
|||
defer cluster.filersLock.Unlock() |
|||
if existingNode, found := cluster.filers[address]; !found { |
|||
return |
|||
} else { |
|||
existingNode.counter-- |
|||
if existingNode.counter <= 0 { |
|||
delete(cluster.filers, address) |
|||
} |
|||
} |
|||
case "master": |
|||
} |
|||
} |
|||
|
|||
func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) { |
|||
switch nodeType { |
|||
case "filer": |
|||
cluster.filersLock.RLock() |
|||
defer cluster.filersLock.RUnlock() |
|||
for _, node := range cluster.filers { |
|||
nodes = append(nodes, node) |
|||
} |
|||
case "master": |
|||
} |
|||
return |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue