Browse Source

Merge pull request #2168 from song-zhang/master

add proxy mode to read non-local volumes
pull/2170/head
Chris Lu 4 years ago
committed by GitHub
parent
commit
4a0ba3a1a9
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      weed/command/server.go
  2. 6
      weed/command/volume.go
  3. 6
      weed/server/volume_server.go
  4. 51
      weed/server/volume_server_handlers_read.go

2
weed/command/server.go

@ -107,7 +107,7 @@ func init() {
serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.") serverOptions.v.indexType = cmdServer.Flag.String("volume.index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.")
serverOptions.v.diskType = cmdServer.Flag.String("volume.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag") serverOptions.v.diskType = cmdServer.Flag.String("volume.disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.") serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.")
serverOptions.v.readRedirect = cmdServer.Flag.Bool("volume.read.redirect", true, "Redirect moved or non-local volumes.")
serverOptions.v.readMode = cmdServer.Flag.String("volume.readMode", "redirect", "[local|remote|redirect] how to deal with non-local volume: 'not found|read in remote node|redirect volume location'.")
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory") serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory")
serverOptions.v.concurrentUploadLimitMB = cmdServer.Flag.Int("volume.concurrentUploadLimitMB", 64, "limit total concurrent upload size") serverOptions.v.concurrentUploadLimitMB = cmdServer.Flag.Int("volume.concurrentUploadLimitMB", 64, "limit total concurrent upload size")

6
weed/command/volume.go

@ -51,7 +51,7 @@ type VolumeServerOptions struct {
indexType *string indexType *string
diskType *string diskType *string
fixJpgOrientation *bool fixJpgOrientation *bool
readRedirect *bool
readMode *string
cpuProfile *string cpuProfile *string
memProfile *string memProfile *string
compactionMBPerSecond *int compactionMBPerSecond *int
@ -80,7 +80,7 @@ func init() {
v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.") v.indexType = cmdVolume.Flag.String("index", "memory", "Choose [memory|leveldb|leveldbMedium|leveldbLarge] mode for memory~performance balance.")
v.diskType = cmdVolume.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag") v.diskType = cmdVolume.Flag.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", false, "Adjust jpg orientation when uploading.") v.fixJpgOrientation = cmdVolume.Flag.Bool("images.fix.orientation", false, "Adjust jpg orientation when uploading.")
v.readRedirect = cmdVolume.Flag.Bool("read.redirect", true, "Redirect moved or non-local volumes.")
v.readMode = cmdVolume.Flag.String("readMode", "redirect", "[local|proxy|redirect] how to deal with non-local volume: 'not found|proxy to remote node|redirect volume location'.")
v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file") v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file")
v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file") v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file")
v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second") v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second")
@ -228,7 +228,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
volumeNeedleMapKind, volumeNeedleMapKind,
strings.Split(masters, ","), 5, *v.dataCenter, *v.rack, strings.Split(masters, ","), 5, *v.dataCenter, *v.rack,
v.whiteList, v.whiteList,
*v.fixJpgOrientation, *v.readRedirect,
*v.fixJpgOrientation, *v.readMode,
*v.compactionMBPerSecond, *v.compactionMBPerSecond,
*v.fileSizeLimitMB, *v.fileSizeLimitMB,
int64(*v.concurrentUploadLimitMB)*1024*1024, int64(*v.concurrentUploadLimitMB)*1024*1024,

6
weed/server/volume_server.go

@ -28,7 +28,7 @@ type VolumeServer struct {
needleMapKind storage.NeedleMapKind needleMapKind storage.NeedleMapKind
FixJpgOrientation bool FixJpgOrientation bool
ReadRedirect bool
ReadMode string
compactionBytePerSecond int64 compactionBytePerSecond int64
metricsAddress string metricsAddress string
metricsIntervalSec int metricsIntervalSec int
@ -50,7 +50,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
dataCenter string, rack string, dataCenter string, rack string,
whiteList []string, whiteList []string,
fixJpgOrientation bool, fixJpgOrientation bool,
readRedirect bool,
readMode string,
compactionMBPerSecond int, compactionMBPerSecond int,
fileSizeLimitMB int, fileSizeLimitMB int,
concurrentUploadLimit int64, concurrentUploadLimit int64,
@ -72,7 +72,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
rack: rack, rack: rack,
needleMapKind: needleMapKind, needleMapKind: needleMapKind,
FixJpgOrientation: fixJpgOrientation, FixJpgOrientation: fixJpgOrientation,
ReadRedirect: readRedirect,
ReadMode: readMode,
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"), grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024, compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,

51
weed/server/volume_server_handlers_read.go

@ -58,14 +58,53 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
hasVolume := vs.store.HasVolume(volumeId) hasVolume := vs.store.HasVolume(volumeId)
_, hasEcVolume := vs.store.FindEcVolume(volumeId) _, hasEcVolume := vs.store.FindEcVolume(volumeId)
if !hasVolume && !hasEcVolume { if !hasVolume && !hasEcVolume {
if !vs.ReadRedirect {
glog.V(2).Infoln("volume is not local:", err, r.URL.Path)
if vs.ReadMode == "local" {
glog.V(0).Infoln("volume is not local:", err, r.URL.Path)
w.WriteHeader(http.StatusNotFound) w.WriteHeader(http.StatusNotFound)
return return
} }
lookupResult, err := operation.Lookup(vs.GetMaster, volumeId.String()) lookupResult, err := operation.Lookup(vs.GetMaster, volumeId.String())
glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err) glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
if err == nil && len(lookupResult.Locations) > 0 {
if err != nil || len(lookupResult.Locations) <= 0{
glog.V(0).Infoln("lookup error:", err, r.URL.Path)
w.WriteHeader(http.StatusNotFound)
return
}
if vs.ReadMode == "proxy" {
// proxy client request to target server
u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].Url))
r.URL.Host = u.Host
r.URL.Scheme = u.Scheme
request, err := http.NewRequest("GET", r.URL.String(), nil)
if err != nil {
glog.V(0).Infof("failed to instance http request of url %s: %v", r.URL.String(), err)
w.WriteHeader(http.StatusInternalServerError)
return
}
for k, vv := range r.Header {
for _, v := range vv {
request.Header.Add(k, v)
}
}
response, err := client.Do(request)
if err != nil {
glog.V(0).Infof("request remote url %s: %v", r.URL.String(), err)
w.WriteHeader(http.StatusInternalServerError)
return
}
defer util.CloseResponse(response)
// proxy target response to client
for k, vv := range response.Header {
for _, v := range vv {
w.Header().Add(k, v)
}
}
w.WriteHeader(response.StatusCode)
io.Copy(w, response.Body)
return
} else {
// redirect
u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl)) u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl))
u.Path = fmt.Sprintf("%s/%s,%s", u.Path, vid, fid) u.Path = fmt.Sprintf("%s/%s,%s", u.Path, vid, fid)
arg := url.Values{} arg := url.Values{}
@ -74,13 +113,9 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
} }
u.RawQuery = arg.Encode() u.RawQuery = arg.Encode()
http.Redirect(w, r, u.String(), http.StatusMovedPermanently) http.Redirect(w, r, u.String(), http.StatusMovedPermanently)
} else {
glog.V(2).Infoln("lookup error:", err, r.URL.Path)
w.WriteHeader(http.StatusNotFound)
}
return return
} }
}
cookie := n.Cookie cookie := n.Cookie
readOption := &storage.ReadOption{ readOption := &storage.ReadOption{

Loading…
Cancel
Save