|
@ -49,9 +49,7 @@ func assignVolumeHandler(w http.ResponseWriter, r *http.Request) { |
|
|
} else { |
|
|
} else { |
|
|
writeJson(w, r, map[string]string{"error": err.Error()}) |
|
|
writeJson(w, r, map[string]string{"error": err.Error()}) |
|
|
} |
|
|
} |
|
|
if *IsDebug { |
|
|
|
|
|
log.Println("volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
debug("volume =", r.FormValue("volume"), ", replicationType =", r.FormValue("replicationType"), ", error =", err) |
|
|
} |
|
|
} |
|
|
func storeHandler(w http.ResponseWriter, r *http.Request) { |
|
|
func storeHandler(w http.ResponseWriter, r *http.Request) { |
|
|
switch r.Method { |
|
|
switch r.Method { |
|
@ -68,40 +66,28 @@ func GetHandler(w http.ResponseWriter, r *http.Request) { |
|
|
vid, fid, ext := parseURLPath(r.URL.Path) |
|
|
vid, fid, ext := parseURLPath(r.URL.Path) |
|
|
volumeId, err := storage.NewVolumeId(vid) |
|
|
volumeId, err := storage.NewVolumeId(vid) |
|
|
if err != nil { |
|
|
if err != nil { |
|
|
if *IsDebug { |
|
|
|
|
|
log.Println("parsing error:", err, r.URL.Path) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
debug("parsing error:", err, r.URL.Path) |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
n.ParsePath(fid) |
|
|
n.ParsePath(fid) |
|
|
|
|
|
|
|
|
if *IsDebug { |
|
|
|
|
|
log.Println("volume", volumeId, "reading", n) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
debug("volume", volumeId, "reading", n) |
|
|
if !store.HasVolume(volumeId) { |
|
|
if !store.HasVolume(volumeId) { |
|
|
lookupResult, err := operation.Lookup(*masterNode, volumeId) |
|
|
lookupResult, err := operation.Lookup(*masterNode, volumeId) |
|
|
if *IsDebug { |
|
|
|
|
|
log.Println("volume", volumeId, "found on", lookupResult, "error", err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
debug("volume", volumeId, "found on", lookupResult, "error", err) |
|
|
if err == nil { |
|
|
if err == nil { |
|
|
http.Redirect(w, r, "http://"+lookupResult.Locations[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) |
|
|
http.Redirect(w, r, "http://"+lookupResult.Locations[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) |
|
|
} else { |
|
|
} else { |
|
|
if *IsDebug { |
|
|
|
|
|
log.Println("lookup error:", err, r.URL.Path) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
debug("lookup error:", err, r.URL.Path) |
|
|
w.WriteHeader(http.StatusNotFound) |
|
|
w.WriteHeader(http.StatusNotFound) |
|
|
} |
|
|
} |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
cookie := n.Cookie |
|
|
cookie := n.Cookie |
|
|
count, e := store.Read(volumeId, n) |
|
|
count, e := store.Read(volumeId, n) |
|
|
if *IsDebug { |
|
|
|
|
|
log.Println("read bytes", count, "error", e) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
debug("read bytes", count, "error", e) |
|
|
if e != nil || count <= 0 { |
|
|
if e != nil || count <= 0 { |
|
|
if *IsDebug { |
|
|
|
|
|
log.Println("read error:", e, r.URL.Path) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
debug("read error:", e, r.URL.Path) |
|
|
w.WriteHeader(http.StatusNotFound) |
|
|
w.WriteHeader(http.StatusNotFound) |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
@ -171,9 +157,7 @@ func DeleteHandler(w http.ResponseWriter, r *http.Request) { |
|
|
volumeId, _ := storage.NewVolumeId(vid) |
|
|
volumeId, _ := storage.NewVolumeId(vid) |
|
|
n.ParsePath(fid) |
|
|
n.ParsePath(fid) |
|
|
|
|
|
|
|
|
if *IsDebug { |
|
|
|
|
|
log.Println("deleting", n) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
debug("deleting", n) |
|
|
|
|
|
|
|
|
cookie := n.Cookie |
|
|
cookie := n.Cookie |
|
|
count, ok := store.Read(volumeId, n) |
|
|
count, ok := store.Read(volumeId, n) |
|
@ -213,6 +197,7 @@ func DeleteHandler(w http.ResponseWriter, r *http.Request) { |
|
|
m["size"] = uint32(count) |
|
|
m["size"] = uint32(count) |
|
|
writeJson(w, r, m) |
|
|
writeJson(w, r, m) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
func parseURLPath(path string) (vid, fid, ext string) { |
|
|
func parseURLPath(path string) (vid, fid, ext string) { |
|
|
|
|
|
|
|
|
sepIndex := strings.LastIndex(path, "/") |
|
|
sepIndex := strings.LastIndex(path, "/") |
|
@ -234,23 +219,17 @@ func parseURLPath(path string) (vid, fid, ext string) { |
|
|
return |
|
|
return |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
type distributedFunction func(location operation.Location) bool |
|
|
|
|
|
|
|
|
|
|
|
func distributedOperation(volumeId storage.VolumeId, op distributedFunction) bool { |
|
|
|
|
|
|
|
|
func distributedOperation(volumeId storage.VolumeId, op func(location operation.Location) bool) bool { |
|
|
if lookupResult, lookupErr := operation.Lookup(*masterNode, volumeId); lookupErr == nil { |
|
|
if lookupResult, lookupErr := operation.Lookup(*masterNode, volumeId); lookupErr == nil { |
|
|
length := 0 |
|
|
length := 0 |
|
|
sem := make(chan int, len(lookupResult.Locations)) |
|
|
|
|
|
selfUrl := (*ip + ":" + strconv.Itoa(*vport)) |
|
|
selfUrl := (*ip + ":" + strconv.Itoa(*vport)) |
|
|
results := make(chan bool) |
|
|
results := make(chan bool) |
|
|
for _, location := range lookupResult.Locations { |
|
|
for _, location := range lookupResult.Locations { |
|
|
if location.Url != selfUrl { |
|
|
if location.Url != selfUrl { |
|
|
sem <- 1 |
|
|
|
|
|
length++ |
|
|
length++ |
|
|
go func(op distributedFunction, location operation.Location, sem chan int, results chan bool) { |
|
|
|
|
|
ret := op(location) |
|
|
|
|
|
<-sem |
|
|
|
|
|
results <- ret |
|
|
|
|
|
}(op, location, sem, results) |
|
|
|
|
|
|
|
|
go func(location operation.Location, results chan bool) { |
|
|
|
|
|
results <- op(location) |
|
|
|
|
|
}(location, results) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
ret := true |
|
|
ret := true |
|
|