From bee8fdb088d65b586698aef92a71a3f60411525e Mon Sep 17 00:00:00 2001 From: tnextday Date: Fri, 11 Mar 2016 01:09:01 +0800 Subject: [PATCH] `ChunkFileReader` will read chunk needle from local volume first --- go/operation/chunked_manifest.go | 61 +++++++++ .../chunked_file_reader.go} | 118 +++++++----------- go/storage/file_id.go | 10 +- go/storage/needle.go | 44 +++---- .../volume_server_handlers_read.go | 3 +- 5 files changed, 140 insertions(+), 96 deletions(-) create mode 100644 go/operation/chunked_manifest.go rename go/{operation/chunked_file.go => storage/chunked_file_reader.go} (63%) diff --git a/go/operation/chunked_manifest.go b/go/operation/chunked_manifest.go new file mode 100644 index 000000000..2fdcb6039 --- /dev/null +++ b/go/operation/chunked_manifest.go @@ -0,0 +1,61 @@ +package operation + +import ( + "encoding/json" + "errors" + "sort" + + "github.com/chrislusf/seaweedfs/go/glog" +) + +type ChunkInfo struct { + Fid string `json:"fid"` + Offset int64 `json:"offset"` + Size int64 `json:"size"` +} + +type ChunkList []*ChunkInfo + +type ChunkManifest struct { + Name string `json:"name,omitempty"` + Mime string `json:"mime,omitempty"` + Size int64 `json:"size,omitempty"` + Chunks ChunkList `json:"chunks,omitempty"` +} + +func (s ChunkList) Len() int { return len(s) } +func (s ChunkList) Less(i, j int) bool { return s[i].Offset < s[j].Offset } +func (s ChunkList) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func LoadChunkManifest(buffer []byte, isGzipped bool) (*ChunkManifest, error) { + if isGzipped { + var err error + if buffer, err = UnGzipData(buffer); err != nil { + return nil, err + } + } + cm := ChunkManifest{} + if e := json.Unmarshal(buffer, &cm); e != nil { + return nil, e + } + sort.Sort(cm.Chunks) + return &cm, nil +} + +func (cm *ChunkManifest) Marshal() ([]byte, error) { + return json.Marshal(cm) +} + +func (cm *ChunkManifest) DeleteChunks(master, collection string) error { + deleteError := 0 + for _, ci := range cm.Chunks { + if e := DeleteFile(master, ci.Fid, collection, ""); e != nil { + deleteError++ + glog.V(0).Infof("Delete %s error: %v, master: %s", ci.Fid, e, master) + } + } + if deleteError > 0 { + return errors.New("Not all chunks deleted.") + } + return nil +} diff --git a/go/operation/chunked_file.go b/go/storage/chunked_file_reader.go similarity index 63% rename from go/operation/chunked_file.go rename to go/storage/chunked_file_reader.go index bcdae8617..637c841ee 100644 --- a/go/operation/chunked_file.go +++ b/go/storage/chunked_file_reader.go @@ -1,16 +1,13 @@ -package operation +package storage import ( - "encoding/json" "errors" "fmt" "io" "net/http" - "sort" - "sync" - "github.com/chrislusf/seaweedfs/go/glog" + "github.com/chrislusf/seaweedfs/go/operation" "github.com/chrislusf/seaweedfs/go/util" ) @@ -21,70 +18,43 @@ var ( ErrInvalidRange = errors.New("Invalid range") ) -type ChunkInfo struct { - Fid string `json:"fid"` - Offset int64 `json:"offset"` - Size int64 `json:"size"` -} - -type ChunkList []*ChunkInfo - -type ChunkManifest struct { - Name string `json:"name,omitempty"` - Mime string `json:"mime,omitempty"` - Size int64 `json:"size,omitempty"` - Chunks ChunkList `json:"chunks,omitempty"` -} - // seekable chunked file reader type ChunkedFileReader struct { - Manifest *ChunkManifest + Manifest *operation.ChunkManifest Master string Collection string + Store *Store pos int64 pr *io.PipeReader pw *io.PipeWriter mutex sync.Mutex } -func (s ChunkList) Len() int { return len(s) } -func (s ChunkList) Less(i, j int) bool { return s[i].Offset < s[j].Offset } -func (s ChunkList) Swap(i, j int) { s[i], s[j] = s[j], s[i] } - -func LoadChunkManifest(buffer []byte, isGzipped bool) (*ChunkManifest, error) { - if isGzipped { - var err error - if buffer, err = UnGzipData(buffer); err != nil { - return nil, err - } +func (cf *ChunkedFileReader) Seek(offset int64, whence int) (int64, error) { + var err error + switch whence { + case 0: + case 1: + offset += cf.pos + case 2: + offset = cf.Manifest.Size - offset } - cm := ChunkManifest{} - if e := json.Unmarshal(buffer, &cm); e != nil { - return nil, e + if offset > cf.Manifest.Size { + err = ErrInvalidRange } - sort.Sort(cm.Chunks) - return &cm, nil -} - -func (cm *ChunkManifest) Marshal() ([]byte, error) { - return json.Marshal(cm) + if cf.pos != offset { + cf.Close() + } + cf.pos = offset + return cf.pos, err } -func (cm *ChunkManifest) DeleteChunks(master, collection string) error { - deleteError := 0 - for _, ci := range cm.Chunks { - if e := DeleteFile(master, ci.Fid, collection, ""); e != nil { - deleteError++ - glog.V(0).Infof("Delete %s error: %v, master: %s", ci.Fid, e, master) - } +func (cf *ChunkedFileReader) readRemoteChunkNeedle(fid string, w io.Writer, offset int64) (written int64, e error) { + fileUrl, lookupError := operation.LookupFileId(cf.Master, fid, cf.Collection, true) + if lookupError != nil { + return 0, lookupError } - if deleteError > 0 { - return errors.New("Not all chunks deleted.") - } - return nil -} -func readChunkNeedle(fileUrl string, w io.Writer, offset int64) (written int64, e error) { req, err := http.NewRequest("GET", fileUrl, nil) if err != nil { return written, err @@ -115,23 +85,21 @@ func readChunkNeedle(fileUrl string, w io.Writer, offset int64) (written int64, return io.Copy(w, resp.Body) } -func (cf *ChunkedFileReader) Seek(offset int64, whence int) (int64, error) { - var err error - switch whence { - case 0: - case 1: - offset += cf.pos - case 2: - offset = cf.Manifest.Size - offset +func (cf *ChunkedFileReader) readLocalChunkNeedle(fid *FileId, w io.Writer, offset int64) (written int64, e error) { + n := &Needle{ + Id: fid.Key, + Cookie: fid.Hashcode, } - if offset > cf.Manifest.Size { - err = ErrInvalidRange + cookie := n.Cookie + count, e := cf.Store.ReadVolumeNeedle(fid.VolumeId, n) + if e != nil || count <= 0 { + return 0, e } - if cf.pos != offset { - cf.Close() + if n.Cookie != cookie { + return 0, fmt.Errorf("read error: with unmaching cookie seen: %s expected: %s", cookie, n.Cookie) } - cf.pos = offset - return cf.pos, err + wn, e := w.Write(n.Data[offset:]) + return int64(wn), e } func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) { @@ -150,12 +118,18 @@ func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) { } for ; chunkIndex < cm.Chunks.Len(); chunkIndex++ { ci := cm.Chunks[chunkIndex] - // if we need read date from local volume server first? - fileUrl, lookupError := LookupFileId(cf.Master, ci.Fid, cf.Collection, true) - if lookupError != nil { - return n, lookupError + fid, e := ParseFileId(ci.Fid) + if e != nil { + return n, e } - if wn, e := readChunkNeedle(fileUrl, w, chunkStartOffset); e != nil { + var wn int64 + if cf.Store != nil && cf.Store.HasVolume(fid.VolumeId) { + wn, e = cf.readLocalChunkNeedle(fid, w, chunkStartOffset) + } else { + wn, e = cf.readRemoteChunkNeedle(ci.Fid, w, chunkStartOffset) + } + + if e != nil { return n, e } else { n += wn diff --git a/go/storage/file_id.go b/go/storage/file_id.go index 64b61ba89..3cc14eaf3 100644 --- a/go/storage/file_id.go +++ b/go/storage/file_id.go @@ -22,14 +22,20 @@ func NewFileId(VolumeId VolumeId, Key uint64, Hashcode uint32) *FileId { return &FileId{VolumeId: VolumeId, Key: Key, Hashcode: Hashcode} } func ParseFileId(fid string) (*FileId, error) { - a := strings.Split(fid, ",") + var a []string + if strings.Contains(fid, ",") { + a = strings.Split(fid, ",") + } else { + a = strings.Split(fid, "/") + } if len(a) != 2 { glog.V(1).Infoln("Invalid fid ", fid, ", split length ", len(a)) return nil, errors.New("Invalid fid " + fid) } + vid_string, key_hash_string := a[0], a[1] volumeId, _ := NewVolumeId(vid_string) - key, hash, e := ParseKeyHash(key_hash_string) + key, hash, e := ParseIdCookie(key_hash_string) return &FileId{VolumeId: volumeId, Key: key, Hashcode: hash}, e } func (n *FileId) String() string { diff --git a/go/storage/needle.go b/go/storage/needle.go index e49368820..e7758c41e 100644 --- a/go/storage/needle.go +++ b/go/storage/needle.go @@ -193,36 +193,38 @@ func (n *Needle) ParsePath(fid string) (err error) { if length <= 8 { return errors.New("Invalid fid:" + fid) } + n.Id, n.Cookie, err = ParseIdCookie(fid) + if err != nil { + return err + } + return err +} + +func ParseIdCookie(fid string) (uint64, uint32, error) { delta := "" deltaIndex := strings.LastIndex(fid, "_") if deltaIndex > 0 { fid, delta = fid[0:deltaIndex], fid[deltaIndex+1:] } - n.Id, n.Cookie, err = ParseKeyHash(fid) - if err != nil { - return err + + if len(fid)%2 == 1 { + fid = "0" + fid + } + key_hash_bytes, khe := hex.DecodeString(fid) + key_hash_len := len(key_hash_bytes) + if khe != nil || key_hash_len <= 4 { + glog.V(0).Infoln("Invalid key_hash", fid, "length:", key_hash_len, "error", khe) + return 0, 0, errors.New("Invalid key and hash:" + fid) } + id := util.BytesToUint64(key_hash_bytes[0 : key_hash_len-4]) + cookie := util.BytesToUint32(key_hash_bytes[key_hash_len-4 : key_hash_len]) + if delta != "" { if d, e := strconv.ParseUint(delta, 10, 64); e == nil { - n.Id += d + id += d } else { - return e + return 0, 0, e } } - return err -} - -func ParseKeyHash(key_hash_string string) (uint64, uint32, error) { - if len(key_hash_string)%2 == 1 { - key_hash_string = "0" + key_hash_string - } - key_hash_bytes, khe := hex.DecodeString(key_hash_string) - key_hash_len := len(key_hash_bytes) - if khe != nil || key_hash_len <= 4 { - glog.V(0).Infoln("Invalid key_hash", key_hash_string, "length:", key_hash_len, "error", khe) - return 0, 0, errors.New("Invalid key and hash:" + key_hash_string) - } - key := util.BytesToUint64(key_hash_bytes[0 : key_hash_len-4]) - hash := util.BytesToUint32(key_hash_bytes[key_hash_len-4 : key_hash_len]) - return key, hash, nil + return id, cookie, nil } diff --git a/go/weed/weed_server/volume_server_handlers_read.go b/go/weed/weed_server/volume_server_handlers_read.go index ebe323d1c..b1a1d66ec 100644 --- a/go/weed/weed_server/volume_server_handlers_read.go +++ b/go/weed/weed_server/volume_server_handlers_read.go @@ -156,9 +156,10 @@ func (vs *VolumeServer) tryHandleChunkedFile(vid storage.VolumeId, n *storage.Ne w.Header().Set("X-File-Store", "chunked") - chunkedFileReader := &operation.ChunkedFileReader{ + chunkedFileReader := &storage.ChunkedFileReader{ Manifest: chunkManifest, Master: vs.GetMasterNode(), + Store: vs.store, } if v := vs.store.GetVolume(vid); v != nil { chunkedFileReader.Collection = v.Collection