diff --git a/go/operation/list_masters.go b/go/operation/list_masters.go new file mode 100644 index 000000000..05235aed0 --- /dev/null +++ b/go/operation/list_masters.go @@ -0,0 +1,27 @@ +package operation + +import ( + "code.google.com/p/weed-fs/go/glog" + "code.google.com/p/weed-fs/go/util" + "encoding/json" +) + +type ClusterStatusResult struct { + IsLeader bool + Leader string + Peers []string +} + +func ListMasters(server string) ([]string, error) { + jsonBlob, err := util.Get("http://" + server + "/cluster/status") + glog.V(2).Info("list masters result :", string(jsonBlob)) + if err != nil { + return nil, err + } + var ret ClusterStatusResult + err = json.Unmarshal(jsonBlob, &ret) + if err != nil { + return nil, err + } + return ret.Peers, nil +} diff --git a/go/operation/lookup_volume_id.go b/go/operation/lookup_volume_id.go index cd1d3b1bd..6e6035fae 100644 --- a/go/operation/lookup_volume_id.go +++ b/go/operation/lookup_volume_id.go @@ -1,12 +1,12 @@ package operation import ( - "code.google.com/p/weed-fs/go/storage" "code.google.com/p/weed-fs/go/util" "encoding/json" "errors" _ "fmt" "net/url" + "strings" ) type Location struct { @@ -18,9 +18,9 @@ type LookupResult struct { Error string `json:"error"` } -func Lookup(server string, vid storage.VolumeId) (*LookupResult, error) { +func Lookup(server string, vid string) (*LookupResult, error) { values := make(url.Values) - values.Add("volumeId", vid.String()) + values.Add("volumeId", vid) jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values) if err != nil { return nil, err @@ -37,11 +37,11 @@ func Lookup(server string, vid storage.VolumeId) (*LookupResult, error) { } func LookupFileId(server string, fileId string) (fullUrl string, err error) { - fid, parseErr := storage.ParseFileId(fileId) - if parseErr != nil { - return "", parseErr + a := strings.Split(fileId, ",") + if len(a) != 2 { + return "", errors.New("Invalid fileId " + fileId) } - lookup, lookupError := Lookup(server, fid.VolumeId) + lookup, lookupError := Lookup(server, a[0]) if lookupError != nil { return "", lookupError } diff --git a/go/operation/allocate_volume.go b/go/replication/allocate_volume.go similarity index 97% rename from go/operation/allocate_volume.go rename to go/replication/allocate_volume.go index 3f96583e5..0f5ebc00f 100644 --- a/go/operation/allocate_volume.go +++ b/go/replication/allocate_volume.go @@ -1,4 +1,4 @@ -package operation +package replication import ( "code.google.com/p/weed-fs/go/storage" diff --git a/go/replication/store_replicate.go b/go/replication/store_replicate.go index bc630c5d1..3e709de44 100644 --- a/go/replication/store_replicate.go +++ b/go/replication/store_replicate.go @@ -71,7 +71,7 @@ func ReplicatedDelete(masterNode string, store *storage.Store, volumeId storage. } func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) bool) bool { - if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId); lookupErr == nil { + if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil { length := 0 selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port)) results := make(chan bool) diff --git a/go/replication/volume_growth.go b/go/replication/volume_growth.go index 6e5bf1f5c..d7d1c90bd 100644 --- a/go/replication/volume_growth.go +++ b/go/replication/volume_growth.go @@ -2,7 +2,6 @@ package replication import ( "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/operation" "code.google.com/p/weed-fs/go/storage" "code.google.com/p/weed-fs/go/topology" "errors" @@ -200,7 +199,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, collection string, repType } func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, collection string, repType storage.ReplicationType, servers ...*topology.DataNode) error { for _, server := range servers { - if err := operation.AllocateVolume(server, vid, collection, repType); err == nil { + if err := AllocateVolume(server, vid, collection, repType); err == nil { vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: collection, RepType: repType, Version: storage.CurrentVersion} server.AddOrUpdateVolume(vi) topo.RegisterVolumeLayout(&vi, server) diff --git a/go/storage/store.go b/go/storage/store.go index 84386cd86..52e78d27d 100644 --- a/go/storage/store.go +++ b/go/storage/store.go @@ -2,10 +2,13 @@ package storage import ( "code.google.com/p/weed-fs/go/glog" + "code.google.com/p/weed-fs/go/operation" "code.google.com/p/weed-fs/go/util" "encoding/json" + "errors" "fmt" "io/ioutil" + "math/rand" "net/url" "strconv" "strings" @@ -16,16 +19,53 @@ type DiskLocation struct { maxVolumeCount int volumes map[VolumeId]*Volume } +type MasterNodes struct { + nodes []string + lastNode int +} + +func NewMasterNodes(bootstrapNode string) (mn *MasterNodes) { + mn = &MasterNodes{nodes: []string{bootstrapNode}, lastNode: -1} + return +} +func (mn *MasterNodes) reset() { + if len(mn.nodes) > 1 && mn.lastNode > 0 { + mn.lastNode = -mn.lastNode + } +} +func (mn *MasterNodes) findMaster() (string, error) { + if len(mn.nodes) == 0 { + return "", errors.New("No master node found!") + } + if mn.lastNode < 0 { + for _, m := range mn.nodes { + if masters, e := operation.ListMasters(m); e == nil { + mn.nodes = masters + mn.lastNode = rand.Intn(len(mn.nodes)) + glog.V(2).Info("current master node is :", mn.nodes[mn.lastNode]) + break + } + } + } + if len(mn.nodes) == 1 { + return mn.nodes[0], nil + } + if mn.lastNode < 0 { + return "", errors.New("No master node avalable!") + } + return mn.nodes[mn.lastNode], nil +} + type Store struct { Port int Ip string PublicUrl string locations []*DiskLocation - masterNode string dataCenter string //optional informaton, overwriting master setting if exists rack string //optional information, overwriting master setting if exists connected bool volumeSizeLimit uint64 //read from the master + masterNodes *MasterNodes } func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int) (s *Store) { @@ -199,16 +239,21 @@ type JoinResult struct { VolumeSizeLimit uint64 } -func (s *Store) SetMaster(mserver string) { - s.masterNode = mserver -} func (s *Store) SetDataCenter(dataCenter string) { s.dataCenter = dataCenter } func (s *Store) SetRack(rack string) { s.rack = rack } + +func (s *Store) SetBootstrapMaster(bootstrapMaster string) { + s.masterNodes = NewMasterNodes(bootstrapMaster) +} func (s *Store) Join() error { + masterNode, e := s.masterNodes.findMaster() + if e != nil { + return e + } stats := new([]*VolumeInfo) maxVolumeCount := 0 for _, location := range s.locations { @@ -237,8 +282,9 @@ func (s *Store) Join() error { values.Add("maxVolumeCount", strconv.Itoa(maxVolumeCount)) values.Add("dataCenter", s.dataCenter) values.Add("rack", s.rack) - jsonBlob, err := util.Post("http://"+s.masterNode+"/dir/join", values) + jsonBlob, err := util.Post("http://"+masterNode+"/dir/join", values) if err != nil { + s.masterNodes.reset() return err } var ret JoinResult diff --git a/go/util/post.go b/go/util/http_util.go similarity index 58% rename from go/util/post.go rename to go/util/http_util.go index cbc6dcfd5..80589dcfa 100644 --- a/go/util/post.go +++ b/go/util/http_util.go @@ -21,3 +21,18 @@ func Post(url string, values url.Values) ([]byte, error) { } return b, nil } + +func Get(url string) ([]byte, error) { + r, err := http.Get(url) + if err != nil { + glog.V(0).Infoln("getting ", url, err) + return nil, err + } + defer r.Body.Close() + b, err := ioutil.ReadAll(r.Body) + if err != nil { + glog.V(0).Infoln("read get result from", url, err) + return nil, err + } + return b, nil +} diff --git a/go/weed/master.go b/go/weed/master.go index 146974166..97def1948 100644 --- a/go/weed/master.go +++ b/go/weed/master.go @@ -28,8 +28,9 @@ var cmdMaster = &Command{ var ( mport = cmdMaster.Flag.Int("port", 9333, "http listen port") - mip = cmdMaster.Flag.String("ip", "localhost", "http listen port") + masterIp = cmdMaster.Flag.String("ip", "", "master ip address") metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data") + masterPeers = cmdMaster.Flag.String("peers", "", "other master nodes in comma separated ip:port list") volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1000, "Default Volume Size in MegaBytes") mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file") @@ -55,16 +56,28 @@ func runMaster(cmd *Command, args []string) bool { } r := mux.NewRouter() - weed_server.NewMasterServer(r, VERSION, *mport, *metaFolder, + ms := weed_server.NewMasterServer(r, VERSION, *mport, *metaFolder, *volumeSizeLimitMB, *mpulse, *confFile, *defaultRepType, *garbageThreshold, masterWhiteList, ) - glog.V(0).Infoln("Start Weed Master", VERSION, "at port", *mip+":"+strconv.Itoa(*mport)) + glog.V(0).Infoln("Start Weed Master", VERSION, "at port", *masterIp+":"+strconv.Itoa(*mport)) + srv := &http.Server{ - Addr: *mip + ":" + strconv.Itoa(*mport), + Addr: *masterIp + ":" + strconv.Itoa(*mport), Handler: r, ReadTimeout: time.Duration(*mReadTimeout) * time.Second, } + + go func() { + time.Sleep(100 * time.Millisecond) + var peers []string + if *masterPeers != "" { + peers = strings.Split(*masterPeers, ",") + } + raftServer := weed_server.NewRaftServer(r, VERSION, peers, *masterIp+":"+strconv.Itoa(*mport), *metaFolder) + ms.SetRaftServer(raftServer) + }() + e := srv.ListenAndServe() if e != nil { glog.Fatalf("Fail to start:%s", e) diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go index 732097bde..f42585da2 100644 --- a/go/weed/weed_server/volume_server.go +++ b/go/weed/weed_server/volume_server.go @@ -43,7 +43,7 @@ func NewVolumeServer(r *http.ServeMux, version string, ip string, port int, publ go func() { connected := true - vs.store.SetMaster(vs.masterNode) + vs.store.SetBootstrapMaster(vs.masterNode) vs.store.SetDataCenter(vs.dataCenter) vs.store.SetRack(vs.rack) for { @@ -58,7 +58,11 @@ func NewVolumeServer(r *http.ServeMux, version string, ip string, port int, publ connected = false } } - time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)*(1+rand.Float32())) * time.Millisecond) + if connected { + time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)*(1+rand.Float32())) * time.Millisecond) + } else { + time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)* 0.25) * time.Millisecond) + } } }() glog.V(0).Infoln("store joined at", vs.masterNode) diff --git a/go/weed/weed_server/volume_server_handlers.go b/go/weed/weed_server/volume_server_handlers.go index c7671fe37..2f4673763 100644 --- a/go/weed/weed_server/volume_server_handlers.go +++ b/go/weed/weed_server/volume_server_handlers.go @@ -102,7 +102,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, glog.V(2).Infoln("volume", volumeId, "reading", n) if !vs.store.HasVolume(volumeId) { - lookupResult, err := operation.Lookup(vs.masterNode, volumeId) + lookupResult, err := operation.Lookup(vs.masterNode, volumeId.String()) glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err) if err == nil && len(lookupResult.Locations) > 0 { http.Redirect(w, r, "http://"+lookupResult.Locations[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently)