Browse Source

`ChunkFileReader` will read chunk needle from local volume first

pull/279/head
tnextday 10 years ago
parent
commit
bee8fdb088
  1. 61
      go/operation/chunked_manifest.go
  2. 118
      go/storage/chunked_file_reader.go
  3. 10
      go/storage/file_id.go
  4. 44
      go/storage/needle.go
  5. 3
      go/weed/weed_server/volume_server_handlers_read.go

61
go/operation/chunked_manifest.go

@ -0,0 +1,61 @@
package operation
import (
"encoding/json"
"errors"
"sort"
"github.com/chrislusf/seaweedfs/go/glog"
)
type ChunkInfo struct {
Fid string `json:"fid"`
Offset int64 `json:"offset"`
Size int64 `json:"size"`
}
type ChunkList []*ChunkInfo
type ChunkManifest struct {
Name string `json:"name,omitempty"`
Mime string `json:"mime,omitempty"`
Size int64 `json:"size,omitempty"`
Chunks ChunkList `json:"chunks,omitempty"`
}
func (s ChunkList) Len() int { return len(s) }
func (s ChunkList) Less(i, j int) bool { return s[i].Offset < s[j].Offset }
func (s ChunkList) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func LoadChunkManifest(buffer []byte, isGzipped bool) (*ChunkManifest, error) {
if isGzipped {
var err error
if buffer, err = UnGzipData(buffer); err != nil {
return nil, err
}
}
cm := ChunkManifest{}
if e := json.Unmarshal(buffer, &cm); e != nil {
return nil, e
}
sort.Sort(cm.Chunks)
return &cm, nil
}
func (cm *ChunkManifest) Marshal() ([]byte, error) {
return json.Marshal(cm)
}
func (cm *ChunkManifest) DeleteChunks(master, collection string) error {
deleteError := 0
for _, ci := range cm.Chunks {
if e := DeleteFile(master, ci.Fid, collection, ""); e != nil {
deleteError++
glog.V(0).Infof("Delete %s error: %v, master: %s", ci.Fid, e, master)
}
}
if deleteError > 0 {
return errors.New("Not all chunks deleted.")
}
return nil
}

118
go/operation/chunked_file.go → go/storage/chunked_file_reader.go

