diff --git a/go/weed/weed_server/master_server_handlers.go b/go/weed/weed_server/master_server_handlers.go index f1ceeedcb..7da78d556 100644 --- a/go/weed/weed_server/master_server_handlers.go +++ b/go/weed/weed_server/master_server_handlers.go @@ -4,10 +4,6 @@ import ( "code.google.com/p/weed-fs/go/operation" "code.google.com/p/weed-fs/go/stats" "code.google.com/p/weed-fs/go/storage" - "code.google.com/p/weed-fs/go/topology" - "code.google.com/p/weed-fs/go/util" - "encoding/json" - "errors" "net/http" "strconv" "strings" @@ -110,149 +106,3 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) } } - -func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) { - collection, ok := ms.Topo.GetCollection(r.FormValue("collection")) - if !ok { - writeJsonQuiet(w, r, map[string]interface{}{"error": "collection " + r.FormValue("collection") + "does not exist!"}) - return - } - for _, server := range collection.ListVolumeServers() { - _, err := util.Get("http://" + server.Ip + ":" + strconv.Itoa(server.Port) + "/admin/delete_collection?collection=" + r.FormValue("collection")) - if err != nil { - writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) - return - } - } - ms.Topo.DeleteCollection(r.FormValue("collection")) -} - -func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) { - init := r.FormValue("init") == "true" - ip := r.FormValue("ip") - if ip == "" { - ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")] - } - port, _ := strconv.Atoi(r.FormValue("port")) - maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount")) - s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") - publicUrl := r.FormValue("publicUrl") - volumes := new([]storage.VolumeInfo) - if err := json.Unmarshal([]byte(r.FormValue("volumes")), volumes); err != nil { - writeJsonQuiet(w, r, map[string]string{"error": "Cannot unmarshal \"volumes\": " + err.Error()}) - return - } - debug(s, "volumes", r.FormValue("volumes")) - ms.Topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount, r.FormValue("dataCenter"), r.FormValue("rack")) - m := make(map[string]interface{}) - m["VolumeSizeLimit"] = uint64(ms.volumeSizeLimitMB) * 1024 * 1024 - writeJsonQuiet(w, r, m) -} - -func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) { - m := make(map[string]interface{}) - m["Version"] = util.VERSION - m["Topology"] = ms.Topo.ToMap() - writeJsonQuiet(w, r, m) -} - -func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Request) { - gcThreshold := r.FormValue("garbageThreshold") - if gcThreshold == "" { - gcThreshold = ms.garbageThreshold - } - debug("garbageThreshold =", gcThreshold) - ms.Topo.Vacuum(gcThreshold) - ms.dirStatusHandler(w, r) -} - -func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request) { - count := 0 - option, err := ms.getVolumeGrowOption(r) - if err != nil { - w.WriteHeader(http.StatusNotAcceptable) - writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) - return - } - if err == nil { - if count, err = strconv.Atoi(r.FormValue("count")); err == nil { - if ms.Topo.FreeSpace() < count*option.ReplicaPlacement.GetCopyCount() { - err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*option.ReplicaPlacement.GetCopyCount())) - } else { - count, err = ms.vg.GrowByCountAndType(count, option, ms.Topo) - } - } else { - err = errors.New("parameter count is not found") - } - } - if err != nil { - w.WriteHeader(http.StatusNotAcceptable) - writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) - } else { - w.WriteHeader(http.StatusNotAcceptable) - writeJsonQuiet(w, r, map[string]interface{}{"count": count}) - } -} - -func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Request) { - m := make(map[string]interface{}) - m["Version"] = util.VERSION - m["Volumes"] = ms.Topo.ToVolumeMap() - writeJsonQuiet(w, r, m) -} - -func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) { - vid, _, _, _, _ := parseURLPath(r.URL.Path) - volumeId, err := storage.NewVolumeId(vid) - if err != nil { - debug("parsing error:", err, r.URL.Path) - return - } - machines := ms.Topo.Lookup("", volumeId) - if machines != nil && len(machines) > 0 { - http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) - } else { - w.WriteHeader(http.StatusNotFound) - writeJsonQuiet(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) - } -} - -func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) { - if ms.Topo.IsLeader() { - submitForClientHandler(w, r, "localhost:"+strconv.Itoa(ms.port)) - } else { - submitForClientHandler(w, r, ms.Topo.RaftServer.Leader()) - } -} - -func (ms *MasterServer) deleteFromMasterServerHandler(w http.ResponseWriter, r *http.Request) { - if ms.Topo.IsLeader() { - deleteForClientHandler(w, r, "localhost:"+strconv.Itoa(ms.port)) - } else { - deleteForClientHandler(w, r, ms.Topo.RaftServer.Leader()) - } -} - -func (ms *MasterServer) hasWriableVolume(option *topology.VolumeGrowOption) bool { - vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement) - return vl.GetActiveVolumeCount(option) > 0 -} - -func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGrowOption, error) { - replicationString := r.FormValue("replication") - if replicationString == "" { - replicationString = ms.defaultReplicaPlacement - } - replicaPlacement, err := storage.NewReplicaPlacementFromString(replicationString) - if err != nil { - return nil, err - } - volumeGrowOption := &topology.VolumeGrowOption{ - Collection: r.FormValue("collection"), - ReplicaPlacement: replicaPlacement, - DataCenter: r.FormValue("dataCenter"), - Rack: r.FormValue("rack"), - DataNode: r.FormValue("dataNode"), - } - return volumeGrowOption, nil -} diff --git a/go/weed/weed_server/master_server_handlers_admin.go b/go/weed/weed_server/master_server_handlers_admin.go new file mode 100644 index 000000000..44080d393 --- /dev/null +++ b/go/weed/weed_server/master_server_handlers_admin.go @@ -0,0 +1,158 @@ +package weed_server + +import ( + "code.google.com/p/weed-fs/go/storage" + "code.google.com/p/weed-fs/go/topology" + "code.google.com/p/weed-fs/go/util" + "encoding/json" + "errors" + "net/http" + "strconv" + "strings" +) + +func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) { + collection, ok := ms.Topo.GetCollection(r.FormValue("collection")) + if !ok { + writeJsonQuiet(w, r, map[string]interface{}{"error": "collection " + r.FormValue("collection") + "does not exist!"}) + return + } + for _, server := range collection.ListVolumeServers() { + _, err := util.Get("http://" + server.Ip + ":" + strconv.Itoa(server.Port) + "/admin/delete_collection?collection=" + r.FormValue("collection")) + if err != nil { + writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) + return + } + } + ms.Topo.DeleteCollection(r.FormValue("collection")) +} + +func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) { + init := r.FormValue("init") == "true" + ip := r.FormValue("ip") + if ip == "" { + ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")] + } + port, _ := strconv.Atoi(r.FormValue("port")) + maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount")) + s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") + publicUrl := r.FormValue("publicUrl") + volumes := new([]storage.VolumeInfo) + if err := json.Unmarshal([]byte(r.FormValue("volumes")), volumes); err != nil { + writeJsonQuiet(w, r, map[string]string{"error": "Cannot unmarshal \"volumes\": " + err.Error()}) + return + } + debug(s, "volumes", r.FormValue("volumes")) + ms.Topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount, r.FormValue("dataCenter"), r.FormValue("rack")) + m := make(map[string]interface{}) + m["VolumeSizeLimit"] = uint64(ms.volumeSizeLimitMB) * 1024 * 1024 + writeJsonQuiet(w, r, m) +} + +func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) { + m := make(map[string]interface{}) + m["Version"] = util.VERSION + m["Topology"] = ms.Topo.ToMap() + writeJsonQuiet(w, r, m) +} + +func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Request) { + gcThreshold := r.FormValue("garbageThreshold") + if gcThreshold == "" { + gcThreshold = ms.garbageThreshold + } + debug("garbageThreshold =", gcThreshold) + ms.Topo.Vacuum(gcThreshold) + ms.dirStatusHandler(w, r) +} + +func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request) { + count := 0 + option, err := ms.getVolumeGrowOption(r) + if err != nil { + w.WriteHeader(http.StatusNotAcceptable) + writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) + return + } + if err == nil { + if count, err = strconv.Atoi(r.FormValue("count")); err == nil { + if ms.Topo.FreeSpace() < count*option.ReplicaPlacement.GetCopyCount() { + err = errors.New("Only " + strconv.Itoa(ms.Topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*option.ReplicaPlacement.GetCopyCount())) + } else { + count, err = ms.vg.GrowByCountAndType(count, option, ms.Topo) + } + } else { + err = errors.New("parameter count is not found") + } + } + if err != nil { + w.WriteHeader(http.StatusNotAcceptable) + writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) + } else { + w.WriteHeader(http.StatusNotAcceptable) + writeJsonQuiet(w, r, map[string]interface{}{"count": count}) + } +} + +func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Request) { + m := make(map[string]interface{}) + m["Version"] = util.VERSION + m["Volumes"] = ms.Topo.ToVolumeMap() + writeJsonQuiet(w, r, m) +} + +func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) { + vid, _, _, _, _ := parseURLPath(r.URL.Path) + volumeId, err := storage.NewVolumeId(vid) + if err != nil { + debug("parsing error:", err, r.URL.Path) + return + } + machines := ms.Topo.Lookup("", volumeId) + if machines != nil && len(machines) > 0 { + http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) + } else { + w.WriteHeader(http.StatusNotFound) + writeJsonQuiet(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) + } +} + +func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) { + if ms.Topo.IsLeader() { + submitForClientHandler(w, r, "localhost:"+strconv.Itoa(ms.port)) + } else { + submitForClientHandler(w, r, ms.Topo.RaftServer.Leader()) + } +} + +func (ms *MasterServer) deleteFromMasterServerHandler(w http.ResponseWriter, r *http.Request) { + if ms.Topo.IsLeader() { + deleteForClientHandler(w, r, "localhost:"+strconv.Itoa(ms.port)) + } else { + deleteForClientHandler(w, r, ms.Topo.RaftServer.Leader()) + } +} + +func (ms *MasterServer) hasWriableVolume(option *topology.VolumeGrowOption) bool { + vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement) + return vl.GetActiveVolumeCount(option) > 0 +} + +func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGrowOption, error) { + replicationString := r.FormValue("replication") + if replicationString == "" { + replicationString = ms.defaultReplicaPlacement + } + replicaPlacement, err := storage.NewReplicaPlacementFromString(replicationString) + if err != nil { + return nil, err + } + volumeGrowOption := &topology.VolumeGrowOption{ + Collection: r.FormValue("collection"), + ReplicaPlacement: replicaPlacement, + DataCenter: r.FormValue("dataCenter"), + Rack: r.FormValue("rack"), + DataNode: r.FormValue("dataNode"), + } + return volumeGrowOption, nil +}