Browse Source

1. volume server auto detect clustered master nodes

2. remove operation package dependency on storage
pull/2/head
Chris Lu 11 years ago
parent
commit
edae676913
  1. 27
      go/operation/list_masters.go
  2. 14
      go/operation/lookup_volume_id.go
  3. 2
      go/replication/allocate_volume.go
  4. 2
      go/replication/store_replicate.go
  5. 3
      go/replication/volume_growth.go
  6. 56
      go/storage/store.go
  7. 15
      go/util/http_util.go
  8. 21
      go/weed/master.go
  9. 8
      go/weed/weed_server/volume_server.go
  10. 2
      go/weed/weed_server/volume_server_handlers.go

27
go/operation/list_masters.go

@ -0,0 +1,27 @@
package operation
import (
"code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/util"
"encoding/json"
)
type ClusterStatusResult struct {
IsLeader bool
Leader string
Peers []string
}
func ListMasters(server string) ([]string, error) {
jsonBlob, err := util.Get("http://" + server + "/cluster/status")
glog.V(2).Info("list masters result :", string(jsonBlob))
if err != nil {
return nil, err
}
var ret ClusterStatusResult
err = json.Unmarshal(jsonBlob, &ret)
if err != nil {
return nil, err
}
return ret.Peers, nil
}

14
go/operation/lookup_volume_id.go

