From 723ae11db401b791925d99968ad53240c3259305 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 11 Oct 2020 20:15:10 -0700 Subject: [PATCH 01/18] refactoring in order to adjust volume server url later --- weed/filer/reader_at.go | 2 +- weed/filesys/wfs_deletion.go | 2 +- weed/filesys/wfs_filer_client.go | 10 +++++----- weed/filesys/wfs_write.go | 8 ++++++-- weed/messaging/broker/broker_append.go | 4 ++-- weed/pb/filer_pb/filer_client.go | 2 +- weed/replication/sink/filersink/fetch_write.go | 4 ++-- weed/replication/source/filer_source.go | 4 ++-- weed/s3api/s3api_handlers.go | 4 ++-- weed/server/webdav_server.go | 4 ++-- weed/shell/commands.go | 4 ++-- 11 files changed, 26 insertions(+), 22 deletions(-) diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 835d6cfc2..da7eae621 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -75,7 +75,7 @@ func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType { } for _, loc := range locations.Locations { - volumeServerAddress := filerClient.AdjustedUrl(loc.Url) + volumeServerAddress := filerClient.AdjustedUrl(loc) targetUrl := fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId) targetUrls = append(targetUrls, targetUrl) } diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go index 9791c8630..a245b6795 100644 --- a/weed/filesys/wfs_deletion.go +++ b/weed/filesys/wfs_deletion.go @@ -68,7 +68,7 @@ func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.Se } for _, loc := range locations.Locations { lr.Locations = append(lr.Locations, operation.Location{ - Url: wfs.AdjustedUrl(loc.Url), + Url: wfs.AdjustedUrl(loc), PublicUrl: loc.PublicUrl, }) } diff --git a/weed/filesys/wfs_filer_client.go b/weed/filesys/wfs_filer_client.go index 736df3588..8c30de3c5 100644 --- a/weed/filesys/wfs_filer_client.go +++ b/weed/filesys/wfs_filer_client.go @@ -26,15 +26,15 @@ func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) erro } -func (wfs *WFS) AdjustedUrl(hostAndPort string) string { +func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string { if !wfs.option.OutsideContainerClusterMode { - return hostAndPort + return location.Url } - commaIndex := strings.Index(hostAndPort, ":") + commaIndex := strings.Index(location.Url, ":") if commaIndex < 0 { - return hostAndPort + return location.Url } filerCommaIndex := strings.Index(wfs.option.FilerGrpcAddress, ":") - return fmt.Sprintf("%s:%s", wfs.option.FilerGrpcAddress[:filerCommaIndex], hostAndPort[commaIndex+1:]) + return fmt.Sprintf("%s:%s", wfs.option.FilerGrpcAddress[:filerCommaIndex], location.Url[commaIndex+1:]) } diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go index fec33e4ab..e7db31203 100644 --- a/weed/filesys/wfs_write.go +++ b/weed/filesys/wfs_write.go @@ -38,8 +38,12 @@ func (wfs *WFS) saveDataAsChunk(dir string) filer.SaveDataAsChunkFunctionType { return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) } - fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) - host = wfs.AdjustedUrl(host) + fileId, auth = resp.FileId, security.EncodedJwt(resp.Auth) + loc := &filer_pb.Location{ + Url: resp.Url, + PublicUrl: resp.PublicUrl, + } + host = wfs.AdjustedUrl(loc) collection, replication = resp.Collection, resp.Replication return nil diff --git a/weed/messaging/broker/broker_append.go b/weed/messaging/broker/broker_append.go index 80f107e00..8e5b56fd0 100644 --- a/weed/messaging/broker/broker_append.go +++ b/weed/messaging/broker/broker_append.go @@ -108,6 +108,6 @@ func (broker *MessageBroker) WithFilerClient(fn func(filer_pb.SeaweedFilerClient } -func (broker *MessageBroker) AdjustedUrl(hostAndPort string) string { - return hostAndPort +func (broker *MessageBroker) AdjustedUrl(location *filer_pb.Location) string { + return location.Url } diff --git a/weed/pb/filer_pb/filer_client.go b/weed/pb/filer_pb/filer_client.go index 4eacfa2e5..96a716d5b 100644 --- a/weed/pb/filer_pb/filer_client.go +++ b/weed/pb/filer_pb/filer_client.go @@ -21,7 +21,7 @@ var ( type FilerClient interface { WithFilerClient(fn func(SeaweedFilerClient) error) error - AdjustedUrl(hostAndPort string) string + AdjustedUrl(location *Location) string } func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry, err error) { diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index d33669447..98330da6b 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -124,6 +124,6 @@ func (fs *FilerSink) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) }, fs.grpcAddress, fs.grpcDialOption) } -func (fs *FilerSink) AdjustedUrl(hostAndPort string) string { - return hostAndPort +func (fs *FilerSink) AdjustedUrl(location *filer_pb.Location) string { + return location.Url } diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index c3ef8835c..ff4f2eb26 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -111,8 +111,8 @@ func (fs *FilerSource) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) erro } -func (fs *FilerSource) AdjustedUrl(hostAndPort string) string { - return hostAndPort +func (fs *FilerSource) AdjustedUrl(location *filer_pb.Location) string { + return location.Url } func volumeId(fileId string) string { diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go index fa706cd1c..6935c75bd 100644 --- a/weed/s3api/s3api_handlers.go +++ b/weed/s3api/s3api_handlers.go @@ -50,8 +50,8 @@ func (s3a *S3ApiServer) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) err }, s3a.option.FilerGrpcAddress, s3a.option.GrpcDialOption) } -func (s3a *S3ApiServer) AdjustedUrl(hostAndPort string) string { - return hostAndPort +func (s3a *S3ApiServer) AdjustedUrl(location *filer_pb.Location) string { + return location.Url } // If none of the http routes match respond with MethodNotAllowed diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index f13e73a7b..ee5fa5f65 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -118,8 +118,8 @@ func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption) } -func (fs *WebDavFileSystem) AdjustedUrl(hostAndPort string) string { - return hostAndPort +func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string { + return location.Url } func clearName(name string) (string, error) { diff --git a/weed/shell/commands.go b/weed/shell/commands.go index f61ed9f82..1a937ad53 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -102,8 +102,8 @@ func (ce *CommandEnv) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error } -func (ce *CommandEnv) AdjustedUrl(hostAndPort string) string { - return hostAndPort +func (ce *CommandEnv) AdjustedUrl(location *filer_pb.Location) string { + return location.Url } func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) { From 7704469d294c20df46d1cbd418754dfa9ace0add Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 11 Oct 2020 20:42:15 -0700 Subject: [PATCH 02/18] mount: outsideContainerClusterMode changed to use volume server publicUrl --- weed/command/mount.go | 8 +------- weed/filesys/wfs_filer_client.go | 15 +++------------ 2 files changed, 4 insertions(+), 19 deletions(-) diff --git a/weed/command/mount.go b/weed/command/mount.go index 42a79bd1f..f9700679d 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -48,7 +48,7 @@ func init() { mountOptions.allowOthers = cmdMount.Flag.Bool("allowOthers", true, "allows other users to access the file system") mountOptions.umaskString = cmdMount.Flag.String("umask", "022", "octal umask, e.g., 022, 0111") mountOptions.nonempty = cmdMount.Flag.Bool("nonempty", false, "allows the mounting over a non-empty directory") - mountOptions.outsideContainerClusterMode = cmdMount.Flag.Bool("outsideContainerClusterMode", false, "allows other users to access the file system") + mountOptions.outsideContainerClusterMode = cmdMount.Flag.Bool("outsideContainerClusterMode", false, "allows other users to access volume servers with publicUrl") mountOptions.uidMap = cmdMount.Flag.String("map.uid", "", "map local uid to uid on filer, comma-separated :") mountOptions.gidMap = cmdMount.Flag.String("map.gid", "", "map local gid to gid on filer, comma-separated :") @@ -72,11 +72,5 @@ var cmdMount = &Command{ On OS X, it requires OSXFUSE (http://osxfuse.github.com/). - If the SeaweedFS system runs in a container cluster, e.g. managed by kubernetes or docker compose, - the volume servers are not accessible by their own ip addresses. - In "outsideContainerClusterMode", the mount will use the filer ip address instead, assuming: - * All volume server containers are accessible through the same hostname or IP address as the filer. - * All volume server container ports are open external to the cluster. - `, } diff --git a/weed/filesys/wfs_filer_client.go b/weed/filesys/wfs_filer_client.go index 8c30de3c5..096ee555f 100644 --- a/weed/filesys/wfs_filer_client.go +++ b/weed/filesys/wfs_filer_client.go @@ -1,9 +1,6 @@ package filesys import ( - "fmt" - "strings" - "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/pb" @@ -27,14 +24,8 @@ func (wfs *WFS) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) erro } func (wfs *WFS) AdjustedUrl(location *filer_pb.Location) string { - if !wfs.option.OutsideContainerClusterMode { - return location.Url - } - commaIndex := strings.Index(location.Url, ":") - if commaIndex < 0 { - return location.Url + if wfs.option.OutsideContainerClusterMode { + return location.PublicUrl } - filerCommaIndex := strings.Index(wfs.option.FilerGrpcAddress, ":") - return fmt.Sprintf("%s:%s", wfs.option.FilerGrpcAddress[:filerCommaIndex], location.Url[commaIndex+1:]) - + return location.Url } From bbd0afd37efc5b07e3b9ed109b95afe5c9eaae78 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sun, 11 Oct 2020 21:25:30 -0700 Subject: [PATCH 03/18] 2.04 --- k8s/seaweedfs/Chart.yaml | 2 +- k8s/seaweedfs/values.yaml | 2 +- weed/util/constants.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/k8s/seaweedfs/Chart.yaml b/k8s/seaweedfs/Chart.yaml index 843e68e46..3425b79fa 100644 --- a/k8s/seaweedfs/Chart.yaml +++ b/k8s/seaweedfs/Chart.yaml @@ -1,4 +1,4 @@ apiVersion: v1 description: SeaweedFS name: seaweedfs -version: 2.03 \ No newline at end of file +version: 2.04 \ No newline at end of file diff --git a/k8s/seaweedfs/values.yaml b/k8s/seaweedfs/values.yaml index 82776675d..622b9b29e 100644 --- a/k8s/seaweedfs/values.yaml +++ b/k8s/seaweedfs/values.yaml @@ -4,7 +4,7 @@ global: registry: "" repository: "" imageName: chrislusf/seaweedfs - imageTag: "2.03" + imageTag: "2.04" imagePullPolicy: IfNotPresent imagePullSecrets: imagepullsecret restartPolicy: Always diff --git a/weed/util/constants.go b/weed/util/constants.go index 0f3fd52c7..3b222fbaa 100644 --- a/weed/util/constants.go +++ b/weed/util/constants.go @@ -5,7 +5,7 @@ import ( ) var ( - VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 03) + VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 04) COMMIT = "" ) From 253323ce6d140c9b9a0040c9e3b1b25ac6452dac Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 12 Oct 2020 11:13:38 -0700 Subject: [PATCH 04/18] Hadoop 1.5.0 --- other/java/client/pom.xml | 2 +- other/java/client/pom.xml.deploy | 2 +- other/java/client/pom_debug.xml | 2 +- other/java/hdfs2/dependency-reduced-pom.xml | 2 +- other/java/hdfs2/pom.xml | 2 +- other/java/hdfs3/dependency-reduced-pom.xml | 2 +- other/java/hdfs3/pom.xml | 2 +- 7 files changed, 7 insertions(+), 7 deletions(-) diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index 01ac3dcfc..f5fb65143 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.4.9 + 1.5.0 org.sonatype.oss diff --git a/other/java/client/pom.xml.deploy b/other/java/client/pom.xml.deploy index 01ac3dcfc..f5fb65143 100644 --- a/other/java/client/pom.xml.deploy +++ b/other/java/client/pom.xml.deploy @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.4.9 + 1.5.0 org.sonatype.oss diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml index 14281f7a3..4cc16db84 100644 --- a/other/java/client/pom_debug.xml +++ b/other/java/client/pom_debug.xml @@ -5,7 +5,7 @@ com.github.chrislusf seaweedfs-client - 1.4.9 + 1.5.0 org.sonatype.oss diff --git a/other/java/hdfs2/dependency-reduced-pom.xml b/other/java/hdfs2/dependency-reduced-pom.xml index 966abd1c7..78fc3ca32 100644 --- a/other/java/hdfs2/dependency-reduced-pom.xml +++ b/other/java/hdfs2/dependency-reduced-pom.xml @@ -301,7 +301,7 @@ - 1.4.9 + 1.5.0 2.9.2 diff --git a/other/java/hdfs2/pom.xml b/other/java/hdfs2/pom.xml index ede70eea4..b89664f76 100644 --- a/other/java/hdfs2/pom.xml +++ b/other/java/hdfs2/pom.xml @@ -5,7 +5,7 @@ 4.0.0 - 1.4.9 + 1.5.0 2.9.2 diff --git a/other/java/hdfs3/dependency-reduced-pom.xml b/other/java/hdfs3/dependency-reduced-pom.xml index 9ac40b48b..47456c6f6 100644 --- a/other/java/hdfs3/dependency-reduced-pom.xml +++ b/other/java/hdfs3/dependency-reduced-pom.xml @@ -309,7 +309,7 @@ - 1.4.9 + 1.5.0 3.1.1 diff --git a/other/java/hdfs3/pom.xml b/other/java/hdfs3/pom.xml index d1a5d8e0f..825e80e40 100644 --- a/other/java/hdfs3/pom.xml +++ b/other/java/hdfs3/pom.xml @@ -5,7 +5,7 @@ 4.0.0 - 1.4.9 + 1.5.0 3.1.1 From b57d435abee18cfa5d4fdbe1d2f143e8d62535c3 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 12 Oct 2020 18:14:12 +0000 Subject: [PATCH 05/18] Bump junit from 3.8.1 to 4.13.1 in /other/java/s3copier Bumps [junit](https://github.com/junit-team/junit4) from 3.8.1 to 4.13.1. - [Release notes](https://github.com/junit-team/junit4/releases) - [Changelog](https://github.com/junit-team/junit4/blob/main/doc/ReleaseNotes4.13.1.md) - [Commits](https://github.com/junit-team/junit4/commits/r4.13.1) Signed-off-by: dependabot[bot] --- other/java/s3copier/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/other/java/s3copier/pom.xml b/other/java/s3copier/pom.xml index f8cb9e91c..c3ff30932 100644 --- a/other/java/s3copier/pom.xml +++ b/other/java/s3copier/pom.xml @@ -28,7 +28,7 @@ junit junit - 3.8.1 + 4.13.1 test From 648044f06716d9ea21d9f36a330c8594b1ac37fd Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 12 Oct 2020 18:14:15 +0000 Subject: [PATCH 06/18] Bump junit from 4.12 to 4.13.1 in /test/random_access Bumps [junit](https://github.com/junit-team/junit4) from 4.12 to 4.13.1. - [Release notes](https://github.com/junit-team/junit4/releases) - [Changelog](https://github.com/junit-team/junit4/blob/main/doc/ReleaseNotes4.12.md) - [Commits](https://github.com/junit-team/junit4/compare/r4.12...r4.13.1) Signed-off-by: dependabot[bot] --- test/random_access/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/random_access/pom.xml b/test/random_access/pom.xml index 6c5c90eea..44a3fd9df 100644 --- a/test/random_access/pom.xml +++ b/test/random_access/pom.xml @@ -25,7 +25,7 @@ junit junit - 4.12 + 4.13.1 test From 1e07a0b791ca87bf7e00a9e9b581bdc15844afb5 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 12 Oct 2020 18:14:17 +0000 Subject: [PATCH 07/18] Bump junit from 4.12 to 4.13.1 in /other/java/client Bumps [junit](https://github.com/junit-team/junit4) from 4.12 to 4.13.1. - [Release notes](https://github.com/junit-team/junit4/releases) - [Changelog](https://github.com/junit-team/junit4/blob/main/doc/ReleaseNotes4.12.md) - [Commits](https://github.com/junit-team/junit4/compare/r4.12...r4.13.1) Signed-off-by: dependabot[bot] --- other/java/client/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index f5fb65143..9fbc074ec 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -65,7 +65,7 @@ junit junit - 4.12 + 4.13.1 test From 4e31ca9507a9be45bd5f5897fc1e4be59fca778d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 12 Oct 2020 11:37:15 -0700 Subject: [PATCH 08/18] upgrade junit to 4.13.1 --- other/java/client/pom.xml | 2 +- other/java/client/pom_debug.xml | 2 +- test/random_access/pom.xml | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/other/java/client/pom.xml b/other/java/client/pom.xml index f5fb65143..9fbc074ec 100644 --- a/other/java/client/pom.xml +++ b/other/java/client/pom.xml @@ -65,7 +65,7 @@ junit junit - 4.12 + 4.13.1 test diff --git a/other/java/client/pom_debug.xml b/other/java/client/pom_debug.xml index 4cc16db84..9f96294de 100644 --- a/other/java/client/pom_debug.xml +++ b/other/java/client/pom_debug.xml @@ -65,7 +65,7 @@ junit junit - 4.12 + 4.13.1 test diff --git a/test/random_access/pom.xml b/test/random_access/pom.xml index 6c5c90eea..44a3fd9df 100644 --- a/test/random_access/pom.xml +++ b/test/random_access/pom.xml @@ -25,7 +25,7 @@ junit junit - 4.12 + 4.13.1 test From f022aff289d509e93a93a0e8386693876d1be240 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 12 Oct 2020 12:26:25 -0700 Subject: [PATCH 09/18] add back http.StatusPartialContent revert https://github.com/chrislusf/seaweedfs/commit/e7c04af1d061ae0f09e044fd41969110f472447e --- weed/server/common.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/weed/server/common.go b/weed/server/common.go index 75fc3ad9e..44098a4b5 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -274,7 +274,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 ra := ranges[0] w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10)) w.Header().Set("Content-Range", ra.contentRange(totalSize)) - // w.WriteHeader(http.StatusPartialContent) + w.WriteHeader(http.StatusPartialContent) err = writeFn(w, ra.start, ra.length) if err != nil { @@ -315,7 +315,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 if w.Header().Get("Content-Encoding") == "" { w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10)) } - // w.WriteHeader(http.StatusPartialContent) + w.WriteHeader(http.StatusPartialContent) if _, err := io.CopyN(w, sendContent, sendSize); err != nil { http.Error(w, "Internal Error", http.StatusInternalServerError) return From b18f21cce178b60531086f164d24e832a7b6eb86 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 12 Oct 2020 21:58:37 -0700 Subject: [PATCH 10/18] mount: fix bound tree with filer.path fix https://github.com/chrislusf/seaweedfs/issues/1528 --- weed/Makefile | 2 +- weed/filesys/meta_cache/meta_cache.go | 4 ++-- weed/filesys/wfs.go | 2 +- weed/util/bounded_tree/bounded_tree.go | 16 +++++++++++++--- weed/util/bounded_tree/bounded_tree_test.go | 4 ++-- 5 files changed, 19 insertions(+), 9 deletions(-) diff --git a/weed/Makefile b/weed/Makefile index f537fe051..0e2d29623 100644 --- a/weed/Makefile +++ b/weed/Makefile @@ -16,7 +16,7 @@ debug_shell: debug_mount: go build -gcflags="all=-N -l" - dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- mount -dir=~/tmp/mm + dlv --listen=:2345 --headless=true --api-version=2 --accept-multiclient exec weed -- mount -dir=~/tmp/mm -cacheCapacityMB=0 -filer.path=/buckets debug_server: go build -gcflags="all=-N -l" diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go index bb81d6d27..247f0ce81 100644 --- a/weed/filesys/meta_cache/meta_cache.go +++ b/weed/filesys/meta_cache/meta_cache.go @@ -22,10 +22,10 @@ type MetaCache struct { uidGidMapper *UidGidMapper } -func NewMetaCache(dbFolder string, uidGidMapper *UidGidMapper) *MetaCache { +func NewMetaCache(dbFolder string, baseDir util.FullPath, uidGidMapper *UidGidMapper) *MetaCache { return &MetaCache{ localStore: openMetaStore(dbFolder), - visitedBoundary: bounded_tree.NewBoundedTree(), + visitedBoundary: bounded_tree.NewBoundedTree(baseDir), uidGidMapper: uidGidMapper, } } diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 57b4c3da5..265fc95a8 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -92,7 +92,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { wfs.chunkCache = chunk_cache.NewTieredChunkCache(256, cacheDir, option.CacheSizeMB, 1024*1024) } - wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), option.UidGidMapper) + wfs.metaCache = meta_cache.NewMetaCache(path.Join(cacheDir, "meta"), util.FullPath(option.FilerMountRootPath), option.UidGidMapper) startTime := time.Now() go meta_cache.SubscribeMetaEvents(wfs.metaCache, wfs.signature, wfs, wfs.option.FilerMountRootPath, startTime.UnixNano()) grace.OnInterrupt(func() { diff --git a/weed/util/bounded_tree/bounded_tree.go b/weed/util/bounded_tree/bounded_tree.go index 0e023c0d1..0182aeba1 100644 --- a/weed/util/bounded_tree/bounded_tree.go +++ b/weed/util/bounded_tree/bounded_tree.go @@ -16,13 +16,15 @@ type Node struct { type BoundedTree struct { root *Node sync.RWMutex + baseDir util.FullPath } -func NewBoundedTree() *BoundedTree { +func NewBoundedTree(baseDir util.FullPath) *BoundedTree { return &BoundedTree{ root: &Node{ Name: "/", }, + baseDir: baseDir, } } @@ -39,9 +41,12 @@ func (t *BoundedTree) EnsureVisited(p util.FullPath, visitFn VisitNodeFunc) { if t.root == nil { return } + if t.baseDir != "/" { + p = p[len(t.baseDir):] + } components := p.Split() // fmt.Printf("components %v %d\n", components, len(components)) - if canDelete := t.ensureVisited(t.root, util.FullPath("/"), components, 0, visitFn); canDelete { + if canDelete := t.ensureVisited(t.root, t.baseDir, components, 0, visitFn); canDelete { t.root = nil } } @@ -60,7 +65,12 @@ func (t *BoundedTree) ensureVisited(n *Node, currentPath util.FullPath, componen } else { // fmt.Printf("ensure %v\n", currentPath) - children, err := visitFn(currentPath) + filerPath := currentPath + if t.baseDir != "/" { + filerPath = t.baseDir + filerPath + } + + children, err := visitFn(filerPath) if err != nil { glog.V(0).Infof("failed to visit %s: %v", currentPath, err) return diff --git a/weed/util/bounded_tree/bounded_tree_test.go b/weed/util/bounded_tree/bounded_tree_test.go index 0b9c3177a..465f1cc9c 100644 --- a/weed/util/bounded_tree/bounded_tree_test.go +++ b/weed/util/bounded_tree/bounded_tree_test.go @@ -52,7 +52,7 @@ func TestBoundedTree(t *testing.T) { // g // h - tree := NewBoundedTree() + tree := NewBoundedTree(util.FullPath("/")) tree.EnsureVisited(util.FullPath("/a/b/c"), visitFn) @@ -100,7 +100,7 @@ func TestEmptyBoundedTree(t *testing.T) { // g // h - tree := NewBoundedTree() + tree := NewBoundedTree(util.FullPath("/")) visitFn := func(path util.FullPath) (childDirectories []string, err error) { fmt.Printf(" visit %v ...\n", path) From 3f7d1d1bf146eaeac3ddcd96e3d8de088bdf97ce Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 13 Oct 2020 00:29:46 -0700 Subject: [PATCH 11/18] Only wait on retryable requests --- .../repeated_vacuum/repeated_vacuum.go | 2 +- weed/command/benchmark.go | 2 +- weed/filer/filechunk_manifest.go | 8 +++- weed/filer/stream.go | 6 ++- .../replication/repl_util/replication_utli.go | 2 +- weed/util/http_util.go | 38 ++++++++++--------- 6 files changed, 34 insertions(+), 24 deletions(-) diff --git a/unmaintained/repeated_vacuum/repeated_vacuum.go b/unmaintained/repeated_vacuum/repeated_vacuum.go index 12ac42dbe..bff5becc1 100644 --- a/unmaintained/repeated_vacuum/repeated_vacuum.go +++ b/unmaintained/repeated_vacuum/repeated_vacuum.go @@ -32,7 +32,7 @@ func main() { go func() { for { println("vacuum threshold", *garbageThreshold) - _, err := util.Get(fmt.Sprintf("http://%s/vol/vacuum?garbageThreshold=%f", *master, *garbageThreshold)) + _, _, err := util.Get(fmt.Sprintf("http://%s/vol/vacuum?garbageThreshold=%f", *master, *garbageThreshold)) if err != nil { log.Fatalf("vacuum: %v", err) } diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index 8bb585d91..e241a904e 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -290,7 +290,7 @@ func readFiles(fileIdLineChan chan string, s *stat) { } var bytes []byte for _, url := range urls { - bytes, err = util.Get(url) + bytes, _, err = util.Get(url) if err == nil { break } diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index 2df8a4bbf..271d5e5ee 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -97,12 +97,16 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool var err error var buffer bytes.Buffer + var shouldRetry bool for waitTime := time.Second; waitTime < ReadWaitTime; waitTime += waitTime / 2 { for _, urlString := range urlStrings { - err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { + shouldRetry, err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, isFullChunk, offset, size, func(data []byte) { buffer.Write(data) }) + if !shouldRetry { + break + } if err != nil { glog.V(0).Infof("read %s failed, err: %v", urlString, err) buffer.Reset() @@ -110,7 +114,7 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool break } } - if err != nil { + if err != nil && shouldRetry{ glog.V(0).Infof("sleep for %v before retrying reading", waitTime) time.Sleep(waitTime) } else { diff --git a/weed/filer/stream.go b/weed/filer/stream.go index a41aebe22..1e1d5c7f3 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -174,10 +174,14 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { return err } var buffer bytes.Buffer + var shouldRetry bool for _, urlString := range urlStrings { - err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { + shouldRetry, err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { buffer.Write(data) }) + if !shouldRetry { + break + } if err != nil { glog.V(1).Infof("read %s failed, err: %v", chunkView.FileId, err) buffer.Reset() diff --git a/weed/replication/repl_util/replication_utli.go b/weed/replication/repl_util/replication_utli.go index 9b18275b5..42777f4ad 100644 --- a/weed/replication/repl_util/replication_utli.go +++ b/weed/replication/repl_util/replication_utli.go @@ -19,7 +19,7 @@ func CopyFromChunkViews(chunkViews []*filer.ChunkView, filerSource *source.Filer var writeErr error for _, fileUrl := range fileUrls { - err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { + _, err = util.ReadUrlAsStream(fileUrl+"?readDeleted=true", nil, false, chunk.IsFullChunk(), chunk.Offset, int(chunk.Size), func(data []byte) { writeErr = writeFunc(data) }) if err != nil { diff --git a/weed/util/http_util.go b/weed/util/http_util.go index eef24b930..da0b3d849 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -67,14 +67,14 @@ func Post(url string, values url.Values) ([]byte, error) { // github.com/chrislusf/seaweedfs/unmaintained/repeated_vacuum/repeated_vacuum.go // may need increasing http.Client.Timeout -func Get(url string) ([]byte, error) { +func Get(url string) ([]byte, bool, error) { request, err := http.NewRequest("GET", url, nil) request.Header.Add("Accept-Encoding", "gzip") response, err := client.Do(request) if err != nil { - return nil, err + return nil, true, err } defer response.Body.Close() @@ -89,12 +89,13 @@ func Get(url string) ([]byte, error) { b, err := ioutil.ReadAll(reader) if response.StatusCode >= 400 { - return nil, fmt.Errorf("%s: %s", url, response.Status) + retryable := response.StatusCode >= 500 + return nil, retryable, fmt.Errorf("%s: %s", url, response.Status) } if err != nil { - return nil, err + return nil, false, err } - return b, nil + return b, false, nil } func Head(url string) (http.Header, error) { @@ -207,7 +208,7 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC if cipherKey != nil { var n int - err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) { + _, err := readEncryptedUrl(fileUrl, cipherKey, isContentCompressed, isFullChunk, offset, size, func(data []byte) { n = copy(buf, data) }) return int64(n), err @@ -272,7 +273,7 @@ func ReadUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullC return n, err } -func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error { +func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (retryable bool, err error) { if cipherKey != nil { return readEncryptedUrl(fileUrl, cipherKey, isContentGzipped, isFullChunk, offset, size, fn) @@ -280,7 +281,7 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is req, err := http.NewRequest("GET", fileUrl, nil) if err != nil { - return err + return false, err } if isFullChunk { @@ -291,11 +292,12 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is r, err := client.Do(req) if err != nil { - return err + return true, err } defer CloseResponse(r) if r.StatusCode >= 400 { - return fmt.Errorf("%s: %s", fileUrl, r.Status) + retryable = r.StatusCode >= 500 + return retryable, fmt.Errorf("%s: %s", fileUrl, r.Status) } var reader io.ReadCloser @@ -317,23 +319,23 @@ func ReadUrlAsStream(fileUrl string, cipherKey []byte, isContentGzipped bool, is m, err = reader.Read(buf) fn(buf[:m]) if err == io.EOF { - return nil + return false, nil } if err != nil { - return err + return false, err } } } -func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) error { - encryptedData, err := Get(fileUrl) +func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool, isFullChunk bool, offset int64, size int, fn func(data []byte)) (bool, error) { + encryptedData, retryable, err := Get(fileUrl) if err != nil { - return fmt.Errorf("fetch %s: %v", fileUrl, err) + return retryable, fmt.Errorf("fetch %s: %v", fileUrl, err) } decryptedData, err := Decrypt(encryptedData, CipherKey(cipherKey)) if err != nil { - return fmt.Errorf("decrypt %s: %v", fileUrl, err) + return false, fmt.Errorf("decrypt %s: %v", fileUrl, err) } if isContentCompressed { decryptedData, err = DecompressData(decryptedData) @@ -342,14 +344,14 @@ func readEncryptedUrl(fileUrl string, cipherKey []byte, isContentCompressed bool } } if len(decryptedData) < int(offset)+size { - return fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size) + return false, fmt.Errorf("read decrypted %s size %d [%d, %d)", fileUrl, len(decryptedData), offset, int(offset)+size) } if isFullChunk { fn(decryptedData) } else { fn(decryptedData[int(offset) : int(offset)+size]) } - return nil + return false, nil } func ReadUrlAsReaderCloser(fileUrl string, rangeHeader string) (io.ReadCloser, error) { From 9b4f7fed14aa23115b896cc19c909b6c8f8db336 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 13 Oct 2020 11:21:13 -0700 Subject: [PATCH 12/18] mount: report filer IO error related to https://github.com/chrislusf/seaweedfs/issues/1530 --- weed/filesys/dir.go | 11 +++++++-- weed/filesys/meta_cache/meta_cache.go | 6 +++++ weed/filesys/meta_cache/meta_cache_init.go | 4 ++-- weed/util/bounded_tree/bounded_tree.go | 26 ++++++++++++++-------- 4 files changed, 34 insertions(+), 13 deletions(-) diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 4dede3a8b..ae2ae3418 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -234,7 +234,11 @@ func (dir *Dir) Lookup(ctx context.Context, req *fuse.LookupRequest, resp *fuse. fullFilePath := util.NewFullPath(dir.FullPath(), req.Name) dirPath := util.FullPath(dir.FullPath()) - meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, util.FullPath(dirPath)) + visitErr := meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath) + if visitErr != nil { + glog.Errorf("dir Lookup %s: %v", dirPath, visitErr) + return nil, fuse.EIO + } cachedEntry, cacheErr := dir.wfs.metaCache.FindEntry(context.Background(), fullFilePath) if cacheErr == filer_pb.ErrNotFound { return nil, fuse.ENOENT @@ -296,7 +300,10 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) { } dirPath := util.FullPath(dir.FullPath()) - meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath) + if err = meta_cache.EnsureVisited(dir.wfs.metaCache, dir.wfs, dirPath); err != nil { + glog.Errorf("dir ReadDirAll %s: %v", dirPath, err) + return nil, fuse.EIO + } listedEntries, listErr := dir.wfs.metaCache.ListDirectoryEntries(context.Background(), util.FullPath(dir.FullPath()), "", false, int(math.MaxInt32)) if listErr != nil { glog.Errorf("list meta cache: %v", listErr) diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go index 247f0ce81..5ae4c1440 100644 --- a/weed/filesys/meta_cache/meta_cache.go +++ b/weed/filesys/meta_cache/meta_cache.go @@ -2,6 +2,7 @@ package meta_cache import ( "context" + "fmt" "os" "sync" @@ -116,6 +117,11 @@ func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.Full mc.RLock() defer mc.RUnlock() + + if !mc.visitedBoundary.HasVisited(dirPath) { + return nil, fmt.Errorf("unsynchronized dir: %v", dirPath) + } + entries, err := mc.localStore.ListDirectoryEntries(ctx, dirPath, startFileName, includeStartFile, limit) if err != nil { return nil, err diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go index 455a8772c..3e1719224 100644 --- a/weed/filesys/meta_cache/meta_cache_init.go +++ b/weed/filesys/meta_cache/meta_cache_init.go @@ -10,9 +10,9 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) { +func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) error { - mc.visitedBoundary.EnsureVisited(dirPath, func(path util.FullPath) (childDirectories []string, err error) { + return mc.visitedBoundary.EnsureVisited(dirPath, func(path util.FullPath) (childDirectories []string, err error) { glog.V(4).Infof("ReadDirAllEntries %s ...", path) diff --git a/weed/util/bounded_tree/bounded_tree.go b/weed/util/bounded_tree/bounded_tree.go index 0182aeba1..7d4dfb99a 100644 --- a/weed/util/bounded_tree/bounded_tree.go +++ b/weed/util/bounded_tree/bounded_tree.go @@ -34,7 +34,7 @@ type VisitNodeFunc func(path util.FullPath) (childDirectories []string, err erro // No action if the directory has been visited before or does not exist. // A leaf node, which has no children, represents a directory not visited. // A non-leaf node or a non-existing node represents a directory already visited, or does not need to visit. -func (t *BoundedTree) EnsureVisited(p util.FullPath, visitFn VisitNodeFunc) { +func (t *BoundedTree) EnsureVisited(p util.FullPath, visitFn VisitNodeFunc) (visitErr error){ t.Lock() defer t.Unlock() @@ -46,12 +46,17 @@ func (t *BoundedTree) EnsureVisited(p util.FullPath, visitFn VisitNodeFunc) { } components := p.Split() // fmt.Printf("components %v %d\n", components, len(components)) - if canDelete := t.ensureVisited(t.root, t.baseDir, components, 0, visitFn); canDelete { + canDelete, err := t.ensureVisited(t.root, t.baseDir, components, 0, visitFn) + if err != nil { + return err + } + if canDelete { t.root = nil } + return nil } -func (t *BoundedTree) ensureVisited(n *Node, currentPath util.FullPath, components []string, i int, visitFn VisitNodeFunc) (canDeleteNode bool) { +func (t *BoundedTree) ensureVisited(n *Node, currentPath util.FullPath, components []string, i int, visitFn VisitNodeFunc) (canDeleteNode bool, visitErr error) { // println("ensureVisited", currentPath, i) @@ -73,12 +78,12 @@ func (t *BoundedTree) ensureVisited(n *Node, currentPath util.FullPath, componen children, err := visitFn(filerPath) if err != nil { glog.V(0).Infof("failed to visit %s: %v", currentPath, err) - return + return false, err } if len(children) == 0 { // fmt.Printf(" canDelete %v without children\n", currentPath) - return true + return true, nil } n.Children = make(map[string]*Node) @@ -103,19 +108,22 @@ func (t *BoundedTree) ensureVisited(n *Node, currentPath util.FullPath, componen } // fmt.Printf(" ensureVisited %v %v\n", currentPath, toVisitNode.Name) - - if canDelete := t.ensureVisited(toVisitNode, currentPath.Child(components[i]), components, i+1, visitFn); canDelete { + canDelete, childVisitErr := t.ensureVisited(toVisitNode, currentPath.Child(components[i]), components, i+1, visitFn) + if childVisitErr != nil { + return false, childVisitErr + } + if canDelete { // fmt.Printf(" delete %v %v\n", currentPath, components[i]) delete(n.Children, components[i]) if len(n.Children) == 0 { // fmt.Printf(" canDelete %v\n", currentPath) - return true + return true, nil } } - return false + return false, nil } From aac4cb1f0ccead575c92f9df44820b16b487051b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 13 Oct 2020 13:53:34 -0700 Subject: [PATCH 13/18] adds errror on read and write --- weed/filer/stream.go | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 1e1d5c7f3..363b07f14 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -2,6 +2,7 @@ package filer import ( "bytes" + "fmt" "io" "math" "strings" @@ -35,10 +36,14 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f data, err := retriedFetchChunkData(urlStrings, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size)) if err != nil { - return err + glog.Errorf("read chunk: %v", err) + return fmt.Errorf("read chunk: %v", err) + } + _, err = w.Write(data) + if err != nil { + glog.Errorf("write chunk: %v", err) + return fmt.Errorf("write chunk: %v", err) } - w.Write(data) - } return nil From c127da12194e5b6e8a51d9b51a9ed9700ba9ac0b Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 13 Oct 2020 14:04:46 -0700 Subject: [PATCH 14/18] filer: linearize timeout for large chunk of data --- weed/util/net_timeout.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/weed/util/net_timeout.go b/weed/util/net_timeout.go index f057a8f5b..e8075c297 100644 --- a/weed/util/net_timeout.go +++ b/weed/util/net_timeout.go @@ -54,7 +54,8 @@ func (c *Conn) Read(b []byte) (count int, e error) { func (c *Conn) Write(b []byte) (count int, e error) { if c.WriteTimeout != 0 { - err := c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout)) + // minimum 4KB/s + err := c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout * time.Duration(len(b)/40000+1))) if err != nil { return 0, err } From 28d4e1a51ba624fd4f5dbfbf386a6301de64b559 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 13 Oct 2020 19:49:52 -0700 Subject: [PATCH 15/18] mount: retry for directory listing with filer related to https://github.com/chrislusf/seaweedfs/issues/1530 --- weed/filesys/meta_cache/meta_cache_init.go | 31 +++++++++++++++------- 1 file changed, 21 insertions(+), 10 deletions(-) diff --git a/weed/filesys/meta_cache/meta_cache_init.go b/weed/filesys/meta_cache/meta_cache_init.go index 3e1719224..f42d61230 100644 --- a/weed/filesys/meta_cache/meta_cache_init.go +++ b/weed/filesys/meta_cache/meta_cache_init.go @@ -3,6 +3,8 @@ package meta_cache import ( "context" "fmt" + "strings" + "time" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" @@ -16,19 +18,28 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full glog.V(4).Infof("ReadDirAllEntries %s ...", path) - err = filer_pb.ReadDirAllEntries(client, dirPath, "", func(pbEntry *filer_pb.Entry, isLast bool) error { - entry := filer.FromPbEntry(string(dirPath), pbEntry) - if err := mc.doInsertEntry(context.Background(), entry); err != nil { - glog.V(0).Infof("read %s: %v", entry.FullPath, err) - return err + for waitTime := time.Second; waitTime < filer.ReadWaitTime; waitTime += waitTime / 2 { + err = filer_pb.ReadDirAllEntries(client, dirPath, "", func(pbEntry *filer_pb.Entry, isLast bool) error { + entry := filer.FromPbEntry(string(dirPath), pbEntry) + if err := mc.doInsertEntry(context.Background(), entry); err != nil { + glog.V(0).Infof("read %s: %v", entry.FullPath, err) + return err + } + if entry.IsDirectory() { + childDirectories = append(childDirectories, entry.Name()) + } + return nil + }) + if err == nil { + break } - if entry.IsDirectory() { - childDirectories = append(childDirectories, entry.Name()) + if strings.Contains(err.Error(), "transport: ") { + glog.V(0).Infof("ReadDirAllEntries %s: %v. Retry in %v", path, err, waitTime) + time.Sleep(waitTime) + continue } - return nil - }) - if err != nil { err = fmt.Errorf("list %s: %v", dirPath, err) + break } return }) From 58fa5064911b8a191c30c35cbdfb259a14249292 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 13 Oct 2020 19:50:22 -0700 Subject: [PATCH 16/18] minor --- weed/filer/filechunk_manifest.go | 2 +- weed/filesys/meta_cache/meta_cache.go | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index 271d5e5ee..db1a51651 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -115,7 +115,7 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool } } if err != nil && shouldRetry{ - glog.V(0).Infof("sleep for %v before retrying reading", waitTime) + glog.V(0).Infof("retry reading in %v", waitTime) time.Sleep(waitTime) } else { break diff --git a/weed/filesys/meta_cache/meta_cache.go b/weed/filesys/meta_cache/meta_cache.go index 5ae4c1440..0dd129623 100644 --- a/weed/filesys/meta_cache/meta_cache.go +++ b/weed/filesys/meta_cache/meta_cache.go @@ -117,7 +117,6 @@ func (mc *MetaCache) ListDirectoryEntries(ctx context.Context, dirPath util.Full mc.RLock() defer mc.RUnlock() - if !mc.visitedBoundary.HasVisited(dirPath) { return nil, fmt.Errorf("unsynchronized dir: %v", dirPath) } From 0542911e297fa41c8495ea4a6f855435892db400 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 13 Oct 2020 19:50:46 -0700 Subject: [PATCH 17/18] go fmt --- weed/filer/filechunk_manifest.go | 2 +- weed/util/bounded_tree/bounded_tree.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index db1a51651..f45448a6e 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -114,7 +114,7 @@ func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool break } } - if err != nil && shouldRetry{ + if err != nil && shouldRetry { glog.V(0).Infof("retry reading in %v", waitTime) time.Sleep(waitTime) } else { diff --git a/weed/util/bounded_tree/bounded_tree.go b/weed/util/bounded_tree/bounded_tree.go index 7d4dfb99a..0e8af2520 100644 --- a/weed/util/bounded_tree/bounded_tree.go +++ b/weed/util/bounded_tree/bounded_tree.go @@ -34,7 +34,7 @@ type VisitNodeFunc func(path util.FullPath) (childDirectories []string, err erro // No action if the directory has been visited before or does not exist. // A leaf node, which has no children, represents a directory not visited. // A non-leaf node or a non-existing node represents a directory already visited, or does not need to visit. -func (t *BoundedTree) EnsureVisited(p util.FullPath, visitFn VisitNodeFunc) (visitErr error){ +func (t *BoundedTree) EnsureVisited(p util.FullPath, visitFn VisitNodeFunc) (visitErr error) { t.Lock() defer t.Unlock() From 1069b325dd92b6a1b16a20290acc8129d5e19ef8 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 13 Oct 2020 20:26:03 -0700 Subject: [PATCH 18/18] shell: volumeServer.evacuate adds printout for ec volumes --- weed/shell/command_volume_server_evacuate.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/weed/shell/command_volume_server_evacuate.go b/weed/shell/command_volume_server_evacuate.go index 214783ee1..a82454cd3 100644 --- a/weed/shell/command_volume_server_evacuate.go +++ b/weed/shell/command_volume_server_evacuate.go @@ -9,6 +9,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/super_block" "io" + "os" "sort" ) @@ -151,6 +152,11 @@ func moveAwayOneEcVolume(commandEnv *CommandEnv, ecShardInfo *master_pb.VolumeEc for i := 0; i < len(otherNodes); i++ { emptyNode := otherNodes[i] + collectionPrefix := "" + if ecShardInfo.Collection != "" { + collectionPrefix = ecShardInfo.Collection + "_" + } + fmt.Fprintf(os.Stdout, "moving ec volume %s%d.%d %s => %s\n", collectionPrefix, ecShardInfo.Id, shardId, thisNode.info.Id, emptyNode.info.Id) err = moveMountedShardToEcNode(commandEnv, thisNode, ecShardInfo.Collection, needle.VolumeId(ecShardInfo.Id), shardId, emptyNode, applyChange) if err != nil { return