Browse Source
refactoring to separate master and volume server, so that these servers
refactoring to separate master and volume server, so that these servers
can be embedded into other applicationspull/2/head
Chris Lu
11 years ago
8 changed files with 741 additions and 531 deletions
-
244go/weed/master.go
-
296go/weed/volume.go
-
34go/weed/weed.go
-
158go/weed/weed_server/common.go
-
69go/weed/weed_server/master_server.go
-
168go/weed/weed_server/master_server_handlers.go
-
67go/weed/weed_server/volume_server.go
-
236go/weed/weed_server/volume_server_handlers.go
@ -0,0 +1,158 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"bytes" |
|||
"code.google.com/p/weed-fs/go/glog" |
|||
"code.google.com/p/weed-fs/go/operation" |
|||
"code.google.com/p/weed-fs/go/storage" |
|||
"encoding/json" |
|||
"fmt" |
|||
"net" |
|||
"net/http" |
|||
"path/filepath" |
|||
"strconv" |
|||
"strings" |
|||
) |
|||
|
|||
var IsDebug *bool |
|||
|
|||
func writeJson(w http.ResponseWriter, r *http.Request, obj interface{}) (err error) { |
|||
w.Header().Set("Content-Type", "application/javascript") |
|||
var bytes []byte |
|||
if r.FormValue("pretty") != "" { |
|||
bytes, err = json.MarshalIndent(obj, "", " ") |
|||
} else { |
|||
bytes, err = json.Marshal(obj) |
|||
} |
|||
if err != nil { |
|||
return |
|||
} |
|||
callback := r.FormValue("callback") |
|||
if callback == "" { |
|||
_, err = w.Write(bytes) |
|||
} else { |
|||
if _, err = w.Write([]uint8(callback)); err != nil { |
|||
return |
|||
} |
|||
if _, err = w.Write([]uint8("(")); err != nil { |
|||
return |
|||
} |
|||
fmt.Fprint(w, string(bytes)) |
|||
if _, err = w.Write([]uint8(")")); err != nil { |
|||
return |
|||
} |
|||
} |
|||
return |
|||
} |
|||
|
|||
// wrapper for writeJson - just logs errors
|
|||
func writeJsonQuiet(w http.ResponseWriter, r *http.Request, obj interface{}) { |
|||
if err := writeJson(w, r, obj); err != nil { |
|||
glog.V(0).Infof("error writing JSON %s: %s", obj, err.Error()) |
|||
} |
|||
} |
|||
func writeJsonError(w http.ResponseWriter, r *http.Request, err error) { |
|||
m := make(map[string]interface{}) |
|||
m["error"] = err.Error() |
|||
writeJsonQuiet(w, r, m) |
|||
} |
|||
|
|||
func debug(params ...interface{}) { |
|||
if *IsDebug { |
|||
glog.V(0).Infoln(params) |
|||
} |
|||
} |
|||
|
|||
func secure(whiteList []string, f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) { |
|||
return func(w http.ResponseWriter, r *http.Request) { |
|||
if len(whiteList) == 0 { |
|||
f(w, r) |
|||
return |
|||
} |
|||
host, _, err := net.SplitHostPort(r.RemoteAddr) |
|||
if err == nil { |
|||
for _, ip := range whiteList { |
|||
if ip == host { |
|||
f(w, r) |
|||
return |
|||
} |
|||
} |
|||
} |
|||
writeJsonQuiet(w, r, map[string]interface{}{"error": "No write permisson from " + host}) |
|||
} |
|||
} |
|||
|
|||
func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl string) { |
|||
m := make(map[string]interface{}) |
|||
if r.Method != "POST" { |
|||
m["error"] = "Only submit via POST!" |
|||
writeJsonQuiet(w, r, m) |
|||
return |
|||
} |
|||
|
|||
debug("parsing upload file...") |
|||
fname, data, mimeType, isGzipped, lastModified, pe := storage.ParseUpload(r) |
|||
if pe != nil { |
|||
writeJsonError(w, r, pe) |
|||
return |
|||
} |
|||
|
|||
debug("assigning file id for", fname) |
|||
assignResult, ae := operation.Assign(masterUrl, 1, r.FormValue("replication")) |
|||
if ae != nil { |
|||
writeJsonError(w, r, ae) |
|||
return |
|||
} |
|||
|
|||
url := "http://" + assignResult.PublicUrl + "/" + assignResult.Fid |
|||
if lastModified != 0 { |
|||
url = url + "?ts=" + strconv.FormatUint(lastModified, 10) |
|||
} |
|||
|
|||
debug("upload file to store", url) |
|||
uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType) |
|||
if err != nil { |
|||
writeJsonError(w, r, err) |
|||
return |
|||
} |
|||
|
|||
m["fileName"] = fname |
|||
m["fid"] = assignResult.Fid |
|||
m["fileUrl"] = assignResult.PublicUrl + "/" + assignResult.Fid |
|||
m["size"] = uploadResult.Size |
|||
writeJsonQuiet(w, r, m) |
|||
return |
|||
} |
|||
|
|||
func parseURLPath(path string) (vid, fid, filename, ext string, isVolumeIdOnly bool) { |
|||
switch strings.Count(path, "/") { |
|||
case 3: |
|||
parts := strings.Split(path, "/") |
|||
vid, fid, filename = parts[1], parts[2], parts[3] |
|||
ext = filepath.Ext(filename) |
|||
case 2: |
|||
parts := strings.Split(path, "/") |
|||
vid, fid = parts[1], parts[2] |
|||
dotIndex := strings.LastIndex(fid, ".") |
|||
if dotIndex > 0 { |
|||
ext = fid[dotIndex:] |
|||
fid = fid[0:dotIndex] |
|||
} |
|||
default: |
|||
sepIndex := strings.LastIndex(path, "/") |
|||
commaIndex := strings.LastIndex(path[sepIndex:], ",") |
|||
if commaIndex <= 0 { |
|||
vid, isVolumeIdOnly = path[sepIndex+1:], true |
|||
return |
|||
} |
|||
dotIndex := strings.LastIndex(path[sepIndex:], ".") |
|||
vid = path[sepIndex+1 : commaIndex] |
|||
fid = path[commaIndex+1:] |
|||
ext = "" |
|||
if dotIndex > 0 { |
|||
fid = path[commaIndex+1 : dotIndex] |
|||
ext = path[dotIndex:] |
|||
} |
|||
} |
|||
return |
|||
} |
@ -0,0 +1,69 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"code.google.com/p/weed-fs/go/glog" |
|||
"code.google.com/p/weed-fs/go/replication" |
|||
"code.google.com/p/weed-fs/go/sequence" |
|||
"code.google.com/p/weed-fs/go/topology" |
|||
"github.com/gorilla/mux" |
|||
"path" |
|||
"sync" |
|||
) |
|||
|
|||
type MasterServer struct { |
|||
port int |
|||
metaFolder string |
|||
volumeSizeLimitMB uint |
|||
pulseSeconds int |
|||
defaultRepType string |
|||
garbageThreshold string |
|||
whiteList []string |
|||
version string |
|||
|
|||
topo *topology.Topology |
|||
vg *replication.VolumeGrowth |
|||
vgLock sync.Mutex |
|||
} |
|||
|
|||
func NewMasterServer(r *mux.Router, version string, port int, metaFolder string, |
|||
volumeSizeLimitMB uint, |
|||
pulseSeconds int, |
|||
confFile string, |
|||
defaultRepType string, |
|||
garbageThreshold string, |
|||
whiteList []string) *MasterServer { |
|||
ms := &MasterServer{ |
|||
version: version, |
|||
volumeSizeLimitMB: volumeSizeLimitMB, |
|||
pulseSeconds: pulseSeconds, |
|||
defaultRepType: defaultRepType, |
|||
garbageThreshold: garbageThreshold, |
|||
whiteList: whiteList, |
|||
} |
|||
//if len(*etcdCluster) == 0 {
|
|||
seq := sequence.NewFileSequencer(path.Join(metaFolder, "weed.seq")) |
|||
//} else {
|
|||
// seq = sequence.NewEtcdSequencer(*etcdCluster)
|
|||
//}
|
|||
var e error |
|||
if ms.topo, e = topology.NewTopology("topo", confFile, seq, |
|||
uint64(volumeSizeLimitMB)*1024*1024, pulseSeconds); e != nil { |
|||
glog.Fatalf("cannot create topology:%s", e) |
|||
} |
|||
ms.vg = replication.NewDefaultVolumeGrowth() |
|||
glog.V(0).Infoln("Volume Size Limit is", volumeSizeLimitMB, "MB") |
|||
|
|||
r.HandleFunc("/dir/assign", secure(ms.whiteList, ms.dirAssignHandler)) |
|||
r.HandleFunc("/dir/lookup", secure(ms.whiteList, ms.dirLookupHandler)) |
|||
r.HandleFunc("/dir/join", secure(ms.whiteList, ms.dirJoinHandler)) |
|||
r.HandleFunc("/dir/status", secure(ms.whiteList, ms.dirStatusHandler)) |
|||
r.HandleFunc("/vol/grow", secure(ms.whiteList, ms.volumeGrowHandler)) |
|||
r.HandleFunc("/vol/status", secure(ms.whiteList, ms.volumeStatusHandler)) |
|||
r.HandleFunc("/vol/vacuum", secure(ms.whiteList, ms.volumeVacuumHandler)) |
|||
r.HandleFunc("/submit", secure(ms.whiteList, ms.submitFromMasterServerHandler)) |
|||
r.HandleFunc("/", ms.redirectHandler) |
|||
|
|||
ms.topo.StartRefreshWritableVolumes(garbageThreshold) |
|||
|
|||
return ms |
|||
} |
@ -0,0 +1,168 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"code.google.com/p/weed-fs/go/storage" |
|||
"encoding/json" |
|||
"errors" |
|||
"net/http" |
|||
"strconv" |
|||
"strings" |
|||
) |
|||
|
|||
func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request) { |
|||
vid := r.FormValue("volumeId") |
|||
collection := r.FormValue("collection") //optional, but can be faster if too many collections
|
|||
commaSep := strings.Index(vid, ",") |
|||
if commaSep > 0 { |
|||
vid = vid[0:commaSep] |
|||
} |
|||
volumeId, err := storage.NewVolumeId(vid) |
|||
if err == nil { |
|||
machines := ms.topo.Lookup(collection, volumeId) |
|||
if machines != nil { |
|||
ret := []map[string]string{} |
|||
for _, dn := range machines { |
|||
ret = append(ret, map[string]string{"url": dn.Url(), "publicUrl": dn.PublicUrl}) |
|||
} |
|||
writeJsonQuiet(w, r, map[string]interface{}{"locations": ret}) |
|||
} else { |
|||
w.WriteHeader(http.StatusNotFound) |
|||
writeJsonQuiet(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) |
|||
} |
|||
} else { |
|||
w.WriteHeader(http.StatusNotAcceptable) |
|||
writeJsonQuiet(w, r, map[string]string{"error": "unknown volumeId format " + vid}) |
|||
} |
|||
} |
|||
|
|||
func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) { |
|||
c, e := strconv.Atoi(r.FormValue("count")) |
|||
if e != nil { |
|||
c = 1 |
|||
} |
|||
repType := r.FormValue("replication") |
|||
if repType == "" { |
|||
repType = ms.defaultRepType |
|||
} |
|||
collection := r.FormValue("collection") |
|||
dataCenter := r.FormValue("dataCenter") |
|||
rt, err := storage.NewReplicationTypeFromString(repType) |
|||
if err != nil { |
|||
w.WriteHeader(http.StatusNotAcceptable) |
|||
writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) |
|||
return |
|||
} |
|||
|
|||
if ms.topo.GetVolumeLayout(collection, rt).GetActiveVolumeCount(dataCenter) <= 0 { |
|||
if ms.topo.FreeSpace() <= 0 { |
|||
w.WriteHeader(http.StatusNotFound) |
|||
writeJsonQuiet(w, r, map[string]string{"error": "No free volumes left!"}) |
|||
return |
|||
} else { |
|||
ms.vgLock.Lock() |
|||
defer ms.vgLock.Unlock() |
|||
if ms.topo.GetVolumeLayout(collection, rt).GetActiveVolumeCount(dataCenter) <= 0 { |
|||
if _, err = ms.vg.AutomaticGrowByType(collection, rt, dataCenter, ms.topo); err != nil { |
|||
writeJsonQuiet(w, r, map[string]string{"error": "Cannot grow volume group! " + err.Error()}) |
|||
return |
|||
} |
|||
} |
|||
} |
|||
} |
|||
fid, count, dn, err := ms.topo.PickForWrite(collection, rt, c, dataCenter) |
|||
if err == nil { |
|||
writeJsonQuiet(w, r, map[string]interface{}{"fid": fid, "url": dn.Url(), "publicUrl": dn.PublicUrl, "count": count}) |
|||
} else { |
|||
w.WriteHeader(http.StatusNotAcceptable) |
|||
writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) |
|||
} |
|||
} |
|||
|
|||
func (ms *MasterServer) dirJoinHandler(w http.ResponseWriter, r *http.Request) { |
|||
init := r.FormValue("init") == "true" |
|||
ip := r.FormValue("ip") |
|||
if ip == "" { |
|||
ip = r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")] |
|||
} |
|||
port, _ := strconv.Atoi(r.FormValue("port")) |
|||
maxVolumeCount, _ := strconv.Atoi(r.FormValue("maxVolumeCount")) |
|||
s := r.RemoteAddr[0:strings.Index(r.RemoteAddr, ":")+1] + r.FormValue("port") |
|||
publicUrl := r.FormValue("publicUrl") |
|||
volumes := new([]storage.VolumeInfo) |
|||
if err := json.Unmarshal([]byte(r.FormValue("volumes")), volumes); err != nil { |
|||
writeJsonQuiet(w, r, map[string]string{"error": "Cannot unmarshal \"volumes\": " + err.Error()}) |
|||
return |
|||
} |
|||
debug(s, "volumes", r.FormValue("volumes")) |
|||
ms.topo.RegisterVolumes(init, *volumes, ip, port, publicUrl, maxVolumeCount, r.FormValue("dataCenter"), r.FormValue("rack")) |
|||
m := make(map[string]interface{}) |
|||
m["VolumeSizeLimit"] = uint64(ms.volumeSizeLimitMB) * 1024 * 1024 |
|||
writeJsonQuiet(w, r, m) |
|||
} |
|||
|
|||
func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) { |
|||
m := make(map[string]interface{}) |
|||
m["Version"] = ms.version |
|||
m["Topology"] = ms.topo.ToMap() |
|||
writeJsonQuiet(w, r, m) |
|||
} |
|||
|
|||
func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Request) { |
|||
gcThreshold := r.FormValue("garbageThreshold") |
|||
if gcThreshold == "" { |
|||
gcThreshold = ms.garbageThreshold |
|||
} |
|||
debug("garbageThreshold =", gcThreshold) |
|||
ms.topo.Vacuum(gcThreshold) |
|||
ms.dirStatusHandler(w, r) |
|||
} |
|||
|
|||
func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request) { |
|||
count := 0 |
|||
rt, err := storage.NewReplicationTypeFromString(r.FormValue("replication")) |
|||
if err == nil { |
|||
if count, err = strconv.Atoi(r.FormValue("count")); err == nil { |
|||
if ms.topo.FreeSpace() < count*rt.GetCopyCount() { |
|||
err = errors.New("Only " + strconv.Itoa(ms.topo.FreeSpace()) + " volumes left! Not enough for " + strconv.Itoa(count*rt.GetCopyCount())) |
|||
} else { |
|||
count, err = ms.vg.GrowByCountAndType(count, r.FormValue("collection"), rt, r.FormValue("dataCneter"), ms.topo) |
|||
} |
|||
} else { |
|||
err = errors.New("parameter count is not found") |
|||
} |
|||
} |
|||
if err != nil { |
|||
w.WriteHeader(http.StatusNotAcceptable) |
|||
writeJsonQuiet(w, r, map[string]string{"error": "parameter replication " + err.Error()}) |
|||
} else { |
|||
w.WriteHeader(http.StatusNotAcceptable) |
|||
writeJsonQuiet(w, r, map[string]interface{}{"count": count}) |
|||
} |
|||
} |
|||
|
|||
func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Request) { |
|||
m := make(map[string]interface{}) |
|||
m["Version"] = ms.version |
|||
m["Volumes"] = ms.topo.ToVolumeMap() |
|||
writeJsonQuiet(w, r, m) |
|||
} |
|||
|
|||
func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) { |
|||
vid, _, _, _, _ := parseURLPath(r.URL.Path) |
|||
volumeId, err := storage.NewVolumeId(vid) |
|||
if err != nil { |
|||
debug("parsing error:", err, r.URL.Path) |
|||
return |
|||
} |
|||
machines := ms.topo.Lookup("", volumeId) |
|||
if machines != nil && len(machines) > 0 { |
|||
http.Redirect(w, r, "http://"+machines[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) |
|||
} else { |
|||
w.WriteHeader(http.StatusNotFound) |
|||
writeJsonQuiet(w, r, map[string]string{"error": "volume id " + volumeId.String() + " not found. "}) |
|||
} |
|||
} |
|||
|
|||
func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) { |
|||
submitForClientHandler(w, r, "localhost:"+strconv.Itoa(ms.port)) |
|||
} |
@ -0,0 +1,67 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"code.google.com/p/weed-fs/go/glog" |
|||
"code.google.com/p/weed-fs/go/storage" |
|||
"github.com/gorilla/mux" |
|||
"math/rand" |
|||
"time" |
|||
) |
|||
|
|||
type VolumeServer struct { |
|||
masterNode string |
|||
pulseSeconds int |
|||
dataCenter string |
|||
rack string |
|||
whiteList []string |
|||
store *storage.Store |
|||
version string |
|||
} |
|||
|
|||
func NewVolumeServer(r *mux.Router, version string, ip string, port int, publicUrl string, folders []string, maxCounts []int, |
|||
masterNode string, pulseSeconds int, |
|||
dataCenter string, rack string, |
|||
whiteList []string) *VolumeServer { |
|||
vs := &VolumeServer{ |
|||
version: version, |
|||
masterNode: masterNode, |
|||
pulseSeconds: pulseSeconds, |
|||
dataCenter: dataCenter, |
|||
rack: rack, |
|||
whiteList: whiteList, |
|||
} |
|||
vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts) |
|||
|
|||
r.HandleFunc("/submit", secure(vs.whiteList, vs.submitFromVolumeServerHandler)) |
|||
r.HandleFunc("/status", secure(vs.whiteList, vs.statusHandler)) |
|||
r.HandleFunc("/admin/assign_volume", secure(vs.whiteList, vs.assignVolumeHandler)) |
|||
r.HandleFunc("/admin/vacuum_volume_check", secure(vs.whiteList, vs.vacuumVolumeCheckHandler)) |
|||
r.HandleFunc("/admin/vacuum_volume_compact", secure(vs.whiteList, vs.vacuumVolumeCompactHandler)) |
|||
r.HandleFunc("/admin/vacuum_volume_commit", secure(vs.whiteList, vs.vacuumVolumeCommitHandler)) |
|||
r.HandleFunc("/admin/freeze_volume", secure(vs.whiteList, vs.freezeVolumeHandler)) |
|||
r.HandleFunc("/", vs.storeHandler) |
|||
|
|||
go func() { |
|||
connected := true |
|||
vs.store.SetMaster(vs.masterNode) |
|||
vs.store.SetDataCenter(vs.dataCenter) |
|||
vs.store.SetRack(vs.rack) |
|||
for { |
|||
err := vs.store.Join() |
|||
if err == nil { |
|||
if !connected { |
|||
connected = true |
|||
glog.V(0).Infoln("Reconnected with master") |
|||
} |
|||
} else { |
|||
if connected { |
|||
connected = false |
|||
} |
|||
} |
|||
time.Sleep(time.Duration(float32(vs.pulseSeconds*1e3)*(1+rand.Float32())) * time.Millisecond) |
|||
} |
|||
}() |
|||
glog.V(0).Infoln("store joined at", vs.masterNode) |
|||
|
|||
return vs |
|||
} |
@ -0,0 +1,236 @@ |
|||
package weed_server |
|||
|
|||
import ( |
|||
"code.google.com/p/weed-fs/go/glog" |
|||
"code.google.com/p/weed-fs/go/operation" |
|||
"code.google.com/p/weed-fs/go/replication" |
|||
"code.google.com/p/weed-fs/go/storage" |
|||
"mime" |
|||
"net/http" |
|||
"strconv" |
|||
"strings" |
|||
"time" |
|||
) |
|||
|
|||
var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") |
|||
|
|||
func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { |
|||
m := make(map[string]interface{}) |
|||
m["Version"] = vs.version |
|||
m["Volumes"] = vs.store.Status() |
|||
writeJsonQuiet(w, r, m) |
|||
} |
|||
func (vs *VolumeServer) assignVolumeHandler(w http.ResponseWriter, r *http.Request) { |
|||
err := vs.store.AddVolume(r.FormValue("volume"), r.FormValue("collection"), r.FormValue("replicationType")) |
|||
if err == nil { |
|||
writeJsonQuiet(w, r, map[string]string{"error": ""}) |
|||
} else { |
|||
writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) |
|||
} |
|||
debug("assign volume =", r.FormValue("volume"), ", collection =", r.FormValue("collection"), ", replicationType =", r.FormValue("replicationType"), ", error =", err) |
|||
} |
|||
func (vs *VolumeServer) vacuumVolumeCheckHandler(w http.ResponseWriter, r *http.Request) { |
|||
err, ret := vs.store.CheckCompactVolume(r.FormValue("volume"), r.FormValue("garbageThreshold")) |
|||
if err == nil { |
|||
writeJsonQuiet(w, r, map[string]interface{}{"error": "", "result": ret}) |
|||
} else { |
|||
writeJsonQuiet(w, r, map[string]interface{}{"error": err.Error(), "result": false}) |
|||
} |
|||
debug("checked compacting volume =", r.FormValue("volume"), "garbageThreshold =", r.FormValue("garbageThreshold"), "vacuum =", ret) |
|||
} |
|||
func (vs *VolumeServer) vacuumVolumeCompactHandler(w http.ResponseWriter, r *http.Request) { |
|||
err := vs.store.CompactVolume(r.FormValue("volume")) |
|||
if err == nil { |
|||
writeJsonQuiet(w, r, map[string]string{"error": ""}) |
|||
} else { |
|||
writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) |
|||
} |
|||
debug("compacted volume =", r.FormValue("volume"), ", error =", err) |
|||
} |
|||
func (vs *VolumeServer) vacuumVolumeCommitHandler(w http.ResponseWriter, r *http.Request) { |
|||
err := vs.store.CommitCompactVolume(r.FormValue("volume")) |
|||
if err == nil { |
|||
writeJsonQuiet(w, r, map[string]interface{}{"error": ""}) |
|||
} else { |
|||
writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) |
|||
} |
|||
debug("commit compact volume =", r.FormValue("volume"), ", error =", err) |
|||
} |
|||
func (vs *VolumeServer) freezeVolumeHandler(w http.ResponseWriter, r *http.Request) { |
|||
//TODO: notify master that this volume will be read-only
|
|||
err := vs.store.FreezeVolume(r.FormValue("volume")) |
|||
if err == nil { |
|||
writeJsonQuiet(w, r, map[string]interface{}{"error": ""}) |
|||
} else { |
|||
writeJsonQuiet(w, r, map[string]string{"error": err.Error()}) |
|||
} |
|||
debug("freeze volume =", r.FormValue("volume"), ", error =", err) |
|||
} |
|||
func (vs *VolumeServer) submitFromVolumeServerHandler(w http.ResponseWriter, r *http.Request) { |
|||
submitForClientHandler(w, r, vs.masterNode) |
|||
} |
|||
|
|||
func (vs *VolumeServer) storeHandler(w http.ResponseWriter, r *http.Request) { |
|||
switch r.Method { |
|||
case "GET": |
|||
vs.GetOrHeadHandler(w, r, true) |
|||
case "HEAD": |
|||
vs.GetOrHeadHandler(w, r, false) |
|||
case "DELETE": |
|||
secure(vs.whiteList, vs.DeleteHandler)(w, r) |
|||
case "PUT": |
|||
secure(vs.whiteList, vs.PostHandler)(w, r) |
|||
case "POST": |
|||
secure(vs.whiteList, vs.PostHandler)(w, r) |
|||
} |
|||
} |
|||
func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) { |
|||
n := new(storage.Needle) |
|||
vid, fid, filename, ext, _ := parseURLPath(r.URL.Path) |
|||
volumeId, err := storage.NewVolumeId(vid) |
|||
if err != nil { |
|||
debug("parsing error:", err, r.URL.Path) |
|||
return |
|||
} |
|||
n.ParsePath(fid) |
|||
|
|||
debug("volume", volumeId, "reading", n) |
|||
if !vs.store.HasVolume(volumeId) { |
|||
lookupResult, err := operation.Lookup(vs.masterNode, volumeId) |
|||
debug("volume", volumeId, "found on", lookupResult, "error", err) |
|||
if err == nil { |
|||
http.Redirect(w, r, "http://"+lookupResult.Locations[0].PublicUrl+r.URL.Path, http.StatusMovedPermanently) |
|||
} else { |
|||
debug("lookup error:", err, r.URL.Path) |
|||
w.WriteHeader(http.StatusNotFound) |
|||
} |
|||
return |
|||
} |
|||
cookie := n.Cookie |
|||
count, e := vs.store.Read(volumeId, n) |
|||
debug("read bytes", count, "error", e) |
|||
if e != nil || count <= 0 { |
|||
debug("read error:", e, r.URL.Path) |
|||
w.WriteHeader(http.StatusNotFound) |
|||
return |
|||
} |
|||
if n.Cookie != cookie { |
|||
glog.V(0).Infoln("request with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) |
|||
w.WriteHeader(http.StatusNotFound) |
|||
return |
|||
} |
|||
if n.LastModified != 0 { |
|||
w.Header().Set("Last-Modified", time.Unix(int64(n.LastModified), 0).UTC().Format(http.TimeFormat)) |
|||
if r.Header.Get("If-Modified-Since") != "" { |
|||
if t, parseError := time.Parse(http.TimeFormat, r.Header.Get("If-Modified-Since")); parseError == nil { |
|||
if t.Unix() >= int64(n.LastModified) { |
|||
w.WriteHeader(http.StatusNotModified) |
|||
return |
|||
} |
|||
} |
|||
} |
|||
} |
|||
if n.NameSize > 0 && filename == "" { |
|||
filename = string(n.Name) |
|||
dotIndex := strings.LastIndex(filename, ".") |
|||
if dotIndex > 0 { |
|||
ext = filename[dotIndex:] |
|||
} |
|||
} |
|||
mtype := "" |
|||
if ext != "" { |
|||
mtype = mime.TypeByExtension(ext) |
|||
} |
|||
if n.MimeSize > 0 { |
|||
mtype = string(n.Mime) |
|||
} |
|||
if mtype != "" { |
|||
w.Header().Set("Content-Type", mtype) |
|||
} |
|||
if filename != "" { |
|||
w.Header().Set("Content-Disposition", "filename="+fileNameEscaper.Replace(filename)) |
|||
} |
|||
if ext != ".gz" { |
|||
if n.IsGzipped() { |
|||
if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { |
|||
w.Header().Set("Content-Encoding", "gzip") |
|||
} else { |
|||
if n.Data, err = storage.UnGzipData(n.Data); err != nil { |
|||
debug("lookup error:", err, r.URL.Path) |
|||
} |
|||
} |
|||
} |
|||
} |
|||
w.Header().Set("Content-Length", strconv.Itoa(len(n.Data))) |
|||
if isGetMethod { |
|||
if _, e = w.Write(n.Data); e != nil { |
|||
debug("response write error:", e) |
|||
} |
|||
} |
|||
} |
|||
func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { |
|||
m := make(map[string]interface{}) |
|||
if e := r.ParseForm(); e != nil { |
|||
debug("form parse error:", e) |
|||
writeJsonError(w, r, e) |
|||
return |
|||
} |
|||
vid, _, _, _, _ := parseURLPath(r.URL.Path) |
|||
volumeId, ve := storage.NewVolumeId(vid) |
|||
if ve != nil { |
|||
debug("NewVolumeId error:", ve) |
|||
writeJsonError(w, r, ve) |
|||
return |
|||
} |
|||
needle, ne := storage.NewNeedle(r) |
|||
if ne != nil { |
|||
writeJsonError(w, r, ne) |
|||
return |
|||
} |
|||
ret, errorStatus := replication.ReplicatedWrite(vs.masterNode, vs.store, volumeId, needle, r) |
|||
if errorStatus == "" { |
|||
w.WriteHeader(http.StatusCreated) |
|||
} else { |
|||
w.WriteHeader(http.StatusInternalServerError) |
|||
m["error"] = errorStatus |
|||
} |
|||
m["size"] = ret |
|||
writeJsonQuiet(w, r, m) |
|||
} |
|||
func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { |
|||
n := new(storage.Needle) |
|||
vid, fid, _, _, _ := parseURLPath(r.URL.Path) |
|||
volumeId, _ := storage.NewVolumeId(vid) |
|||
n.ParsePath(fid) |
|||
|
|||
debug("deleting", n) |
|||
|
|||
cookie := n.Cookie |
|||
count, ok := vs.store.Read(volumeId, n) |
|||
|
|||
if ok != nil { |
|||
m := make(map[string]uint32) |
|||
m["size"] = 0 |
|||
writeJsonQuiet(w, r, m) |
|||
return |
|||
} |
|||
|
|||
if n.Cookie != cookie { |
|||
glog.V(0).Infoln("delete with unmaching cookie from ", r.RemoteAddr, "agent", r.UserAgent()) |
|||
return |
|||
} |
|||
|
|||
n.Size = 0 |
|||
ret := replication.ReplicatedDelete(vs.masterNode, vs.store, volumeId, n, r) |
|||
|
|||
if ret != 0 { |
|||
w.WriteHeader(http.StatusAccepted) |
|||
} else { |
|||
w.WriteHeader(http.StatusInternalServerError) |
|||
} |
|||
|
|||
m := make(map[string]uint32) |
|||
m["size"] = uint32(count) |
|||
writeJsonQuiet(w, r, m) |
|||
} |
|||
|
Write
Preview
Loading…
Cancel
Save
Reference in new issue