Browse Source

update

pull/283/head
tnextday 10 years ago
parent
commit
39c97f8955
  1. 30
      go/storage/volume_replicate.go
  2. 15
      go/weed/weed_server/volume_server_handlers_replicate.go

30
go/storage/volume_replicate.go

@ -63,14 +63,18 @@ func ScanDirtyData(indexFileContent []byte) (dirtys DirtyDatas) {
}
func (cr *CleanReader) Seek(offset int64, whence int) (int64, error) {
off, e := cr.DataFile.Seek(0, 1)
oldOff, e := cr.DataFile.Seek(0, 1)
if e != nil {
return 0, nil
return 0, e
}
newOff, e := cr.DataFile.Seek(offset, whence)
if e != nil {
return 0, e
}
if off != offset {
cr.Close()
if oldOff != newOff {
cr.closePipe(true)
}
return cr.DataFile.Seek(offset, whence)
return newOff, nil
}
func (cr *CleanReader) Size() (int64, error) {
@ -125,7 +129,7 @@ func (cdr *CleanReader) WriteTo(w io.Writer) (written int64, err error) {
}
if sz <= 0 {
// copy until eof
n, e = io.Copy(w, cdr.DataFile);
n, e = io.Copy(w, cdr.DataFile)
written += n
return
}
@ -149,13 +153,15 @@ func (cr *CleanReader) Read(p []byte) (int, error) {
}
func (cr *CleanReader) Close() (e error) {
cr.mutex.Lock()
defer cr.mutex.Unlock()
cr.closePipe()
cr.closePipe(true)
return cr.DataFile.Close()
}
func (cr *CleanReader) closePipe() (e error) {
func (cr *CleanReader) closePipe(lock bool) (e error) {
if lock {
cr.mutex.Lock()
defer cr.mutex.Unlock()
}
if cr.pr != nil {
if err := cr.pr.Close(); err != nil {
e = err
@ -177,7 +183,7 @@ func (cr *CleanReader) getPipeReader() io.Reader {
if cr.pr != nil && cr.pw != nil {
return cr.pr
}
cr.closePipe()
cr.closePipe(false)
cr.pr, cr.pw = io.Pipe()
go func(pw *io.PipeWriter) {
_, e := cr.WriteTo(pw)
@ -193,7 +199,7 @@ func (v *Volume) GetVolumeCleanReader() (cr *CleanReader, err error) {
} else {
dirtys = ScanDirtyData(indexData)
}
dataFile, e := os.Open(v.FileName()+".dat")
dataFile, e := os.Open(v.FileName() + ".dat")
if e != nil {
return nil, e

15
go/weed/weed_server/volume_server_handlers_replicate.go

@ -2,10 +2,11 @@ package weed_server
import (
"fmt"
"io"
"net/http"
"strconv"
"github.com/chrislusf/seaweedfs/go/glog"
"io"
"github.com/pierrec/lz4"
)
@ -26,16 +27,17 @@ func (vs *VolumeServer) getVolumeCleanDataHandler(w http.ResponseWriter, r *http
return
}
w.Header().Set("Accept-Ranges", "bytes")
w.Header().Set("Content-Encoding", "lz4")
w.Header().Set("Content-Disposition", fmt.Sprintf(`filename="%d.dat.lz4"`, v.Id))
lz4w := lz4.NewWriter(w)
defer lz4w.Close()
rangeReq := r.Header.Get("Range")
if rangeReq == "" {
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
w.Header().Set("Content-Encoding", "lz4")
lz4w := lz4.NewWriter(w)
if _, e = io.Copy(lz4w, cr); e != nil {
glog.V(4).Infoln("response write error:", e)
}
lz4w.Close()
return
}
ranges, e := parseRange(rangeReq, totalSize)
@ -49,13 +51,16 @@ func (vs *VolumeServer) getVolumeCleanDataHandler(w http.ResponseWriter, r *http
}
ra := ranges[0]
if _, e := cr.Seek(ra.start, 0); e != nil {
http.Error(w, fmt.Sprintf("Seek: %v", e), http.StatusInternalServerError)
http.Error(w, e.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
w.Header().Set("Content-Range", ra.contentRange(totalSize))
w.Header().Set("Content-Encoding", "lz4")
w.WriteHeader(http.StatusPartialContent)
lz4w := lz4.NewWriter(w)
if _, e = io.CopyN(lz4w, cr, ra.length); e != nil {
glog.V(2).Infoln("response write error:", e)
}
lz4w.Close()
}
Loading…
Cancel
Save