diff --git a/weed/command/mount.go b/weed/command/mount.go index 428e073f2..088e5104c 100644 --- a/weed/command/mount.go +++ b/weed/command/mount.go @@ -29,6 +29,7 @@ type MountOptions struct { readOnly *bool debug *bool debugPort *int + localSocket *string } var ( @@ -63,6 +64,7 @@ func init() { mountOptions.readOnly = cmdMount.Flag.Bool("readOnly", false, "read only") mountOptions.debug = cmdMount.Flag.Bool("debug", false, "serves runtime profiling data, e.g., http://localhost:/debug/pprof/goroutine?debug=2") mountOptions.debugPort = cmdMount.Flag.Int("debug.port", 6061, "http port for debugging") + mountOptions.localSocket = cmdFiler.Flag.String("localSocket", "", "default to /tmp/seaweedfs-mount-.sock") mountCpuProfile = cmdMount.Flag.String("cpuprofile", "", "cpu profile output file") mountMemProfile = cmdMount.Flag.String("memprofile", "", "memory profile output file") diff --git a/weed/command/mount_std.go b/weed/command/mount_std.go index d865e053f..7926c9cdc 100644 --- a/weed/command/mount_std.go +++ b/weed/command/mount_std.go @@ -12,9 +12,12 @@ import ( "github.com/chrislusf/seaweedfs/weed/mount/unmount" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/mount_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/hanwen/go-fuse/v2/fuse" + "google.golang.org/grpc/reflection" + "net" "net/http" "os" "os/user" @@ -98,6 +101,22 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { unmount.Unmount(dir) + // start on local unix socket + if *option.localSocket == "" { + mountDirHash := util.HashToInt32([]byte(dir)) + if mountDirHash < 0 { + mountDirHash = -mountDirHash + } + *option.localSocket = fmt.Sprintf("/tmp/seaweefs-mount-%d.sock", mountDirHash) + if err := os.Remove(*option.localSocket); err != nil && !os.IsNotExist(err) { + glog.Fatalf("Failed to remove %s, error: %s", *option.localSocket, err.Error()) + } + } + montSocketListener, err := net.Listen("unix", *option.localSocket) + if err != nil { + glog.Fatalf("Failed to listen on %s: %v", *option.localSocket, err) + } + // detect mount folder mode if *option.dirAutoCreate { os.MkdirAll(dir, os.FileMode(0777)&^umask) @@ -229,6 +248,11 @@ func RunMount(option *MountOptions, umask os.FileMode) bool { unmount.Unmount(dir) }) + grpcS := pb.NewGrpcServer() + mount_pb.RegisterSeaweedMountServer(grpcS, seaweedFileSystem) + reflection.Register(grpcS) + go grpcS.Serve(montSocketListener) + seaweedFileSystem.StartBackgroundTasks() fmt.Printf("This is SeaweedFS version %s %s %s\n", util.Version(), runtime.GOOS, runtime.GOARCH) diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 195a5bb27..169925427 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -6,6 +6,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/mount/meta_cache" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/mount_pb" "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" @@ -59,6 +60,7 @@ type WFS struct { // https://dl.acm.org/doi/fullHtml/10.1145/3310148 // follow https://github.com/hanwen/go-fuse/blob/master/fuse/api.go fuse.RawFileSystem + mount_pb.UnimplementedSeaweedMountServer fs.Inode option *Option metaCache *meta_cache.MetaCache diff --git a/weed/mount/weedfs_grpc_server.go b/weed/mount/weedfs_grpc_server.go new file mode 100644 index 000000000..1227372d8 --- /dev/null +++ b/weed/mount/weedfs_grpc_server.go @@ -0,0 +1,13 @@ +package mount + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/mount_pb" +) + +func (wfs *WFS) Configure(ctx context.Context, request *mount_pb.ConfigureRequest) (*mount_pb.ConfigureResponse, error) { + glog.V(0).Infof("quota changed from %d to %d", wfs.option.Quota, request.CollectionCapacity) + wfs.option.Quota = request.GetCollectionCapacity() + return &mount_pb.ConfigureResponse{}, nil +} diff --git a/weed/mount/weedfs_quota.go b/weed/mount/weedfs_quota.go index f9307a6b3..ac3e58e62 100644 --- a/weed/mount/weedfs_quota.go +++ b/weed/mount/weedfs_quota.go @@ -10,12 +10,14 @@ import ( func (wfs *WFS) loopCheckQuota() { - if wfs.option.Quota <= 0 { - return - } - for { + time.Sleep(61 * time.Second) + + if wfs.option.Quota <= 0 { + continue + } + err := wfs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.StatisticsRequest{ @@ -47,7 +49,6 @@ func (wfs *WFS) loopCheckQuota() { glog.Warningf("read quota usage: %v", err) } - time.Sleep(61 * time.Second) } } diff --git a/weed/pb/Makefile b/weed/pb/Makefile index dc27172b6..954b4cb98 100644 --- a/weed/pb/Makefile +++ b/weed/pb/Makefile @@ -8,6 +8,7 @@ gen: protoc filer.proto --go_out=./filer_pb --go-grpc_out=./filer_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative protoc remote.proto --go_out=./remote_pb --go-grpc_out=./remote_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative protoc iam.proto --go_out=./iam_pb --go-grpc_out=./iam_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative + protoc mount.proto --go_out=./mount_pb --go-grpc_out=./mount_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative protoc messaging.proto --go_out=./messaging_pb --go-grpc_out=./messaging_pb --go_opt=paths=source_relative --go-grpc_opt=paths=source_relative # protoc filer.proto --java_out=../../other/java/client/src/main/java cp filer.proto ../../other/java/client/src/main/proto diff --git a/weed/pb/mount.proto b/weed/pb/mount.proto new file mode 100644 index 000000000..ec0847f12 --- /dev/null +++ b/weed/pb/mount.proto @@ -0,0 +1,25 @@ +syntax = "proto3"; + +package messaging_pb; + +option go_package = "github.com/chrislusf/seaweedfs/weed/pb/mount_pb"; +option java_package = "seaweedfs.client"; +option java_outer_classname = "MountProto"; + +////////////////////////////////////////////////// + +service SeaweedMount { + + rpc Configure (ConfigureRequest) returns (ConfigureResponse) { + } + +} + +////////////////////////////////////////////////// + +message ConfigureRequest { + int64 collection_capacity = 1; +} + +message ConfigureResponse { +} diff --git a/weed/pb/mount_pb/mount.pb.go b/weed/pb/mount_pb/mount.pb.go new file mode 100644 index 000000000..cbaf533fe --- /dev/null +++ b/weed/pb/mount_pb/mount.pb.go @@ -0,0 +1,208 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.26.0 +// protoc v3.17.3 +// source: mount.proto + +package mount_pb + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type ConfigureRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + CollectionCapacity int64 `protobuf:"varint,1,opt,name=collection_capacity,json=collectionCapacity,proto3" json:"collection_capacity,omitempty"` +} + +func (x *ConfigureRequest) Reset() { + *x = ConfigureRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_mount_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ConfigureRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ConfigureRequest) ProtoMessage() {} + +func (x *ConfigureRequest) ProtoReflect() protoreflect.Message { + mi := &file_mount_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ConfigureRequest.ProtoReflect.Descriptor instead. +func (*ConfigureRequest) Descriptor() ([]byte, []int) { + return file_mount_proto_rawDescGZIP(), []int{0} +} + +func (x *ConfigureRequest) GetCollectionCapacity() int64 { + if x != nil { + return x.CollectionCapacity + } + return 0 +} + +type ConfigureResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *ConfigureResponse) Reset() { + *x = ConfigureResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_mount_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ConfigureResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ConfigureResponse) ProtoMessage() {} + +func (x *ConfigureResponse) ProtoReflect() protoreflect.Message { + mi := &file_mount_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ConfigureResponse.ProtoReflect.Descriptor instead. +func (*ConfigureResponse) Descriptor() ([]byte, []int) { + return file_mount_proto_rawDescGZIP(), []int{1} +} + +var File_mount_proto protoreflect.FileDescriptor + +var file_mount_proto_rawDesc = []byte{ + 0x0a, 0x0b, 0x6d, 0x6f, 0x75, 0x6e, 0x74, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0c, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, 0x62, 0x22, 0x43, 0x0a, 0x10, 0x43, + 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x2f, 0x0a, 0x13, 0x63, 0x6f, 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x5f, 0x63, 0x61, + 0x70, 0x61, 0x63, 0x69, 0x74, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x03, 0x52, 0x12, 0x63, 0x6f, + 0x6c, 0x6c, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x43, 0x61, 0x70, 0x61, 0x63, 0x69, 0x74, 0x79, + 0x22, 0x13, 0x0a, 0x11, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x52, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x5e, 0x0a, 0x0c, 0x53, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, + 0x4d, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x4e, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, + 0x72, 0x65, 0x12, 0x1e, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, + 0x62, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x69, 0x6e, 0x67, 0x5f, 0x70, + 0x62, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x75, 0x72, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x4f, 0x0a, 0x10, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, + 0x66, 0x73, 0x2e, 0x63, 0x6c, 0x69, 0x65, 0x6e, 0x74, 0x42, 0x0a, 0x4d, 0x6f, 0x75, 0x6e, 0x74, + 0x50, 0x72, 0x6f, 0x74, 0x6f, 0x5a, 0x2f, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, + 0x6d, 0x2f, 0x63, 0x68, 0x72, 0x69, 0x73, 0x6c, 0x75, 0x73, 0x66, 0x2f, 0x73, 0x65, 0x61, 0x77, + 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x77, 0x65, 0x65, 0x64, 0x2f, 0x70, 0x62, 0x2f, 0x6d, 0x6f, + 0x75, 0x6e, 0x74, 0x5f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_mount_proto_rawDescOnce sync.Once + file_mount_proto_rawDescData = file_mount_proto_rawDesc +) + +func file_mount_proto_rawDescGZIP() []byte { + file_mount_proto_rawDescOnce.Do(func() { + file_mount_proto_rawDescData = protoimpl.X.CompressGZIP(file_mount_proto_rawDescData) + }) + return file_mount_proto_rawDescData +} + +var file_mount_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_mount_proto_goTypes = []interface{}{ + (*ConfigureRequest)(nil), // 0: messaging_pb.ConfigureRequest + (*ConfigureResponse)(nil), // 1: messaging_pb.ConfigureResponse +} +var file_mount_proto_depIdxs = []int32{ + 0, // 0: messaging_pb.SeaweedMount.Configure:input_type -> messaging_pb.ConfigureRequest + 1, // 1: messaging_pb.SeaweedMount.Configure:output_type -> messaging_pb.ConfigureResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_mount_proto_init() } +func file_mount_proto_init() { + if File_mount_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_mount_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ConfigureRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_mount_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ConfigureResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_mount_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_mount_proto_goTypes, + DependencyIndexes: file_mount_proto_depIdxs, + MessageInfos: file_mount_proto_msgTypes, + }.Build() + File_mount_proto = out.File + file_mount_proto_rawDesc = nil + file_mount_proto_goTypes = nil + file_mount_proto_depIdxs = nil +} diff --git a/weed/pb/mount_pb/mount_grpc.pb.go b/weed/pb/mount_pb/mount_grpc.pb.go new file mode 100644 index 000000000..41737aa21 --- /dev/null +++ b/weed/pb/mount_pb/mount_grpc.pb.go @@ -0,0 +1,101 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. + +package mount_pb + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// SeaweedMountClient is the client API for SeaweedMount service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type SeaweedMountClient interface { + Configure(ctx context.Context, in *ConfigureRequest, opts ...grpc.CallOption) (*ConfigureResponse, error) +} + +type seaweedMountClient struct { + cc grpc.ClientConnInterface +} + +func NewSeaweedMountClient(cc grpc.ClientConnInterface) SeaweedMountClient { + return &seaweedMountClient{cc} +} + +func (c *seaweedMountClient) Configure(ctx context.Context, in *ConfigureRequest, opts ...grpc.CallOption) (*ConfigureResponse, error) { + out := new(ConfigureResponse) + err := c.cc.Invoke(ctx, "/messaging_pb.SeaweedMount/Configure", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// SeaweedMountServer is the server API for SeaweedMount service. +// All implementations must embed UnimplementedSeaweedMountServer +// for forward compatibility +type SeaweedMountServer interface { + Configure(context.Context, *ConfigureRequest) (*ConfigureResponse, error) + mustEmbedUnimplementedSeaweedMountServer() +} + +// UnimplementedSeaweedMountServer must be embedded to have forward compatible implementations. +type UnimplementedSeaweedMountServer struct { +} + +func (UnimplementedSeaweedMountServer) Configure(context.Context, *ConfigureRequest) (*ConfigureResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Configure not implemented") +} +func (UnimplementedSeaweedMountServer) mustEmbedUnimplementedSeaweedMountServer() {} + +// UnsafeSeaweedMountServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to SeaweedMountServer will +// result in compilation errors. +type UnsafeSeaweedMountServer interface { + mustEmbedUnimplementedSeaweedMountServer() +} + +func RegisterSeaweedMountServer(s grpc.ServiceRegistrar, srv SeaweedMountServer) { + s.RegisterService(&SeaweedMount_ServiceDesc, srv) +} + +func _SeaweedMount_Configure_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ConfigureRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SeaweedMountServer).Configure(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/messaging_pb.SeaweedMount/Configure", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SeaweedMountServer).Configure(ctx, req.(*ConfigureRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// SeaweedMount_ServiceDesc is the grpc.ServiceDesc for SeaweedMount service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var SeaweedMount_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "messaging_pb.SeaweedMount", + HandlerType: (*SeaweedMountServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Configure", + Handler: _SeaweedMount_Configure_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "mount.proto", +}