diff --git a/go/storage/volume_replicate.go b/go/storage/volume_replicate.go index 2fc12fc9e..1ea6a8765 100644 --- a/go/storage/volume_replicate.go +++ b/go/storage/volume_replicate.go @@ -73,6 +73,14 @@ func (cr *CleanReader) Seek(offset int64, whence int) (int64, error) { return cr.DataFile.Seek(offset, whence) } +func (cr *CleanReader) Size() (int64, error) { + fi, e := cr.DataFile.Stat() + if e != nil { + return 0, e + } + return fi.Size(), nil +} + func (cdr *CleanReader) WriteTo(w io.Writer) (written int64, err error) { off, e := cdr.DataFile.Seek(0, 1) if e != nil { @@ -84,9 +92,6 @@ func (cdr *CleanReader) WriteTo(w io.Writer) (written int64, err error) { var nextDirty *DirtyData if dirtyIndex < len(cdr.Dirtys) { nextDirty = &cdr.Dirtys[dirtyIndex] - if nextDirty.Offset+int64(nextDirty.Size) < off { - nextDirty = nil - } } for { if nextDirty != nil && off >= nextDirty.Offset && off < nextDirty.Offset+int64(nextDirty.Size) { @@ -189,6 +194,7 @@ func (v *Volume) GetVolumeCleanReader() (cr *CleanReader, err error) { dirtys = ScanDirtyData(indexData) } dataFile, e := os.Open(v.FileName()) + if e != nil { return nil, e } diff --git a/go/weed/weed_server/volume_server.go b/go/weed/weed_server/volume_server.go index 8becdd0f1..b8472235e 100644 --- a/go/weed/weed_server/volume_server.go +++ b/go/weed/weed_server/volume_server.go @@ -57,6 +57,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, adminMux.HandleFunc("/admin/sync/status", vs.guard.WhiteList(vs.getVolumeSyncStatusHandler)) adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler)) adminMux.HandleFunc("/admin/sync/data", vs.guard.WhiteList(vs.getVolumeDataContentHandler)) + adminMux.HandleFunc("/admin/sync/vol_data", vs.guard.WhiteList(vs.getVolumeCleanDataHandler)) adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler)) diff --git a/go/weed/weed_server/volume_server_handlers_replicate.go b/go/weed/weed_server/volume_server_handlers_replicate.go new file mode 100644 index 000000000..c8b807ffb --- /dev/null +++ b/go/weed/weed_server/volume_server_handlers_replicate.go @@ -0,0 +1,60 @@ +package weed_server + +import ( + "fmt" + "net/http" + "strconv" + "github.com/chrislusf/seaweedfs/go/glog" + "io" + "github.com/pierrec/lz4" +) + +func (vs *VolumeServer) getVolumeCleanDataHandler(w http.ResponseWriter, r *http.Request) { + v, err := vs.getVolume("volume", r) + if v == nil { + http.Error(w, fmt.Sprintf("Not Found volume: %v", err), http.StatusBadRequest) + return + } + cr, e := v.GetVolumeCleanReader() + if e != nil { + http.Error(w, fmt.Sprintf("Get volume clean reader: %v", err), http.StatusInternalServerError) + return + } + totalSize, e := cr.Size() + if e != nil { + http.Error(w, fmt.Sprintf("Get volume size: %v", err), http.StatusInternalServerError) + return + } + w.Header().Set("Accept-Ranges", "bytes") + w.Header().Set("Content-Encoding", "lz4") + lz4w := lz4.NewWriter(w) + defer lz4w.Close() + rangeReq := r.Header.Get("Range") + if rangeReq == "" { + w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) + if _, e = io.Copy(lz4w, cr); e != nil { + glog.V(4).Infoln("response write error:", e) + } + return + } + ranges, err := parseRange(rangeReq, totalSize) + if err != nil { + http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable) + return + } + if len(ranges) != 1 { + http.Error(w, "Only support one range", http.StatusNotImplemented) + return + } + ra := ranges[0] + if _, e := cr.Seek(ra.start, 0); e != nil { + http.Error(w, fmt.Sprintf("Seek: %v", err), http.StatusInternalServerError) + return + } + w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10)) + w.Header().Set("Content-Range", ra.contentRange(totalSize)) + w.WriteHeader(http.StatusPartialContent) + if _, e = io.CopyN(lz4w, cr, ra.length); e != nil { + glog.V(2).Infoln("response write error:", e) + } +}