Browse Source

Merge 356751f001 into a87fe8ffce

pull/283/merge
tnextday 10 years ago
parent
commit
e2c6aca286
  1. 3
      Makefile
  2. 2
      go/filer/flat_namespace/flat_namespace_store.go
  3. 12
      go/storage/needle_read_write.go
  4. 3
      go/storage/volume_info.go
  5. 212
      go/storage/volume_replicate.go
  6. 22
      go/storage/volume_replicate_test.go
  7. 3
      go/storage/volume_sync.go
  8. 1
      go/weed/weed_server/volume_server.go
  9. 66
      go/weed/weed_server/volume_server_handlers_replicate.go

3
Makefile

@ -20,3 +20,6 @@ build: deps
linux: deps linux: deps
mkdir -p linux mkdir -p linux
GOOS=linux GOARCH=amd64 go build $(GO_FLAGS) -o linux/$(BINARY) $(SOURCE_DIR) GOOS=linux GOARCH=amd64 go build $(GO_FLAGS) -o linux/$(BINARY) $(SOURCE_DIR)
imports:
goimports -w $(SOURCE_DIR)

2
go/filer/flat_namespace/flat_namespace_store.go

@ -1,7 +1,5 @@
package flat_namespace package flat_namespace
import ()
type FlatNamespaceStore interface { type FlatNamespaceStore interface {
Put(fullFileName string, fid string) (err error) Put(fullFileName string, fid string) (err error)
Get(fullFileName string) (fid string, err error) Get(fullFileName string) (fid string, err error)

12
go/storage/needle_read_write.go

@ -238,13 +238,25 @@ func (n *Needle) ReadNeedleBody(r *os.File, version Version, offset int64, bodyL
} }
n.Data = bytes[:n.Size] n.Data = bytes[:n.Size]
n.Checksum = NewCRC(n.Data) n.Checksum = NewCRC(n.Data)
checksum := binary.BigEndian.Uint32(bytes[n.Size : n.Size+NeedleChecksumSize])
if n.Checksum.Value() != checksum {
glog.V(0).Infof("CRC error! Data On Disk Corrupted, needle id = %x", n.Id)
}
case Version2: case Version2:
bytes := make([]byte, bodyLength) bytes := make([]byte, bodyLength)
if _, err = r.ReadAt(bytes, offset); err != nil { if _, err = r.ReadAt(bytes, offset); err != nil {
return return
} }
n.readNeedleDataVersion2(bytes[0:n.Size]) n.readNeedleDataVersion2(bytes[0:n.Size])
if n.DataSize == 0 {
return
}
n.Checksum = NewCRC(n.Data) n.Checksum = NewCRC(n.Data)
checksum := binary.BigEndian.Uint32(bytes[n.Size : n.Size+NeedleChecksumSize])
if n.Checksum.Value() != checksum {
glog.V(0).Infof("CRC error! Data On Disk Corrupted, needle id = %x", n.Id)
}
default: default:
err = fmt.Errorf("Unsupported Version! (%d)", version) err = fmt.Errorf("Unsupported Version! (%d)", version)
} }

3
go/storage/volume_info.go

@ -2,8 +2,9 @@ package storage
import ( import (
"fmt" "fmt"
"github.com/chrislusf/seaweedfs/go/operation"
"sort" "sort"
"github.com/chrislusf/seaweedfs/go/operation"
) )
type VolumeInfo struct { type VolumeInfo struct {

212
go/storage/volume_replicate.go

@ -0,0 +1,212 @@
package storage
import (
"io"
"os"
"sort"
"sync"
"encoding/binary"
)
type DirtyData struct {
Offset int64 `comment:"Dirty data start offset"`
Size uint32 `comment:"Size of the dirty data"`
}
type DirtyDatas []DirtyData
func (s DirtyDatas) Len() int { return len(s) }
func (s DirtyDatas) Less(i, j int) bool { return s[i].Offset < s[j].Offset }
func (s DirtyDatas) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s DirtyDatas) Sort() { sort.Sort(s) }
func (s DirtyDatas) Search(offset int64) int {
return sort.Search(len(s), func(i int) bool {
v := &s[i]
return v.Offset+int64(v.Size) > offset
})
}
type CleanReader struct {
Dirtys DirtyDatas
DataFile *os.File
pr *io.PipeReader
pw *io.PipeWriter
mutex sync.Mutex
}
func ScanDirtyData(indexFileContent []byte) (dirtys DirtyDatas) {
m := NewCompactMap()
for i := 0; i+16 <= len(indexFileContent); i += 16 {
bytes := indexFileContent[i : i+16]
key := binary.BigEndian.Uint64(bytes[:8])
offset := binary.BigEndian.Uint32(bytes[8:12])
size := binary.BigEndian.Uint32(bytes[12:16])
k := Key(key)
if offset != 0 && size != 0 {
m.Set(k, offset, size)
} else {
if nv, ok := m.Get(k); ok {
//mark old needle data as dirty data
if int64(nv.Size)-NeedleHeaderSize > 0 {
dirtys = append(dirtys, DirtyData{
Offset: int64(nv.Offset)*8 + NeedleHeaderSize,
Size: nv.Size,
})
}
}
m.Delete(k)
}
}
dirtys.Sort()
return dirtys
}
func (cr *CleanReader) Seek(offset int64, whence int) (int64, error) {
oldOff, e := cr.DataFile.Seek(0, 1)
if e != nil {
return 0, e
}
newOff, e := cr.DataFile.Seek(offset, whence)
if e != nil {
return 0, e
}
if oldOff != newOff {
cr.closePipe(true)
}
return newOff, nil
}
func (cr *CleanReader) Size() (int64, error) {
fi, e := cr.DataFile.Stat()
if e != nil {
return 0, e
}
return fi.Size(), nil
}
func (cdr *CleanReader) WriteTo(w io.Writer) (written int64, err error) {
off, e := cdr.DataFile.Seek(0, 1)
if e != nil {
return 0, nil
}
const ZeroBufSize = 32 * 1024
zeroBuf := make([]byte, ZeroBufSize)
dirtyIndex := cdr.Dirtys.Search(off)
var nextDirty *DirtyData
if dirtyIndex < len(cdr.Dirtys) {
nextDirty = &cdr.Dirtys[dirtyIndex]
}
for {
if nextDirty != nil && off >= nextDirty.Offset && off < nextDirty.Offset+int64(nextDirty.Size) {
sz := nextDirty.Offset + int64(nextDirty.Size) - off
for sz > 0 {
mn := int64(ZeroBufSize)
if mn > sz {
mn = sz
}
var n int
if n, e = w.Write(zeroBuf[:mn]); e != nil {
return
}
written += int64(n)
sz -= int64(n)
off += int64(n)
}
dirtyIndex++
if dirtyIndex < len(cdr.Dirtys) {
nextDirty = &cdr.Dirtys[dirtyIndex]
} else {
nextDirty = nil
}
if _, e = cdr.DataFile.Seek(off, 0); e != nil {
return
}
} else {
var n, sz int64
if nextDirty != nil {
sz = nextDirty.Offset - off
}
if sz <= 0 {
// copy until eof
n, e = io.Copy(w, cdr.DataFile)
written += n
return
}
if n, e = io.CopyN(w, cdr.DataFile, sz); e != nil {
return
}
off += n
written += n
}
}
return
}
func (cr *CleanReader) ReadAt(p []byte, off int64) (n int, err error) {
cr.Seek(off, 0)
return cr.Read(p)
}
func (cr *CleanReader) Read(p []byte) (int, error) {
return cr.getPipeReader().Read(p)
}
func (cr *CleanReader) Close() (e error) {
cr.closePipe(true)
return cr.DataFile.Close()
}
func (cr *CleanReader) closePipe(lock bool) (e error) {
if lock {
cr.mutex.Lock()
defer cr.mutex.Unlock()
}
if cr.pr != nil {
if err := cr.pr.Close(); err != nil {
e = err
}
}
cr.pr = nil
if cr.pw != nil {
if err := cr.pw.Close(); err != nil {
e = err
}
}
cr.pw = nil
return e
}
func (cr *CleanReader) getPipeReader() io.Reader {
cr.mutex.Lock()
defer cr.mutex.Unlock()
if cr.pr != nil && cr.pw != nil {
return cr.pr
}
cr.closePipe(false)
cr.pr, cr.pw = io.Pipe()
go func(pw *io.PipeWriter) {
_, e := cr.WriteTo(pw)
pw.CloseWithError(e)
}(cr.pw)
return cr.pr
}
func (v *Volume) GetVolumeCleanReader() (cr *CleanReader, err error) {
var dirtys DirtyDatas
if indexData, e := v.nm.IndexFileContent(); e != nil {
return nil, err
} else {
dirtys = ScanDirtyData(indexData)
}
dataFile, e := os.Open(v.FileName() + ".dat")
if e != nil {
return nil, e
}
cr = &CleanReader{
Dirtys: dirtys,
DataFile: dataFile,
}
return
}

22
go/storage/volume_replicate_test.go

@ -0,0 +1,22 @@
package storage
import "testing"
func TestDirtyDataSearch(t *testing.T) {
testData := DirtyDatas{
{30, 20}, {106, 200}, {5, 20}, {512, 68}, {412, 50},
}
testOffset := []int64{
0, 150, 480, 1024,
}
testData.Sort()
t.Logf("TestData = %v", testData)
for _, off := range testOffset {
i := testData.Search(off)
if i < testData.Len() {
t.Logf("(%d) nearest chunk[%d]: %v", off, i, testData[i])
} else {
t.Logf("Search %d return %d ", off, i)
}
}
}

3
go/storage/volume_sync.go

@ -202,6 +202,9 @@ func (v *Volume) fetchNeedle(volumeDataContentHandlerUrl string,
if err != nil { if err != nil {
return fmt.Errorf("Reading from %s error: %v", volumeDataContentHandlerUrl, err) return fmt.Errorf("Reading from %s error: %v", volumeDataContentHandlerUrl, err)
} }
if needleValue.Size != uint32(len(b)) {
return fmt.Errorf("Reading from %s error: size incorrect", volumeDataContentHandlerUrl)
}
offset, err := v.AppendBlob(b) offset, err := v.AppendBlob(b)
if err != nil { if err != nil {
return fmt.Errorf("Appending volume %d error: %v", v.Id, err) return fmt.Errorf("Appending volume %d error: %v", v.Id, err)

1
go/weed/weed_server/volume_server.go

@ -57,6 +57,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
adminMux.HandleFunc("/admin/sync/status", vs.guard.WhiteList(vs.getVolumeSyncStatusHandler)) adminMux.HandleFunc("/admin/sync/status", vs.guard.WhiteList(vs.getVolumeSyncStatusHandler))
adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler)) adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler))
adminMux.HandleFunc("/admin/sync/data", vs.guard.WhiteList(vs.getVolumeDataContentHandler)) adminMux.HandleFunc("/admin/sync/data", vs.guard.WhiteList(vs.getVolumeDataContentHandler))
adminMux.HandleFunc("/admin/sync/vol_data", vs.guard.WhiteList(vs.getVolumeCleanDataHandler))
adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler)) adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler))

