Browse Source

volume servers always connect to the master leader

pull/442/head
Chris Lu 8 years ago
parent
commit
3065506b38
  1. 12
      weed/operation/list_masters.go
  2. 11
      weed/server/filer_server.go
  3. 3
      weed/server/volume_grpc_client.go
  4. 31
      weed/storage/store.go

12
weed/operation/list_masters.go

@ -13,20 +13,20 @@ type ClusterStatusResult struct {
Peers []string `json:"Peers,omitempty"`
}
func ListMasters(server string) ([]string, error) {
func ListMasters(server string) (leader string, peers []string, err error) {
jsonBlob, err := util.Get("http://" + server + "/cluster/status")
glog.V(2).Info("list masters result :", string(jsonBlob))
if err != nil {
return nil, err
return "", nil, err
}
var ret ClusterStatusResult
err = json.Unmarshal(jsonBlob, &ret)
if err != nil {
return nil, err
return "", nil, err
}
masters := ret.Peers
peers = ret.Peers
if ret.IsLeader {
masters = append(masters, ret.Leader)
peers = append(peers, ret.Leader)
}
return masters, nil
return ret.Leader, peers, nil
}

11
weed/server/filer_server.go

@ -117,17 +117,6 @@ func NewFilerServer(r *http.ServeMux, ip string, port int, master string, dir st
fs.masterNodes = storage.NewMasterNodes(fs.master)
glog.V(0).Infof("Filer server bootstraps with master %s", fs.getMasterNode())
//force initialize with all available master nodes
for {
_, err := fs.masterNodes.FindMaster()
if err != nil {
glog.Infof("filer server failed to get master cluster info:%s", err.Error())
time.Sleep(3 * time.Second)
} else {
break
}
}
for {
glog.V(4).Infof("Filer server sending to master %s", fs.getMasterNode())
master, err := fs.detectHealthyMaster(fs.getMasterNode())

3
weed/server/volume_grpc_client.go

@ -23,13 +23,14 @@ func (vs *VolumeServer) heartbeat() {
err := vs.doHeartbeat(time.Duration(vs.pulseSeconds) * time.Second)
if err != nil {
glog.V(0).Infof("heartbeat error: %v", err)
time.Sleep(time.Duration(3*vs.pulseSeconds) * time.Second)
time.Sleep(time.Duration(vs.pulseSeconds) * time.Second)
}
}
}
func (vs *VolumeServer) doHeartbeat(sleepInterval time.Duration) error {
vs.masterNodes.Reset()
masterNode, err := vs.masterNodes.FindMaster()
if err != nil {
return fmt.Errorf("No master found: %v", err)

31
weed/storage/store.go

@ -3,7 +3,6 @@ package storage
import (
"errors"
"fmt"
"math/rand"
"strconv"
"strings"
@ -18,48 +17,46 @@ const (
type MasterNodes struct {
nodes []string
lastNode int
leader string
}
func (mn *MasterNodes) String() string {
return fmt.Sprintf("nodes:%v, lastNode:%d", mn.nodes, mn.lastNode)
return fmt.Sprintf("nodes:%v, leader:%s", mn.nodes, mn.leader)
}
func NewMasterNodes(bootstrapNode string) (mn *MasterNodes) {
mn = &MasterNodes{nodes: []string{bootstrapNode}, lastNode: -1}
mn = &MasterNodes{nodes: []string{bootstrapNode}, leader: ""}
return
}
func (mn *MasterNodes) Reset() {
glog.V(4).Infof("Resetting master nodes: %v", mn)
if len(mn.nodes) > 1 && mn.lastNode >= 0 {
glog.V(0).Infof("Reset master %s from: %v", mn.nodes[mn.lastNode], mn.nodes)
mn.lastNode = -mn.lastNode - 1
if mn.leader != "" {
mn.leader = ""
glog.V(0).Infof("Resetting master nodes: %v", mn)
}
}
func (mn *MasterNodes) FindMaster() (string, error) {
func (mn *MasterNodes) FindMaster() (leader string, err error) {
if len(mn.nodes) == 0 {
return "", errors.New("No master node found!")
}
if mn.lastNode < 0 {
if mn.leader == "" {
for _, m := range mn.nodes {
glog.V(4).Infof("Listing masters on %s", m)
if masters, e := operation.ListMasters(m); e == nil {
if len(masters) == 0 {
continue
}
if leader, masters, e := operation.ListMasters(m); e == nil {
if leader != "" {
mn.nodes = append(masters, m)
mn.lastNode = rand.Intn(len(mn.nodes))
mn.leader = leader
glog.V(2).Infof("current master nodes is %v", mn)
break
}
} else {
glog.V(4).Infof("Failed listing masters on %s: %v", m, e)
}
}
}
if mn.lastNode < 0 {
if mn.leader == "" {
return "", errors.New("No master node available!")
}
return mn.nodes[mn.lastNode], nil
return mn.leader, nil
}
/*

Loading…
Cancel
Save