@ -1,12 +1,12 @@
package operation package operation
import ( import (
"code.google.com/p/weed-fs/go/storage"
"code.google.com/p/weed-fs/go/util" "code.google.com/p/weed-fs/go/util"
"encoding/json" "encoding/json"
"errors" "errors"
_ "fmt" _ "fmt"
"net/url" "net/url"
"strings"
) )
type Location struct { type Location struct {
@ -18,9 +18,9 @@ type LookupResult struct {
Error string `json:"error"` Error string `json:"error"`
} }
func Lookup(server string, vid storage.VolumeId) (*LookupResult, error) {
func Lookup(server string, vid string) (*LookupResult, error) {
values := make(url.Values) values := make(url.Values)
values.Add("volumeId", vid.String())
values.Add("volumeId", vid)
jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values) jsonBlob, err := util.Post("http://"+server+"/dir/lookup", values)
if err != nil { if err != nil {
return nil, err return nil, err
@ -37,11 +37,11 @@ func Lookup(server string, vid storage.VolumeId) (*LookupResult, error) {
} }
func LookupFileId(server string, fileId string) (fullUrl string, err error) { func LookupFileId(server string, fileId string) (fullUrl string, err error) {
fid, parseErr := storage.ParseFileId(fileId)
if parseErr != nil {
return "", parseErr
a := strings.Split(fileId, ",")
if len(a) != 2 {
return "", errors.New("Invalid fileId " + fileId)
} }
lookup, lookupError := Lookup(server, fid.VolumeId)
lookup, lookupError := Lookup(server, a[0])
if lookupError != nil { if lookupError != nil {
return "", lookupError return "", lookupError
} }

2
go/operation/allocate_volume.go → go/replication/allocate_volume.go

@ -1,4 +1,4 @@
package operation
package replication
import ( import (
"code.google.com/p/weed-fs/go/storage" "code.google.com/p/weed-fs/go/storage"

2
go/replication/store_replicate.go

@ -71,7 +71,7 @@ func ReplicatedDelete(masterNode string, store *storage.Store, volumeId storage.
} }
func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) bool) bool { func distributedOperation(masterNode string, store *storage.Store, volumeId storage.VolumeId, op func(location operation.Location) bool) bool {
if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId); lookupErr == nil {
if lookupResult, lookupErr := operation.Lookup(masterNode, volumeId.String()); lookupErr == nil {
length := 0 length := 0
selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port)) selfUrl := (store.Ip + ":" + strconv.Itoa(store.Port))
results := make(chan bool) results := make(chan bool)

3
go/replication/volume_growth.go

@ -2,7 +2,6 @@ package replication
import ( import (
"code.google.com/p/weed-fs/go/glog" "code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/operation"
"code.google.com/p/weed-fs/go/storage" "code.google.com/p/weed-fs/go/storage"
"code.google.com/p/weed-fs/go/topology" "code.google.com/p/weed-fs/go/topology"
"errors" "errors"
@ -200,7 +199,7 @@ func (vg *VolumeGrowth) GrowByCountAndType(count int, collection string, repType
} }
func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, collection string, repType storage.ReplicationType, servers ...*topology.DataNode) error { func (vg *VolumeGrowth) grow(topo *topology.Topology, vid storage.VolumeId, collection string, repType storage.ReplicationType, servers ...*topology.DataNode) error {
for _, server := range servers { for _, server := range servers {
if err := operation.AllocateVolume(server, vid, collection, repType); err == nil {
if err := AllocateVolume(server, vid, collection, repType); err == nil {
vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: collection, RepType: repType, Version: storage.CurrentVersion} vi := storage.VolumeInfo{Id: vid, Size: 0, Collection: collection, RepType: repType, Version: storage.CurrentVersion}
server.AddOrUpdateVolume(vi) server.AddOrUpdateVolume(vi)
topo.RegisterVolumeLayout(&vi, server) topo.RegisterVolumeLayout(&vi, server)

56
go/storage/store.go

@ -2,10 +2,13 @@ package storage
import ( import (
"code.google.com/p/weed-fs/go/glog" "code.google.com/p/weed-fs/go/glog"
"code.google.com/p/weed-fs/go/operation"
"code.google.com/p/weed-fs/go/util" "code.google.com/p/weed-fs/go/util"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"math/rand"
"net/url" "net/url"
"strconv" "strconv"
"strings" "strings"
@ -16,16 +19,53 @@ type DiskLocation struct {
maxVolumeCount int maxVolumeCount int
volumes map[VolumeId]*Volume volumes map[VolumeId]*Volume
} }
type MasterNodes struct {
nodes []string
lastNode int
}
func NewMasterNodes(bootstrapNode string) (mn *MasterNodes) {
mn = &MasterNodes{nodes: []string{bootstrapNode}, lastNode: -1}
return
}
func (mn *MasterNodes) reset() {
if len(mn.nodes) > 1 && mn.lastNode > 0 {
mn.lastNode = -mn.lastNode
}
}
func (mn *MasterNodes) findMaster() (string, error) {
if len(mn.nodes) == 0 {
return "", errors.New("No master node found!")
}
if mn.lastNode < 0 {
for _, m := range mn.nodes {
if masters, e := operation.ListMasters(m); e == nil {
mn.nodes = masters
mn.lastNode = rand.Intn(len(mn.nodes))
glog.V(2).Info("current master node is :", mn.nodes[mn.lastNode])
break
}
}
}
if len(mn.nodes) == 1 {
return mn.nodes[0], nil
}
if mn.lastNode < 0 {
return "", errors.New("No master node avalable!")
}
return mn.nodes[mn.lastNode], nil
}
type Store struct { type Store struct {
Port int Port int
Ip string Ip string
PublicUrl string PublicUrl string
locations []*DiskLocation locations []*DiskLocation
masterNode string
dataCenter string //optional informaton, overwriting master setting if exists dataCenter string //optional informaton, overwriting master setting if exists
rack string //optional information, overwriting master setting if exists rack string //optional information, overwriting master setting if exists
connected bool connected bool
volumeSizeLimit uint64 //read from the master volumeSizeLimit uint64 //read from the master
masterNodes *MasterNodes
} }
func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int) (s *Store) { func NewStore(port int, ip, publicUrl string, dirnames []string, maxVolumeCounts []int) (s *Store) {
@ -199,16 +239,21 @@ type JoinResult struct {
VolumeSizeLimit uint64 VolumeSizeLimit uint64
} }
func (s *Store) SetMaster(mserver string) {
s.masterNode = mserver
}
func (s *Store) SetDataCenter(dataCenter string) { func (s *Store) SetDataCenter(dataCenter string) {
s.dataCenter = dataCenter s.dataCenter = dataCenter
} }
func (s *Store) SetRack(rack string) { func (s *Store) SetRack(rack string) {
s.rack = rack s.rack = rack
} }
func (s *Store) SetBootstrapMaster(bootstrapMaster string) {
s.masterNodes = NewMasterNodes(bootstrapMaster)
}
func (s *Store) Join() error { func (s *Store) Join() error {
masterNode, e := s.masterNodes.findMaster()
if e != nil {
return e
}
stats := new([]*VolumeInfo) stats := new([]*VolumeInfo)
maxVolumeCount := 0 maxVolumeCount := 0
for _, location := range s.locations { for _, location := range s.locations {
@ -237,8 +282,9 @@ func (s *Store) Join() error {
values.Add("maxVolumeCount", strconv.Itoa(maxVolumeCount)) values.Add("maxVolumeCount", strconv.Itoa(maxVolumeCount))
values.Add("dataCenter", s.dataCenter) values.Add("dataCenter", s.dataCenter)
values.Add("rack", s.rack) values.Add("rack", s.rack)
jsonBlob, err := util.Post("http://"+s.masterNode+"/dir/join", values)
jsonBlob, err := util.Post("http://"+masterNode+"/dir/join", values)
if err != nil { if err != nil {
s.masterNodes.reset()
return err return err
} }
var ret JoinResult var ret JoinResult

15
go/util/post.go → go/util/http_util.go

@ -21,3 +21,18 @@ func Post(url string, values url.Values) ([]byte, error) {
} }
return b, nil return b, nil
} }
func Get(url string) ([]byte, error) {
r, err := http.Get(url)
if err != nil {
glog.V(0).Infoln("getting ", url, err)
return nil, err
}
defer r.Body.Close()
b, err := ioutil.ReadAll(r.Body)
if err != nil {
glog.V(0).Infoln("read get result from", url, err)
return nil, err
}
return b, nil
}

21
go/weed/master.go

@ -28,8 +28,9 @@ var cmdMaster = &Command{
var ( var (
mport = cmdMaster.Flag.Int("port", 9333, "http listen port") mport = cmdMaster.Flag.Int("port", 9333, "http listen port")
mip = cmdMaster.Flag.String("ip", "localhost", "http listen port")
masterIp = cmdMaster.Flag.String("ip", "", "master ip address")
metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data") metaFolder = cmdMaster.Flag.String("mdir", os.TempDir(), "data directory to store meta data")
masterPeers = cmdMaster.Flag.String("peers", "", "other master nodes in comma separated ip:port list")
volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1000, "Default Volume Size in MegaBytes") volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1000, "Default Volume Size in MegaBytes")
mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats") mpulse = cmdMaster.Flag.Int("pulseSeconds", 5, "number of seconds between heartbeats")
confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file") confFile = cmdMaster.Flag.String("conf", "/etc/weedfs/weedfs.conf", "xml configuration file")
@ -55,16 +56,28 @@ func runMaster(cmd *Command, args []string) bool {
} }
r := mux.NewRouter() r := mux.NewRouter()
weed_server.NewMasterServer(r, VERSION, *mport, *metaFolder,
ms := weed_server.NewMasterServer(r, VERSION, *mport, *metaFolder,
*volumeSizeLimitMB, *mpulse, *confFile, *defaultRepType, *garbageThreshold, masterWhiteList, *volumeSizeLimitMB, *mpulse, *confFile, *defaultRepType, *garbageThreshold, masterWhiteList,
) )
glog.V(0).Infoln("Start Weed Master", VERSION, "at port", *mip+":"+strconv.Itoa(*mport))
glog.V(0).Infoln("Start Weed Master", VERSION, "at port", *masterIp+":"+strconv.Itoa(*mport))
srv := &http.Server{ srv := &http.Server{
Addr: *mip + ":" + strconv.Itoa(*mport),
Addr: *masterIp + ":" + strconv.Itoa(*mport),
Handler: r, Handler: r,
ReadTimeout: time.Duration(*mReadTimeout) * time.Second, ReadTimeout: time.Duration(*mReadTimeout) * time.Second,
} }
go func() {
time.Sleep(100 * time.Millisecond)
var peers []string
if *masterPeers != "" {
peers = strings.Split(*masterPeers, ",")
}
raftServer := weed_server.NewRaftServer(r, VERSION, peers, *masterIp+":"+strconv.Itoa(*mport), *metaFolder)
ms.SetRaftServer(raftServer)
}()
e := srv.ListenAndServe() e := srv.ListenAndServe()
if e != nil { if e != nil {
glog.Fatalf("Fail to start:%s", e) glog.Fatalf("Fail to start:%s", e)

8
go/weed/weed_server/volume_server.go

@ -43,7 +43,7 @@ func NewVolumeServer(r *http.ServeMux, version string, ip string, port int, publ
go func() { go func() {
connected := true connected := true
vs.store.SetMaster(vs.masterNode)
vs.store.SetBootstrapMaster(vs.masterNode)
vs.store.SetDataCenter(vs.dataCenter) vs.store.SetDataCenter(vs.dataCenter)
vs.store.SetRack(vs.rack) vs.store.SetRack(vs.rack)
for { for {
@ -58,7 +58,11 @@ func NewVolumeServer(r *http.ServeMux, version string, ip string, port int, publ
connected = false connected = false
} }
} }
time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)*(1+rand.Float32())) * time.Millisecond)
if connected {
time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)*(1+rand.Float32())) * time.Millisecond)
} else {
time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)* 0.25) * time.Millisecond)
}
} }
}() }()
glog.V(0).Infoln("store joined at", vs.masterNode) glog.V(0).Infoln("store joined at", vs.masterNode)

2
go/weed/weed_server/volume_server_handlers.go

@ -102,7 +102,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
glog.V(2).Infoln("volume", volumeId, "reading", n) glog.V(2).Infoln("volume", volumeId, "reading", n)
if !vs.store.HasVolume(volumeId) { if !vs.store.HasVolume(volumeId) {
lookupResult, err := operation.Lookup(vs.masterNode, volumeId)
lookupResult, err := operation.Lookup(vs.masterNode, volumeId.String())
glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err) glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
if err == nil && len(lookupResult.Locations) > 0 { if err == nil && len(lookupResult.Locations) > 0 {
http.Redirect(w, r, "http://"+lookupResult.Locations[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) http.Redirect(w, r, "http://"+lookupResult.Locations[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently)

Loading…
Cancel
Save