diff --git a/go/weed/master.go b/go/weed/master.go index 6255f85f5..a7b86ec8a 100644 --- a/go/weed/master.go +++ b/go/weed/master.go @@ -1,23 +1,14 @@ package main import ( - "bytes" "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/operation" - "code.google.com/p/weed-fs/go/replication" - "code.google.com/p/weed-fs/go/sequence" - "code.google.com/p/weed-fs/go/storage" - "code.google.com/p/weed-fs/go/topology" - "encoding/json" - "errors" + "code.google.com/p/weed-fs/go/weed/weed_server" "github.com/gorilla/mux" "net/http" "os" - "path" "runtime" "strconv" "strings" - "sync" "time" ) @@ -51,168 +42,6 @@ var ( masterWhiteList []string ) -var topo *topology.Topology -var vg *replication.VolumeGrowth -var vgLock sync.Mutex - -func dirLookupHandler(w http.ResponseWriter, r *http.Request) { - vid := r.FormValue("volumeId") - collection := r.FormValue("collection") //optional, but can be faster if too many collections - commaSep := strings.Index(vid, ",") - if commaSep > 0 { - vid = vid[0:commaSep] - } - volumeId, err := storage.NewVolumeId(vid) - if err == nil { - machines := topo.Lookup(collection, volumeId) - if machines != nil { - ret := []map[string]string{} - for _, dn := range machines { - ret = append(ret, map[string]string{"url": dn.Url(), "publicUrl": dn.PublicUrl}) - } - writeJsonQuiet(w, r, map[string]interface{}{"locations": ret}) - } else { - w.WriteHeader(http.StatusNotFound) - writeJsonQuiet(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) - } - } else { - w.WriteHeader(http.StatusNotAcceptable) - writeJsonQuiet(w, r, map[string]string{"error": "unknown volumeId format " + vid}) - } -} - -func dirAssignHandler(w http.ResponseWriter, r *http.Request) { - c, e := strconv.Atoi(r.FormValue("count")) - if e != nil { - c = 1 - } - repType := r.FormValue("replication") - if repType == "" { - repType = *defaultRepType - } - collection := r.FormValue("collection") - dataCenter := r.FormValue("dataCenter") - rt, err := storage.NewReplicationTypeFromString(repType) - if err != nil { - w.WriteHeader(http.StatusNotAcceptable) - writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) - return - } - - if topo.GetVolumeLayout(collection, rt).GetActiveVolumeCount(dataCenter) <= 0 { - if topo.FreeSpace() <= 0 { - w.WriteHeader(http.StatusNotFound) - writeJsonQuiet(w, r, map[string]string{"error": "No free volumes left!"}) - return - } else { - vgLock.Lock() - defer vgLock.Unlock() - if topo.GetVolumeLayout(collection, rt).GetActiveVolumeCount(dataCenter) <= 0 { - if _, err = vg.AutomaticGrowByType(collection, rt, dataCenter, topo); err != nil { - writeJsonQuiet(w, r, map[string]string{"error": "Cannot grow volume group! " + err.Error()}) - return - } - } - } - } - fid, count, dn, err := topo.PickForWrite(collection, rt, c, dataCenter) - if err == nil { - writeJsonQuiet(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count}) - } else { - w.WriteHeader(http.StatusNotAcceptable) - writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) - } -} - -func dirJoinHandler(w http.ResponseWriter, r *http.Request) { - init := r.FormValue("init") == "true" - ip := r.FormValue("ip") - if ip == "" { - ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")] - } - port, _ := strconv.Atoi(r.FormValue("port")) - maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount")) - s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") - publicUrl := r.FormValue("publicUrl") - volumes := new([]storage.VolumeInfo) - if err := json.Unmarshal([]byte(r.FormValue("volumes")), volumes); err != nil { - writeJsonQuiet(w, r, map[string]string{"error": "Cannot unmarshal \"volumes\": " + err.Error()}) - return - } - debug(s, "volumes", r.FormValue("volumes")) - topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount, r.FormValue("dataCenter"), r.FormValue("rack")) - m := make(map[string]interface{}) - m["VolumeSizeLimit"] = uint64(*volumeSizeLimitMB) * 1024 * 1024 - writeJsonQuiet(w, r, m) -} - -func dirStatusHandler(w http.ResponseWriter, r *http.Request) { - m := make(map[string]interface{}) - m["Version"] = VERSION - m["Topology"] = topo.ToMap() - writeJsonQuiet(w, r, m) -} - -func volumeVacuumHandler(w http.ResponseWriter, r *http.Request) { - gcThreshold := r.FormValue("garbageThreshold") - if gcThreshold == "" { - gcThreshold = *garbageThreshold - } - debug("garbageThreshold =", gcThreshold) - topo.Vacuum(gcThreshold) - dirStatusHandler(w, r) -} - -func volumeGrowHandler(w http.ResponseWriter, r *http.Request) { - count := 0 - rt, err := storage.NewReplicationTypeFromString(r.FormValue("replication")) - if err == nil { - if count, err = strconv.Atoi(r.FormValue("count")); err == nil { - if topo.FreeSpace() < count*rt.GetCopyCount() { - err = errors.New("Only " + strconv.Itoa(topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount())) - } else { - count, err = vg.GrowByCountAndType(count, r.FormValue("collection"), rt, r.FormValue("dataCneter"), topo) - } - } else { - err = errors.New("parameter count is not found") - } - } - if err != nil { - w.WriteHeader(http.StatusNotAcceptable) - writeJsonQuiet(w, r, map[string]string{"error": "parameter replication " + err.Error()}) - } else { - w.WriteHeader(http.StatusNotAcceptable) - writeJsonQuiet(w, r, map[string]interface{}{"count": count}) - } -} - -func volumeStatusHandler(w http.ResponseWriter, r *http.Request) { - m := make(map[string]interface{}) - m["Version"] = VERSION - m["Volumes"] = topo.ToVolumeMap() - writeJsonQuiet(w, r, m) -} - -func redirectHandler(w http.ResponseWriter, r *http.Request) { - vid, _, _, _, _ := parseURLPath(r.URL.Path) - volumeId, err := storage.NewVolumeId(vid) - if err != nil { - debug("parsing error:", err, r.URL.Path) - return - } - machines := topo.Lookup("", volumeId) - if machines != nil && len(machines) > 0 { - http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) - } else { - w.WriteHeader(http.StatusNotFound) - writeJsonQuiet(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) - } -} - -func submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) { - submitForClientHandler(w, r, "localhost:"+strconv.Itoa(*mport)) -} - func runMaster(cmd *Command, args []string) bool { if *mMaxCpu < 1 { *mMaxCpu = runtime.NumCPU() @@ -221,32 +50,11 @@ func runMaster(cmd *Command, args []string) bool { if *masterWhiteListOption != "" { masterWhiteList = strings.Split(*masterWhiteListOption, ",") } - var seq sequence.Sequencer - //if len(*etcdCluster) == 0 { - seq = sequence.NewFileSequencer(path.Join(*metaFolder, "weed.seq")) - //} else { - // seq = sequence.NewEtcdSequencer(*etcdCluster) - //} - var e error - if topo, e = topology.NewTopology("topo", *confFile, seq, - uint64(*volumeSizeLimitMB)*1024*1024, *mpulse); e != nil { - glog.Fatalf("cannot create topology:%s", e) - } - vg = replication.NewDefaultVolumeGrowth() - glog.V(0).Infoln("Volume Size Limit is", *volumeSizeLimitMB, "MB") r := mux.NewRouter() - r.HandleFunc("/dir/assign", secure(masterWhiteList, dirAssignHandler)) - r.HandleFunc("/dir/lookup", secure(masterWhiteList, dirLookupHandler)) - r.HandleFunc("/dir/join", secure(masterWhiteList, dirJoinHandler)) - r.HandleFunc("/dir/status", secure(masterWhiteList, dirStatusHandler)) - r.HandleFunc("/vol/grow", secure(masterWhiteList, volumeGrowHandler)) - r.HandleFunc("/vol/status", secure(masterWhiteList, volumeStatusHandler)) - r.HandleFunc("/vol/vacuum", secure(masterWhiteList, volumeVacuumHandler)) - r.HandleFunc("/submit", secure(masterWhiteList, submitFromMasterServerHandler)) - r.HandleFunc("/", redirectHandler) - - topo.StartRefreshWritableVolumes(*garbageThreshold) + weed_server.NewMasterServer(r, VERSION, *mport, *metaFolder, + *volumeSizeLimitMB, *mpulse, *confFile, *defaultRepType, *garbageThreshold, masterWhiteList, + ) glog.V(0).Infoln("Start Weed Master", VERSION, "at port", strconv.Itoa(*mport)) srv := &http.Server{ @@ -254,51 +62,9 @@ func runMaster(cmd *Command, args []string) bool { Handler: r, ReadTimeout: time.Duration(*mReadTimeout) * time.Second, } - e = srv.ListenAndServe() + e := srv.ListenAndServe() if e != nil { glog.Fatalf("Fail to start:%s", e) } return true } - -func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) { - m := make(map[string]interface{}) - if r.Method != "POST" { - m["error"] = "Only submit via POST!" - writeJsonQuiet(w, r, m) - return - } - - debug("parsing upload file...") - fname, data, mimeType, isGzipped, lastModified, pe := storage.ParseUpload(r) - if pe != nil { - writeJsonError(w, r, pe) - return - } - - debug("assigning file id for", fname) - assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication")) - if ae != nil { - writeJsonError(w, r, ae) - return - } - - url := "http://" + assignResult.PublicUrl + "/" + assignResult.Fid - if lastModified != 0 { - url = url + "?ts=" + strconv.FormatUint(lastModified, 10) - } - - debug("upload file to store", url) - uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType) - if err != nil { - writeJsonError(w, r, err) - return - } - - m["fileName"] = fname - m["fid"] = assignResult.Fid - m["fileUrl"] = assignResult.PublicUrl + "/" + assignResult.Fid - m["size"] = uploadResult.Size - writeJsonQuiet(w, r, m) - return -} diff --git a/go/weed/volume.go b/go/weed/volume.go index f3a6038c2..50864c541 100644 --- a/go/weed/volume.go +++ b/go/weed/volume.go @@ -2,15 +2,10 @@ package main import ( "code.google.com/p/weed-fs/go/glog" - "code.google.com/p/weed-fs/go/operation" - "code.google.com/p/weed-fs/go/replication" - "code.google.com/p/weed-fs/go/storage" + "code.google.com/p/weed-fs/go/weed/weed_server" "github.com/gorilla/mux" - "math/rand" - "mime" "net/http" "os" - "path/filepath" "runtime" "strconv" "strings" @@ -44,264 +39,9 @@ var ( rack = cmdVolume.Flag.String("rack", "", "current volume server's rack name") volumeWhiteListOption = cmdVolume.Flag.String("whiteList", "", "comma separated Ip addresses having write permission. No limit if empty.") - store *storage.Store volumeWhiteList []string ) -var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") - -func statusHandler(w http.ResponseWriter, r *http.Request) { - m := make(map[string]interface{}) - m["Version"] = VERSION - m["Volumes"] = store.Status() - writeJsonQuiet(w, r, m) -} -func assignVolumeHandler(w http.ResponseWriter, r *http.Request) { - err := store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replicationType")) - if err == nil { - writeJsonQuiet(w, r, map[string]string{"error": ""}) - } else { - writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) - } - debug("assign volume =", r.FormValue("volume"), ", collection =", r.FormValue("collection"), ", replicationType =", r.FormValue("replicationType"), ", error =", err) -} -func vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) { - err, ret := store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold")) - if err == nil { - writeJsonQuiet(w, r, map[string]interface{}{"error": "", "result": ret}) - } else { - writeJsonQuiet(w, r, map[string]interface{}{"error": err.Error(), "result": false}) - } - debug("checked compacting volume =", r.FormValue("volume"), "garbageThreshold =", r.FormValue("garbageThreshold"), "vacuum =", ret) -} -func vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) { - err := store.CompactVolume(r.FormValue("volume")) - if err == nil { - writeJsonQuiet(w, r, map[string]string{"error": ""}) - } else { - writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) - } - debug("compacted volume =", r.FormValue("volume"), ", error =", err) -} -func vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) { - err := store.CommitCompactVolume(r.FormValue("volume")) - if err == nil { - writeJsonQuiet(w, r, map[string]interface{}{"error": ""}) - } else { - writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) - } - debug("commit compact volume =", r.FormValue("volume"), ", error =", err) -} -func freezeVolumeHandler(w http.ResponseWriter, r *http.Request) { - //TODO: notify master that this volume will be read-only - err := store.FreezeVolume(r.FormValue("volume")) - if err == nil { - writeJsonQuiet(w, r, map[string]interface{}{"error": ""}) - } else { - writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) - } - debug("freeze volume =", r.FormValue("volume"), ", error =", err) -} -func submitFromVolumeServerHandler(w http.ResponseWriter, r *http.Request) { - submitForClientHandler(w, r, *masterNode) -} -func storeHandler(w http.ResponseWriter, r *http.Request) { - switch r.Method { - case "GET": - GetOrHeadHandler(w, r, true) - case "HEAD": - GetOrHeadHandler(w, r, false) - case "DELETE": - secure(volumeWhiteList, DeleteHandler)(w, r) - case "PUT": - secure(volumeWhiteList, PostHandler)(w, r) - case "POST": - secure(volumeWhiteList, PostHandler)(w, r) - } -} -func GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) { - n := new(storage.Needle) - vid, fid, filename, ext, _ := parseURLPath(r.URL.Path) - volumeId, err := storage.NewVolumeId(vid) - if err != nil { - debug("parsing error:", err, r.URL.Path) - return - } - n.ParsePath(fid) - - debug("volume", volumeId, "reading", n) - if !store.HasVolume(volumeId) { - lookupResult, err := operation.Lookup(*masterNode, volumeId) - debug("volume", volumeId, "found on", lookupResult, "error", err) - if err == nil { - http.Redirect(w, r, "http://"+lookupResult.Locations[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) - } else { - debug("lookup error:", err, r.URL.Path) - w.WriteHeader(http.StatusNotFound) - } - return - } - cookie := n.Cookie - count, e := store.Read(volumeId, n) - debug("read bytes", count, "error", e) - if e != nil || count <= 0 { - debug("read error:", e, r.URL.Path) - w.WriteHeader(http.StatusNotFound) - return - } - if n.Cookie != cookie { - glog.V(0).Infoln("request with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) - w.WriteHeader(http.StatusNotFound) - return - } - if n.LastModified != 0 { - w.Header().Set("Last-Modified", time.Unix(int64(n.LastModified), 0).UTC().Format(http.TimeFormat)) - if r.Header.Get("If-Modified-Since") != "" { - if t, parseError := time.Parse(http.TimeFormat, r.Header.Get("If-Modified-Since")); parseError == nil { - if t.Unix() >= int64(n.LastModified) { - w.WriteHeader(http.StatusNotModified) - return - } - } - } - } - if n.NameSize > 0 && filename == "" { - filename = string(n.Name) - dotIndex := strings.LastIndex(filename, ".") - if dotIndex > 0 { - ext = filename[dotIndex:] - } - } - mtype := "" - if ext != "" { - mtype = mime.TypeByExtension(ext) - } - if n.MimeSize > 0 { - mtype = string(n.Mime) - } - if mtype != "" { - w.Header().Set("Content-Type", mtype) - } - if filename != "" { - w.Header().Set("Content-Disposition", "filename="+fileNameEscaper.Replace(filename)) - } - if ext != ".gz" { - if n.IsGzipped() { - if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { - w.Header().Set("Content-Encoding", "gzip") - } else { - if n.Data, err = storage.UnGzipData(n.Data); err != nil { - debug("lookup error:", err, r.URL.Path) - } - } - } - } - w.Header().Set("Content-Length", strconv.Itoa(len(n.Data))) - if isGetMethod { - if _, e = w.Write(n.Data); e != nil { - debug("response write error:", e) - } - } -} -func PostHandler(w http.ResponseWriter, r *http.Request) { - m := make(map[string]interface{}) - if e := r.ParseForm(); e != nil { - debug("form parse error:", e) - writeJsonError(w, r, e) - return - } - vid, _, _, _, _ := parseURLPath(r.URL.Path) - volumeId, ve := storage.NewVolumeId(vid) - if ve != nil { - debug("NewVolumeId error:", ve) - writeJsonError(w, r, ve) - return - } - needle, ne := storage.NewNeedle(r) - if ne != nil { - writeJsonError(w, r, ne) - return - } - ret, errorStatus := replication.ReplicatedWrite(*masterNode, store, volumeId, needle, r) - if errorStatus == "" { - w.WriteHeader(http.StatusCreated) - } else { - w.WriteHeader(http.StatusInternalServerError) - m["error"] = errorStatus - } - m["size"] = ret - writeJsonQuiet(w, r, m) -} -func DeleteHandler(w http.ResponseWriter, r *http.Request) { - n := new(storage.Needle) - vid, fid, _, _, _ := parseURLPath(r.URL.Path) - volumeId, _ := storage.NewVolumeId(vid) - n.ParsePath(fid) - - debug("deleting", n) - - cookie := n.Cookie - count, ok := store.Read(volumeId, n) - - if ok != nil { - m := make(map[string]uint32) - m["size"] = 0 - writeJsonQuiet(w, r, m) - return - } - - if n.Cookie != cookie { - glog.V(0).Infoln("delete with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) - return - } - - n.Size = 0 - ret := replication.ReplicatedDelete(*masterNode, store, volumeId, n, r) - - if ret != 0 { - w.WriteHeader(http.StatusAccepted) - } else { - w.WriteHeader(http.StatusInternalServerError) - } - - m := make(map[string]uint32) - m["size"] = uint32(count) - writeJsonQuiet(w, r, m) -} - -func parseURLPath(path string) (vid, fid, filename, ext string, isVolumeIdOnly bool) { - switch strings.Count(path, "/") { - case 3: - parts := strings.Split(path, "/") - vid, fid, filename = parts[1], parts[2], parts[3] - ext = filepath.Ext(filename) - case 2: - parts := strings.Split(path, "/") - vid, fid = parts[1], parts[2] - dotIndex := strings.LastIndex(fid, ".") - if dotIndex > 0 { - ext = fid[dotIndex:] - fid = fid[0:dotIndex] - } - default: - sepIndex := strings.LastIndex(path, "/") - commaIndex := strings.LastIndex(path[sepIndex:], ",") - if commaIndex <= 0 { - vid, isVolumeIdOnly = path[sepIndex+1:], true - return - } - dotIndex := strings.LastIndex(path[sepIndex:], ".") - vid = path[sepIndex+1 : commaIndex] - fid = path[commaIndex+1:] - ext = "" - if dotIndex > 0 { - fid = path[commaIndex+1 : dotIndex] - ext = path[dotIndex:] - } - } - return -} - func runVolume(cmd *Command, args []string) bool { if *vMaxCpu < 1 { *vMaxCpu = runtime.NumCPU() @@ -340,39 +80,11 @@ func runVolume(cmd *Command, args []string) bool { volumeWhiteList = strings.Split(*volumeWhiteListOption, ",") } - store = storage.NewStore(*vport, *ip, *publicUrl, folders, maxCounts) - defer store.Close() r := mux.NewRouter() - r.HandleFunc("/submit", secure(volumeWhiteList, submitFromVolumeServerHandler)) - r.HandleFunc("/status", secure(volumeWhiteList, statusHandler)) - r.HandleFunc("/admin/assign_volume", secure(volumeWhiteList, assignVolumeHandler)) - r.HandleFunc("/admin/vacuum_volume_check", secure(volumeWhiteList, vacuumVolumeCheckHandler)) - r.HandleFunc("/admin/vacuum_volume_compact", secure(volumeWhiteList, vacuumVolumeCompactHandler)) - r.HandleFunc("/admin/vacuum_volume_commit", secure(volumeWhiteList, vacuumVolumeCommitHandler)) - r.HandleFunc("/admin/freeze_volume", secure(volumeWhiteList, freezeVolumeHandler)) - r.HandleFunc("/", storeHandler) - go func() { - connected := true - store.SetMaster(*masterNode) - store.SetDataCenter(*dataCenter) - store.SetRack(*rack) - for { - err := store.Join() - if err == nil { - if !connected { - connected = true - glog.V(0).Infoln("Reconnected with master") - } - } else { - if connected { - connected = false - } - } - time.Sleep(time.Duration(float32(*vpulse*1e3)*(1+rand.Float32())) * time.Millisecond) - } - }() - glog.V(0).Infoln("store joined at", *masterNode) + weed_server.NewVolumeServer(r, VERSION, *ip, *vport, *publicUrl, folders, maxCounts, + *masterNode, *vpulse, *dataCenter, *rack, volumeWhiteList, + ) glog.V(0).Infoln("Start Weed volume server", VERSION, "at http://"+*ip+":"+strconv.Itoa(*vport)) srv := &http.Server{ diff --git a/go/weed/weed.go b/go/weed/weed.go index 3d4beac88..26a009b4b 100644 --- a/go/weed/weed.go +++ b/go/weed/weed.go @@ -10,6 +10,7 @@ import ( "net" "net/http" "os" + "path/filepath" "strings" "sync" "text/template" @@ -244,3 +245,36 @@ func secure(whiteList []string, f func(w http.ResponseWriter, r *http.Request)) writeJsonQuiet(w, r, map[string]interface{}{"error": "No write permisson from " + host}) } } + +func parseURLPath(path string) (vid, fid, filename, ext string, isVolumeIdOnly bool) { + switch strings.Count(path, "/") { + case 3: + parts := strings.Split(path, "/") + vid, fid, filename = parts[1], parts[2], parts[3] + ext = filepath.Ext(filename) + case 2: + parts := strings.Split(path, "/") + vid, fid = parts[1], parts[2] + dotIndex := strings.LastIndex(fid, ".") + if dotIndex > 0 { + ext = fid[dotIndex:] + fid = fid[0:dotIndex] + } + default: + sepIndex := strings.LastIndex(path, "/") + commaIndex := strings.LastIndex(path[sepIndex:], ",") + if commaIndex <= 0 { + vid, isVolumeIdOnly = path[sepIndex+1:], true + return + } + dotIndex := strings.LastIndex(path[sepIndex:], ".") + vid = path[sepIndex+1 : commaIndex] + fid = path[commaIndex+1:] + ext = "" + if dotIndex > 0 { + fid = path[commaIndex+1 : dotIndex] + ext = path[dotIndex:] + } + } + return +} diff --git a/go/weed/weed_server/common.go b/go/weed/weed_server/common.go new file mode 100644 index 000000000..929bf592d --- /dev/null +++ b/go/weed/weed_server/common.go @@ -0,0 +1,158 @@ +package weed_server + +import ( + "bytes" + "code.google.com/p/weed-fs/go/glog" + "code.google.com/p/weed-fs/go/operation" + "code.google.com/p/weed-fs/go/storage" + "encoding/json" + "fmt" + "net" + "net/http" + "path/filepath" + "strconv" + "strings" +) + +var IsDebug *bool + +func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) (err error) { + w.Header().Set("Content-Type", "application/javascript") + var bytes []byte + if r.FormValue("pretty") != "" { + bytes, err = json.MarshalIndent(obj, "", " ") + } else { + bytes, err = json.Marshal(obj) + } + if err != nil { + return + } + callback := r.FormValue("callback") + if callback == "" { + _, err = w.Write(bytes) + } else { + if _, err = w.Write([]uint8(callback)); err != nil { + return + } + if _, err = w.Write([]uint8("(")); err != nil { + return + } + fmt.Fprint(w, string(bytes)) + if _, err = w.Write([]uint8(")")); err != nil { + return + } + } + return +} + +// wrapper for writeJson - just logs errors +func writeJsonQuiet(w http.ResponseWriter, r *http.Request, obj interface{}) { + if err := writeJson(w, r, obj); err != nil { + glog.V(0).Infof("error writing JSON %s: %s", obj, err.Error()) + } +} +func writeJsonError(w http.ResponseWriter, r *http.Request, err error) { + m := make(map[string]interface{}) + m["error"] = err.Error() + writeJsonQuiet(w, r, m) +} + +func debug(params ...interface{}) { + if *IsDebug { + glog.V(0).Infoln(params) + } +} + +func secure(whiteList []string, f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) { + return func(w http.ResponseWriter, r *http.Request) { + if len(whiteList) == 0 { + f(w, r) + return + } + host, _, err := net.SplitHostPort(r.RemoteAddr) + if err == nil { + for _, ip := range whiteList { + if ip == host { + f(w, r) + return + } + } + } + writeJsonQuiet(w, r, map[string]interface{}{"error": "No write permisson from " + host}) + } +} + +func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) { + m := make(map[string]interface{}) + if r.Method != "POST" { + m["error"] = "Only submit via POST!" + writeJsonQuiet(w, r, m) + return + } + + debug("parsing upload file...") + fname, data, mimeType, isGzipped, lastModified, pe := storage.ParseUpload(r) + if pe != nil { + writeJsonError(w, r, pe) + return + } + + debug("assigning file id for", fname) + assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication")) + if ae != nil { + writeJsonError(w, r, ae) + return + } + + url := "http://" + assignResult.PublicUrl + "/" + assignResult.Fid + if lastModified != 0 { + url = url + "?ts=" + strconv.FormatUint(lastModified, 10) + } + + debug("upload file to store", url) + uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType) + if err != nil { + writeJsonError(w, r, err) + return + } + + m["fileName"] = fname + m["fid"] = assignResult.Fid + m["fileUrl"] = assignResult.PublicUrl + "/" + assignResult.Fid + m["size"] = uploadResult.Size + writeJsonQuiet(w, r, m) + return +} + +func parseURLPath(path string) (vid, fid, filename, ext string, isVolumeIdOnly bool) { + switch strings.Count(path, "/") { + case 3: + parts := strings.Split(path, "/") + vid, fid, filename = parts[1], parts[2], parts[3] + ext = filepath.Ext(filename) + case 2: + parts := strings.Split(path, "/") + vid, fid = parts[1], parts[2] + dotIndex := strings.LastIndex(fid, ".") + if dotIndex > 0 { + ext = fid[dotIndex:] + fid = fid[0:dotIndex] + } + default: + sepIndex := strings.LastIndex(path, "/") + commaIndex := strings.LastIndex(path[sepIndex:], ",") + if commaIndex <= 0 { + vid, isVolumeIdOnly = path[sepIndex+1:], true + return + } + dotIndex := strings.LastIndex(path[sepIndex:], ".") + vid = path[sepIndex+1 : commaIndex] + fid = path[commaIndex+1:] + ext = "" + if dotIndex > 0 { + fid = path[commaIndex+1 : dotIndex] + ext = path[dotIndex:] + } + } + return +} diff --git a/go/weed/weed_server/master_server.go b/go/weed/weed_server/master_server.go new file mode 100644 index 000000000..ee1181f1d --- /dev/null +++ b/go/weed/weed_server/master_server.go @@ -0,0 +1,69 @@ +package weed_server + +import ( + "code.google.com/p/weed-fs/go/glog" + "code.google.com/p/weed-fs/go/replication" + "code.google.com/p/weed-fs/go/sequence" + "code.google.com/p/weed-fs/go/topology" + "github.com/gorilla/mux" + "path" + "sync" +) + +type MasterServer struct { + port int + metaFolder string + volumeSizeLimitMB uint + pulseSeconds int + defaultRepType string + garbageThreshold string + whiteList []string + version string + + topo *topology.Topology + vg *replication.VolumeGrowth + vgLock sync.Mutex +} + +func NewMasterServer(r *mux.Router, version string, port int, metaFolder string, + volumeSizeLimitMB uint, + pulseSeconds int, + confFile string, + defaultRepType string, + garbageThreshold string, + whiteList []string) *MasterServer { + ms := &MasterServer{ + version: version, + volumeSizeLimitMB: volumeSizeLimitMB, + pulseSeconds: pulseSeconds, + defaultRepType: defaultRepType, + garbageThreshold: garbageThreshold, + whiteList: whiteList, + } + //if len(*etcdCluster) == 0 { + seq := sequence.NewFileSequencer(path.Join(metaFolder, "weed.seq")) + //} else { + // seq = sequence.NewEtcdSequencer(*etcdCluster) + //} + var e error + if ms.topo, e = topology.NewTopology("topo", confFile, seq, + uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds); e != nil { + glog.Fatalf("cannot create topology:%s", e) + } + ms.vg = replication.NewDefaultVolumeGrowth() + glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB") + + r.HandleFunc("/dir/assign", secure(ms.whiteList, ms.dirAssignHandler)) + r.HandleFunc("/dir/lookup", secure(ms.whiteList, ms.dirLookupHandler)) + r.HandleFunc("/dir/join", secure(ms.whiteList, ms.dirJoinHandler)) + r.HandleFunc("/dir/status", secure(ms.whiteList, ms.dirStatusHandler)) + r.HandleFunc("/vol/grow", secure(ms.whiteList, ms.volumeGrowHandler)) + r.HandleFunc("/vol/status", secure(ms.whiteList, ms.volumeStatusHandler)) + r.HandleFunc("/vol/vacuum", secure(ms.whiteList, ms.volumeVacuumHandler)) + r.HandleFunc("/submit", secure(ms.whiteList, ms.submitFromMasterServerHandler)) + r.HandleFunc("/", ms.redirectHandler) + + ms.topo.StartRefreshWritableVolumes(garbageThreshold) + + return ms +} diff --git a/go/weed/weed_server/master_server_handlers.go b/go/weed/weed_server/master_server_handlers.go new file mode 100644 index 000000000..fda341f02 --- /dev/null +++ b/go/weed/weed_server/master_server_handlers.go @@ -0,0 +1,168 @@ +package weed_server + +import ( + "code.google.com/p/weed-fs/go/storage" + "encoding/json" + "errors" + "net/http" + "strconv" + "strings" +) + +func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request) { + vid := r.FormValue("volumeId") + collection := r.FormValue("collection") //optional, but can be faster if too many collections + commaSep := strings.Index(vid, ",") + if commaSep > 0 { + vid = vid[0:commaSep] + } + volumeId, err := storage.NewVolumeId(vid) + if err == nil { + machines := ms.topo.Lookup(collection, volumeId) + if machines != nil { + ret := []map[string]string{} + for _, dn := range machines { + ret = append(ret, map[string]string{"url": dn.Url(), "publicUrl": dn.PublicUrl}) + } + writeJsonQuiet(w, r, map[string]interface{}{"locations": ret}) + } else { + w.WriteHeader(http.StatusNotFound) + writeJsonQuiet(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) + } + } else { + w.WriteHeader(http.StatusNotAcceptable) + writeJsonQuiet(w, r, map[string]string{"error": "unknown volumeId format " + vid}) + } +} + +func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) { + c, e := strconv.Atoi(r.FormValue("count")) + if e != nil { + c = 1 + } + repType := r.FormValue("replication") + if repType == "" { + repType = ms.defaultRepType + } + collection := r.FormValue("collection") + dataCenter := r.FormValue("dataCenter") + rt, err := storage.NewReplicationTypeFromString(repType) + if err != nil { + w.WriteHeader(http.StatusNotAcceptable) + writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) + return + } + + if ms.topo.GetVolumeLayout(collection, rt).GetActiveVolumeCount(dataCenter) <= 0 { + if ms.topo.FreeSpace() <= 0 { + w.WriteHeader(http.StatusNotFound) + writeJsonQuiet(w, r, map[string]string{"error": "No free volumes left!"}) + return + } else { + ms.vgLock.Lock() + defer ms.vgLock.Unlock() + if ms.topo.GetVolumeLayout(collection, rt).GetActiveVolumeCount(dataCenter) <= 0 { + if _, err = ms.vg.AutomaticGrowByType(collection, rt, dataCenter, ms.topo); err != nil { + writeJsonQuiet(w, r, map[string]string{"error": "Cannot grow volume group! " + err.Error()}) + return + } + } + } + } + fid, count, dn, err := ms.topo.PickForWrite(collection, rt, c, dataCenter) + if err == nil { + writeJsonQuiet(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count}) + } else { + w.WriteHeader(http.StatusNotAcceptable) + writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) + } +} + +func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) { + init := r.FormValue("init") == "true" + ip := r.FormValue("ip") + if ip == "" { + ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")] + } + port, _ := strconv.Atoi(r.FormValue("port")) + maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount")) + s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") + publicUrl := r.FormValue("publicUrl") + volumes := new([]storage.VolumeInfo) + if err := json.Unmarshal([]byte(r.FormValue("volumes")), volumes); err != nil { + writeJsonQuiet(w, r, map[string]string{"error": "Cannot unmarshal \"volumes\": " + err.Error()}) + return + } + debug(s, "volumes", r.FormValue("volumes")) + ms.topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount, r.FormValue("dataCenter"), r.FormValue("rack")) + m := make(map[string]interface{}) + m["VolumeSizeLimit"] = uint64(ms.volumeSizeLimitMB) * 1024 * 1024 + writeJsonQuiet(w, r, m) +} + +func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) { + m := make(map[string]interface{}) + m["Version"] = ms.version + m["Topology"] = ms.topo.ToMap() + writeJsonQuiet(w, r, m) +} + +func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Request) { + gcThreshold := r.FormValue("garbageThreshold") + if gcThreshold == "" { + gcThreshold = ms.garbageThreshold + } + debug("garbageThreshold =", gcThreshold) + ms.topo.Vacuum(gcThreshold) + ms.dirStatusHandler(w, r) +} + +func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request) { + count := 0 + rt, err := storage.NewReplicationTypeFromString(r.FormValue("replication")) + if err == nil { + if count, err = strconv.Atoi(r.FormValue("count")); err == nil { + if ms.topo.FreeSpace() < count*rt.GetCopyCount() { + err = errors.New("Only " + strconv.Itoa(ms.topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount())) + } else { + count, err = ms.vg.GrowByCountAndType(count, r.FormValue("collection"), rt, r.FormValue("dataCneter"), ms.topo) + } + } else { + err = errors.New("parameter count is not found") + } + } + if err != nil { + w.WriteHeader(http.StatusNotAcceptable) + writeJsonQuiet(w, r, map[string]string{"error": "parameter replication " + err.Error()}) + } else { + w.WriteHeader(http.StatusNotAcceptable) + writeJsonQuiet(w, r, map[string]interface{}{"count": count}) + } +} + +func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Request) { + m := make(map[string]interface{}) + m["Version"] = ms.version + m["Volumes"] = ms.topo.ToVolumeMap() + writeJsonQuiet(w, r, m) +} + +func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) { + vid, _, _, _, _ := parseURLPath(r.URL.Path) + volumeId, err := storage.NewVolumeId(vid) + if err != nil { + debug("parsing error:", err, r.URL.Path) + return + } + machines := ms.topo.Lookup("", volumeId) + if machines != nil && len(machines) > 0 { + http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) + } else { + w.WriteHeader(http.StatusNotFound) + writeJsonQuiet(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) + } +} + +func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) { + submitForClientHandler(w, r, "localhost:"+strconv.Itoa(ms.port)) +} diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go new file mode 100644 index 000000000..e0624d85b --- /dev/null +++ b/go/weed/weed_server/volume_server.go @@ -0,0 +1,67 @@ +package weed_server + +import ( + "code.google.com/p/weed-fs/go/glog" + "code.google.com/p/weed-fs/go/storage" + "github.com/gorilla/mux" + "math/rand" + "time" +) + +type VolumeServer struct { + masterNode string + pulseSeconds int + dataCenter string + rack string + whiteList []string + store *storage.Store + version string +} + +func NewVolumeServer(r *mux.Router, version string, ip string, port int, publicUrl string, folders []string, maxCounts []int, + masterNode string, pulseSeconds int, + dataCenter string, rack string, + whiteList []string) *VolumeServer { + vs := &VolumeServer{ + version: version, + masterNode: masterNode, + pulseSeconds: pulseSeconds, + dataCenter: dataCenter, + rack: rack, + whiteList: whiteList, + } + vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts) + + r.HandleFunc("/submit", secure(vs.whiteList, vs.submitFromVolumeServerHandler)) + r.HandleFunc("/status", secure(vs.whiteList, vs.statusHandler)) + r.HandleFunc("/admin/assign_volume", secure(vs.whiteList, vs.assignVolumeHandler)) + r.HandleFunc("/admin/vacuum_volume_check", secure(vs.whiteList, vs.vacuumVolumeCheckHandler)) + r.HandleFunc("/admin/vacuum_volume_compact", secure(vs.whiteList, vs.vacuumVolumeCompactHandler)) + r.HandleFunc("/admin/vacuum_volume_commit", secure(vs.whiteList, vs.vacuumVolumeCommitHandler)) + r.HandleFunc("/admin/freeze_volume", secure(vs.whiteList, vs.freezeVolumeHandler)) + r.HandleFunc("/", vs.storeHandler) + + go func() { + connected := true + vs.store.SetMaster(vs.masterNode) + vs.store.SetDataCenter(vs.dataCenter) + vs.store.SetRack(vs.rack) + for { + err := vs.store.Join() + if err == nil { + if !connected { + connected = true + glog.V(0).Infoln("Reconnected with master") + } + } else { + if connected { + connected = false + } + } + time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)*(1+rand.Float32())) * time.Millisecond) + } + }() + glog.V(0).Infoln("store joined at", vs.masterNode) + + return vs +} diff --git a/go/weed/weed_server/volume_server_handlers.go b/go/weed/weed_server/volume_server_handlers.go new file mode 100644 index 000000000..4d4de0a27 --- /dev/null +++ b/go/weed/weed_server/volume_server_handlers.go @@ -0,0 +1,236 @@ +package weed_server + +import ( + "code.google.com/p/weed-fs/go/glog" + "code.google.com/p/weed-fs/go/operation" + "code.google.com/p/weed-fs/go/replication" + "code.google.com/p/weed-fs/go/storage" + "mime" + "net/http" + "strconv" + "strings" + "time" +) + +var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") + +func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { + m := make(map[string]interface{}) + m["Version"] = vs.version + m["Volumes"] = vs.store.Status() + writeJsonQuiet(w, r, m) +} +func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) { + err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replicationType")) + if err == nil { + writeJsonQuiet(w, r, map[string]string{"error": ""}) + } else { + writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) + } + debug("assign volume =", r.FormValue("volume"), ", collection =", r.FormValue("collection"), ", replicationType =", r.FormValue("replicationType"), ", error =", err) +} +func (vs *VolumeServer) vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) { + err, ret := vs.store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold")) + if err == nil { + writeJsonQuiet(w, r, map[string]interface{}{"error": "", "result": ret}) + } else { + writeJsonQuiet(w, r, map[string]interface{}{"error": err.Error(), "result": false}) + } + debug("checked compacting volume =", r.FormValue("volume"), "garbageThreshold =", r.FormValue("garbageThreshold"), "vacuum =", ret) +} +func (vs *VolumeServer) vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) { + err := vs.store.CompactVolume(r.FormValue("volume")) + if err == nil { + writeJsonQuiet(w, r, map[string]string{"error": ""}) + } else { + writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) + } + debug("compacted volume =", r.FormValue("volume"), ", error =", err) +} +func (vs *VolumeServer) vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) { + err := vs.store.CommitCompactVolume(r.FormValue("volume")) + if err == nil { + writeJsonQuiet(w, r, map[string]interface{}{"error": ""}) + } else { + writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) + } + debug("commit compact volume =", r.FormValue("volume"), ", error =", err) +} +func (vs *VolumeServer) freezeVolumeHandler(w http.ResponseWriter, r *http.Request) { + //TODO: notify master that this volume will be read-only + err := vs.store.FreezeVolume(r.FormValue("volume")) + if err == nil { + writeJsonQuiet(w, r, map[string]interface{}{"error": ""}) + } else { + writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) + } + debug("freeze volume =", r.FormValue("volume"), ", error =", err) +} +func (vs *VolumeServer) submitFromVolumeServerHandler(w http.ResponseWriter, r *http.Request) { + submitForClientHandler(w, r, vs.masterNode) +} + +func (vs *VolumeServer) storeHandler(w http.ResponseWriter, r *http.Request) { + switch r.Method { + case "GET": + vs.GetOrHeadHandler(w, r, true) + case "HEAD": + vs.GetOrHeadHandler(w, r, false) + case "DELETE": + secure(vs.whiteList, vs.DeleteHandler)(w, r) + case "PUT": + secure(vs.whiteList, vs.PostHandler)(w, r) + case "POST": + secure(vs.whiteList, vs.PostHandler)(w, r) + } +} +func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) { + n := new(storage.Needle) + vid, fid, filename, ext, _ := parseURLPath(r.URL.Path) + volumeId, err := storage.NewVolumeId(vid) + if err != nil { + debug("parsing error:", err, r.URL.Path) + return + } + n.ParsePath(fid) + + debug("volume", volumeId, "reading", n) + if !vs.store.HasVolume(volumeId) { + lookupResult, err := operation.Lookup(vs.masterNode, volumeId) + debug("volume", volumeId, "found on", lookupResult, "error", err) + if err == nil { + http.Redirect(w, r, "http://"+lookupResult.Locations[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) + } else { + debug("lookup error:", err, r.URL.Path) + w.WriteHeader(http.StatusNotFound) + } + return + } + cookie := n.Cookie + count, e := vs.store.Read(volumeId, n) + debug("read bytes", count, "error", e) + if e != nil || count <= 0 { + debug("read error:", e, r.URL.Path) + w.WriteHeader(http.StatusNotFound) + return + } + if n.Cookie != cookie { + glog.V(0).Infoln("request with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) + w.WriteHeader(http.StatusNotFound) + return + } + if n.LastModified != 0 { + w.Header().Set("Last-Modified", time.Unix(int64(n.LastModified), 0).UTC().Format(http.TimeFormat)) + if r.Header.Get("If-Modified-Since") != "" { + if t, parseError := time.Parse(http.TimeFormat, r.Header.Get("If-Modified-Since")); parseError == nil { + if t.Unix() >= int64(n.LastModified) { + w.WriteHeader(http.StatusNotModified) + return + } + } + } + } + if n.NameSize > 0 && filename == "" { + filename = string(n.Name) + dotIndex := strings.LastIndex(filename, ".") + if dotIndex > 0 { + ext = filename[dotIndex:] + } + } + mtype := "" + if ext != "" { + mtype = mime.TypeByExtension(ext) + } + if n.MimeSize > 0 { + mtype = string(n.Mime) + } + if mtype != "" { + w.Header().Set("Content-Type", mtype) + } + if filename != "" { + w.Header().Set("Content-Disposition", "filename="+fileNameEscaper.Replace(filename)) + } + if ext != ".gz" { + if n.IsGzipped() { + if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { + w.Header().Set("Content-Encoding", "gzip") + } else { + if n.Data, err = storage.UnGzipData(n.Data); err != nil { + debug("lookup error:", err, r.URL.Path) + } + } + } + } + w.Header().Set("Content-Length", strconv.Itoa(len(n.Data))) + if isGetMethod { + if _, e = w.Write(n.Data); e != nil { + debug("response write error:", e) + } + } +} +func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { + m := make(map[string]interface{}) + if e := r.ParseForm(); e != nil { + debug("form parse error:", e) + writeJsonError(w, r, e) + return + } + vid, _, _, _, _ := parseURLPath(r.URL.Path) + volumeId, ve := storage.NewVolumeId(vid) + if ve != nil { + debug("NewVolumeId error:", ve) + writeJsonError(w, r, ve) + return + } + needle, ne := storage.NewNeedle(r) + if ne != nil { + writeJsonError(w, r, ne) + return + } + ret, errorStatus := replication.ReplicatedWrite(vs.masterNode, vs.store, volumeId, needle, r) + if errorStatus == "" { + w.WriteHeader(http.StatusCreated) + } else { + w.WriteHeader(http.StatusInternalServerError) + m["error"] = errorStatus + } + m["size"] = ret + writeJsonQuiet(w, r, m) +} +func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { + n := new(storage.Needle) + vid, fid, _, _, _ := parseURLPath(r.URL.Path) + volumeId, _ := storage.NewVolumeId(vid) + n.ParsePath(fid) + + debug("deleting", n) + + cookie := n.Cookie + count, ok := vs.store.Read(volumeId, n) + + if ok != nil { + m := make(map[string]uint32) + m["size"] = 0 + writeJsonQuiet(w, r, m) + return + } + + if n.Cookie != cookie { + glog.V(0).Infoln("delete with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) + return + } + + n.Size = 0 + ret := replication.ReplicatedDelete(vs.masterNode, vs.store, volumeId, n, r) + + if ret != 0 { + w.WriteHeader(http.StatusAccepted) + } else { + w.WriteHeader(http.StatusInternalServerError) + } + + m := make(map[string]uint32) + m["size"] = uint32(count) + writeJsonQuiet(w, r, m) +} +