@ -1,16 +1,13 @@
package operation
package storage
import ( import (
"encoding/json"
"errors" "errors"
"fmt" "fmt"
"io" "io"
"net/http" "net/http"
"sort"
"sync" "sync"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/chrislusf/seaweedfs/go/operation"
"github.com/chrislusf/seaweedfs/go/util" "github.com/chrislusf/seaweedfs/go/util"
) )
@ -21,70 +18,43 @@ var (
ErrInvalidRange = errors.New("Invalid range") ErrInvalidRange = errors.New("Invalid range")
) )
type ChunkInfo struct {
Fid string `json:"fid"`
Offset int64 `json:"offset"`
Size int64 `json:"size"`
}
type ChunkList []*ChunkInfo
type ChunkManifest struct {
Name string `json:"name,omitempty"`
Mime string `json:"mime,omitempty"`
Size int64 `json:"size,omitempty"`
Chunks ChunkList `json:"chunks,omitempty"`
}
// seekable chunked file reader // seekable chunked file reader
type ChunkedFileReader struct { type ChunkedFileReader struct {
Manifest *ChunkManifest
Manifest *operation.ChunkManifest
Master string Master string
Collection string Collection string
Store *Store
pos int64 pos int64
pr *io.PipeReader pr *io.PipeReader
pw *io.PipeWriter pw *io.PipeWriter
mutex sync.Mutex mutex sync.Mutex
} }
func (s ChunkList) Len() int { return len(s) }
func (s ChunkList) Less(i, j int) bool { return s[i].Offset < s[j].Offset }
func (s ChunkList) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func LoadChunkManifest(buffer []byte, isGzipped bool) (*ChunkManifest, error) {
if isGzipped {
var err error
if buffer, err = UnGzipData(buffer); err != nil {
return nil, err
}
func (cf *ChunkedFileReader) Seek(offset int64, whence int) (int64, error) {
var err error
switch whence {
case 0:
case 1:
offset += cf.pos
case 2:
offset = cf.Manifest.Size - offset
} }
cm := ChunkManifest{}
if e := json.Unmarshal(buffer, &cm); e != nil {
return nil, e
if offset > cf.Manifest.Size {
err = ErrInvalidRange
} }
sort.Sort(cm.Chunks)
return &cm, nil
}
func (cm *ChunkManifest) Marshal() ([]byte, error) {
return json.Marshal(cm)
if cf.pos != offset {
cf.Close()
}
cf.pos = offset
return cf.pos, err
} }
func (cm *ChunkManifest) DeleteChunks(master, collection string) error {
deleteError := 0
for _, ci := range cm.Chunks {
if e := DeleteFile(master, ci.Fid, collection, ""); e != nil {
deleteError++
glog.V(0).Infof("Delete %s error: %v, master: %s", ci.Fid, e, master)
}
func (cf *ChunkedFileReader) readRemoteChunkNeedle(fid string, w io.Writer, offset int64) (written int64, e error) {
fileUrl, lookupError := operation.LookupFileId(cf.Master, fid, cf.Collection, true)
if lookupError != nil {
return 0, lookupError
} }
if deleteError > 0 {
return errors.New("Not all chunks deleted.")
}
return nil
}
func readChunkNeedle(fileUrl string, w io.Writer, offset int64) (written int64, e error) {
req, err := http.NewRequest("GET", fileUrl, nil) req, err := http.NewRequest("GET", fileUrl, nil)
if err != nil { if err != nil {
return written, err return written, err
@ -115,23 +85,21 @@ func readChunkNeedle(fileUrl string, w io.Writer, offset int64) (written int64,
return io.Copy(w, resp.Body) return io.Copy(w, resp.Body)
} }
func (cf *ChunkedFileReader) Seek(offset int64, whence int) (int64, error) {
var err error
switch whence {
case 0:
case 1:
offset += cf.pos
case 2:
offset = cf.Manifest.Size - offset
func (cf *ChunkedFileReader) readLocalChunkNeedle(fid *FileId, w io.Writer, offset int64) (written int64, e error) {
n := &Needle{
Id: fid.Key,
Cookie: fid.Hashcode,
} }
if offset > cf.Manifest.Size {
err = ErrInvalidRange
cookie := n.Cookie
count, e := cf.Store.ReadVolumeNeedle(fid.VolumeId, n)
if e != nil || count <= 0 {
return 0, e
} }
if cf.pos != offset {
cf.Close()
if n.Cookie != cookie {
return 0, fmt.Errorf("read error: with unmaching cookie seen: %s expected: %s", cookie, n.Cookie)
} }
cf.pos = offset
return cf.pos, err
wn, e := w.Write(n.Data[offset:])
return int64(wn), e
} }
func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) { func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) {
@ -150,12 +118,18 @@ func (cf *ChunkedFileReader) WriteTo(w io.Writer) (n int64, err error) {
} }
for ; chunkIndex < cm.Chunks.Len(); chunkIndex++ { for ; chunkIndex < cm.Chunks.Len(); chunkIndex++ {
ci := cm.Chunks[chunkIndex] ci := cm.Chunks[chunkIndex]
// if we need read date from local volume server first?
fileUrl, lookupError := LookupFileId(cf.Master, ci.Fid, cf.Collection, true)
if lookupError != nil {
return n, lookupError
fid, e := ParseFileId(ci.Fid)
if e != nil {
return n, e
} }
if wn, e := readChunkNeedle(fileUrl, w, chunkStartOffset); e != nil {
var wn int64
if cf.Store != nil && cf.Store.HasVolume(fid.VolumeId) {
wn, e = cf.readLocalChunkNeedle(fid, w, chunkStartOffset)
} else {
wn, e = cf.readRemoteChunkNeedle(ci.Fid, w, chunkStartOffset)
}
if e != nil {
return n, e return n, e
} else { } else {
n += wn n += wn

10
go/storage/file_id.go

@ -22,14 +22,20 @@ func NewFileId(VolumeId VolumeId, Key uint64, Hashcode uint32) *FileId {
return &FileId{VolumeId: VolumeId, Key: Key, Hashcode: Hashcode} return &FileId{VolumeId: VolumeId, Key: Key, Hashcode: Hashcode}
} }
func ParseFileId(fid string) (*FileId, error) { func ParseFileId(fid string) (*FileId, error) {
a := strings.Split(fid, ",")
var a []string
if strings.Contains(fid, ",") {
a = strings.Split(fid, ",")
} else {
a = strings.Split(fid, "/")
}
if len(a) != 2 { if len(a) != 2 {
glog.V(1).Infoln("Invalid fid ", fid, ", split length ", len(a)) glog.V(1).Infoln("Invalid fid ", fid, ", split length ", len(a))
return nil, errors.New("Invalid fid " + fid) return nil, errors.New("Invalid fid " + fid)
} }
vid_string, key_hash_string := a[0], a[1] vid_string, key_hash_string := a[0], a[1]
volumeId, _ := NewVolumeId(vid_string) volumeId, _ := NewVolumeId(vid_string)
key, hash, e := ParseKeyHash(key_hash_string)
key, hash, e := ParseIdCookie(key_hash_string)
return &FileId{VolumeId: volumeId, Key: key, Hashcode: hash}, e return &FileId{VolumeId: volumeId, Key: key, Hashcode: hash}, e
} }
func (n *FileId) String() string { func (n *FileId) String() string {

44
go/storage/needle.go

@ -193,36 +193,38 @@ func (n *Needle) ParsePath(fid string) (err error) {
if length <= 8 { if length <= 8 {
return errors.New("Invalid fid:" + fid) return errors.New("Invalid fid:" + fid)
} }
n.Id, n.Cookie, err = ParseIdCookie(fid)
if err != nil {
return err
}
return err
}
func ParseIdCookie(fid string) (uint64, uint32, error) {
delta := "" delta := ""
deltaIndex := strings.LastIndex(fid, "_") deltaIndex := strings.LastIndex(fid, "_")
if deltaIndex > 0 { if deltaIndex > 0 {
fid, delta = fid[0:deltaIndex], fid[deltaIndex+1:] fid, delta = fid[0:deltaIndex], fid[deltaIndex+1:]
} }
n.Id, n.Cookie, err = ParseKeyHash(fid)
if err != nil {
return err
if len(fid)%2 == 1 {
fid = "0" + fid
}
key_hash_bytes, khe := hex.DecodeString(fid)
key_hash_len := len(key_hash_bytes)
if khe != nil || key_hash_len <= 4 {
glog.V(0).Infoln("Invalid key_hash", fid, "length:", key_hash_len, "error", khe)
return 0, 0, errors.New("Invalid key and hash:" + fid)
} }
id := util.BytesToUint64(key_hash_bytes[0 : key_hash_len-4])
cookie := util.BytesToUint32(key_hash_bytes[key_hash_len-4 : key_hash_len])
if delta != "" { if delta != "" {
if d, e := strconv.ParseUint(delta, 10, 64); e == nil { if d, e := strconv.ParseUint(delta, 10, 64); e == nil {
n.Id += d
id += d
} else { } else {
return e
return 0, 0, e
} }
} }
return err
}
func ParseKeyHash(key_hash_string string) (uint64, uint32, error) {
if len(key_hash_string)%2 == 1 {
key_hash_string = "0" + key_hash_string
}
key_hash_bytes, khe := hex.DecodeString(key_hash_string)
key_hash_len := len(key_hash_bytes)
if khe != nil || key_hash_len <= 4 {
glog.V(0).Infoln("Invalid key_hash", key_hash_string, "length:", key_hash_len, "error", khe)
return 0, 0, errors.New("Invalid key and hash:" + key_hash_string)
}
key := util.BytesToUint64(key_hash_bytes[0 : key_hash_len-4])
hash := util.BytesToUint32(key_hash_bytes[key_hash_len-4 : key_hash_len])
return key, hash, nil
return id, cookie, nil
} }

3
go/weed/weed_server/volume_server_handlers_read.go

@ -156,9 +156,10 @@ func (vs *VolumeServer) tryHandleChunkedFile(vid storage.VolumeId, n *storage.Ne
w.Header().Set("X-File-Store", "chunked") w.Header().Set("X-File-Store", "chunked")
chunkedFileReader := &operation.ChunkedFileReader{
chunkedFileReader := &storage.ChunkedFileReader{
Manifest: chunkManifest, Manifest: chunkManifest,
Master: vs.GetMasterNode(), Master: vs.GetMasterNode(),
Store: vs.store,
} }
if v := vs.store.GetVolume(vid); v != nil { if v := vs.store.GetVolume(vid); v != nil {
chunkedFileReader.Collection = v.Collection chunkedFileReader.Collection = v.Collection

Loading…
Cancel
Save