diff --git a/weed-fs/src/cmd/weed/master.go b/weed-fs/src/cmd/weed/master.go index d94526675..6d44913a3 100644 --- a/weed-fs/src/cmd/weed/master.go +++ b/weed-fs/src/cmd/weed/master.go @@ -1,87 +1,87 @@ package main import ( - "pkg/directory" - "encoding/json" - "log" - "net/http" - "pkg/storage" - "strconv" - "strings" + "encoding/json" + "log" + "net/http" + "pkg/directory" + "pkg/storage" + "strconv" + "strings" ) func init() { - cmdMaster.Run = runMaster // break init cycle - IsDebug = cmdMaster.Flag.Bool("debug", false, "enable debug mode") + cmdMaster.Run = runMaster // break init cycle + IsDebug = cmdMaster.Flag.Bool("debug", false, "enable debug mode") } var cmdMaster = &Command{ - UsageLine: "master -port=9333", - Short: "start a master server", - Long: `start a master server to provide volume=>location mapping service + UsageLine: "master -port=9333", + Short: "start a master server", + Long: `start a master server to provide volume=>location mapping service and sequence number of file ids `, } var ( - mport = cmdMaster.Flag.Int("port", 9333, "http listen port") - metaFolder = cmdMaster.Flag.String("mdir", "/tmp", "data directory to store mappings") - capacity = cmdMaster.Flag.Int("capacity", 100, "maximum number of volumes to hold") - mapper *directory.Mapper - volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes") + mport = cmdMaster.Flag.Int("port", 9333, "http listen port") + metaFolder = cmdMaster.Flag.String("mdir", "/tmp", "data directory to store mappings") + capacity = cmdMaster.Flag.Int("capacity", 100, "maximum number of volumes to hold") + mapper *directory.Mapper + volumeSizeLimitMB = cmdMaster.Flag.Uint("volumeSizeLimitMB", 32*1024, "Default Volume Size in MegaBytes") ) func dirLookupHandler(w http.ResponseWriter, r *http.Request) { - vid := r.FormValue("volumeId") - commaSep := strings.Index(vid, ",") - if commaSep > 0 { - vid = vid[0:commaSep] - } - volumeId, _ := storage.NewVolumeId(vid) - machine, e := mapper.Get(volumeId) - if e == nil { - writeJson(w, r, machine.Server) - } else { - log.Println("Invalid volume id", volumeId) - writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found"}) - } + vid := r.FormValue("volumeId") + commaSep := strings.Index(vid, ",") + if commaSep > 0 { + vid = vid[0:commaSep] + } + volumeId, _ := storage.NewVolumeId(vid) + machine, e := mapper.Get(volumeId) + if e == nil { + writeJson(w, r, map[string]string{"url": machine.Url, "publicUrl": machine.PublicUrl}) + } else { + log.Println("Invalid volume id", volumeId) + writeJson(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found"}) + } } func dirAssignHandler(w http.ResponseWriter, r *http.Request) { - c:=r.FormValue("count") - fid, count, machine, err := mapper.PickForWrite(c) - if err == nil { - writeJson(w, r, map[string]string{"fid": fid, "url": machine.Url, "publicUrl":machine.PublicUrl, "count":strconv.Itoa(count)}) - } else { - writeJson(w, r, map[string]string{"error": err.Error()}) - } + c := r.FormValue("count") + fid, count, machine, err := mapper.PickForWrite(c) + if err == nil { + writeJson(w, r, map[string]string{"fid": fid, "url": machine.Url, "publicUrl": machine.PublicUrl, "count": strconv.Itoa(count)}) + } else { + writeJson(w, r, map[string]string{"error": err.Error()}) + } } func dirJoinHandler(w http.ResponseWriter, r *http.Request) { - s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") - publicUrl := r.FormValue("publicUrl") - volumes := new([]storage.VolumeInfo) - json.Unmarshal([]byte(r.FormValue("volumes")), volumes) - if *IsDebug { - log.Println(s, "volumes", r.FormValue("volumes")) - } - mapper.Add(*directory.NewMachine(s, publicUrl, *volumes)) + s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") + publicUrl := r.FormValue("publicUrl") + volumes := new([]storage.VolumeInfo) + json.Unmarshal([]byte(r.FormValue("volumes")), volumes) + if *IsDebug { + log.Println(s, "volumes", r.FormValue("volumes")) + } + mapper.Add(*directory.NewMachine(s, publicUrl, *volumes)) } func dirStatusHandler(w http.ResponseWriter, r *http.Request) { - writeJson(w, r, mapper) + writeJson(w, r, mapper) } func runMaster(cmd *Command, args []string) bool { - log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB") - mapper = directory.NewMapper(*metaFolder, "directory", uint64(*volumeSizeLimitMB)*1024*1024) - http.HandleFunc("/dir/assign", dirAssignHandler) - http.HandleFunc("/dir/lookup", dirLookupHandler) - http.HandleFunc("/dir/join", dirJoinHandler) - http.HandleFunc("/dir/status", dirStatusHandler) + log.Println("Volume Size Limit is", *volumeSizeLimitMB, "MB") + mapper = directory.NewMapper(*metaFolder, "directory", uint64(*volumeSizeLimitMB)*1024*1024) + http.HandleFunc("/dir/assign", dirAssignHandler) + http.HandleFunc("/dir/lookup", dirLookupHandler) + http.HandleFunc("/dir/join", dirJoinHandler) + http.HandleFunc("/dir/status", dirStatusHandler) - log.Println("Start directory service at http://127.0.0.1:" + strconv.Itoa(*mport)) - e := http.ListenAndServe(":"+strconv.Itoa(*mport), nil) - if e != nil { - log.Fatal("Fail to start:", e) - } - return true + log.Println("Start directory service at http://127.0.0.1:" + strconv.Itoa(*mport)) + e := http.ListenAndServe(":"+strconv.Itoa(*mport), nil) + if e != nil { + log.Fatal("Fail to start:", e) + } + return true } diff --git a/weed-fs/src/pkg/directory/volume_mapping.go b/weed-fs/src/pkg/directory/volume_mapping.go index 9d58a1e77..b40ab51eb 100644 --- a/weed-fs/src/pkg/directory/volume_mapping.go +++ b/weed-fs/src/pkg/directory/volume_mapping.go @@ -10,17 +10,10 @@ import ( "sync" ) -const ( - FileIdSaveInterval = 10000 -) - -type MachineInfo struct { - Url string //[:port] - PublicUrl string -} type Machine struct { - Server MachineInfo Volumes []storage.VolumeInfo + Url string //[:port] + PublicUrl string } type Mapper struct { @@ -35,7 +28,7 @@ type Mapper struct { } func NewMachine(server, publicUrl string, volumes []storage.VolumeInfo) *Machine { - return &Machine{Server: MachineInfo{Url: server, PublicUrl: publicUrl}, Volumes: volumes} + return &Machine{Url: server, PublicUrl: publicUrl, Volumes: volumes} } func NewMapper(dirname string, filename string, volumeSizeLimit uint64) (m *Mapper) { @@ -49,7 +42,7 @@ func NewMapper(dirname string, filename string, volumeSizeLimit uint64) (m *Mapp return } -func (m *Mapper) PickForWrite(c string) (string, int, *MachineInfo, error) { +func (m *Mapper) PickForWrite(c string) (string, int, *Machine, error) { len_writers := len(m.Writers) if len_writers <= 0 { log.Println("No more writable volumes!") @@ -61,11 +54,11 @@ func (m *Mapper) PickForWrite(c string) (string, int, *MachineInfo, error) { machine := m.Machines[machine_id-1] fileId, count := m.sequence.NextFileId(util.ParseInt(c, 1)) if count == 0 { - return "", 0, &m.Machines[rand.Intn(len(m.Machines))].Server, errors.New("Strange count:" + c) + return "", 0, m.Machines[rand.Intn(len(m.Machines))], errors.New("Strange count:" + c) } - return NewFileId(vid, fileId, rand.Uint32()).String(), count, &machine.Server, nil + return NewFileId(vid, fileId, rand.Uint32()).String(), count, machine, nil } - return "", 0, &m.Machines[rand.Intn(len(m.Machines))].Server, errors.New("Strangely vid " + vid.String() + " is on no machine!") + return "", 0, m.Machines[rand.Intn(len(m.Machines))], errors.New("Strangely vid " + vid.String() + " is on no machine!") } func (m *Mapper) Get(vid storage.VolumeId) (*Machine, error) { machineId := m.vid2machineId[vid] @@ -80,7 +73,7 @@ func (m *Mapper) Add(machine Machine) { m.volumeLock.Lock() foundExistingMachineId := -1 for index, entry := range m.Machines { - if machine.Server.Url == entry.Server.Url { + if machine.Url == entry.Url { foundExistingMachineId = index break }