Browse Source

test DirtyDatas.Search

pull/283/head
tnextday 10 years ago
parent
commit
864e92550b
  1. 111
      go/storage/volume_replicate.go
  2. 22
      go/storage/volume_replicate_test.go

111
go/storage/volume_replicate.go

@ -1,15 +1,12 @@
package storage
import (
"sort"
"io"
"os"
"sort"
"sync"
"github.com/chrislusf/seaweedfs/go/util"
"io/ioutil"
)
type DirtyData struct {
@ -26,11 +23,11 @@ func (s DirtyDatas) Sort() { sort.Sort(s) }
func (s DirtyDatas) Search(offset int64) int {
return sort.Search(len(s), func(i int) bool {
v := &s[i]
return /*v.Offset <= offset &&*/ v.Offset+int64(v.Size) > offset
return v.Offset+int64(v.Size) > offset
})
}
type CleanDataReader struct {
type CleanReader struct {
Dirtys DirtyDatas
DataFile *os.File
pr *io.PipeReader
@ -50,7 +47,7 @@ func ScanDirtyData(indexFileContent []byte) (dirtys DirtyDatas) {
m.Set(k, offset, size)
} else {
if nv, ok := m.Get(k); ok {
//mark old needle file as dirty data
//mark old needle data as dirty data
if int64(nv.Size)-NeedleHeaderSize > 0 {
dirtys = append(dirtys, DirtyData{
Offset: int64(nv.Offset)*8 + NeedleHeaderSize,
@ -65,28 +62,28 @@ func ScanDirtyData(indexFileContent []byte) (dirtys DirtyDatas) {
return dirtys
}
func (cf *CleanDataReader) Seek(offset int64, whence int) (int64, error) {
off, e := cf.DataFile.Seek(0, 1)
func (cr *CleanReader) Seek(offset int64, whence int) (int64, error) {
off, e := cr.DataFile.Seek(0, 1)
if e != nil {
return 0, nil
}
if off != offset {
cf.Close()
cr.Close()
}
return cf.DataFile.Seek(offset, whence)
return cr.DataFile.Seek(offset, whence)
}
func (cf *CleanDataReader) WriteTo(w io.Writer) (written int64, err error) {
off, e := cf.DataFile.Seek(0, 1)
func (cdr *CleanReader) WriteTo(w io.Writer) (written int64, err error) {
off, e := cdr.DataFile.Seek(0, 1)
if e != nil {
return 0, nil
}
const ZeroBufSize = 32 * 1024
zeroBuf := make([]byte, ZeroBufSize)
dirtyIndex := cf.Dirtys.Search(off)
dirtyIndex := cdr.Dirtys.Search(off)
var nextDirty *DirtyData
if dirtyIndex < len(cf.Dirtys) {
nextDirty = &cf.Dirtys[dirtyIndex]
if dirtyIndex < len(cdr.Dirtys) {
nextDirty = &cdr.Dirtys[dirtyIndex]
if nextDirty.Offset+int64(nextDirty.Size) < off {
nextDirty = nil
}
@ -108,12 +105,12 @@ func (cf *CleanDataReader) WriteTo(w io.Writer) (written int64, err error) {
off += int64(n)
}
dirtyIndex++
if dirtyIndex < len(cf.Dirtys) {
nextDirty = &cf.Dirtys[dirtyIndex]
if dirtyIndex < len(cdr.Dirtys) {
nextDirty = &cdr.Dirtys[dirtyIndex]
} else {
nextDirty = nil
}
if _, e = cf.DataFile.Seek(off, 0); e != nil {
if _, e = cdr.DataFile.Seek(off, 0); e != nil {
return
}
} else {
@ -122,11 +119,11 @@ func (cf *CleanDataReader) WriteTo(w io.Writer) (written int64, err error) {
sz = nextDirty.Offset - off
}
if sz > 0 {
if n, e = io.CopyN(w, cf.DataFile, sz); e != nil {
if n, e = io.CopyN(w, cdr.DataFile, sz); e != nil {
return
}
} else {
if n, e = io.Copy(w, cf.DataFile); e != nil {
if n, e = io.Copy(w, cdr.DataFile); e != nil {
return
}
}
@ -137,49 +134,67 @@ func (cf *CleanDataReader) WriteTo(w io.Writer) (written int64, err error) {
return
}
func (cf *CleanDataReader) ReadAt(p []byte, off int64) (n int, err error) {
cf.Seek(off, 0)
return cf.Read(p)
func (cr *CleanReader) ReadAt(p []byte, off int64) (n int, err error) {
cr.Seek(off, 0)
return cr.Read(p)
}
func (cf *CleanDataReader) Read(p []byte) (int, error) {
return cf.getPipeReader().Read(p)
func (cr *CleanReader) Read(p []byte) (int, error) {
return cr.getPipeReader().Read(p)
}
func (cf *CleanDataReader) Close() (e error) {
cf.mutex.Lock()
defer cf.mutex.Unlock()
cf.closePipe()
return cf.DataFile.Close()
func (cr *CleanReader) Close() (e error) {
cr.mutex.Lock()
defer cr.mutex.Unlock()
cr.closePipe()
return cr.DataFile.Close()
}
func (cf *CleanDataReader) closePipe() (e error) {
if cf.pr != nil {
if err := cf.pr.Close(); err != nil {
func (cr *CleanReader) closePipe() (e error) {
if cr.pr != nil {
if err := cr.pr.Close(); err != nil {
e = err
}
}
cf.pr = nil
if cf.pw != nil {
if err := cf.pw.Close(); err != nil {
cr.pr = nil
if cr.pw != nil {
if err := cr.pw.Close(); err != nil {
e = err
}
}
cf.pw = nil
cr.pw = nil
return e
}
func (cf *CleanDataReader) getPipeReader() io.Reader {
cf.mutex.Lock()
defer cf.mutex.Unlock()
if cf.pr != nil && cf.pw != nil {
return cf.pr
func (cr *CleanReader) getPipeReader() io.Reader {
cr.mutex.Lock()
defer cr.mutex.Unlock()
if cr.pr != nil && cr.pw != nil {
return cr.pr
}
cf.closePipe()
cf.pr, cf.pw = io.Pipe()
cr.closePipe()
cr.pr, cr.pw = io.Pipe()
go func(pw *io.PipeWriter) {
_, e := cf.WriteTo(pw)
_, e := cr.WriteTo(pw)
pw.CloseWithError(e)
}(cf.pw)
return cf.pr
}(cr.pw)
return cr.pr
}
func (v *Volume) GetVolumeCleanReader() (cr *CleanReader, err error) {
var dirtys DirtyDatas
if indexData, e := v.nm.IndexFileContent(); e != nil {
return nil, err
} else {
dirtys = ScanDirtyData(indexData)
}
dataFile, e := os.Open(v.FileName())
if e != nil {
return nil, e
}
cr = &CleanReader{
Dirtys: dirtys,
DataFile: dataFile,
}
return
}

22
go/storage/volume_replicate_test.go

@ -0,0 +1,22 @@
package storage
import "testing"
func TestDirtyDataSearch(t *testing.T) {
testData := DirtyDatas{
{30, 20}, {106, 200}, {5, 20}, {512, 68}, {412, 50},
}
testOffset := []int64{
0, 150, 480, 1024,
}
testData.Sort()
t.Logf("TestData = %v", testData)
for _, off := range testOffset {
i := testData.Search(off)
if i < testData.Len() {
t.Logf("(%d) nearest chunk[%d]: %v", off, i, testData[i])
} else {
t.Logf("Search %d return %d ", off, i)
}
}
}
Loading…
Cancel
Save