From f20ef922fd50ff76b4bb39e61022089f02cc241b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 13 Apr 2014 23:41:34 -0700 Subject: [PATCH] 1. add batched volume lookup handler 2. working-in-progress batch delete --- go/weed/weed_server/common.go | 12 ++- go/weed/weed_server/master_server.go | 2 + go/weed/weed_server/master_server_handlers.go | 85 +++++++++++++++---- 3 files changed, 79 insertions(+), 20 deletions(-) diff --git a/go/weed/weed_server/common.go b/go/weed/weed_server/common.go index fe18cdd19..9703302f3 100644 --- a/go/weed/weed_server/common.go +++ b/go/weed/weed_server/common.go @@ -130,6 +130,14 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st return } +func deleteForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) { + r.ParseForm() + fids := r.Form["fid"] + fids = fids + m := make(map[string]interface{}) + writeJsonQuiet(w, r, m) +} + func parseURLPath(path string) (vid, fid, filename, ext string, isVolumeIdOnly bool) { switch strings.Count(path, "/") { case 3: @@ -162,6 +170,7 @@ func parseURLPath(path string) (vid, fid, filename, ext string, isVolumeIdOnly b } return } + func statsCounterHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) m["Version"] = util.VERSION @@ -169,9 +178,6 @@ func statsCounterHandler(w http.ResponseWriter, r *http.Request) { writeJsonQuiet(w, r, m) } -type MemoryStatistics struct { -} - func statsMemoryHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) m["Version"] = util.VERSION diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go index 9d8af156f..482cceae5 100644 --- a/go/weed/weed_server/master_server.go +++ b/go/weed/weed_server/master_server.go @@ -61,10 +61,12 @@ func NewMasterServer(r *mux.Router, port int, metaFolder string, r.HandleFunc("/dir/join", ms.proxyToLeader(secure(ms.whiteList, ms.dirJoinHandler))) r.HandleFunc("/dir/status", ms.proxyToLeader(secure(ms.whiteList, ms.dirStatusHandler))) r.HandleFunc("/col/delete", ms.proxyToLeader(secure(ms.whiteList, ms.collectionDeleteHandler))) + r.HandleFunc("/vol/lookup", ms.proxyToLeader(secure(ms.whiteList, ms.volumeLookupHandler))) r.HandleFunc("/vol/grow", ms.proxyToLeader(secure(ms.whiteList, ms.volumeGrowHandler))) r.HandleFunc("/vol/status", ms.proxyToLeader(secure(ms.whiteList, ms.volumeStatusHandler))) r.HandleFunc("/vol/vacuum", ms.proxyToLeader(secure(ms.whiteList, ms.volumeVacuumHandler))) r.HandleFunc("/submit", secure(ms.whiteList, ms.submitFromMasterServerHandler)) + r.HandleFunc("/delete", secure(ms.whiteList, ms.deleteFromMasterServerHandler)) r.HandleFunc("/{filekey}", ms.redirectHandler) r.HandleFunc("/stats/counter", secure(ms.whiteList, statsCounterHandler)) r.HandleFunc("/stats/memory", secure(ms.whiteList, statsMemoryHandler)) diff --git a/go/weed/weed_server/master_server_handlers.go b/go/weed/weed_server/master_server_handlers.go index 884689a5e..b1705560b 100644 --- a/go/weed/weed_server/master_server_handlers.go +++ b/go/weed/weed_server/master_server_handlers.go @@ -12,30 +12,73 @@ import ( "strings" ) +type LookupResultLocation struct { + Url string `json:"url,omitempty"` + PublicUrl string `json:"publicUrl,omitempty"` +} +type LookupResult struct { + VolumeId string `json:"volumeId,omitempty"` + Locations []LookupResultLocation `json:"locations,omitempty"` + Error string `json:"error,omitempty"` +} + +func (ms *MasterServer) lookupVolumeId(vids []string, collection string) (volumeLocations map[string]LookupResult) { + volumeLocations = make(map[string]LookupResult) + for _, vid := range vids { + commaSep := strings.Index(vid, ",") + if commaSep > 0 { + vid = vid[0:commaSep] + } + if _, ok := volumeLocations[vid]; ok { + continue + } + volumeId, err := storage.NewVolumeId(vid) + if err == nil { + machines := ms.Topo.Lookup(collection, volumeId) + if machines != nil { + var ret []LookupResultLocation + for _, dn := range machines { + ret = append(ret, LookupResultLocation{Url: dn.Url(), PublicUrl: dn.PublicUrl}) + } + volumeLocations[vid] = LookupResult{VolumeId: vid, Locations: ret} + } else { + volumeLocations[vid] = LookupResult{VolumeId: vid, Error: "volumeId not found."} + } + } else { + volumeLocations[vid] = LookupResult{VolumeId: vid, Error: "Unknown volumeId format."} + } + } + return +} + +// Takes one volumeId only, can not do batch lookup func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request) { vid := r.FormValue("volumeId") - collection := r.FormValue("collection") //optional, but can be faster if too many collections commaSep := strings.Index(vid, ",") if commaSep > 0 { vid = vid[0:commaSep] } - volumeId, err := storage.NewVolumeId(vid) - if err == nil { - machines := ms.Topo.Lookup(collection, volumeId) - if machines != nil { - ret := []map[string]string{} - for _, dn := range machines { - ret = append(ret, map[string]string{"url": dn.Url(), "publicUrl": dn.PublicUrl}) - } - writeJsonQuiet(w, r, map[string]interface{}{"locations": ret}) - } else { - w.WriteHeader(http.StatusNotFound) - writeJsonQuiet(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) - } - } else { - w.WriteHeader(http.StatusNotAcceptable) - writeJsonQuiet(w, r, map[string]string{"error": "unknown volumeId format " + vid}) + vids := []string{vid} + collection := r.FormValue("collection") //optional, but can be faster if too many collections + volumeLocations := ms.lookupVolumeId(vids, collection) + location := volumeLocations[vid] + if location.Error != "" { + w.WriteHeader(http.StatusNotFound) + } + writeJsonQuiet(w, r, location) +} + +// This can take batched volumeIds, &volumeId=x&volumeId=y&volumeId=z +func (ms *MasterServer) volumeLookupHandler(w http.ResponseWriter, r *http.Request) { + r.ParseForm() + vids := r.Form["volumeId"] + collection := r.FormValue("collection") //optional, but can be faster if too many collections + volumeLocations := ms.lookupVolumeId(vids, collection) + var ret []LookupResult + for _, volumeLocation := range volumeLocations { + ret = append(ret, volumeLocation) } + writeJsonQuiet(w, r, ret) } func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) { @@ -191,6 +234,14 @@ func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r * } } +func (ms *MasterServer) deleteFromMasterServerHandler(w http.ResponseWriter, r *http.Request) { + if ms.Topo.IsLeader() { + deleteForClientHandler(w, r, "localhost:"+strconv.Itoa(ms.port)) + } else { + deleteForClientHandler(w, r, ms.Topo.RaftServer.Leader()) + } +} + func (ms *MasterServer) hasWriableVolume(option *topology.VolumeGrowOption) bool { vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement) return vl.GetActiveVolumeCount(option) > 0