66
go/weed/weed_server/volume_server_handlers_replicate.go

@ -0,0 +1,66 @@
package weed_server
import (
"fmt"
"io"
"net/http"
"strconv"
"github.com/chrislusf/seaweedfs/go/glog"
"github.com/pierrec/lz4"
)
func (vs *VolumeServer) getVolumeCleanDataHandler(w http.ResponseWriter, r *http.Request) {
v, e := vs.getVolume("volume", r)
if v == nil {
http.Error(w, fmt.Sprintf("Not Found volume: %v", e), http.StatusBadRequest)
return
}
cr, e := v.GetVolumeCleanReader()
if e != nil {
http.Error(w, fmt.Sprintf("Get volume clean reader: %v", e), http.StatusInternalServerError)
return
}
totalSize, e := cr.Size()
if e != nil {
http.Error(w, fmt.Sprintf("Get volume size: %v", e), http.StatusInternalServerError)
return
}
w.Header().Set("Accept-Ranges", "bytes")
w.Header().Set("Content-Disposition", fmt.Sprintf(`filename="%d.dat.lz4"`, v.Id))
rangeReq := r.Header.Get("Range")
if rangeReq == "" {
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
w.Header().Set("Content-Encoding", "lz4")
lz4w := lz4.NewWriter(w)
if _, e = io.Copy(lz4w, cr); e != nil {
glog.V(4).Infoln("response write error:", e)
}
lz4w.Close()
return
}
ranges, e := parseRange(rangeReq, totalSize)
if e != nil {
http.Error(w, e.Error(), http.StatusRequestedRangeNotSatisfiable)
return
}
if len(ranges) != 1 {
http.Error(w, "Only support one range", http.StatusNotImplemented)
return
}
ra := ranges[0]
if _, e := cr.Seek(ra.start, 0); e != nil {
http.Error(w, e.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
w.Header().Set("Content-Range", ra.contentRange(totalSize))
w.Header().Set("Content-Encoding", "lz4")
w.WriteHeader(http.StatusPartialContent)
lz4w := lz4.NewWriter(w)
if _, e = io.CopyN(lz4w, cr, ra.length); e != nil {
glog.V(2).Infoln("response write error:", e)
}
lz4w.Close()
}
Loading…
Cancel
Save