From 864e92550bfd476c4b91b840d8cd980ffb04cde9 Mon Sep 17 00:00:00 2001 From: tnextday Date: Tue, 8 Dec 2015 13:07:03 +0800 Subject: [PATCH] test DirtyDatas.Search --- go/storage/volume_replicate.go | 111 ++++++++++++++++------------ go/storage/volume_replicate_test.go | 22 ++++++ 2 files changed, 85 insertions(+), 48 deletions(-) create mode 100644 go/storage/volume_replicate_test.go diff --git a/go/storage/volume_replicate.go b/go/storage/volume_replicate.go index 93c52e55c..2fc12fc9e 100644 --- a/go/storage/volume_replicate.go +++ b/go/storage/volume_replicate.go @@ -1,15 +1,12 @@ package storage import ( - "sort" - "io" "os" - + "sort" "sync" "github.com/chrislusf/seaweedfs/go/util" - "io/ioutil" ) type DirtyData struct { @@ -26,11 +23,11 @@ func (s DirtyDatas) Sort() { sort.Sort(s) } func (s DirtyDatas) Search(offset int64) int { return sort.Search(len(s), func(i int) bool { v := &s[i] - return /*v.Offset <= offset &&*/ v.Offset+int64(v.Size) > offset + return v.Offset+int64(v.Size) > offset }) } -type CleanDataReader struct { +type CleanReader struct { Dirtys DirtyDatas DataFile *os.File pr *io.PipeReader @@ -50,7 +47,7 @@ func ScanDirtyData(indexFileContent []byte) (dirtys DirtyDatas) { m.Set(k, offset, size) } else { if nv, ok := m.Get(k); ok { - //mark old needle file as dirty data + //mark old needle data as dirty data if int64(nv.Size)-NeedleHeaderSize > 0 { dirtys = append(dirtys, DirtyData{ Offset: int64(nv.Offset)*8 + NeedleHeaderSize, @@ -65,28 +62,28 @@ func ScanDirtyData(indexFileContent []byte) (dirtys DirtyDatas) { return dirtys } -func (cf *CleanDataReader) Seek(offset int64, whence int) (int64, error) { - off, e := cf.DataFile.Seek(0, 1) +func (cr *CleanReader) Seek(offset int64, whence int) (int64, error) { + off, e := cr.DataFile.Seek(0, 1) if e != nil { return 0, nil } if off != offset { - cf.Close() + cr.Close() } - return cf.DataFile.Seek(offset, whence) + return cr.DataFile.Seek(offset, whence) } -func (cf *CleanDataReader) WriteTo(w io.Writer) (written int64, err error) { - off, e := cf.DataFile.Seek(0, 1) +func (cdr *CleanReader) WriteTo(w io.Writer) (written int64, err error) { + off, e := cdr.DataFile.Seek(0, 1) if e != nil { return 0, nil } const ZeroBufSize = 32 * 1024 zeroBuf := make([]byte, ZeroBufSize) - dirtyIndex := cf.Dirtys.Search(off) + dirtyIndex := cdr.Dirtys.Search(off) var nextDirty *DirtyData - if dirtyIndex < len(cf.Dirtys) { - nextDirty = &cf.Dirtys[dirtyIndex] + if dirtyIndex < len(cdr.Dirtys) { + nextDirty = &cdr.Dirtys[dirtyIndex] if nextDirty.Offset+int64(nextDirty.Size) < off { nextDirty = nil } @@ -108,12 +105,12 @@ func (cf *CleanDataReader) WriteTo(w io.Writer) (written int64, err error) { off += int64(n) } dirtyIndex++ - if dirtyIndex < len(cf.Dirtys) { - nextDirty = &cf.Dirtys[dirtyIndex] + if dirtyIndex < len(cdr.Dirtys) { + nextDirty = &cdr.Dirtys[dirtyIndex] } else { nextDirty = nil } - if _, e = cf.DataFile.Seek(off, 0); e != nil { + if _, e = cdr.DataFile.Seek(off, 0); e != nil { return } } else { @@ -122,11 +119,11 @@ func (cf *CleanDataReader) WriteTo(w io.Writer) (written int64, err error) { sz = nextDirty.Offset - off } if sz > 0 { - if n, e = io.CopyN(w, cf.DataFile, sz); e != nil { + if n, e = io.CopyN(w, cdr.DataFile, sz); e != nil { return } } else { - if n, e = io.Copy(w, cf.DataFile); e != nil { + if n, e = io.Copy(w, cdr.DataFile); e != nil { return } } @@ -137,49 +134,67 @@ func (cf *CleanDataReader) WriteTo(w io.Writer) (written int64, err error) { return } -func (cf *CleanDataReader) ReadAt(p []byte, off int64) (n int, err error) { - cf.Seek(off, 0) - return cf.Read(p) +func (cr *CleanReader) ReadAt(p []byte, off int64) (n int, err error) { + cr.Seek(off, 0) + return cr.Read(p) } -func (cf *CleanDataReader) Read(p []byte) (int, error) { - return cf.getPipeReader().Read(p) +func (cr *CleanReader) Read(p []byte) (int, error) { + return cr.getPipeReader().Read(p) } -func (cf *CleanDataReader) Close() (e error) { - cf.mutex.Lock() - defer cf.mutex.Unlock() - cf.closePipe() - return cf.DataFile.Close() +func (cr *CleanReader) Close() (e error) { + cr.mutex.Lock() + defer cr.mutex.Unlock() + cr.closePipe() + return cr.DataFile.Close() } -func (cf *CleanDataReader) closePipe() (e error) { - if cf.pr != nil { - if err := cf.pr.Close(); err != nil { +func (cr *CleanReader) closePipe() (e error) { + if cr.pr != nil { + if err := cr.pr.Close(); err != nil { e = err } } - cf.pr = nil - if cf.pw != nil { - if err := cf.pw.Close(); err != nil { + cr.pr = nil + if cr.pw != nil { + if err := cr.pw.Close(); err != nil { e = err } } - cf.pw = nil + cr.pw = nil return e } -func (cf *CleanDataReader) getPipeReader() io.Reader { - cf.mutex.Lock() - defer cf.mutex.Unlock() - if cf.pr != nil && cf.pw != nil { - return cf.pr +func (cr *CleanReader) getPipeReader() io.Reader { + cr.mutex.Lock() + defer cr.mutex.Unlock() + if cr.pr != nil && cr.pw != nil { + return cr.pr } - cf.closePipe() - cf.pr, cf.pw = io.Pipe() + cr.closePipe() + cr.pr, cr.pw = io.Pipe() go func(pw *io.PipeWriter) { - _, e := cf.WriteTo(pw) + _, e := cr.WriteTo(pw) pw.CloseWithError(e) - }(cf.pw) - return cf.pr + }(cr.pw) + return cr.pr +} + +func (v *Volume) GetVolumeCleanReader() (cr *CleanReader, err error) { + var dirtys DirtyDatas + if indexData, e := v.nm.IndexFileContent(); e != nil { + return nil, err + } else { + dirtys = ScanDirtyData(indexData) + } + dataFile, e := os.Open(v.FileName()) + if e != nil { + return nil, e + } + cr = &CleanReader{ + Dirtys: dirtys, + DataFile: dataFile, + } + return } diff --git a/go/storage/volume_replicate_test.go b/go/storage/volume_replicate_test.go new file mode 100644 index 000000000..d1da211c3 --- /dev/null +++ b/go/storage/volume_replicate_test.go @@ -0,0 +1,22 @@ +package storage + +import "testing" + +func TestDirtyDataSearch(t *testing.T) { + testData := DirtyDatas{ + {30, 20}, {106, 200}, {5, 20}, {512, 68}, {412, 50}, + } + testOffset := []int64{ + 0, 150, 480, 1024, + } + testData.Sort() + t.Logf("TestData = %v", testData) + for _, off := range testOffset { + i := testData.Search(off) + if i < testData.Len() { + t.Logf("(%d) nearest chunk[%d]: %v", off, i, testData[i]) + } else { + t.Logf("Search %d return %d ", off, i) + } + } +}