Browse Source

Separate vacuum speed from replication speed (#7632)

pull/7635/head
msementsov 1 week ago
committed by GitHub
parent
commit
c0dad091f1
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 1
      weed/command/server.go
  2. 3
      weed/command/volume.go
  3. 4
      weed/server/volume_grpc_copy.go
  4. 23
      weed/server/volume_server.go

1
weed/command/server.go

@ -139,6 +139,7 @@ func init() {
serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.") serverOptions.v.fixJpgOrientation = cmdServer.Flag.Bool("volume.images.fix.orientation", false, "Adjust jpg orientation when uploading.")
serverOptions.v.readMode = cmdServer.Flag.String("volume.readMode", "proxy", "[local|proxy|redirect] how to deal with non-local volume: 'not found|read in remote node|redirect volume location'.") serverOptions.v.readMode = cmdServer.Flag.String("volume.readMode", "proxy", "[local|proxy|redirect] how to deal with non-local volume: 'not found|read in remote node|redirect volume location'.")
serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second") serverOptions.v.compactionMBPerSecond = cmdServer.Flag.Int("volume.compactionMBps", 0, "limit compaction speed in mega bytes per second")
serverOptions.v.maintenanceMBPerSecond = cmdServer.Flag.Int("volume.maintenanceMBps", 0, "limit maintenance (replication / balance) IO rate in MB/s. Unset is 0, no limitation.")
serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory") serverOptions.v.fileSizeLimitMB = cmdServer.Flag.Int("volume.fileSizeLimitMB", 256, "limit file size to avoid out of memory")
serverOptions.v.ldbTimeout = cmdServer.Flag.Int64("volume.index.leveldbTimeout", 0, "alive time for leveldb (default to 0). If leveldb of volume is not accessed in ldbTimeout hours, it will be off loaded to reduce opened files and memory consumption.") serverOptions.v.ldbTimeout = cmdServer.Flag.Int64("volume.index.leveldbTimeout", 0, "alive time for leveldb (default to 0). If leveldb of volume is not accessed in ldbTimeout hours, it will be off loaded to reduce opened files and memory consumption.")
serverOptions.v.concurrentUploadLimitMB = cmdServer.Flag.Int("volume.concurrentUploadLimitMB", 64, "limit total concurrent upload size") serverOptions.v.concurrentUploadLimitMB = cmdServer.Flag.Int("volume.concurrentUploadLimitMB", 64, "limit total concurrent upload size")

3
weed/command/volume.go

@ -58,6 +58,7 @@ type VolumeServerOptions struct {
cpuProfile *string cpuProfile *string
memProfile *string memProfile *string
compactionMBPerSecond *int compactionMBPerSecond *int
maintenanceMBPerSecond *int
fileSizeLimitMB *int fileSizeLimitMB *int
concurrentUploadLimitMB *int concurrentUploadLimitMB *int
concurrentDownloadLimitMB *int concurrentDownloadLimitMB *int
@ -96,6 +97,7 @@ func init() {
v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file") v.cpuProfile = cmdVolume.Flag.String("cpuprofile", "", "cpu profile output file")
v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file") v.memProfile = cmdVolume.Flag.String("memprofile", "", "memory profile output file")
v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second") v.compactionMBPerSecond = cmdVolume.Flag.Int("compactionMBps", 0, "limit background compaction or copying speed in mega bytes per second")
v.maintenanceMBPerSecond = cmdVolume.Flag.Int("maintenanceMBps", 0, "limit maintenance (replication / balance) IO rate in MB/s. Unset is 0, no limitation.")
v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory") v.fileSizeLimitMB = cmdVolume.Flag.Int("fileSizeLimitMB", 256, "limit file size to avoid out of memory")
v.ldbTimeout = cmdVolume.Flag.Int64("index.leveldbTimeout", 0, "alive time for leveldb (default to 0). If leveldb of volume is not accessed in ldbTimeout hours, it will be off loaded to reduce opened files and memory consumption.") v.ldbTimeout = cmdVolume.Flag.Int64("index.leveldbTimeout", 0, "alive time for leveldb (default to 0). If leveldb of volume is not accessed in ldbTimeout hours, it will be off loaded to reduce opened files and memory consumption.")
v.concurrentUploadLimitMB = cmdVolume.Flag.Int("concurrentUploadLimitMB", 256, "limit total concurrent upload size") v.concurrentUploadLimitMB = cmdVolume.Flag.Int("concurrentUploadLimitMB", 256, "limit total concurrent upload size")
@ -267,6 +269,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v
v.whiteList, v.whiteList,
*v.fixJpgOrientation, *v.readMode, *v.fixJpgOrientation, *v.readMode,
*v.compactionMBPerSecond, *v.compactionMBPerSecond,
*v.maintenanceMBPerSecond,
*v.fileSizeLimitMB, *v.fileSizeLimitMB,
int64(*v.concurrentUploadLimitMB)*1024*1024, int64(*v.concurrentUploadLimitMB)*1024*1024,
int64(*v.concurrentDownloadLimitMB)*1024*1024, int64(*v.concurrentDownloadLimitMB)*1024*1024,

4
weed/server/volume_grpc_copy.go

@ -115,7 +115,7 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
var sendErr error var sendErr error
var ioBytePerSecond int64 var ioBytePerSecond int64
if req.IoBytePerSecond <= 0 { if req.IoBytePerSecond <= 0 {
ioBytePerSecond = vs.compactionBytePerSecond
ioBytePerSecond = vs.maintenanceBytePerSecond
} else { } else {
ioBytePerSecond = req.IoBytePerSecond ioBytePerSecond = req.IoBytePerSecond
} }
@ -199,7 +199,7 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre
} }
func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) { func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) {
return vs.doCopyFileWithThrottler(client, isEcVolume, collection, vid, compactRevision, stopOffset, baseFileName, ext, isAppend, ignoreSourceFileNotFound, progressFn, util.NewWriteThrottler(vs.compactionBytePerSecond))
return vs.doCopyFileWithThrottler(client, isEcVolume, collection, vid, compactRevision, stopOffset, baseFileName, ext, isAppend, ignoreSourceFileNotFound, progressFn, util.NewWriteThrottler(vs.maintenanceBytePerSecond))
} }
func (vs *VolumeServer) doCopyFileWithThrottler(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc, throttler *util.WriteThrottler) (modifiedTsNs int64, err error) { func (vs *VolumeServer) doCopyFileWithThrottler(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc, throttler *util.WriteThrottler) (modifiedTsNs int64, err error) {

23
weed/server/volume_server.go

@ -42,16 +42,17 @@ type VolumeServer struct {
guard *security.Guard guard *security.Guard
grpcDialOption grpc.DialOption grpcDialOption grpc.DialOption
needleMapKind storage.NeedleMapKind
ldbTimout int64
FixJpgOrientation bool
ReadMode string
compactionBytePerSecond int64
metricsAddress string
metricsIntervalSec int
fileSizeLimitBytes int64
isHeartbeating bool
stopChan chan bool
needleMapKind storage.NeedleMapKind
ldbTimout int64
FixJpgOrientation bool
ReadMode string
compactionBytePerSecond int64
maintenanceBytePerSecond int64
metricsAddress string
metricsIntervalSec int
fileSizeLimitBytes int64
isHeartbeating bool
stopChan chan bool
} }
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
@ -65,6 +66,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
fixJpgOrientation bool, fixJpgOrientation bool,
readMode string, readMode string,
compactionMBPerSecond int, compactionMBPerSecond int,
maintenanceMBPerSecond int,
fileSizeLimitMB int, fileSizeLimitMB int,
concurrentUploadLimit int64, concurrentUploadLimit int64,
concurrentDownloadLimit int64, concurrentDownloadLimit int64,
@ -94,6 +96,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
ReadMode: readMode, ReadMode: readMode,
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"), grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024, compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
maintenanceBytePerSecond: int64(maintenanceMBPerSecond) * 1024 * 1024,
fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
isHeartbeating: true, isHeartbeating: true,
stopChan: make(chan bool), stopChan: make(chan bool),

Loading…
Cancel
Save