From 299312c8057c5b96f67a8ac825ee026fe01dd8fc Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Tue, 5 Jun 2018 23:37:41 -0700 Subject: [PATCH] use separate filer grpc port --- weed/command/filer.go | 24 ++++++++------- weed/command/filer_copy.go | 60 +++++++++++++++++++++++++------------- weed/command/mount.go | 2 ++ weed/command/mount_std.go | 23 ++++++++++++++- weed/filesys/dirty_page.go | 2 +- weed/filesys/file.go | 6 ++-- weed/filesys/wfs.go | 10 +++---- 7 files changed, 86 insertions(+), 41 deletions(-) diff --git a/weed/command/filer.go b/weed/command/filer.go index 1bd1493bd..2d4696828 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -10,7 +10,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/server" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/soheilhy/cmux" "google.golang.org/grpc" "google.golang.org/grpc/reflection" "strings" @@ -24,6 +23,7 @@ type FilerOptions struct { masters *string ip *string port *int + grpcPort *int publicPort *int collection *string defaultReplicaPlacement *string @@ -39,6 +39,7 @@ func init() { f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this collection") f.ip = cmdFiler.Flag.String("ip", "", "filer server http listen ip address") f.port = cmdFiler.Flag.Int("port", 8888, "filer server http listen port") + f.grpcPort = cmdFiler.Flag.Int("port.grpc", 0, "filer grpc server listen port, default to http port + 10000") f.publicPort = cmdFiler.Flag.Int("port.public", 0, "port opened to public") f.defaultReplicaPlacement = cmdFiler.Flag.String("defaultReplicaPlacement", "000", "default replication type if not specified") f.redirectOnRead = cmdFiler.Flag.Bool("redirectOnRead", false, "whether proxy or redirect to volume server during file GET request") @@ -119,21 +120,22 @@ func (fo *FilerOptions) start() { glog.Fatalf("Filer listener error: %v", e) } - m := cmux.New(filerListener) - grpcL := m.Match(cmux.HTTP2HeaderField("content-type", "application/grpc")) - httpL := m.Match(cmux.Any()) - - // Create your protocol servers. + // starting grpc server + grpcPort := *f.grpcPort + if grpcPort == 0 { + grpcPort = *f.port + 10000 + } + grpcL, err := util.NewListener(":"+strconv.Itoa(grpcPort), 0) + if err != nil { + glog.Fatalf("failed to listen on grpc port %d: %v", grpcPort, err) + } grpcS := grpc.NewServer() filer_pb.RegisterSeaweedFilerServer(grpcS, fs) reflection.Register(grpcS) - - httpS := &http.Server{Handler: defaultMux} - go grpcS.Serve(grpcL) - go httpS.Serve(httpL) - if err := m.Serve(); err != nil { + httpS := &http.Server{Handler: defaultMux} + if err := httpS.Serve(filerListener); err != nil { glog.Fatalf("Filer Fail to serve: %v", e) } diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 904aac76c..6bc3d4119 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -25,13 +25,14 @@ var ( ) type CopyOptions struct { - master *string - include *string - replication *string - collection *string - ttl *string - maxMB *int - secretKey *string + filerGrpcPort *int + master *string + include *string + replication *string + collection *string + ttl *string + maxMB *int + secretKey *string secret security.Secret } @@ -45,6 +46,7 @@ func init() { copy.collection = cmdCopy.Flag.String("collection", "", "optional collection name") copy.ttl = cmdCopy.Flag.String("ttl", "", "time to live, e.g.: 1m, 1h, 1d, 1M, 1y") copy.maxMB = cmdCopy.Flag.Int("maxMB", 0, "split files larger than the limit") + copy.filerGrpcPort = cmdCopy.Flag.Int("filer.port.grpc", 0, "filer grpc server listen port, default to filer port + 10000") copy.secretKey = cmdCopy.Flag.String("secure.secret", "", "secret to encrypt Json Web Token(JWT)") } @@ -87,15 +89,33 @@ func runCopy(cmd *Command, args []string) bool { urlPath = urlPath + "/" } + if filerUrl.Port() == "" { + fmt.Printf("The filer port should be specified.\n") + return false + } + + filerPort, parseErr := strconv.ParseUint(filerUrl.Port(), 10, 64) + if parseErr != nil { + fmt.Printf("The filer port parse error: %v\n", parseErr) + return false + } + + filerGrpcPort := filerPort + 10000 + if *copy.filerGrpcPort != 0 { + filerGrpcPort = uint64(*copy.filerGrpcPort) + } + + filerGrpcAddress := fmt.Sprintf("%s:%d", filerUrl.Hostname(), filerGrpcPort) + for _, fileOrDir := range fileOrDirs { - if !doEachCopy(fileOrDir, filerUrl.Host, urlPath) { + if !doEachCopy(fileOrDir, filerUrl.Host, filerGrpcAddress, urlPath) { return false } } return true } -func doEachCopy(fileOrDir string, host string, path string) bool { +func doEachCopy(fileOrDir string, filerAddress, filerGrpcAddress string, path string) bool { f, err := os.Open(fileOrDir) if err != nil { fmt.Printf("Failed to open file %s: %v\n", fileOrDir, err) @@ -113,7 +133,7 @@ func doEachCopy(fileOrDir string, host string, path string) bool { if mode.IsDir() { files, _ := ioutil.ReadDir(fileOrDir) for _, subFileOrDir := range files { - if !doEachCopy(fileOrDir+"/"+subFileOrDir.Name(), host, path+fi.Name()+"/") { + if !doEachCopy(fileOrDir+"/"+subFileOrDir.Name(), filerAddress, filerGrpcAddress, path+fi.Name()+"/") { return false } } @@ -135,13 +155,13 @@ func doEachCopy(fileOrDir string, host string, path string) bool { } if chunkCount == 1 { - return uploadFileAsOne(host, path, f, fi) + return uploadFileAsOne(filerAddress, filerGrpcAddress, path, f, fi) } - return uploadFileInChunks(host, path, f, fi, chunkCount, chunkSize) + return uploadFileInChunks(filerAddress, filerGrpcAddress, path, f, fi, chunkCount, chunkSize) } -func uploadFileAsOne(filerUrl string, urlFolder string, f *os.File, fi os.FileInfo) bool { +func uploadFileAsOne(filerAddress, filerGrpcAddress string, urlFolder string, f *os.File, fi os.FileInfo) bool { // upload the file content fileName := filepath.Base(f.Name()) @@ -183,10 +203,10 @@ func uploadFileAsOne(filerUrl string, urlFolder string, f *os.File, fi os.FileIn Mtime: time.Now().UnixNano(), }) - fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerUrl, urlFolder, fileName) + fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName) } - if err := withFilerClient(filerUrl, func(client filer_pb.SeaweedFilerClient) error { + if err := withFilerClient(filerGrpcAddress, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: urlFolder, Entry: &filer_pb.Entry{ @@ -209,14 +229,14 @@ func uploadFileAsOne(filerUrl string, urlFolder string, f *os.File, fi os.FileIn } return nil }); err != nil { - fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerUrl, urlFolder, fileName, err) + fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err) return false } return true } -func uploadFileInChunks(filerUrl string, urlFolder string, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool { +func uploadFileInChunks(filerAddress, filerGrpcAddress string, urlFolder string, f *os.File, fi os.FileInfo, chunkCount int, chunkSize int64) bool { fileName := filepath.Base(f.Name()) mimeType := detectMimeType(f) @@ -259,7 +279,7 @@ func uploadFileInChunks(filerUrl string, urlFolder string, f *os.File, fi os.Fil fmt.Printf("uploaded %s-%d to %s [%d,%d)\n", fileName, i+1, targetUrl, i*chunkSize, i*chunkSize+int64(uploadResult.Size)) } - if err := withFilerClient(filerUrl, func(client filer_pb.SeaweedFilerClient) error { + if err := withFilerClient(filerGrpcAddress, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.CreateEntryRequest{ Directory: urlFolder, Entry: &filer_pb.Entry{ @@ -282,11 +302,11 @@ func uploadFileInChunks(filerUrl string, urlFolder string, f *os.File, fi os.Fil } return nil }); err != nil { - fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerUrl, urlFolder, fileName, err) + fmt.Printf("upload data %v to http://%s%s%s: %v\n", fileName, filerAddress, urlFolder, fileName, err) return false } - fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerUrl, urlFolder, fileName) + fmt.Printf("copied %s => http://%s%s%s\n", fileName, filerAddress, urlFolder, fileName) return true } diff --git a/weed/command/mount.go b/weed/command/mount.go index 6ba3b3697..df215674f 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -2,6 +2,7 @@ package command type MountOptions struct { filer *string + filerGrpcPort *int dir *string collection *string replication *string @@ -15,6 +16,7 @@ var ( func init() { cmdMount.Run = runMount // break init cycle mountOptions.filer = cmdMount.Flag.String("filer", "localhost:8888", "weed filer location") + mountOptions.filerGrpcPort = cmdMount.Flag.Int("filer.grpc.port", 0, "filer grpc server listen port, default to http port + 10000") mountOptions.dir = cmdMount.Flag.String("dir", ".", "mount weed filer to this directory") mountOptions.collection = cmdMount.Flag.String("collection", "", "collection to create the files") mountOptions.replication = cmdMount.Flag.String("replication", "000", "replication to create to files") diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index d8b6884ff..f64dccb54 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -11,6 +11,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/filesys" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" + "strings" + "strconv" ) func runMount(cmd *Command, args []string) bool { @@ -51,8 +53,27 @@ func runMount(cmd *Command, args []string) bool { c.Close() }) + hostnameAndPort := strings.Split(*mountOptions.filer, ":") + if len(hostnameAndPort) != 2 { + fmt.Printf("The filer should have hostname:port format: %v\n", hostnameAndPort) + return false + } + + filerPort, parseErr := strconv.ParseUint(hostnameAndPort[1], 10, 64) + if parseErr != nil { + fmt.Printf("The filer filer port parse error: %v\n", parseErr) + return false + } + + filerGrpcPort := filerPort + 10000 + if *mountOptions.filerGrpcPort != 0 { + filerGrpcPort = uint64(*copy.filerGrpcPort) + } + + filerAddress := fmt.Sprintf("%s:%d", hostnameAndPort[0], filerGrpcPort) + err = fs.Serve(c, filesys.NewSeaweedFileSystem( - *mountOptions.filer, *mountOptions.collection, *mountOptions.replication, *mountOptions.chunkSizeLimitMB)) + filerAddress, *mountOptions.collection, *mountOptions.replication, *mountOptions.chunkSizeLimitMB)) if err != nil { fuse.Unmount(*mountOptions.dir) } diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index 996eb0abb..ca8c29b7a 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -133,7 +133,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte return nil }); err != nil { - return nil, fmt.Errorf("filer assign volume: %v", err) + return nil, fmt.Errorf("filerGrpcAddress assign volume: %v", err) } fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) diff --git a/weed/filesys/file.go b/weed/filesys/file.go index 1fb7d53b1..625fd4f74 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -26,7 +26,7 @@ type File struct { isOpen bool } -func (file *File) Attr(context context.Context, attr *fuse.Attr) error { +func (file *File) Attr(ctx context.Context, attr *fuse.Attr) error { fullPath := filepath.Join(file.dir.Path, file.Name) @@ -45,7 +45,7 @@ func (file *File) Attr(context context.Context, attr *fuse.Attr) error { ParentDir: file.dir.Path, } - resp, err := client.GetEntryAttributes(context, request) + resp, err := client.GetEntryAttributes(ctx, request) if err != nil { glog.V(0).Infof("file attr read file %v: %v", request, err) return err @@ -129,7 +129,7 @@ func (file *File) Setattr(ctx context.Context, req *fuse.SetattrRequest, resp *f func (file *File) Fsync(ctx context.Context, req *fuse.FsyncRequest) error { // fsync works at OS level - // write the file chunks to the filer + // write the file chunks to the filerGrpcAddress glog.V(3).Infof("%s/%s fsync file %+v", file.dir.Path, file.Name, req) return nil diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 4b9e20b95..ac7333695 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -9,16 +9,16 @@ import ( ) type WFS struct { - filer string + filerGrpcAddress string listDirectoryEntriesCache *ccache.Cache collection string replication string chunkSizeLimit int64 } -func NewSeaweedFileSystem(filer string, collection string, replication string, chunkSizeLimitMB int) *WFS { +func NewSeaweedFileSystem(filerGrpcAddress string, collection string, replication string, chunkSizeLimitMB int) *WFS { return &WFS{ - filer: filer, + filerGrpcAddress: filerGrpcAddress, listDirectoryEntriesCache: ccache.New(ccache.Configure().MaxSize(6000).ItemsToPrune(100)), collection: collection, replication: replication, @@ -32,9 +32,9 @@ func (wfs *WFS) Root() (fs.Node, error) { func (wfs *WFS) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { - grpcConnection, err := grpc.Dial(wfs.filer, grpc.WithInsecure()) + grpcConnection, err := grpc.Dial(wfs.filerGrpcAddress, grpc.WithInsecure()) if err != nil { - return fmt.Errorf("fail to dial %s: %v", wfs.filer, err) + return fmt.Errorf("fail to dial %s: %v", wfs.filerGrpcAddress, err) } defer grpcConnection.Close()