diff --git a/go/storage/chunked_file_reader.go b/go/storage/chunked_file_reader.go index 755d2966b..d7ba964a7 100644 --- a/go/storage/chunked_file_reader.go +++ b/go/storage/chunked_file_reader.go @@ -63,7 +63,7 @@ func (cf *ChunkedFileReader) readRemoteChunkNeedle(fid string, w io.Writer, offs req.Header.Set("Range", fmt.Sprintf("bytes=%d-", offset)) } - resp, err := util.Do(req) + resp, err := util.HttpDo(req) if err != nil { return written, err } diff --git a/go/storage/file_id.go b/go/storage/file_id.go index 3cc14eaf3..0cd0abc11 100644 --- a/go/storage/file_id.go +++ b/go/storage/file_id.go @@ -1,12 +1,10 @@ package storage import ( - "encoding/hex" "errors" "strings" "github.com/chrislusf/seaweedfs/go/glog" - "github.com/chrislusf/seaweedfs/go/util" ) type FileId struct { @@ -38,12 +36,11 @@ func ParseFileId(fid string) (*FileId, error) { key, hash, e := ParseIdCookie(key_hash_string) return &FileId{VolumeId: volumeId, Key: key, Hashcode: hash}, e } + func (n *FileId) String() string { - bytes := make([]byte, 12) - util.Uint64toBytes(bytes[0:8], n.Key) - util.Uint32toBytes(bytes[8:12], n.Hashcode) - nonzero_index := 0 - for ; bytes[nonzero_index] == 0; nonzero_index++ { - } - return n.VolumeId.String() + "," + hex.EncodeToString(bytes[nonzero_index:]) + return n.VolumeId.String() + "," + n.Nid() +} + +func (n *FileId) Nid() string { + return ToNid(n.Key, n.Hashcode) } diff --git a/go/storage/needle.go b/go/storage/needle.go index e7758c41e..0e2b639f2 100644 --- a/go/storage/needle.go +++ b/go/storage/needle.go @@ -179,42 +179,46 @@ func NewNeedle(r *http.Request, fixJpgOrientation bool) (n *Needle, e error) { commaSep := strings.LastIndex(r.URL.Path, ",") dotSep := strings.LastIndex(r.URL.Path, ".") - fid := r.URL.Path[commaSep+1:] + nid := r.URL.Path[commaSep+1:] if dotSep > 0 { - fid = r.URL.Path[commaSep+1 : dotSep] + nid = r.URL.Path[commaSep+1 : dotSep] } - e = n.ParsePath(fid) + e = n.ParseNid(nid) return } -func (n *Needle) ParsePath(fid string) (err error) { - length := len(fid) +func (n *Needle) ParseNid(nid string) (err error) { + length := len(nid) if length <= 8 { - return errors.New("Invalid fid:" + fid) + return errors.New("Invalid nid:" + nid) } - n.Id, n.Cookie, err = ParseIdCookie(fid) + n.Id, n.Cookie, err = ParseIdCookie(nid) if err != nil { return err } return err } -func ParseIdCookie(fid string) (uint64, uint32, error) { +func (n *Needle) Nid() string { + return ToNid(n.Id, n.Cookie) +} + +func ParseIdCookie(nid string) (uint64, uint32, error) { delta := "" - deltaIndex := strings.LastIndex(fid, "_") + deltaIndex := strings.LastIndex(nid, "_") if deltaIndex > 0 { - fid, delta = fid[0:deltaIndex], fid[deltaIndex+1:] + nid, delta = nid[0:deltaIndex], nid[deltaIndex+1:] } - if len(fid)%2 == 1 { - fid = "0" + fid + if len(nid)%2 == 1 { + nid = "0" + nid } - key_hash_bytes, khe := hex.DecodeString(fid) + key_hash_bytes, khe := hex.DecodeString(nid) key_hash_len := len(key_hash_bytes) if khe != nil || key_hash_len <= 4 { - glog.V(0).Infoln("Invalid key_hash", fid, "length:", key_hash_len, "error", khe) - return 0, 0, errors.New("Invalid key and hash:" + fid) + glog.V(0).Infoln("Invalid key_hash", nid, "length:", key_hash_len, "error", khe) + return 0, 0, errors.New("Invalid key and hash:" + nid) } id := util.BytesToUint64(key_hash_bytes[0 : key_hash_len-4]) cookie := util.BytesToUint32(key_hash_bytes[key_hash_len-4 : key_hash_len]) @@ -228,3 +232,13 @@ func ParseIdCookie(fid string) (uint64, uint32, error) { } return id, cookie, nil } + +func ToNid(key uint64, cookie uint32) string { + bytes := make([]byte, 12) + util.Uint64toBytes(bytes[0:8], key) + util.Uint32toBytes(bytes[8:12], cookie) + nonzero_index := 0 + for ; bytes[nonzero_index] == 0; nonzero_index++ { + } + return hex.EncodeToString(bytes[nonzero_index:]) +} diff --git a/go/util/http_util.go b/go/util/http_util.go index 2fbd76fb9..722c63276 100644 --- a/go/util/http_util.go +++ b/go/util/http_util.go @@ -248,7 +248,7 @@ func DownloadToFile(fileUrl, savePath string) (e error) { return } -func Do(req *http.Request) (resp *http.Response, err error) { +func HttpDo(req *http.Request) (resp *http.Response, err error) { return client.Do(req) } diff --git a/go/weed/server.go b/go/weed/server.go index 56490c093..74362228a 100644 --- a/go/weed/server.go +++ b/go/weed/server.go @@ -10,12 +10,13 @@ import ( "sync" "time" + "net" + "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/storage" "github.com/chrislusf/seaweedfs/go/util" "github.com/chrislusf/seaweedfs/go/weed/weed_server" "github.com/gorilla/mux" - "net" ) type ServerOptions struct { @@ -72,6 +73,7 @@ var ( volumeIndexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|boltdb] mode for memory~performance balance.") volumeFixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", true, "Adjust jpg orientation when uploading.") volumeReadRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.") + volumeReadRemoteNeedle = cmdServer.Flag.Bool("volume.read.remote.needle", false, "Read remote needle when have non-local volumes.") volumeServerPublicUrl = cmdServer.Flag.String("volume.publicUrl", "", "publicly accessible address") isStartingFiler = cmdServer.Flag.Bool("filer", false, "whether to start filer") @@ -254,7 +256,7 @@ func runServer(cmd *Command, args []string) bool { folders, maxCounts, volumeNeedleMapKind, net.JoinHostPort(*serverIp, strconv.Itoa(*masterPort)), *volumePulse, *serverDataCenter, *serverRack, - serverWhiteList, *volumeFixJpgOrientation, *volumeReadRedirect, + serverWhiteList, *volumeFixJpgOrientation, *volumeReadRedirect, *volumeReadRemoteNeedle, ) glog.V(0).Infoln("Start Seaweed volume server", util.VERSION, "at", net.JoinHostPort(*serverIp, strconv.Itoa(*volumePort))) diff --git a/go/weed/volume.go b/go/weed/volume.go index 2ab8491d4..b22728445 100644 --- a/go/weed/volume.go +++ b/go/weed/volume.go @@ -8,11 +8,12 @@ import ( "strings" "time" + "net" + "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/storage" "github.com/chrislusf/seaweedfs/go/util" "github.com/chrislusf/seaweedfs/go/weed/weed_server" - "net" ) var ( @@ -37,6 +38,7 @@ type VolumeServerOptions struct { indexType *string fixJpgOrientation *bool readRedirect *bool + readRemoteNeedle *bool } func init() { @@ -55,6 +57,8 @@ func init() { v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|boltdb] mode for memory~performance balance.") v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", true, "Adjust jpg orientation when uploading.") v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.") + v.readRemoteNeedle = cmdVolume.Flag.Bool("read.remote.needle", false, "Read remote needle when have non-local volumes.") + } var cmdVolume = &Command{ @@ -132,7 +136,7 @@ func runVolume(cmd *Command, args []string) bool { volumeNeedleMapKind, *v.master, *v.pulseSeconds, *v.dataCenter, *v.rack, v.whiteList, - *v.fixJpgOrientation, *v.readRedirect, + *v.fixJpgOrientation, *v.readRedirect, *v.readRemoteNeedle, ) listeningAddress := net.JoinHostPort(*v.bindIp, strconv.Itoa(*v.port)) diff --git a/go/weed/weed_server/common.go b/go/weed/weed_server/common.go index 89499af40..582e3e4ef 100644 --- a/go/weed/weed_server/common.go +++ b/go/weed/weed_server/common.go @@ -136,19 +136,19 @@ func deleteForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st writeJsonQuiet(w, r, http.StatusAccepted, ret) } -func parseURLPath(path string) (vid, fid, filename, ext string, isVolumeIdOnly bool) { +func parseURLPath(path string) (vid, nid, filename, ext string, isVolumeIdOnly bool) { switch strings.Count(path, "/") { case 3: parts := strings.Split(path, "/") - vid, fid, filename = parts[1], parts[2], parts[3] + vid, nid, filename = parts[1], parts[2], parts[3] ext = filepath.Ext(filename) case 2: parts := strings.Split(path, "/") - vid, fid = parts[1], parts[2] - dotIndex := strings.LastIndex(fid, ".") + vid, nid = parts[1], parts[2] + dotIndex := strings.LastIndex(nid, ".") if dotIndex > 0 { - ext = fid[dotIndex:] - fid = fid[0:dotIndex] + ext = nid[dotIndex:] + nid = nid[0:dotIndex] } default: sepIndex := strings.LastIndex(path, "/") @@ -159,10 +159,10 @@ func parseURLPath(path string) (vid, fid, filename, ext string, isVolumeIdOnly b } dotIndex := strings.LastIndex(path[sepIndex:], ".") vid = path[sepIndex+1 : commaIndex] - fid = path[commaIndex+1:] + nid = path[commaIndex+1:] ext = "" if dotIndex > 0 { - fid = path[commaIndex+1 : dotIndex] + nid = path[commaIndex+1 : dotIndex] ext = path[dotIndex:] } } diff --git a/go/weed/weed_server/filer_server_handlers.go b/go/weed/weed_server/filer_server_handlers.go index d7867284e..d3772ebbd 100644 --- a/go/weed/weed_server/filer_server_handlers.go +++ b/go/weed/weed_server/filer_server_handlers.go @@ -106,7 +106,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, ContentLength: r.ContentLength, } glog.V(3).Infoln("retrieving from", u) - resp, do_err := util.Do(request) + resp, do_err := util.HttpDo(request) if do_err != nil { glog.V(0).Infoln("failing to connect to volume server", do_err.Error()) writeJsonError(w, r, http.StatusInternalServerError, do_err) @@ -197,7 +197,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { Host: r.Host, ContentLength: r.ContentLength, } - resp, do_err := util.Do(request) + resp, do_err := util.HttpDo(request) if do_err != nil { glog.V(0).Infoln("failing to connect to volume server", r.RequestURI, do_err.Error()) writeJsonError(w, r, http.StatusInternalServerError, do_err) diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go index fbf0339e3..fd6832f7b 100644 --- a/go/weed/weed_server/volume_server.go +++ b/go/weed/weed_server/volume_server.go @@ -22,6 +22,7 @@ type VolumeServer struct { FixJpgOrientation bool ReadRedirect bool + ReadRemoteNeedle bool } func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, @@ -32,13 +33,14 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, dataCenter string, rack string, whiteList []string, fixJpgOrientation bool, - readRedirect bool) *VolumeServer { + readRedirect, readRemoteNeedle bool) *VolumeServer { vs := &VolumeServer{ pulseSeconds: pulseSeconds, dataCenter: dataCenter, rack: rack, FixJpgOrientation: fixJpgOrientation, ReadRedirect: readRedirect, + ReadRemoteNeedle: readRemoteNeedle, } vs.SetMasterNode(masterNode) vs.store = storage.NewStore(port, ip, publicUrl, folders, maxCounts, needleMapKind) @@ -57,6 +59,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler)) adminMux.HandleFunc("/admin/sync/data", vs.guard.WhiteList(vs.getVolumeDataContentHandler)) adminMux.HandleFunc("/admin/sync/vol_data", vs.guard.WhiteList(vs.getVolumeCleanDataHandler)) + adminMux.HandleFunc("/admin/sync/needle", vs.guard.WhiteList(vs.getNeedleHandler)) adminMux.HandleFunc("/admin/task/new", vs.guard.WhiteList(vs.newTaskHandler)) adminMux.HandleFunc("/admin/task/query", vs.guard.WhiteList(vs.queryTaskHandler)) adminMux.HandleFunc("/admin/task/commit", vs.guard.WhiteList(vs.commitTaskHandler)) diff --git a/go/weed/weed_server/volume_server_handlers_read.go b/go/weed/weed_server/volume_server_handlers_read.go index b1a1d66ec..c4ed1d9cd 100644 --- a/go/weed/weed_server/volume_server_handlers_read.go +++ b/go/weed/weed_server/volume_server_handlers_read.go @@ -13,6 +13,9 @@ import ( "net/url" + "io/ioutil" + + "errors" "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/images" "github.com/chrislusf/seaweedfs/go/operation" @@ -24,14 +27,14 @@ var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) { n := new(storage.Needle) - vid, fid, filename, ext, _ := parseURLPath(r.URL.Path) + vid, nid, filename, ext, _ := parseURLPath(r.URL.Path) volumeId, err := storage.NewVolumeId(vid) if err != nil { glog.V(2).Infoln("parsing error:", err, r.URL.Path) w.WriteHeader(http.StatusBadRequest) return } - err = n.ParsePath(fid) + err = n.ParseNid(nid) if err != nil { glog.V(2).Infoln("parsing fid error:", err, r.URL.Path) w.WriteHeader(http.StatusBadRequest) @@ -39,12 +42,29 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } glog.V(4).Infoln("volume", volumeId, "reading", n) - if !vs.store.HasVolume(volumeId) { - if !vs.ReadRedirect { - glog.V(2).Infoln("volume is not local:", err, r.URL.Path) + if vs.store.HasVolume(volumeId) { + cookie := n.Cookie + count, e := vs.store.ReadVolumeNeedle(volumeId, n) + glog.V(4).Infoln("read local bytes", count, "error", e) + if e != nil || count <= 0 { + glog.V(0).Infoln("read local error:", e, r.URL.Path) + w.WriteHeader(http.StatusNotFound) + return + } + if n.Cookie != cookie { + glog.V(0).Infoln("request", r.URL.Path, "with unmaching cookie seen:", cookie, "expected:", n.Cookie, "from", r.RemoteAddr, "agent", r.UserAgent()) + w.WriteHeader(http.StatusNotFound) + return + } + } else if vs.ReadRemoteNeedle { + count, e := vs.readRemoteNeedle(volumeId.String(), n, r.FormValue("collection")) + glog.V(4).Infoln("read remote needle bytes ", count, "error", e) + if e != nil || count <= 0 { + glog.V(2).Infoln("read remote needle error:", e, r.URL.Path) w.WriteHeader(http.StatusNotFound) return } + } else if vs.ReadRedirect { lookupResult, err := operation.Lookup(vs.GetMasterNode(), volumeId.String(), r.FormValue("collection")) glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err) if err == nil && len(lookupResult.Locations) > 0 { @@ -56,20 +76,12 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) w.WriteHeader(http.StatusNotFound) } return - } - cookie := n.Cookie - count, e := vs.store.ReadVolumeNeedle(volumeId, n) - glog.V(4).Infoln("read bytes", count, "error", e) - if e != nil || count <= 0 { - glog.V(0).Infoln("read error:", e, r.URL.Path) - w.WriteHeader(http.StatusNotFound) - return - } - if n.Cookie != cookie { - glog.V(0).Infoln("request", r.URL.Path, "with unmaching cookie seen:", cookie, "expected:", n.Cookie, "from", r.RemoteAddr, "agent", r.UserAgent()) + } else { + glog.V(2).Infoln("volume is not local:", err, r.URL.Path) w.WriteHeader(http.StatusNotFound) return } + if n.LastModified != 0 { w.Header().Set("Last-Modified", time.Unix(int64(n.LastModified), 0).UTC().Format(http.TimeFormat)) if r.Header.Get("If-Modified-Since") != "" { @@ -278,3 +290,48 @@ func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.Re _, e = io.CopyN(w, sendContent, sendSize) return e } + +func (vs *VolumeServer) readRemoteNeedle(vid string, n *storage.Needle, collection string) (int, error) { + lookupResult, err := operation.Lookup(vs.GetMasterNode(), vid, collection) + glog.V(2).Infoln("volume", vid, "found on", lookupResult, "error", err) + if err != nil || len(lookupResult.Locations) == 0 { + return 0, errors.New("lookup error:" + err.Error()) + } + u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations.PickForRead().PublicUrl)) + u.Path = "/admin/sync/needle" + args := url.Values{ + "vid": {vid}, + "nid": {n.Nid()}, + } + u.RawQuery = args.Encode() + req, _ := http.NewRequest("GET", u.String(), nil) + resp, err := util.HttpDo(req) + if err != nil { + return 0, err + } + defer resp.Body.Close() + if n.Data, err = ioutil.ReadAll(resp.Body); err != nil { + return 0, err + } + n.DataSize = uint32(len(n.Data)) + if h := resp.Header.Get("SFS-FLAGS"); h != "" { + if i, err := strconv.ParseInt(h, 16, 64); err == nil { + n.Flags = byte(i) + } + } + if h := resp.Header.Get("SFS-LastModified"); h != "" { + if i, err := strconv.ParseUint(h, 16, 64); err == nil { + n.LastModified = i + n.SetHasLastModifiedDate() + } + } + if h := resp.Header.Get("SFS-Name"); h != "" { + n.Name = []byte(h) + n.SetHasName() + } + if h := resp.Header.Get("SFS-Mime"); h != "" { + n.Mime = []byte(h) + n.SetHasMime() + } + return int(n.DataSize), nil +} diff --git a/go/weed/weed_server/volume_server_handlers_sync.go b/go/weed/weed_server/volume_server_handlers_sync.go index 6f0d5aa8b..5227f536c 100644 --- a/go/weed/weed_server/volume_server_handlers_sync.go +++ b/go/weed/weed_server/volume_server_handlers_sync.go @@ -142,3 +142,48 @@ func (vs *VolumeServer) getVolumeCleanDataHandler(w http.ResponseWriter, r *http } lz4w.Close() } + +func (vs *VolumeServer) getNeedleHandler(w http.ResponseWriter, r *http.Request) { + vid, err := storage.NewVolumeId(r.FormValue("volume")) + if err != nil { + e := fmt.Errorf("parsing volume error: %v", err) + glog.V(2).Infoln(e) + writeJsonError(w, r, http.StatusBadRequest, e) + return + } + nid := r.FormValue("nid") + n := new(storage.Needle) + err = n.ParseNid(nid) + if err != nil { + e := fmt.Errorf("parsing fid (%s) error: %v", nid, err) + glog.V(2).Infoln(e) + writeJsonError(w, r, http.StatusBadRequest, e) + return + } + cookie := n.Cookie + count, e := vs.store.ReadVolumeNeedle(vid, n) + glog.V(4).Infoln("read bytes", count, "error", e) + if e != nil || count <= 0 { + e := fmt.Errorf("read needle (%v,%v) error: %v", vid, nid, err) + glog.V(2).Infoln(e) + writeJsonError(w, r, http.StatusNotFound, e) + return + } + if n.Cookie != cookie { + e := fmt.Errorf("request (%v,%v) with unmaching cookie seen: %v expected: %v", vid, nid, cookie, n.Cookie) + glog.V(2).Infoln(e) + writeJsonError(w, r, http.StatusNotFound, e) + return + } + w.Header().Set("SFS-FLAGS", strconv.FormatInt(int64(n.Flags), 16)) + if n.HasLastModifiedDate() { + w.Header().Set("SFS-LastModified", strconv.FormatUint(n.LastModified, 16)) + } + if n.HasName() && n.NameSize > 0 { + w.Header().Set("SFS-Name", string(n.Name)) + } + if n.HasMime() && n.MimeSize > 0 { + w.Header().Set("SFS-Mime", string(n.Mime)) + } + w.Write(n.Data) +} diff --git a/go/weed/weed_server/volume_server_handlers_write.go b/go/weed/weed_server/volume_server_handlers_write.go index 10f14e5ad..db2431046 100644 --- a/go/weed/weed_server/volume_server_handlers_write.go +++ b/go/weed/weed_server/volume_server_handlers_write.go @@ -47,9 +47,9 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { n := new(storage.Needle) - vid, fid, _, _, _ := parseURLPath(r.URL.Path) + vid, nid, _, _, _ := parseURLPath(r.URL.Path) volumeId, _ := storage.NewVolumeId(vid) - n.ParsePath(fid) + n.ParseNid(nid) glog.V(2).Infoln("deleting", n) @@ -111,7 +111,7 @@ func (vs *VolumeServer) batchDeleteHandler(w http.ResponseWriter, r *http.Reques } n := new(storage.Needle) volumeId, _ := storage.NewVolumeId(vid) - n.ParsePath(id_cookie) + n.ParseNid(id_cookie) glog.V(4).Infoln("batch deleting", n) cookie := n.Cookie if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil {