Browse Source

Volume server add `/admin/sync/vol_data` handler to serve cleaned volume date

pull/283/head
tnextday 10 years ago
parent
commit
9d03f763dc
  1. 12
      go/storage/volume_replicate.go
  2. 1
      go/weed/weed_server/volume_server.go
  3. 60
      go/weed/weed_server/volume_server_handlers_replicate.go

12
go/storage/volume_replicate.go

@ -73,6 +73,14 @@ func (cr *CleanReader) Seek(offset int64, whence int) (int64, error) {
return cr.DataFile.Seek(offset, whence)
}
func (cr *CleanReader) Size() (int64, error) {
fi, e := cr.DataFile.Stat()
if e != nil {
return 0, e
}
return fi.Size(), nil
}
func (cdr *CleanReader) WriteTo(w io.Writer) (written int64, err error) {
off, e := cdr.DataFile.Seek(0, 1)
if e != nil {
@ -84,9 +92,6 @@ func (cdr *CleanReader) WriteTo(w io.Writer) (written int64, err error) {
var nextDirty *DirtyData
if dirtyIndex < len(cdr.Dirtys) {
nextDirty = &cdr.Dirtys[dirtyIndex]
if nextDirty.Offset+int64(nextDirty.Size) < off {
nextDirty = nil
}
}
for {
if nextDirty != nil && off >= nextDirty.Offset && off < nextDirty.Offset+int64(nextDirty.Size) {
@ -189,6 +194,7 @@ func (v *Volume) GetVolumeCleanReader() (cr *CleanReader, err error) {
dirtys = ScanDirtyData(indexData)
}
dataFile, e := os.Open(v.FileName())
if e != nil {
return nil, e
}

1
go/weed/weed_server/volume_server.go

@ -57,6 +57,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
adminMux.HandleFunc("/admin/sync/status", vs.guard.WhiteList(vs.getVolumeSyncStatusHandler))
adminMux.HandleFunc("/admin/sync/index", vs.guard.WhiteList(vs.getVolumeIndexContentHandler))
adminMux.HandleFunc("/admin/sync/data", vs.guard.WhiteList(vs.getVolumeDataContentHandler))
adminMux.HandleFunc("/admin/sync/vol_data", vs.guard.WhiteList(vs.getVolumeCleanDataHandler))
adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler))

60
go/weed/weed_server/volume_server_handlers_replicate.go

@ -0,0 +1,60 @@
package weed_server
import (
"fmt"
"net/http"
"strconv"
"github.com/chrislusf/seaweedfs/go/glog"
"io"
"github.com/pierrec/lz4"
)
func (vs *VolumeServer) getVolumeCleanDataHandler(w http.ResponseWriter, r *http.Request) {
v, err := vs.getVolume("volume", r)
if v == nil {
http.Error(w, fmt.Sprintf("Not Found volume: %v", err), http.StatusBadRequest)
return
}
cr, e := v.GetVolumeCleanReader()
if e != nil {
http.Error(w, fmt.Sprintf("Get volume clean reader: %v", err), http.StatusInternalServerError)
return
}
totalSize, e := cr.Size()
if e != nil {
http.Error(w, fmt.Sprintf("Get volume size: %v", err), http.StatusInternalServerError)
return
}
w.Header().Set("Accept-Ranges", "bytes")
w.Header().Set("Content-Encoding", "lz4")
lz4w := lz4.NewWriter(w)
defer lz4w.Close()
rangeReq := r.Header.Get("Range")
if rangeReq == "" {
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
if _, e = io.Copy(lz4w, cr); e != nil {
glog.V(4).Infoln("response write error:", e)
}
return
}
ranges, err := parseRange(rangeReq, totalSize)
if err != nil {
http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable)
return
}
if len(ranges) != 1 {
http.Error(w, "Only support one range", http.StatusNotImplemented)
return
}
ra := ranges[0]
if _, e := cr.Seek(ra.start, 0); e != nil {
http.Error(w, fmt.Sprintf("Seek: %v", err), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
w.Header().Set("Content-Range", ra.contentRange(totalSize))
w.WriteHeader(http.StatusPartialContent)
if _, e = io.CopyN(lz4w, cr, ra.length); e != nil {
glog.V(2).Infoln("response write error:", e)
}
}
Loading…
Cancel
Save