From a74e2bed2cb1b98c2791c2a9ea5819e482ff7cb0 Mon Sep 17 00:00:00 2001 From: tnextday Date: Sat, 5 Dec 2015 21:59:05 +0800 Subject: [PATCH 1/8] go imports `Needle.ReadNeedleBody` add CRC check and warning --- go/filer/flat_namespace/flat_namespace_store.go | 2 -- go/storage/needle.go | 2 +- go/storage/needle_read_write.go | 8 ++++++++ go/storage/volume_info.go | 3 ++- go/storage/volume_sync.go | 3 +++ 5 files changed, 14 insertions(+), 4 deletions(-) diff --git a/go/filer/flat_namespace/flat_namespace_store.go b/go/filer/flat_namespace/flat_namespace_store.go index 832b70e40..068201adf 100644 --- a/go/filer/flat_namespace/flat_namespace_store.go +++ b/go/filer/flat_namespace/flat_namespace_store.go @@ -1,7 +1,5 @@ package flat_namespace -import () - type FlatNamespaceStore interface { Put(fullFileName string, fid string) (err error) Get(fullFileName string) (fid string, err error) diff --git a/go/storage/needle.go b/go/storage/needle.go index 32ebdae7d..c9124a681 100644 --- a/go/storage/needle.go +++ b/go/storage/needle.go @@ -14,8 +14,8 @@ import ( "github.com/chrislusf/seaweedfs/go/glog" "github.com/chrislusf/seaweedfs/go/images" - "github.com/chrislusf/seaweedfs/go/util" "github.com/chrislusf/seaweedfs/go/operation" + "github.com/chrislusf/seaweedfs/go/util" ) const ( diff --git a/go/storage/needle_read_write.go b/go/storage/needle_read_write.go index 9d7af600a..eb7989884 100644 --- a/go/storage/needle_read_write.go +++ b/go/storage/needle_read_write.go @@ -238,6 +238,10 @@ func (n *Needle) ReadNeedleBody(r *os.File, version Version, offset int64, bodyL } n.Data = bytes[:n.Size] n.Checksum = NewCRC(n.Data) + checksum := util.BytesToUint32(bytes[n.Size : n.Size+NeedleChecksumSize]) + if n.Checksum.Value() != checksum { + glog.V(0).Infof("CRC error! Data On Disk Corrupted, needle id = %x", n.Id) + } case Version2: bytes := make([]byte, bodyLength) if _, err = r.ReadAt(bytes, offset); err != nil { @@ -245,6 +249,10 @@ func (n *Needle) ReadNeedleBody(r *os.File, version Version, offset int64, bodyL } n.readNeedleDataVersion2(bytes[0:n.Size]) n.Checksum = NewCRC(n.Data) + checksum := util.BytesToUint32(bytes[n.Size : n.Size+NeedleChecksumSize]) + if n.Checksum.Value() != checksum { + glog.V(0).Infof("CRC error! Data On Disk Corrupted, needle id = %x", n.Id) + } default: err = fmt.Errorf("Unsupported Version! (%d)", version) } diff --git a/go/storage/volume_info.go b/go/storage/volume_info.go index a2f139c89..e4979c790 100644 --- a/go/storage/volume_info.go +++ b/go/storage/volume_info.go @@ -2,8 +2,9 @@ package storage import ( "fmt" - "github.com/chrislusf/seaweedfs/go/operation" "sort" + + "github.com/chrislusf/seaweedfs/go/operation" ) type VolumeInfo struct { diff --git a/go/storage/volume_sync.go b/go/storage/volume_sync.go index 2c72d62f0..01d59d6ae 100644 --- a/go/storage/volume_sync.go +++ b/go/storage/volume_sync.go @@ -202,6 +202,9 @@ func (v *Volume) fetchNeedle(volumeDataContentHandlerUrl string, if err != nil { return fmt.Errorf("Reading from %s error: %v", volumeDataContentHandlerUrl, err) } + if needleValue.Size != uint32(len(b)) { + return fmt.Errorf("Reading from %s error: size incorrect", volumeDataContentHandlerUrl) + } offset, err := v.AppendBlob(b) if err != nil { return fmt.Errorf("Appending volume %d error: %v", v.Id, err) From 77e3581a52efecbc6eb3e0bbc57563a61147564a Mon Sep 17 00:00:00 2001 From: tnextday Date: Mon, 7 Dec 2015 21:14:57 +0800 Subject: [PATCH 2/8] Add CleanDataReader, auto fill zero in deleted needle --- Makefile | 3 + go/storage/volume_replicate.go | 185 +++++++++++++++++++++++++++++++++ 2 files changed, 188 insertions(+) create mode 100644 go/storage/volume_replicate.go diff --git a/Makefile b/Makefile index 6719a7bdd..68bd916be 100644 --- a/Makefile +++ b/Makefile @@ -20,3 +20,6 @@ build: deps linux: deps mkdir -p linux GOOS=linux GOARCH=amd64 go build $(GO_FLAGS) -o linux/$(BINARY) $(SOURCE_DIR) + +imports: + goimports -w $(SOURCE_DIR) \ No newline at end of file diff --git a/go/storage/volume_replicate.go b/go/storage/volume_replicate.go new file mode 100644 index 000000000..93c52e55c --- /dev/null +++ b/go/storage/volume_replicate.go @@ -0,0 +1,185 @@ +package storage + +import ( + "sort" + + "io" + "os" + + "sync" + + "github.com/chrislusf/seaweedfs/go/util" + "io/ioutil" +) + +type DirtyData struct { + Offset int64 `comment:"Dirty data start offset"` + Size uint32 `comment:"Size of the dirty data"` +} + +type DirtyDatas []DirtyData + +func (s DirtyDatas) Len() int { return len(s) } +func (s DirtyDatas) Less(i, j int) bool { return s[i].Offset < s[j].Offset } +func (s DirtyDatas) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s DirtyDatas) Sort() { sort.Sort(s) } +func (s DirtyDatas) Search(offset int64) int { + return sort.Search(len(s), func(i int) bool { + v := &s[i] + return /*v.Offset <= offset &&*/ v.Offset+int64(v.Size) > offset + }) +} + +type CleanDataReader struct { + Dirtys DirtyDatas + DataFile *os.File + pr *io.PipeReader + pw *io.PipeWriter + mutex sync.Mutex +} + +func ScanDirtyData(indexFileContent []byte) (dirtys DirtyDatas) { + m := NewCompactMap() + for i := 0; i+16 <= len(indexFileContent); i += 16 { + bytes := indexFileContent[i : i+16] + key := util.BytesToUint64(bytes[:8]) + offset := util.BytesToUint32(bytes[8:12]) + size := util.BytesToUint32(bytes[12:16]) + k := Key(key) + if offset != 0 && size != 0 { + m.Set(k, offset, size) + } else { + if nv, ok := m.Get(k); ok { + //mark old needle file as dirty data + if int64(nv.Size)-NeedleHeaderSize > 0 { + dirtys = append(dirtys, DirtyData{ + Offset: int64(nv.Offset)*8 + NeedleHeaderSize, + Size: nv.Size - NeedleHeaderSize, + }) + } + } + m.Delete(k) + } + } + dirtys.Sort() + return dirtys +} + +func (cf *CleanDataReader) Seek(offset int64, whence int) (int64, error) { + off, e := cf.DataFile.Seek(0, 1) + if e != nil { + return 0, nil + } + if off != offset { + cf.Close() + } + return cf.DataFile.Seek(offset, whence) +} + +func (cf *CleanDataReader) WriteTo(w io.Writer) (written int64, err error) { + off, e := cf.DataFile.Seek(0, 1) + if e != nil { + return 0, nil + } + const ZeroBufSize = 32 * 1024 + zeroBuf := make([]byte, ZeroBufSize) + dirtyIndex := cf.Dirtys.Search(off) + var nextDirty *DirtyData + if dirtyIndex < len(cf.Dirtys) { + nextDirty = &cf.Dirtys[dirtyIndex] + if nextDirty.Offset+int64(nextDirty.Size) < off { + nextDirty = nil + } + } + for { + if nextDirty != nil && off >= nextDirty.Offset && off < nextDirty.Offset+int64(nextDirty.Size) { + sz := nextDirty.Offset + int64(nextDirty.Size) - off + for sz > 0 { + mn := int64(ZeroBufSize) + if mn > sz { + mn = sz + } + var n int + if n, e = w.Write(zeroBuf[:mn]); e != nil { + return + } + written += int64(n) + sz -= int64(n) + off += int64(n) + } + dirtyIndex++ + if dirtyIndex < len(cf.Dirtys) { + nextDirty = &cf.Dirtys[dirtyIndex] + } else { + nextDirty = nil + } + if _, e = cf.DataFile.Seek(off, 0); e != nil { + return + } + } else { + var n, sz int64 + if nextDirty != nil { + sz = nextDirty.Offset - off + } + if sz > 0 { + if n, e = io.CopyN(w, cf.DataFile, sz); e != nil { + return + } + } else { + if n, e = io.Copy(w, cf.DataFile); e != nil { + return + } + } + off += n + written += n + } + } + return +} + +func (cf *CleanDataReader) ReadAt(p []byte, off int64) (n int, err error) { + cf.Seek(off, 0) + return cf.Read(p) +} + +func (cf *CleanDataReader) Read(p []byte) (int, error) { + return cf.getPipeReader().Read(p) +} + +func (cf *CleanDataReader) Close() (e error) { + cf.mutex.Lock() + defer cf.mutex.Unlock() + cf.closePipe() + return cf.DataFile.Close() +} + +func (cf *CleanDataReader) closePipe() (e error) { + if cf.pr != nil { + if err := cf.pr.Close(); err != nil { + e = err + } + } + cf.pr = nil + if cf.pw != nil { + if err := cf.pw.Close(); err != nil { + e = err + } + } + cf.pw = nil + return e +} + +func (cf *CleanDataReader) getPipeReader() io.Reader { + cf.mutex.Lock() + defer cf.mutex.Unlock() + if cf.pr != nil && cf.pw != nil { + return cf.pr + } + cf.closePipe() + cf.pr, cf.pw = io.Pipe() + go func(pw *io.PipeWriter) { + _, e := cf.WriteTo(pw) + pw.CloseWithError(e) + }(cf.pw) + return cf.pr +} From 864e92550bfd476c4b91b840d8cd980ffb04cde9 Mon Sep 17 00:00:00 2001 From: tnextday Date: Tue, 8 Dec 2015 13:07:03 +0800 Subject: [PATCH 3/8] test DirtyDatas.Search --- go/storage/volume_replicate.go | 111 ++++++++++++++++------------ go/storage/volume_replicate_test.go | 22 ++++++ 2 files changed, 85 insertions(+), 48 deletions(-) create mode 100644 go/storage/volume_replicate_test.go diff --git a/go/storage/volume_replicate.go b/go/storage/volume_replicate.go index 93c52e55c..2fc12fc9e 100644 --- a/go/storage/volume_replicate.go +++ b/go/storage/volume_replicate.go @@ -1,15 +1,12 @@ package storage import ( - "sort" - "io" "os" - + "sort" "sync" "github.com/chrislusf/seaweedfs/go/util" - "io/ioutil" ) type DirtyData struct { @@ -26,11 +23,11 @@ func (s DirtyDatas) Sort() { sort.Sort(s) } func (s DirtyDatas) Search(offset int64) int { return sort.Search(len(s), func(i int) bool { v := &s[i] - return /*v.Offset <= offset &&*/ v.Offset+int64(v.Size) > offset + return v.Offset+int64(v.Size) > offset }) } -type CleanDataReader struct { +type CleanReader struct { Dirtys DirtyDatas DataFile *os.File pr *io.PipeReader @@ -50,7 +47,7 @@ func ScanDirtyData(indexFileContent []byte) (dirtys DirtyDatas) { m.Set(k, offset, size) } else { if nv, ok := m.Get(k); ok { - //mark old needle file as dirty data + //mark old needle data as dirty data if int64(nv.Size)-NeedleHeaderSize > 0 { dirtys = append(dirtys, DirtyData{ Offset: int64(nv.Offset)*8 + NeedleHeaderSize, @@ -65,28 +62,28 @@ func ScanDirtyData(indexFileContent []byte) (dirtys DirtyDatas) { return dirtys } -func (cf *CleanDataReader) Seek(offset int64, whence int) (int64, error) { - off, e := cf.DataFile.Seek(0, 1) +func (cr *CleanReader) Seek(offset int64, whence int) (int64, error) { + off, e := cr.DataFile.Seek(0, 1) if e != nil { return 0, nil } if off != offset { - cf.Close() + cr.Close() } - return cf.DataFile.Seek(offset, whence) + return cr.DataFile.Seek(offset, whence) } -func (cf *CleanDataReader) WriteTo(w io.Writer) (written int64, err error) { - off, e := cf.DataFile.Seek(0, 1) +func (cdr *CleanReader) WriteTo(w io.Writer) (written int64, err error) { + off, e := cdr.DataFile.Seek(0, 1) if e != nil { return 0, nil } const ZeroBufSize = 32 * 1024 zeroBuf := make([]byte, ZeroBufSize) - dirtyIndex := cf.Dirtys.Search(off) + dirtyIndex := cdr.Dirtys.Search(off) var nextDirty *DirtyData - if dirtyIndex < len(cf.Dirtys) { - nextDirty = &cf.Dirtys[dirtyIndex] + if dirtyIndex < len(cdr.Dirtys) { + nextDirty = &cdr.Dirtys[dirtyIndex] if nextDirty.Offset+int64(nextDirty.Size) < off { nextDirty = nil } @@ -108,12 +105,12 @@ func (cf *CleanDataReader) WriteTo(w io.Writer) (written int64, err error) { off += int64(n) } dirtyIndex++ - if dirtyIndex < len(cf.Dirtys) { - nextDirty = &cf.Dirtys[dirtyIndex] + if dirtyIndex < len(cdr.Dirtys) { + nextDirty = &cdr.Dirtys[dirtyIndex] } else { nextDirty = nil } - if _, e = cf.DataFile.Seek(off, 0); e != nil { + if _, e = cdr.DataFile.Seek(off, 0); e != nil { return } } else { @@ -122,11 +119,11 @@ func (cf *CleanDataReader) WriteTo(w io.Writer) (written int64, err error) { sz = nextDirty.Offset - off } if sz > 0 { - if n, e = io.CopyN(w, cf.DataFile, sz); e != nil { + if n, e = io.CopyN(w, cdr.DataFile, sz); e != nil { return } } else { - if n, e = io.Copy(w, cf.DataFile); e != nil { + if n, e = io.Copy(w, cdr.DataFile); e != nil { return } } @@ -137,49 +134,67 @@ func (cf *CleanDataReader) WriteTo(w io.Writer) (written int64, err error) { return } -func (cf *CleanDataReader) ReadAt(p []byte, off int64) (n int, err error) { - cf.Seek(off, 0) - return cf.Read(p) +func (cr *CleanReader) ReadAt(p []byte, off int64) (n int, err error) { + cr.Seek(off, 0) + return cr.Read(p) } -func (cf *CleanDataReader) Read(p []byte) (int, error) { - return cf.getPipeReader().Read(p) +func (cr *CleanReader) Read(p []byte) (int, error) { + return cr.getPipeReader().Read(p) } -func (cf *CleanDataReader) Close() (e error) { - cf.mutex.Lock() - defer cf.mutex.Unlock() - cf.closePipe() - return cf.DataFile.Close() +func (cr *CleanReader) Close() (e error) { + cr.mutex.Lock() + defer cr.mutex.Unlock() + cr.closePipe() + return cr.DataFile.Close() } -func (cf *CleanDataReader) closePipe() (e error) { - if cf.pr != nil { - if err := cf.pr.Close(); err != nil { +func (cr *CleanReader) closePipe() (e error) { + if cr.pr != nil { + if err := cr.pr.Close(); err != nil { e = err } } - cf.pr = nil - if cf.pw != nil { - if err := cf.pw.Close(); err != nil { + cr.pr = nil + if cr.pw != nil { + if err := cr.pw.Close(); err != nil { e = err } } - cf.pw = nil + cr.pw = nil return e } -func (cf *CleanDataReader) getPipeReader() io.Reader { - cf.mutex.Lock() - defer cf.mutex.Unlock() - if cf.pr != nil && cf.pw != nil { - return cf.pr +func (cr *CleanReader) getPipeReader() io.Reader { + cr.mutex.Lock() + defer cr.mutex.Unlock() + if cr.pr != nil && cr.pw != nil { + return cr.pr } - cf.closePipe() - cf.pr, cf.pw = io.Pipe() + cr.closePipe() + cr.pr, cr.pw = io.Pipe() go func(pw *io.PipeWriter) { - _, e := cf.WriteTo(pw) + _, e := cr.WriteTo(pw) pw.CloseWithError(e) - }(cf.pw) - return cf.pr + }(cr.pw) + return cr.pr +} + +func (v *Volume) GetVolumeCleanReader() (cr *CleanReader, err error) { + var dirtys DirtyDatas + if indexData, e := v.nm.IndexFileContent(); e != nil { + return nil, err + } else { + dirtys = ScanDirtyData(indexData) + } + dataFile, e := os.Open(v.FileName()) + if e != nil { + return nil, e + } + cr = &CleanReader{ + Dirtys: dirtys, + DataFile: dataFile, + } + return } diff --git a/go/storage/volume_replicate_test.go b/go/storage/volume_replicate_test.go new file mode 100644 index 000000000..d1da211c3 --- /dev/null +++ b/go/storage/volume_replicate_test.go @@ -0,0 +1,22 @@ +package storage + +import "testing" + +func TestDirtyDataSearch(t *testing.T) { + testData := DirtyDatas{ + {30, 20}, {106, 200}, {5, 20}, {512, 68}, {412, 50}, + } + testOffset := []int64{ + 0, 150, 480, 1024, + } + testData.Sort() + t.Logf("TestData = %v", testData) + for _, off := range testOffset { + i := testData.Search(off) + if i < testData.Len() { + t.Logf("(%d) nearest chunk[%d]: %v", off, i, testData[i]) + } else { + t.Logf("Search %d return %d ", off, i) + } + } +} From 9d03f763dc3cb05ae2805b9ebbf10c5e61fc7d03 Mon Sep 17 00:00:00 2001 From: tnextday Date: Tue, 8 Dec 2015 16:47:55 +0800 Subject: [PATCH 4/8] Volume server add `/admin/sync/vol_data` handler to serve cleaned volume date --- go/storage/volume_replicate.go | 12 +++- go/weed/weed_server/volume_server.go | 1 + .../volume_server_handlers_replicate.go | 60 +++++++++++++++++++ 3 files changed, 70 insertions(+), 3 deletions(-) create mode 100644 go/weed/weed_server/volume_server_handlers_replicate.go diff --git a/go/storage/volume_replicate.go b/go/storage/volume_replicate.go index 2fc12fc9e..1ea6a8765 100644 --- a/go/storage/volume_replicate.go +++ b/go/storage/volume_replicate.go @@ -73,6 +73,14 @@ func (cr *CleanReader) Seek(offset int64, whence int) (int64, error) { return cr.DataFile.Seek(offset, whence) } +func (cr *CleanReader) Size() (int64, error) { + fi, e := cr.DataFile.Stat() + if e != nil { + return 0, e + } + return fi.Size(), nil +} + func (cdr *CleanReader) WriteTo(w io.Writer) (written int64, err error) { off, e := cdr.DataFile.Seek(0, 1) if e != nil { @@ -84,9 +92,6 @@ func (cdr *CleanReader) WriteTo(w io.Writer) (written int64, err error) { var nextDirty *DirtyData if dirtyIndex < len(cdr.Dirtys) { nextDirty = &cdr.Dirtys[dirtyIndex] - if nextDirty.Offset+int64(nextDirty.Size) < off { - nextDirty = nil - } } for { if nextDirty != nil && off >= nextDirty.Offset && off < nextDirty.Offset+int64(nextDirty.Size) { @@ -189,6 +194,7 @@ func (v *Volume) GetVolumeCleanReader() (cr *CleanReader, err error) { dirtys = ScanDirtyData(indexData) } dataFile, e := os.Open(v.FileName()) + if e != nil { return nil, e } diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go index 8becdd0f1..b8472235e 100644 --- a/go/weed/weed_server/volume_server.go +++ b/go/weed/weed_server/volume_server.go @@ -57,6 +57,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, adminMux.HandleFunc("/admin/sync/status", vs.guard.WhiteList(vs.getVolumeSyncStatusHandler)) adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler)) adminMux.HandleFunc("/admin/sync/data", vs.guard.WhiteList(vs.getVolumeDataContentHandler)) + adminMux.HandleFunc("/admin/sync/vol_data", vs.guard.WhiteList(vs.getVolumeCleanDataHandler)) adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler)) diff --git a/go/weed/weed_server/volume_server_handlers_replicate.go b/go/weed/weed_server/volume_server_handlers_replicate.go new file mode 100644 index 000000000..c8b807ffb --- /dev/null +++ b/go/weed/weed_server/volume_server_handlers_replicate.go @@ -0,0 +1,60 @@ +package weed_server + +import ( + "fmt" + "net/http" + "strconv" + "github.com/chrislusf/seaweedfs/go/glog" + "io" + "github.com/pierrec/lz4" +) + +func (vs *VolumeServer) getVolumeCleanDataHandler(w http.ResponseWriter, r *http.Request) { + v, err := vs.getVolume("volume", r) + if v == nil { + http.Error(w, fmt.Sprintf("Not Found volume: %v", err), http.StatusBadRequest) + return + } + cr, e := v.GetVolumeCleanReader() + if e != nil { + http.Error(w, fmt.Sprintf("Get volume clean reader: %v", err), http.StatusInternalServerError) + return + } + totalSize, e := cr.Size() + if e != nil { + http.Error(w, fmt.Sprintf("Get volume size: %v", err), http.StatusInternalServerError) + return + } + w.Header().Set("Accept-Ranges", "bytes") + w.Header().Set("Content-Encoding", "lz4") + lz4w := lz4.NewWriter(w) + defer lz4w.Close() + rangeReq := r.Header.Get("Range") + if rangeReq == "" { + w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) + if _, e = io.Copy(lz4w, cr); e != nil { + glog.V(4).Infoln("response write error:", e) + } + return + } + ranges, err := parseRange(rangeReq, totalSize) + if err != nil { + http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable) + return + } + if len(ranges) != 1 { + http.Error(w, "Only support one range", http.StatusNotImplemented) + return + } + ra := ranges[0] + if _, e := cr.Seek(ra.start, 0); e != nil { + http.Error(w, fmt.Sprintf("Seek: %v", err), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10)) + w.Header().Set("Content-Range", ra.contentRange(totalSize)) + w.WriteHeader(http.StatusPartialContent) + if _, e = io.CopyN(lz4w, cr, ra.length); e != nil { + glog.V(2).Infoln("response write error:", e) + } +} From 8454b053196e75d1a7ec78a1892e2c96c77aeb56 Mon Sep 17 00:00:00 2001 From: tnextday Date: Tue, 8 Dec 2015 17:33:58 +0800 Subject: [PATCH 5/8] fix ScanDirtyData bug --- go/storage/volume_replicate.go | 4 ++-- .../volume_server_handlers_replicate.go | 17 +++++++++-------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/go/storage/volume_replicate.go b/go/storage/volume_replicate.go index 1ea6a8765..4ca21f105 100644 --- a/go/storage/volume_replicate.go +++ b/go/storage/volume_replicate.go @@ -51,7 +51,7 @@ func ScanDirtyData(indexFileContent []byte) (dirtys DirtyDatas) { if int64(nv.Size)-NeedleHeaderSize > 0 { dirtys = append(dirtys, DirtyData{ Offset: int64(nv.Offset)*8 + NeedleHeaderSize, - Size: nv.Size - NeedleHeaderSize, + Size: nv.Size, }) } } @@ -193,7 +193,7 @@ func (v *Volume) GetVolumeCleanReader() (cr *CleanReader, err error) { } else { dirtys = ScanDirtyData(indexData) } - dataFile, e := os.Open(v.FileName()) + dataFile, e := os.Open(v.FileName()+".dat") if e != nil { return nil, e diff --git a/go/weed/weed_server/volume_server_handlers_replicate.go b/go/weed/weed_server/volume_server_handlers_replicate.go index c8b807ffb..c8b429a7b 100644 --- a/go/weed/weed_server/volume_server_handlers_replicate.go +++ b/go/weed/weed_server/volume_server_handlers_replicate.go @@ -10,23 +10,24 @@ import ( ) func (vs *VolumeServer) getVolumeCleanDataHandler(w http.ResponseWriter, r *http.Request) { - v, err := vs.getVolume("volume", r) + v, e := vs.getVolume("volume", r) if v == nil { - http.Error(w, fmt.Sprintf("Not Found volume: %v", err), http.StatusBadRequest) + http.Error(w, fmt.Sprintf("Not Found volume: %v", e), http.StatusBadRequest) return } cr, e := v.GetVolumeCleanReader() if e != nil { - http.Error(w, fmt.Sprintf("Get volume clean reader: %v", err), http.StatusInternalServerError) + http.Error(w, fmt.Sprintf("Get volume clean reader: %v", e), http.StatusInternalServerError) return } totalSize, e := cr.Size() if e != nil { - http.Error(w, fmt.Sprintf("Get volume size: %v", err), http.StatusInternalServerError) + http.Error(w, fmt.Sprintf("Get volume size: %v", e), http.StatusInternalServerError) return } w.Header().Set("Accept-Ranges", "bytes") w.Header().Set("Content-Encoding", "lz4") + w.Header().Set("Content-Disposition", fmt.Sprintf(`filename="%d.dat.lz4"`, v.Id)) lz4w := lz4.NewWriter(w) defer lz4w.Close() rangeReq := r.Header.Get("Range") @@ -37,9 +38,9 @@ func (vs *VolumeServer) getVolumeCleanDataHandler(w http.ResponseWriter, r *http } return } - ranges, err := parseRange(rangeReq, totalSize) - if err != nil { - http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable) + ranges, e := parseRange(rangeReq, totalSize) + if e != nil { + http.Error(w, e.Error(), http.StatusRequestedRangeNotSatisfiable) return } if len(ranges) != 1 { @@ -48,7 +49,7 @@ func (vs *VolumeServer) getVolumeCleanDataHandler(w http.ResponseWriter, r *http } ra := ranges[0] if _, e := cr.Seek(ra.start, 0); e != nil { - http.Error(w, fmt.Sprintf("Seek: %v", err), http.StatusInternalServerError) + http.Error(w, fmt.Sprintf("Seek: %v", e), http.StatusInternalServerError) return } w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10)) From 848930ab5a9ec2a99ae5250723dc2c175390a09d Mon Sep 17 00:00:00 2001 From: tnextday Date: Tue, 8 Dec 2015 17:43:52 +0800 Subject: [PATCH 6/8] fix forever loop problem --- go/storage/volume_replicate.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/go/storage/volume_replicate.go b/go/storage/volume_replicate.go index 4ca21f105..f40cf60da 100644 --- a/go/storage/volume_replicate.go +++ b/go/storage/volume_replicate.go @@ -123,14 +123,14 @@ func (cdr *CleanReader) WriteTo(w io.Writer) (written int64, err error) { if nextDirty != nil { sz = nextDirty.Offset - off } - if sz > 0 { - if n, e = io.CopyN(w, cdr.DataFile, sz); e != nil { - return - } - } else { - if n, e = io.Copy(w, cdr.DataFile); e != nil { - return - } + if sz <= 0 { + // copy until eof + n, e = io.Copy(w, cdr.DataFile); + written += n + return + } + if n, e = io.CopyN(w, cdr.DataFile, sz); e != nil { + return } off += n written += n From acf4d44f7e41544201545ee23493b5a49e800024 Mon Sep 17 00:00:00 2001 From: tnextday Date: Tue, 8 Dec 2015 17:56:57 +0800 Subject: [PATCH 7/8] update --- go/storage/needle_read_write.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/go/storage/needle_read_write.go b/go/storage/needle_read_write.go index eb7989884..adb5058be 100644 --- a/go/storage/needle_read_write.go +++ b/go/storage/needle_read_write.go @@ -248,6 +248,9 @@ func (n *Needle) ReadNeedleBody(r *os.File, version Version, offset int64, bodyL return } n.readNeedleDataVersion2(bytes[0:n.Size]) + if n.DataSize == 0 { + return + } n.Checksum = NewCRC(n.Data) checksum := util.BytesToUint32(bytes[n.Size : n.Size+NeedleChecksumSize]) if n.Checksum.Value() != checksum { From 39c97f8955f71a0ba06ac51929236f9b66b97aa9 Mon Sep 17 00:00:00 2001 From: tnextday Date: Tue, 8 Dec 2015 20:12:12 +0800 Subject: [PATCH 8/8] update --- go/storage/volume_replicate.go | 30 +++++++++++-------- .../volume_server_handlers_replicate.go | 15 ++++++---- 2 files changed, 28 insertions(+), 17 deletions(-) diff --git a/go/storage/volume_replicate.go b/go/storage/volume_replicate.go index f40cf60da..00b9cd14e 100644 --- a/go/storage/volume_replicate.go +++ b/go/storage/volume_replicate.go @@ -63,14 +63,18 @@ func ScanDirtyData(indexFileContent []byte) (dirtys DirtyDatas) { } func (cr *CleanReader) Seek(offset int64, whence int) (int64, error) { - off, e := cr.DataFile.Seek(0, 1) + oldOff, e := cr.DataFile.Seek(0, 1) if e != nil { - return 0, nil + return 0, e + } + newOff, e := cr.DataFile.Seek(offset, whence) + if e != nil { + return 0, e } - if off != offset { - cr.Close() + if oldOff != newOff { + cr.closePipe(true) } - return cr.DataFile.Seek(offset, whence) + return newOff, nil } func (cr *CleanReader) Size() (int64, error) { @@ -125,7 +129,7 @@ func (cdr *CleanReader) WriteTo(w io.Writer) (written int64, err error) { } if sz <= 0 { // copy until eof - n, e = io.Copy(w, cdr.DataFile); + n, e = io.Copy(w, cdr.DataFile) written += n return } @@ -149,13 +153,15 @@ func (cr *CleanReader) Read(p []byte) (int, error) { } func (cr *CleanReader) Close() (e error) { - cr.mutex.Lock() - defer cr.mutex.Unlock() - cr.closePipe() + cr.closePipe(true) return cr.DataFile.Close() } -func (cr *CleanReader) closePipe() (e error) { +func (cr *CleanReader) closePipe(lock bool) (e error) { + if lock { + cr.mutex.Lock() + defer cr.mutex.Unlock() + } if cr.pr != nil { if err := cr.pr.Close(); err != nil { e = err @@ -177,7 +183,7 @@ func (cr *CleanReader) getPipeReader() io.Reader { if cr.pr != nil && cr.pw != nil { return cr.pr } - cr.closePipe() + cr.closePipe(false) cr.pr, cr.pw = io.Pipe() go func(pw *io.PipeWriter) { _, e := cr.WriteTo(pw) @@ -193,7 +199,7 @@ func (v *Volume) GetVolumeCleanReader() (cr *CleanReader, err error) { } else { dirtys = ScanDirtyData(indexData) } - dataFile, e := os.Open(v.FileName()+".dat") + dataFile, e := os.Open(v.FileName() + ".dat") if e != nil { return nil, e diff --git a/go/weed/weed_server/volume_server_handlers_replicate.go b/go/weed/weed_server/volume_server_handlers_replicate.go index c8b429a7b..411967326 100644 --- a/go/weed/weed_server/volume_server_handlers_replicate.go +++ b/go/weed/weed_server/volume_server_handlers_replicate.go @@ -2,10 +2,11 @@ package weed_server import ( "fmt" + "io" "net/http" "strconv" + "github.com/chrislusf/seaweedfs/go/glog" - "io" "github.com/pierrec/lz4" ) @@ -26,16 +27,17 @@ func (vs *VolumeServer) getVolumeCleanDataHandler(w http.ResponseWriter, r *http return } w.Header().Set("Accept-Ranges", "bytes") - w.Header().Set("Content-Encoding", "lz4") w.Header().Set("Content-Disposition", fmt.Sprintf(`filename="%d.dat.lz4"`, v.Id)) - lz4w := lz4.NewWriter(w) - defer lz4w.Close() + rangeReq := r.Header.Get("Range") if rangeReq == "" { w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) + w.Header().Set("Content-Encoding", "lz4") + lz4w := lz4.NewWriter(w) if _, e = io.Copy(lz4w, cr); e != nil { glog.V(4).Infoln("response write error:", e) } + lz4w.Close() return } ranges, e := parseRange(rangeReq, totalSize) @@ -49,13 +51,16 @@ func (vs *VolumeServer) getVolumeCleanDataHandler(w http.ResponseWriter, r *http } ra := ranges[0] if _, e := cr.Seek(ra.start, 0); e != nil { - http.Error(w, fmt.Sprintf("Seek: %v", e), http.StatusInternalServerError) + http.Error(w, e.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10)) w.Header().Set("Content-Range", ra.contentRange(totalSize)) + w.Header().Set("Content-Encoding", "lz4") w.WriteHeader(http.StatusPartialContent) + lz4w := lz4.NewWriter(w) if _, e = io.CopyN(lz4w, cr, ra.length); e != nil { glog.V(2).Infoln("response write error:", e) } + lz4w.Close() }