diff --git a/weed/admin/dash/worker_grpc_server.go b/weed/admin/dash/worker_grpc_server.go index 98c41362a..71cba85cd 100644 --- a/weed/admin/dash/worker_grpc_server.go +++ b/weed/admin/dash/worker_grpc_server.go @@ -108,6 +108,8 @@ func (s *WorkerGrpcServer) StartWithTLS(port int) error { go s.cleanupRoutine() go s.activeLogFetchLoop() + pb.ServeGrpcOnLocalSocket(grpcServer, port) + // Start serving in a goroutine go func() { if err := s.grpcServer.Serve(listener); err != nil { diff --git a/weed/command/filer.go b/weed/command/filer.go index e76759907..d6a990ebd 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -421,6 +421,7 @@ func (fo *FilerOptions) startFiler() { go grpcS.Serve(grpcLocalL) } go grpcS.Serve(grpcL) + pb.ServeGrpcOnLocalSocket(grpcS, grpcPort) if runtime.GOOS != "windows" { localSocket := *fo.localSocket diff --git a/weed/command/master.go b/weed/command/master.go index dd880acd4..ad81fe7a0 100644 --- a/weed/command/master.go +++ b/weed/command/master.go @@ -254,6 +254,7 @@ func startMaster(masterOption MasterOptions, masterWhiteList []string) { go grpcS.Serve(grpcLocalL) } go grpcS.Serve(grpcL) + pb.ServeGrpcOnLocalSocket(grpcS, grpcPort) // For multi-master mode with non-Hashicorp raft, wait and check if we should join if !*masterOption.raftHashicorp && !isSingleMaster { diff --git a/weed/command/mini.go b/weed/command/mini.go index 6c83c7788..b5a3bd367 100644 --- a/weed/command/mini.go +++ b/weed/command/mini.go @@ -823,6 +823,16 @@ func runMini(cmd *Command, args []string) bool { miniS3Options.filer = &filerAddress miniWebDavOptions.filer = &filerAddress + // Register Unix socket paths for gRPC services so local inter-service + // communication goes through Unix sockets instead of TCP. + pb.RegisterLocalGrpcSocket(*miniMasterOptions.portGrpc, fmt.Sprintf("/tmp/seaweedfs-master-grpc-%d.sock", *miniMasterOptions.portGrpc)) + pb.RegisterLocalGrpcSocket(*miniOptions.v.portGrpc, fmt.Sprintf("/tmp/seaweedfs-volume-grpc-%d.sock", *miniOptions.v.portGrpc)) + pb.RegisterLocalGrpcSocket(*miniFilerOptions.portGrpc, fmt.Sprintf("/tmp/seaweedfs-filer-grpc-%d.sock", *miniFilerOptions.portGrpc)) + if *miniS3Options.portGrpc > 0 { + pb.RegisterLocalGrpcSocket(*miniS3Options.portGrpc, fmt.Sprintf("/tmp/seaweedfs-s3-grpc-%d.sock", *miniS3Options.portGrpc)) + } + pb.RegisterLocalGrpcSocket(*miniAdminOptions.grpcPort, fmt.Sprintf("/tmp/seaweedfs-admin-grpc-%d.sock", *miniAdminOptions.grpcPort)) + go stats_collect.StartMetricsServer(*miniMetricsHttpIp, *miniMetricsHttpPort) if *miniMasterOptions.volumeSizeLimitMB > util.VolumeSizeLimitGB*1000 { diff --git a/weed/command/s3.go b/weed/command/s3.go index cf4900654..081e84f13 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -376,6 +376,7 @@ func (s3opt *S3Options) startS3Server() bool { go grpcS.Serve(grpcLocalL) } go grpcS.Serve(grpcL) + pb.ServeGrpcOnLocalSocket(grpcS, grpcPort) if *s3opt.tlsPrivateKey != "" { // Check for port conflict when both HTTP and HTTPS are enabled on the same port diff --git a/weed/command/volume.go b/weed/command/volume.go index 3d9f4ce71..aa456a0ad 100644 --- a/weed/command/volume.go +++ b/weed/command/volume.go @@ -415,6 +415,7 @@ func (v VolumeServerOptions) startGrpcService(vs volume_server_pb.VolumeServerSe glog.Fatalf("start gRPC service failed, %s", err) } }() + pb.ServeGrpcOnLocalSocket(grpcS, grpcPort) return grpcS } diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index a1356d9d3..4b7c0852d 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -4,7 +4,9 @@ import ( "context" "fmt" "math/rand/v2" + "net" "net/http" + "os" "strconv" "strings" "sync" @@ -42,6 +44,12 @@ var ( // cache grpc connections grpcClients = make(map[string]*versionedGrpcClient) grpcClientsLock sync.Mutex + + // localGrpcSockets maps gRPC port numbers to Unix socket paths. + // When registered (by mini mode), gRPC clients connect via Unix socket + // instead of TCP for local services. + localGrpcSockets = make(map[int]string) + localGrpcSocketsLock sync.RWMutex ) type versionedGrpcClient struct { @@ -55,6 +63,59 @@ func init() { http.DefaultTransport.(*http.Transport).MaxIdleConns = 1024 } +// RegisterLocalGrpcSocket registers a Unix socket path for a gRPC port. +// When a gRPC client dials an address on this port, it uses the Unix socket. +func RegisterLocalGrpcSocket(grpcPort int, socketPath string) { + localGrpcSocketsLock.Lock() + defer localGrpcSocketsLock.Unlock() + localGrpcSockets[grpcPort] = socketPath +} + +// GetLocalGrpcSocket returns the Unix socket path for a gRPC port, or empty if not registered. +func GetLocalGrpcSocket(grpcPort int) string { + localGrpcSocketsLock.RLock() + defer localGrpcSocketsLock.RUnlock() + return localGrpcSockets[grpcPort] +} + +// resolveLocalGrpcSocket extracts the port from a gRPC address and returns +// the registered Unix socket path, if any. +func resolveLocalGrpcSocket(address string) string { + _, portStr, err := net.SplitHostPort(address) + if err != nil { + return "" + } + port, err := strconv.Atoi(portStr) + if err != nil { + return "" + } + return GetLocalGrpcSocket(port) +} + +// ServeGrpcOnLocalSocket starts serving a gRPC server on a Unix socket +// if one is registered for the given port. +func ServeGrpcOnLocalSocket(grpcServer *grpc.Server, grpcPort int) { + socketPath := GetLocalGrpcSocket(grpcPort) + if socketPath == "" { + return + } + if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) { + glog.Warningf("Failed to remove old gRPC socket %s: %v", socketPath, err) + } + listener, err := net.Listen("unix", socketPath) + if err != nil { + glog.Errorf("Failed to listen on gRPC Unix socket %s: %v", socketPath, err) + return + } + glog.V(0).Infof("gRPC also listening on Unix socket %s", socketPath) + go func() { + if err := grpcServer.Serve(listener); err != nil && err != grpc.ErrServerStopped { + glog.Errorf("gRPC Unix socket server error on %s: %v", socketPath, err) + } + os.Remove(socketPath) + }() +} + func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server { var options []grpc.ServerOption options = append(options, @@ -97,6 +158,14 @@ func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server { func GrpcDial(ctx context.Context, address string, waitForReady bool, opts ...grpc.DialOption) (*grpc.ClientConn, error) { var options []grpc.DialOption + // Route through Unix socket if one is registered for this address's port + if socketPath := resolveLocalGrpcSocket(address); socketPath != "" { + options = append(options, grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) { + var d net.Dialer + return d.DialContext(ctx, "unix", socketPath) + })) + } + options = append(options, grpc.WithDefaultCallOptions( grpc.MaxCallSendMsgSize(Max_Message_Size),