Browse Source

send peers info to filers

pull/2427/head
Chris Lu 3 years ago
parent
commit
84bb8e7365
  1. 64
      weed/election/cluster.go
  2. 35
      weed/server/master_grpc_server.go
  3. 4
      weed/server/master_server.go
  4. 9
      weed/wdclient/masterclient.go

64
weed/election/cluster.go

@ -2,6 +2,7 @@ package election
import ( import (
"github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"math" "math"
"sync" "sync"
"time" "time"
@ -31,14 +32,14 @@ func NewCluster() *Cluster {
} }
} }
func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) {
func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse {
switch nodeType { switch nodeType {
case "filer": case "filer":
cluster.nodesLock.Lock() cluster.nodesLock.Lock()
defer cluster.nodesLock.Unlock() defer cluster.nodesLock.Unlock()
if existingNode, found := cluster.nodes[address]; found { if existingNode, found := cluster.nodes[address]; found {
existingNode.counter++ existingNode.counter++
return
return nil
} }
cluster.nodes[address] = &ClusterNode{ cluster.nodes[address] = &ClusterNode{
Address: address, Address: address,
@ -46,27 +47,29 @@ func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress
counter: 1, counter: 1,
createdTs: time.Now(), createdTs: time.Now(),
} }
cluster.ensureLeader(true, address)
return cluster.ensureLeader(true, nodeType, address)
case "master": case "master":
} }
return nil
} }
func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) {
func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse {
switch nodeType { switch nodeType {
case "filer": case "filer":
cluster.nodesLock.Lock() cluster.nodesLock.Lock()
defer cluster.nodesLock.Unlock() defer cluster.nodesLock.Unlock()
if existingNode, found := cluster.nodes[address]; !found { if existingNode, found := cluster.nodes[address]; !found {
return
return nil
} else { } else {
existingNode.counter-- existingNode.counter--
if existingNode.counter <= 0 { if existingNode.counter <= 0 {
delete(cluster.nodes, address) delete(cluster.nodes, address)
cluster.ensureLeader(false, address)
return cluster.ensureLeader(false, nodeType, address)
} }
} }
case "master": case "master":
} }
return nil
} }
func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) { func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) {
@ -82,13 +85,40 @@ func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode)
return return
} }
func (cluster *Cluster) ensureLeader(isAdd bool, address pb.ServerAddress) {
func (cluster *Cluster) ensureLeader(isAdd bool, nodeType string, address pb.ServerAddress) (result []*master_pb.KeepConnectedResponse) {
if isAdd { if isAdd {
if cluster.leaders.addLeaderIfVacant(address) { if cluster.leaders.addLeaderIfVacant(address) {
// has added the address as one leader // has added the address as one leader
result = append(result, &master_pb.KeepConnectedResponse{
ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
NodeType: nodeType,
Address: string(address),
IsLeader: true,
IsAdd: true,
},
})
} else {
result = append(result, &master_pb.KeepConnectedResponse{
ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
NodeType: nodeType,
Address: string(address),
IsLeader: false,
IsAdd: true,
},
})
} }
} else { } else {
if cluster.leaders.removeLeaderIfExists(address) { if cluster.leaders.removeLeaderIfExists(address) {
result = append(result, &master_pb.KeepConnectedResponse{
ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
NodeType: nodeType,
Address: string(address),
IsLeader: true,
IsAdd: false,
},
})
// pick the freshest one, since it is less likely to go away // pick the freshest one, since it is less likely to go away
var shortestDuration int64 = math.MaxInt64 var shortestDuration int64 = math.MaxInt64
now := time.Now() now := time.Now()
@ -105,10 +135,28 @@ func (cluster *Cluster) ensureLeader(isAdd bool, address pb.ServerAddress) {
} }
if candidateAddress != "" { if candidateAddress != "" {
cluster.leaders.addLeaderIfVacant(candidateAddress) cluster.leaders.addLeaderIfVacant(candidateAddress)
// added a new leader
result = append(result, &master_pb.KeepConnectedResponse{
ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
NodeType: nodeType,
Address: string(candidateAddress),
IsLeader: true,
IsAdd: true,
},
})
} }
// removed the leader, and maybe added a new leader
} else {
result = append(result, &master_pb.KeepConnectedResponse{
ClusterNodeUpdate: &master_pb.ClusterNodeUpdate{
NodeType: nodeType,
Address: string(address),
IsLeader: false,
IsAdd: false,
},
})
} }
} }
return
} }
func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) { func (leaders *Leaders) addLeaderIfVacant(address pb.ServerAddress) (hasChanged bool) {

35
weed/server/master_grpc_server.go

@ -44,11 +44,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
} }
if len(message.DeletedVids) > 0 { if len(message.DeletedVids) > 0 {
ms.clientChansLock.RLock()
for _, ch := range ms.clientChans {
ch <- message
}
ms.clientChansLock.RUnlock()
ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: message})
} }
} }
}() }()
@ -153,12 +149,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
} }
if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 { if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 {
ms.clientChansLock.RLock()
for host, ch := range ms.clientChans {
glog.V(0).Infof("master send to %s: %s", host, message.String())
ch <- message
}
ms.clientChansLock.RUnlock()
ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: message})
} }
// tell the volume servers about the leader // tell the volume servers about the leader
@ -195,10 +186,14 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
stopChan := make(chan bool, 1) stopChan := make(chan bool, 1)
clientName, messageChan := ms.addClient(req.ClientType, peerAddress) clientName, messageChan := ms.addClient(req.ClientType, peerAddress)
ms.Cluster.AddClusterNode(req.ClientType, peerAddress, req.Version)
for _, update := range ms.Cluster.AddClusterNode(req.ClientType, peerAddress, req.Version) {
ms.broadcastToClients(update)
}
defer func() { defer func() {
ms.Cluster.RemoveClusterNode(req.ClientType, peerAddress)
for _, update := range ms.Cluster.RemoveClusterNode(req.ClientType, peerAddress) {
ms.broadcastToClients(update)
}
ms.deleteClient(clientName) ms.deleteClient(clientName)
}() }()
@ -223,7 +218,7 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
for { for {
select { select {
case message := <-messageChan: case message := <-messageChan:
if err := stream.Send(&master_pb.KeepConnectedResponse{VolumeLocation: message}); err != nil {
if err := stream.Send(message); err != nil {
glog.V(0).Infof("=> client %v: %+v", clientName, message) glog.V(0).Infof("=> client %v: %+v", clientName, message)
return err return err
} }
@ -238,6 +233,14 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
} }
func (ms *MasterServer) broadcastToClients(message *master_pb.KeepConnectedResponse) {
ms.clientChansLock.RLock()
for _, ch := range ms.clientChans {
ch <- message
}
ms.clientChansLock.RUnlock()
}
func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedServer) error { func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedServer) error {
leader, err := ms.Topo.Leader() leader, err := ms.Topo.Leader()
if err != nil { if err != nil {
@ -254,7 +257,7 @@ func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedSe
return nil return nil
} }
func (ms *MasterServer) addClient(clientType string, clientAddress pb.ServerAddress) (clientName string, messageChan chan *master_pb.VolumeLocation) {
func (ms *MasterServer) addClient(clientType string, clientAddress pb.ServerAddress) (clientName string, messageChan chan *master_pb.KeepConnectedResponse) {
clientName = clientType + "@" + string(clientAddress) clientName = clientType + "@" + string(clientAddress)
glog.V(0).Infof("+ client %v", clientName) glog.V(0).Infof("+ client %v", clientName)
@ -263,7 +266,7 @@ func (ms *MasterServer) addClient(clientType string, clientAddress pb.ServerAddr
// trying to send to it in SendHeartbeat and so we can't lock the // trying to send to it in SendHeartbeat and so we can't lock the
// clientChansLock to remove the channel and we're stuck writing to it // clientChansLock to remove the channel and we're stuck writing to it
// 100 is probably overkill // 100 is probably overkill
messageChan = make(chan *master_pb.VolumeLocation, 100)
messageChan = make(chan *master_pb.KeepConnectedResponse, 100)
ms.clientChansLock.Lock() ms.clientChansLock.Lock()
ms.clientChans[clientName] = messageChan ms.clientChans[clientName] = messageChan

4
weed/server/master_server.go

@ -61,7 +61,7 @@ type MasterServer struct {
// notifying clients // notifying clients
clientChansLock sync.RWMutex clientChansLock sync.RWMutex
clientChans map[string]chan *master_pb.VolumeLocation
clientChans map[string]chan *master_pb.KeepConnectedResponse
grpcDialOption grpc.DialOption grpcDialOption grpc.DialOption
@ -102,7 +102,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddre
option: option, option: option,
preallocateSize: preallocateSize, preallocateSize: preallocateSize,
vgCh: make(chan *topology.VolumeGrowRequest, 1<<6), vgCh: make(chan *topology.VolumeGrowRequest, 1<<6),
clientChans: make(map[string]chan *master_pb.VolumeLocation),
clientChans: make(map[string]chan *master_pb.KeepConnectedResponse),
grpcDialOption: grpcDialOption, grpcDialOption: grpcDialOption,
MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Master, "", peers), MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Master, "", peers),
adminLocks: NewAdminLocks(), adminLocks: NewAdminLocks(),

9
weed/wdclient/masterclient.go

@ -149,6 +149,15 @@ func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedL
} }
} }
if resp.ClusterNodeUpdate != nil {
update := resp.ClusterNodeUpdate
if update.IsAdd {
glog.V(0).Infof("+ %s %s leader:%v\n", update.NodeType, update.Address, update.IsLeader)
} else {
glog.V(0).Infof("- %s %s leader:%v\n", update.NodeType, update.Address, update.IsLeader)
}
}
} }
}) })

Loading…
Cancel
Save