From 7566782c2e172408bbbfd32bb5c3b505bfd8d5e1 Mon Sep 17 00:00:00 2001 From: zhangsong Date: Wed, 30 Jun 2021 17:28:37 +0800 Subject: [PATCH] add proxy mode to read non-local volumes --- weed/command/server.go | 2 +- weed/command/volume.go | 6 +-- weed/server/volume_server.go | 6 +-- weed/server/volume_server_handlers.go | 2 + weed/server/volume_server_handlers_read.go | 51 ++++++++++++++++++---- 5 files changed, 52 insertions(+), 15 deletions(-) diff --git a/weed/command/server.go b/weed/command/server.go index d2bd6466e..a4050aa51 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -107,7 +107,7 @@ func init() { serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.") serverOptions.v.diskType = cmdServer.Flag.String("volume.disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag") serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.") - serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.") + serverOptions.v.readMode = cmdServer.Flag.String("volume.readMode", "redirect", "[local|remote|redirect] how to deal with non-local volume: 'not found|read in remote node|redirect volume location'.") serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory") serverOptions.v.concurrentUploadLimitMB = cmdServer.Flag.Int("volume.concurrentUploadLimitMB", 64, "limit total concurrent upload size") diff --git a/weed/command/volume.go b/weed/command/volume.go index 139a3791e..e974a9082 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -51,7 +51,7 @@ type VolumeServerOptions struct { indexType *string diskType *string fixJpgOrientation *bool - readRedirect *bool + readMode *string cpuProfile *string memProfile *string compactionMBPerSecond *int @@ -80,7 +80,7 @@ func init() { v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.") v.diskType = cmdVolume.Flag.String("disk", "", "[hdd|ssd|] hard drive or solid state drive or any tag") v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", false, "Adjust jpg orientation when uploading.") - v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.") + v.readMode = cmdVolume.Flag.String("readMode", "redirect", "[local|remote|redirect] how to deal with non-local volume: 'not found|read in remote node|redirect volume location'.") v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file") v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file") v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second") @@ -228,7 +228,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v volumeNeedleMapKind, strings.Split(masters, ","), 5, *v.dataCenter, *v.rack, v.whiteList, - *v.fixJpgOrientation, *v.readRedirect, + *v.fixJpgOrientation, *v.readMode, *v.compactionMBPerSecond, *v.fileSizeLimitMB, int64(*v.concurrentUploadLimitMB)*1024*1024, diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index f7359ea6b..74c5f72c6 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -28,7 +28,7 @@ type VolumeServer struct { needleMapKind storage.NeedleMapKind FixJpgOrientation bool - ReadRedirect bool + ReadMode string compactionBytePerSecond int64 metricsAddress string metricsIntervalSec int @@ -50,7 +50,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, dataCenter string, rack string, whiteList []string, fixJpgOrientation bool, - readRedirect bool, + readMode string, compactionMBPerSecond int, fileSizeLimitMB int, concurrentUploadLimit int64, @@ -72,7 +72,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, rack: rack, needleMapKind: needleMapKind, FixJpgOrientation: fixJpgOrientation, - ReadRedirect: readRedirect, + ReadMode: readMode, grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"), compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024, fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go index 8ac3c0d90..d35146d56 100644 --- a/weed/server/volume_server_handlers.go +++ b/weed/server/volume_server_handlers.go @@ -1,6 +1,7 @@ package weed_server import ( + "fmt" "net/http" "strconv" "strings" @@ -81,6 +82,7 @@ func getContentLength(r *http.Request) int64 { } func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Request) { + fmt.Printf("publicReadOnlyHandler in.") w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) if r.Header.Get("Origin") != "" { w.Header().Set("Access-Control-Allow-Origin", "*") diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 3e977cfd4..2100eb620 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -58,14 +58,53 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) hasVolume := vs.store.HasVolume(volumeId) _, hasEcVolume := vs.store.FindEcVolume(volumeId) if !hasVolume && !hasEcVolume { - if !vs.ReadRedirect { - glog.V(2).Infoln("volume is not local:", err, r.URL.Path) + if vs.ReadMode == "local" { + glog.V(0).Infoln("volume is not local:", err, r.URL.Path) w.WriteHeader(http.StatusNotFound) return } lookupResult, err := operation.Lookup(vs.GetMaster, volumeId.String()) glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err) - if err == nil && len(lookupResult.Locations) > 0 { + if err != nil || len(lookupResult.Locations) <= 0{ + glog.V(0).Infoln("lookup error:", err, r.URL.Path) + w.WriteHeader(http.StatusNotFound) + return + } + if vs.ReadMode == "remote" { + // proxy client request to target server + u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].Url)) + r.URL.Host = u.Host + r.URL.Scheme = u.Scheme + request, err := http.NewRequest("GET", r.URL.String(), nil) + if err != nil { + glog.V(0).Infof("failed to instance http request of url %s: %v", r.URL.String(), err) + w.WriteHeader(http.StatusInternalServerError) + return + } + for k, vv := range r.Header { + for _, v := range vv { + request.Header.Add(k, v) + } + } + + response, err := client.Do(request) + if err != nil { + glog.V(0).Infof("request remote url %s: %v", r.URL.String(), err) + w.WriteHeader(http.StatusInternalServerError) + return + } + defer util.CloseResponse(response) + // proxy target response to client + for k, vv := range response.Header { + for _, v := range vv { + w.Header().Add(k, v) + } + } + w.WriteHeader(response.StatusCode) + io.Copy(w, response.Body) + return + } else { + // redirect u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl)) u.Path = fmt.Sprintf("%s/%s,%s", u.Path, vid, fid) arg := url.Values{} @@ -74,12 +113,8 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } u.RawQuery = arg.Encode() http.Redirect(w, r, u.String(), http.StatusMovedPermanently) - - } else { - glog.V(2).Infoln("lookup error:", err, r.URL.Path) - w.WriteHeader(http.StatusNotFound) + return } - return } cookie := n.Cookie