From 0156e2975aa4aabe142301deb72cc2657eb79ee9 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Wed, 26 Feb 2020 16:46:01 -0800 Subject: [PATCH] mount: add mode to run external to SeaweedFS container cluster --- weed/command/mount.go | 30 ++++++++++++++++---------- weed/command/mount_std.go | 36 +++++++++++++++++--------------- weed/filer2/filer_client_util.go | 8 ++++--- weed/filesys/dirty_page.go | 1 + weed/filesys/wfs.go | 16 ++++++++++++++ weed/filesys/wfs_deletion.go | 9 ++++---- weed/server/webdav_server.go | 3 +++ weed/shell/command_fs_du.go | 3 +++ 8 files changed, 71 insertions(+), 35 deletions(-) diff --git a/weed/command/mount.go b/weed/command/mount.go index f09b285f7..4bdb3415a 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -7,17 +7,18 @@ import ( ) type MountOptions struct { - filer *string - filerMountRootPath *string - dir *string - dirListCacheLimit *int64 - collection *string - replication *string - ttlSec *int - chunkSizeLimitMB *int - dataCenter *string - allowOthers *bool - umaskString *string + filer *string + filerMountRootPath *string + dir *string + dirListCacheLimit *int64 + collection *string + replication *string + ttlSec *int + chunkSizeLimitMB *int + dataCenter *string + allowOthers *bool + umaskString *string + outsideContainerClusterMode *bool } var ( @@ -41,6 +42,7 @@ func init() { mountOptions.umaskString = cmdMount.Flag.String("umask", "022", "octal umask, e.g., 022, 0111") mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file") mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file") + mountOptions.outsideContainerClusterMode = cmdMount.Flag.Bool("outsideContainerClusterMode", false, "allows other users to access the file system") } var cmdMount = &Command{ @@ -58,6 +60,12 @@ var cmdMount = &Command{ On OS X, it requires OSXFUSE (http://osxfuse.github.com/). + If the SeaweedFS systemm runs in a container cluster, e.g. managed by kubernetes or docker compose, + the volume servers are not accessible by their own ip addresses. + In "outsideContainerClusterMode", the mount will use the filer ip address instead, assuming: + * All volume server containers are accessible through the same hostname or IP address as the filer. + * All volume server container ports are open external to the cluster. + `, } diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index e26b7c3f5..e8e3fb030 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -46,11 +46,12 @@ func runMount(cmd *Command, args []string) bool { *mountOptions.ttlSec, *mountOptions.dirListCacheLimit, os.FileMode(umask), + *mountOptions.outsideContainerClusterMode, ) } func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCenter string, chunkSizeLimitMB int, - allowOthers bool, ttlSec int, dirListCacheLimit int64, umask os.FileMode) bool { + allowOthers bool, ttlSec int, dirListCacheLimit int64, umask os.FileMode, outsideContainerClusterMode bool) bool { util.LoadConfiguration("security", false) @@ -164,22 +165,23 @@ func RunMount(filer, filerMountRootPath, dir, collection, replication, dataCente daemonize.SignalOutcome(nil) err = fs.Serve(c, filesys.NewSeaweedFileSystem(&filesys.Option{ - FilerGrpcAddress: filerGrpcAddress, - GrpcDialOption: grpcDialOption, - FilerMountRootPath: mountRoot, - Collection: collection, - Replication: replication, - TtlSec: int32(ttlSec), - ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024, - DataCenter: dataCenter, - DirListCacheLimit: dirListCacheLimit, - EntryCacheTtl: 3 * time.Second, - MountUid: uid, - MountGid: gid, - MountMode: mountMode, - MountCtime: fileInfo.ModTime(), - MountMtime: time.Now(), - Umask: umask, + FilerGrpcAddress: filerGrpcAddress, + GrpcDialOption: grpcDialOption, + FilerMountRootPath: mountRoot, + Collection: collection, + Replication: replication, + TtlSec: int32(ttlSec), + ChunkSizeLimit: int64(chunkSizeLimitMB) * 1024 * 1024, + DataCenter: dataCenter, + DirListCacheLimit: dirListCacheLimit, + EntryCacheTtl: 3 * time.Second, + MountUid: uid, + MountGid: gid, + MountMode: mountMode, + MountCtime: fileInfo.ModTime(), + MountMtime: time.Now(), + Umask: umask, + OutsideContainerClusterMode: outsideContainerClusterMode, })) if err != nil { fuse.Unmount(dir) diff --git a/weed/filer2/filer_client_util.go b/weed/filer2/filer_client_util.go index 9e03d60c4..ab9db2992 100644 --- a/weed/filer2/filer_client_util.go +++ b/weed/filer2/filer_client_util.go @@ -23,6 +23,7 @@ func VolumeId(fileId string) string { type FilerClient interface { WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error + AdjustedUrl(hostAndPort string) string } func ReadIntoBuffer(filerClient FilerClient, fullFilePath FullPath, buff []byte, chunkViews []*ChunkView, baseOffset int64) (totalRead int64, err error) { @@ -67,9 +68,10 @@ func ReadIntoBuffer(filerClient FilerClient, fullFilePath FullPath, buff []byte, return } + volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url) var n int64 n, err = util.ReadUrl( - fmt.Sprintf("http://%s/%s", locations.Locations[0].Url, chunkView.FileId), + fmt.Sprintf("http://%s/%s", volumeServerAddress, chunkView.FileId), chunkView.Offset, int(chunkView.Size), buff[chunkView.LogicOffset-baseOffset:chunkView.LogicOffset-baseOffset+int64(chunkView.Size)], @@ -77,10 +79,10 @@ func ReadIntoBuffer(filerClient FilerClient, fullFilePath FullPath, buff []byte, if err != nil { - glog.V(0).Infof("%v read http://%s/%v %v bytes: %v", fullFilePath, locations.Locations[0].Url, chunkView.FileId, n, err) + glog.V(0).Infof("%v read http://%s/%v %v bytes: %v", fullFilePath, volumeServerAddress, chunkView.FileId, n, err) err = fmt.Errorf("failed to read http://%s/%s: %v", - locations.Locations[0].Url, chunkView.FileId, err) + volumeServerAddress, chunkView.FileId, err) return } diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index 9b0f96951..67e1d57ef 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -165,6 +165,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, } fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) + host = pages.f.wfs.AdjustedUrl(host) pages.collection, pages.replication = resp.Collection, resp.Replication return nil diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 8f4225fb0..aa530f6aa 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -5,6 +5,7 @@ import ( "fmt" "math" "os" + "strings" "sync" "time" @@ -37,6 +38,9 @@ type Option struct { MountMode os.FileMode MountCtime time.Time MountMtime time.Time + + // whether the mount runs outside SeaweedFS containers + OutsideContainerClusterMode bool } var _ = fs.FS(&WFS{}) @@ -247,5 +251,17 @@ func (wfs *WFS) forgetNode(fullpath filer2.FullPath) { defer wfs.nodesLock.Unlock() delete(wfs.nodes, fullpath.AsInode()) +} + +func (wfs *WFS) AdjustedUrl(hostAndPort string) string { + if !wfs.option.OutsideContainerClusterMode { + return hostAndPort + } + commaIndex := strings.Index(hostAndPort, ":") + if commaIndex < 0 { + return hostAndPort + } + filerCommaIndex := strings.Index(wfs.option.FilerGrpcAddress, ":") + return fmt.Sprintf("%s:%s", wfs.option.FilerGrpcAddress[:filerCommaIndex], hostAndPort[commaIndex+1:]) } diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go index f53e95d26..bf21b1808 100644 --- a/weed/filesys/wfs_deletion.go +++ b/weed/filesys/wfs_deletion.go @@ -3,11 +3,12 @@ package filesys import ( "context" + "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "google.golang.org/grpc" ) func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) { @@ -21,12 +22,12 @@ func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) { } wfs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { - deleteFileIds(wfs.option.GrpcDialOption, client, fileIds) + wfs.deleteFileIds(wfs.option.GrpcDialOption, client, fileIds) return nil }) } -func deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerClient, fileIds []string) error { +func (wfs *WFS) deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerClient, fileIds []string) error { var vids []string for _, fileId := range fileIds { @@ -56,7 +57,7 @@ func deleteFileIds(grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerC } for _, loc := range locations.Locations { lr.Locations = append(lr.Locations, operation.Location{ - Url: loc.Url, + Url: wfs.AdjustedUrl(loc.Url), PublicUrl: loc.PublicUrl, }) } diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 959e50128..ddd611724 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -104,6 +104,9 @@ func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption) } +func (fs *WebDavFileSystem) AdjustedUrl(hostAndPort string) string { + return hostAndPort +} func clearName(name string) (string, error) { slashed := strings.HasSuffix(name, "/") diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go index a1e21bfa6..6c31ebdff 100644 --- a/weed/shell/command_fs_du.go +++ b/weed/shell/command_fs_du.go @@ -105,3 +105,6 @@ func (env *CommandEnv) getFilerClient(filerServer string, filerPort int64) *comm func (c *commandFilerClient) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { return c.env.withFilerClient(c.filerServer, c.filerPort, fn) } +func (c *commandFilerClient) AdjustedUrl(hostAndPort string) string { + return hostAndPort +}