From d949a238b83dc05fe79a1fb02a047a25355feb7b Mon Sep 17 00:00:00 2001 From: famosss Date: Fri, 16 Sep 2022 15:30:40 +0800 Subject: [PATCH 1/4] volume: add "readBufSize" option to customize read optimization (#3702) * simplify a bit * feat: volume: add "readBufSize" option to customize read optimization * refactor : redbufSIze -> readBufferSize * simplify a bit * simplify a bit --- weed/command/server.go | 1 + weed/command/volume.go | 3 +++ weed/server/volume_server.go | 3 +++ weed/server/volume_server_handlers_read.go | 5 +++-- weed/storage/store.go | 4 ++++ weed/storage/volume_read.go | 2 +- 6 files changed, 15 insertions(+), 3 deletions(-) diff --git a/weed/command/server.go b/weed/command/server.go index 74f0e582e..78b52ea4b 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -132,6 +132,7 @@ func init() { serverOptions.v.idxFolder = cmdServer.Flag.String("volume.dir.idx", "", "directory to store .idx files") serverOptions.v.inflightUploadDataTimeout = cmdServer.Flag.Duration("volume.inflightUploadDataTimeout", 60*time.Second, "inflight upload data wait timeout of volume servers") serverOptions.v.hasSlowRead = cmdServer.Flag.Bool("volume.hasSlowRead", false, " if true, this prevents slow reads from blocking other requests, but large file read P99 latency will increase.") + serverOptions.v.readBufferSize = cmdServer.Flag.Int("volume.readBufferSize", 1024 * 1024, " larger values can optimize query performance but will increase some memory usage,Use with hasSlowRead normally") s3Options.port = cmdServer.Flag.Int("s3.port", 8333, "s3 server http listen port") s3Options.portGrpc = cmdServer.Flag.Int("s3.port.grpc", 0, "s3 server grpc listen port") diff --git a/weed/command/volume.go b/weed/command/volume.go index 3b31ada50..5b62a4844 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -67,6 +67,7 @@ type VolumeServerOptions struct { // pulseSeconds *int inflightUploadDataTimeout *time.Duration hasSlowRead *bool + readBufferSize *int } func init() { @@ -98,6 +99,7 @@ func init() { v.idxFolder = cmdVolume.Flag.String("dir.idx", "", "directory to store .idx files") v.inflightUploadDataTimeout = cmdVolume.Flag.Duration("inflightUploadDataTimeout", 60*time.Second, "inflight upload data wait timeout of volume servers") v.hasSlowRead = cmdVolume.Flag.Bool("hasSlowRead", false, " if true, this prevents slow reads from blocking other requests, but large file read P99 latency will increase.") + v.readBufferSize = cmdVolume.Flag.Int("readBufferSize", 1024 * 1024, " larger values can optimize query performance but will increase some memory usage,Use with hasSlowRead normally.") } var cmdVolume = &Command{ @@ -246,6 +248,7 @@ func (v VolumeServerOptions) startVolumeServer(volumeFolders, maxVolumeCounts, v int64(*v.concurrentDownloadLimitMB)*1024*1024, *v.inflightUploadDataTimeout, *v.hasSlowRead, + *v.readBufferSize, ) // starting grpc server grpcS := v.startGrpcService(volumeServer) diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 8bf50ce45..07bb0b9ee 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -29,6 +29,7 @@ type VolumeServer struct { inFlightDownloadDataLimitCond *sync.Cond inflightUploadDataTimeout time.Duration hasSlowRead bool + readBufferSize int SeedMasterNodes []pb.ServerAddress currentMaster pb.ServerAddress @@ -66,6 +67,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, concurrentDownloadLimit int64, inflightUploadDataTimeout time.Duration, hasSlowRead bool, + readBufferSize int, ) *VolumeServer { v := util.GetViper() @@ -96,6 +98,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, concurrentDownloadLimit: concurrentDownloadLimit, inflightUploadDataTimeout: inflightUploadDataTimeout, hasSlowRead: hasSlowRead, + readBufferSize: readBufferSize, } vs.SeedMasterNodes = masterNodes diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index b8f4120a6..facdf2556 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -116,8 +116,9 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) cookie := n.Cookie readOption := &storage.ReadOption{ - ReadDeleted: r.FormValue("readDeleted") == "true", - HasSlowRead: vs.hasSlowRead, + ReadDeleted: r.FormValue("readDeleted") == "true", + HasSlowRead: vs.hasSlowRead, + ReadBufferSize: vs.readBufferSize, } var count int diff --git a/weed/storage/store.go b/weed/storage/store.go index 48736c1a9..45f87525b 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -44,6 +44,10 @@ type ReadOption struct { // * read requests should complete asap, not blocking other requests. // * write requests may see high latency when downloading large files. HasSlowRead bool + + // increasing ReadBufferSize can reduce the number of get locks times and shorten read P99 latency. + // but will increase memory usage a bit. Use with hasSlowRead normally. + ReadBufferSize int } /* diff --git a/weed/storage/volume_read.go b/weed/storage/volume_read.go index e045137b4..ee3cff45c 100644 --- a/weed/storage/volume_read.go +++ b/weed/storage/volume_read.go @@ -136,7 +136,7 @@ func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, wr actualOffset += int64(MaxPossibleVolumeSize) } - buf := mem.Allocate(min(1024*1024, int(size))) + buf := mem.Allocate(min(readOption.ReadBufferSize, int(size))) defer mem.Free(buf) // read needle data From 277976bd762a108d6b6f7f96500a769c5454549d Mon Sep 17 00:00:00 2001 From: Ryan Russell Date: Fri, 16 Sep 2022 04:43:17 -0500 Subject: [PATCH 2/4] refactor(storage): readability improvements (#3703) Signed-off-by: Ryan Russell Signed-off-by: Ryan Russell --- weed/storage/backend/memory_map/memory_map_windows.go | 2 +- weed/storage/needle/file_id_test.go | 2 +- weed/storage/needle/needle_parse_upload.go | 2 +- weed/storage/needle/needle_write.go | 2 +- weed/storage/store.go | 2 +- weed/storage/store_ec.go | 2 +- weed/storage/store_vacuum.go | 2 +- weed/storage/volume_vacuum.go | 2 +- weed/storage/volume_write.go | 2 +- 9 files changed, 9 insertions(+), 9 deletions(-) diff --git a/weed/storage/backend/memory_map/memory_map_windows.go b/weed/storage/backend/memory_map/memory_map_windows.go index b65e1b20a..ce0c04318 100644 --- a/weed/storage/backend/memory_map/memory_map_windows.go +++ b/weed/storage/backend/memory_map/memory_map_windows.go @@ -153,7 +153,7 @@ func allocate(hMapFile windows.Handle, offset uint64, length uint64, write bool) mBuffer := MemoryBuffer{} - //align memory allocations to the minium virtal memory allocation size + //align memory allocations to the minium virtual memory allocation size dwSysGran := systemInfo.dwAllocationGranularity start := (offset / uint64(dwSysGran)) * uint64(dwSysGran) diff --git a/weed/storage/needle/file_id_test.go b/weed/storage/needle/file_id_test.go index ee5175d39..2796470aa 100644 --- a/weed/storage/needle/file_id_test.go +++ b/weed/storage/needle/file_id_test.go @@ -15,7 +15,7 @@ func TestParseFileIdFromString(t *testing.T) { fidStr1 = "100, 12345678" _, err = ParseFileIdFromString(fidStr1) if err == nil { - t.Errorf("%s : needlId invalid syntax", fidStr1) + t.Errorf("%s : needleId invalid syntax", fidStr1) } fidStr1 = "100,123456789" diff --git a/weed/storage/needle/needle_parse_upload.go b/weed/storage/needle/needle_parse_upload.go index e733cfb99..e00729811 100644 --- a/weed/storage/needle/needle_parse_upload.go +++ b/weed/storage/needle/needle_parse_upload.go @@ -195,7 +195,7 @@ func parseMultipart(r *http.Request, sizeLimit int64, pu *ParsedUpload) (e error } contentType := part.Header.Get("Content-Type") if contentType != "" && contentType != "application/octet-stream" && mtype != contentType { - pu.MimeType = contentType // only return mime type if not deductable + pu.MimeType = contentType // only return mime type if not deducible mtype = contentType } diff --git a/weed/storage/needle/needle_write.go b/weed/storage/needle/needle_write.go index a562417df..60b6e7adb 100644 --- a/weed/storage/needle/needle_write.go +++ b/weed/storage/needle/needle_write.go @@ -128,7 +128,7 @@ func (n *Needle) Append(w backend.BackendStorageFile, version Version) (offset u return } if offset >= MaxPossibleVolumeSize && n.Size.IsValid() { - err = fmt.Errorf("Volume Size %d Exeededs %d", offset, MaxPossibleVolumeSize) + err = fmt.Errorf("Volume Size %d Exceeded %d", offset, MaxPossibleVolumeSize) return } diff --git a/weed/storage/store.go b/weed/storage/store.go index 45f87525b..76be2fa9c 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -62,7 +62,7 @@ type Store struct { GrpcPort int PublicUrl string Locations []*DiskLocation - dataCenter string // optional informaton, overwriting master setting if exists + dataCenter string // optional information, overwriting master setting if exists rack string // optional information, overwriting master setting if exists connected bool NeedleMapKind NeedleMapKind diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index 8d7e890b3..b46803fb2 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -339,7 +339,7 @@ func (s *Store) recoverOneRemoteEcShardInterval(needleId types.NeedleId, ecVolum ecVolume.ShardLocationsLock.RLock() for shardId, locations := range ecVolume.ShardLocations { - // skip currnent shard or empty shard + // skip current shard or empty shard if shardId == shardIdToRecover { continue } diff --git a/weed/storage/store_vacuum.go b/weed/storage/store_vacuum.go index e8db7d7cd..0f25c888a 100644 --- a/weed/storage/store_vacuum.go +++ b/weed/storage/store_vacuum.go @@ -10,7 +10,7 @@ import ( func (s *Store) CheckCompactVolume(volumeId needle.VolumeId) (float64, error) { if v := s.findVolume(volumeId); v != nil { - glog.V(3).Infof("volumd %d garbage level: %f", volumeId, v.garbageLevel()) + glog.V(3).Infof("volume %d garbage level: %f", volumeId, v.garbageLevel()) return v.garbageLevel(), nil } return 0, fmt.Errorf("volume id %d is not found during check compact", volumeId) diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go index 642b3eab5..2862ca94d 100644 --- a/weed/storage/volume_vacuum.go +++ b/weed/storage/volume_vacuum.go @@ -167,7 +167,7 @@ func (v *Volume) CommitCompact() error { if e = v.load(true, false, v.needleMapKind, 0); e != nil { return e } - glog.V(3).Infof("Finish commiting volume %d", v.Id) + glog.V(3).Infof("Finish committing volume %d", v.Id) return nil } diff --git a/weed/storage/volume_write.go b/weed/storage/volume_write.go index 49022cc0a..223e19577 100644 --- a/weed/storage/volume_write.go +++ b/weed/storage/volume_write.go @@ -81,7 +81,7 @@ func removeVolumeFiles(filename string) { // compaction os.Remove(filename + ".cpd") os.Remove(filename + ".cpx") - // level db indx file + // level db index file os.RemoveAll(filename + ".ldb") // marker for damaged or incomplete volume os.Remove(filename + ".note") From 824f7ad9e192c1999fdb5da3a9af8c112904cc66 Mon Sep 17 00:00:00 2001 From: Ryan Russell Date: Fri, 16 Sep 2022 04:43:49 -0500 Subject: [PATCH 3/4] refactor(shell): readability improvements (#3704) Signed-off-by: Ryan Russell Signed-off-by: Ryan Russell --- weed/shell/command_fs_configure.go | 2 +- weed/shell/command_lock_unlock.go | 2 +- weed/shell/command_remote_meta_sync.go | 2 +- weed/shell/command_s3_circuitbreaker_test.go | 2 +- weed/shell/command_s3_clean_uploads.go | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/weed/shell/command_fs_configure.go b/weed/shell/command_fs_configure.go index 1bf42b9ab..60edb79c2 100644 --- a/weed/shell/command_fs_configure.go +++ b/weed/shell/command_fs_configure.go @@ -98,7 +98,7 @@ func (c *commandFsConfigure) Do(args []string, commandEnv *CommandEnv, writer io return fmt.Errorf("parse replication %s: %v", *replication, err) } if *volumeGrowthCount%rp.GetCopyCount() != 0 { - return fmt.Errorf("volumeGrowthCount %d should be devided by replication copy count %d", *volumeGrowthCount, rp.GetCopyCount()) + return fmt.Errorf("volumeGrowthCount %d should be divided by replication copy count %d", *volumeGrowthCount, rp.GetCopyCount()) } } diff --git a/weed/shell/command_lock_unlock.go b/weed/shell/command_lock_unlock.go index a69be21c7..a214ea91c 100644 --- a/weed/shell/command_lock_unlock.go +++ b/weed/shell/command_lock_unlock.go @@ -21,7 +21,7 @@ func (c *commandLock) Name() string { func (c *commandLock) Help() string { return `lock in order to exclusively manage the cluster - This is a blocking operation if there is alread another lock. + This is a blocking operation if there is already another lock. ` } diff --git a/weed/shell/command_remote_meta_sync.go b/weed/shell/command_remote_meta_sync.go index ebd43c326..bc9075746 100644 --- a/weed/shell/command_remote_meta_sync.go +++ b/weed/shell/command_remote_meta_sync.go @@ -96,7 +96,7 @@ After caching the file content, the entry.RemoteEntry will be remoteEntry.LastLocalSyncTsNs == time.Now.UnixNano() Attributes.FileSize = uint64(remoteEntry.RemoteSize) Attributes.Mtime = remoteEntry.RemoteMtime - chunks = non-emtpy + chunks = non-empty When "weed filer.remote.sync" to upload local changes to remote, the criteria is: diff --git a/weed/shell/command_s3_circuitbreaker_test.go b/weed/shell/command_s3_circuitbreaker_test.go index 3d0b4ac6e..d1d92f9f8 100644 --- a/weed/shell/command_s3_circuitbreaker_test.go +++ b/weed/shell/command_s3_circuitbreaker_test.go @@ -285,7 +285,7 @@ func TestCircuitBreakerShell(t *testing.T) { t.Error(err) } if !reflect.DeepEqual(actual, expect) { - t.Fatal("result of s3 circuit breaker shell command is unexpect!") + t.Fatal("result of s3 circuit breaker shell command is unexpected!") } } } diff --git a/weed/shell/command_s3_clean_uploads.go b/weed/shell/command_s3_clean_uploads.go index 831ca47c0..81af4bbb7 100644 --- a/weed/shell/command_s3_clean_uploads.go +++ b/weed/shell/command_s3_clean_uploads.go @@ -60,7 +60,7 @@ func (c *commandS3CleanUploads) Do(args []string, commandEnv *CommandEnv, writer for _, bucket := range buckets { if err := c.cleanupUploads(commandEnv, writer, filerBucketsPath, bucket, *uploadedTimeAgo, signingKey); err != nil { - fmt.Fprintf(writer, fmt.Sprintf("failed cleanup uploads for backet %s: %v", bucket, err)) + fmt.Fprintf(writer, fmt.Sprintf("failed cleanup uploads for bucket %s: %v", bucket, err)) } } From 5d87ad72d8ee90146d2f6b619d54c6ac0ba4b868 Mon Sep 17 00:00:00 2001 From: Konstantin Lebedev <9497591+kmlebedev@users.noreply.github.com> Date: Fri, 16 Sep 2022 21:05:38 +0500 Subject: [PATCH 4/4] mute log filer: no entry is found in filer store (#3707) --- weed/server/filer_server_handlers_write.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index fc767dd9e..d2c296fad 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -195,6 +195,8 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { httpStatus := http.StatusInternalServerError if err == filer_pb.ErrNotFound { httpStatus = http.StatusNoContent + writeJsonQuiet(w, r, httpStatus, nil) + return } writeJsonError(w, r, httpStatus, err) return