From 2e78a522ab3892debf9aefeb978177678ae52a83 Mon Sep 17 00:00:00 2001 From: askeipx Date: Wed, 24 Aug 2022 11:18:21 +0500 Subject: [PATCH] remove old raft servers if they don't answer to pings for too long (#3398) * remove old raft servers if they don't answer to pings for too long add ping durations as options rename ping fields fix some todos get masters through masterclient raft remove server from leader use raft servers to ping them CheckMastersAlive for hashicorp raft only * prepare blocking ping * pass waitForReady as param * pass waitForReady through all functions * waitForReady works * refactor * remove unneeded params * rollback unneeded changes * fix --- weed/cluster/master_client.go | 3 +- weed/command/filer_copy.go | 2 +- weed/command/upload.go | 7 ++-- weed/mount/wfs_filer_client.go | 2 +- weed/mq/broker/broker_server.go | 5 ++- weed/operation/grpc_client.go | 4 +- weed/pb/grpc_client_server.go | 37 ++++++++++--------- .../replication/sink/filersink/fetch_write.go | 5 ++- weed/replication/source/filer_source.go | 2 +- weed/s3api/s3api_handlers.go | 5 ++- weed/server/filer_grpc_server_admin.go | 5 ++- weed/server/master_grpc_server_admin.go | 9 +++-- weed/server/master_server.go | 33 +++++++++++++++-- weed/server/volume_grpc_admin.go | 7 ++-- weed/server/volume_grpc_client_to_master.go | 2 +- weed/server/volume_grpc_copy.go | 7 ++-- weed/server/webdav_server.go | 2 +- weed/shell/command_cluster_check.go | 7 ++-- weed/shell/command_collection_delete.go | 3 +- weed/wdclient/masterclient.go | 11 +++--- 20 files changed, 99 insertions(+), 59 deletions(-) diff --git a/weed/cluster/master_client.go b/weed/cluster/master_client.go index d1e769c1d..15009e132 100644 --- a/weed/cluster/master_client.go +++ b/weed/cluster/master_client.go @@ -2,6 +2,7 @@ package cluster import ( "context" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" @@ -10,7 +11,7 @@ import ( func ListExistingPeerUpdates(master pb.ServerAddress, grpcDialOption grpc.DialOption, filerGroup string, clientType string) (existingNodes []*master_pb.ClusterNodeUpdate) { - if grpcErr := pb.WithMasterClient(false, master, grpcDialOption, func(client master_pb.SeaweedClient) error { + if grpcErr := pb.WithMasterClient(false, master, grpcDialOption, false, func(client master_pb.SeaweedClient) error { resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ ClientType: clientType, FilerGroup: filerGroup, diff --git a/weed/command/filer_copy.go b/weed/command/filer_copy.go index 977e2a6b8..db2d8c42c 100644 --- a/weed/command/filer_copy.go +++ b/weed/command/filer_copy.go @@ -572,7 +572,7 @@ func (worker *FileCopyWorker) WithFilerClient(streamingMode bool, fn func(filer_ err = pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) - }, filerGrpcAddress, worker.options.grpcDialOption) + }, filerGrpcAddress, false, worker.options.grpcDialOption) return } diff --git a/weed/command/upload.go b/weed/command/upload.go index 389a72552..1f03f7b5a 100644 --- a/weed/command/upload.go +++ b/weed/command/upload.go @@ -4,11 +4,12 @@ import ( "context" "encoding/json" "fmt" + "os" + "path/filepath" + "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "google.golang.org/grpc" - "os" - "path/filepath" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/security" @@ -130,7 +131,7 @@ func runUpload(cmd *Command, args []string) bool { } func readMasterConfiguration(grpcDialOption grpc.DialOption, masterAddress pb.ServerAddress) (replication string, err error) { - err = pb.WithMasterClient(false, masterAddress, grpcDialOption, func(client master_pb.SeaweedClient) error { + err = pb.WithMasterClient(false, masterAddress, grpcDialOption, false, func(client master_pb.SeaweedClient) error { resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { return fmt.Errorf("get master %s configuration: %v", masterAddress, err) diff --git a/weed/mount/wfs_filer_client.go b/weed/mount/wfs_filer_client.go index 020970df7..e991d8b39 100644 --- a/weed/mount/wfs_filer_client.go +++ b/weed/mount/wfs_filer_client.go @@ -25,7 +25,7 @@ func (wfs *WFS) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFile err = pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) - }, filerGrpcAddress, wfs.option.GrpcDialOption) + }, filerGrpcAddress, false, wfs.option.GrpcDialOption) if err != nil { glog.V(0).Infof("WithFilerClient %d %v: %v", x, filerGrpcAddress, err) diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index c107dbe45..89afb6e4d 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -1,11 +1,12 @@ package broker import ( + "time" + "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb" "github.com/seaweedfs/seaweedfs/weed/wdclient" "google.golang.org/grpc" - "time" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -100,7 +101,7 @@ func (broker *MessageQueueBroker) GetDataCenter() string { func (broker *MessageQueueBroker) withMasterClient(streamingMode bool, master pb.ServerAddress, fn func(client master_pb.SeaweedClient) error) error { - return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, func(client master_pb.SeaweedClient) error { + return pb.WithMasterClient(streamingMode, master, broker.grpcDialOption, false, func(client master_pb.SeaweedClient) error { return fn(client) }) diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go index c06e501a5..c1f2bba82 100644 --- a/weed/operation/grpc_client.go +++ b/weed/operation/grpc_client.go @@ -13,7 +13,7 @@ func WithVolumeServerClient(streamingMode bool, volumeServer pb.ServerAddress, g return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := volume_server_pb.NewVolumeServerClient(grpcConnection) return fn(client) - }, volumeServer.ToGrpcAddress(), grpcDialOption) + }, volumeServer.ToGrpcAddress(), false, grpcDialOption) } @@ -22,6 +22,6 @@ func WithMasterServerClient(streamingMode bool, masterServer pb.ServerAddress, g return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) - }, masterServer.ToGrpcAddress(), grpcDialOption) + }, masterServer.ToGrpcAddress(), false, grpcDialOption) } diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index a78ed0ca4..f3cca7fba 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -3,9 +3,6 @@ package pb import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/glog" - "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" - "github.com/seaweedfs/seaweedfs/weed/util" "math/rand" "net/http" "strconv" @@ -13,6 +10,10 @@ import ( "sync" "time" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -65,15 +66,17 @@ func NewGrpcServer(opts ...grpc.ServerOption) *grpc.Server { return grpc.NewServer(options...) } -func GrpcDial(ctx context.Context, address string, opts ...grpc.DialOption) (*grpc.ClientConn, error) { +func GrpcDial(ctx context.Context, address string, waitForReady bool, opts ...grpc.DialOption) (*grpc.ClientConn, error) { // opts = append(opts, grpc.WithBlock()) // opts = append(opts, grpc.WithTimeout(time.Duration(5*time.Second))) var options []grpc.DialOption + options = append(options, // grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions( grpc.MaxCallSendMsgSize(Max_Message_Size), grpc.MaxCallRecvMsgSize(Max_Message_Size), + grpc.WaitForReady(waitForReady), ), grpc.WithKeepaliveParams(keepalive.ClientParameters{ Time: 30 * time.Second, // client ping server if no activity for this long @@ -88,7 +91,7 @@ func GrpcDial(ctx context.Context, address string, opts ...grpc.DialOption) (*gr return grpc.DialContext(ctx, address, options...) } -func getOrCreateConnection(address string, opts ...grpc.DialOption) (*versionedGrpcClient, error) { +func getOrCreateConnection(address string, waitForReady bool, opts ...grpc.DialOption) (*versionedGrpcClient, error) { grpcClientsLock.Lock() defer grpcClientsLock.Unlock() @@ -99,7 +102,7 @@ func getOrCreateConnection(address string, opts ...grpc.DialOption) (*versionedG } ctx := context.Background() - grpcConnection, err := GrpcDial(ctx, address, opts...) + grpcConnection, err := GrpcDial(ctx, address, waitForReady, opts...) if err != nil { return nil, fmt.Errorf("fail to dial %s: %v", address, err) } @@ -115,10 +118,10 @@ func getOrCreateConnection(address string, opts ...grpc.DialOption) (*versionedG } // WithGrpcClient In streamingMode, always use a fresh connection. Otherwise, try to reuse an existing connection. -func WithGrpcClient(streamingMode bool, fn func(*grpc.ClientConn) error, address string, opts ...grpc.DialOption) error { +func WithGrpcClient(streamingMode bool, fn func(*grpc.ClientConn) error, address string, waitForReady bool, opts ...grpc.DialOption) error { if !streamingMode { - vgc, err := getOrCreateConnection(address, opts...) + vgc, err := getOrCreateConnection(address, waitForReady, opts...) if err != nil { return fmt.Errorf("getOrCreateConnection %s: %v", address, err) } @@ -138,7 +141,7 @@ func WithGrpcClient(streamingMode bool, fn func(*grpc.ClientConn) error, address } return executionErr } else { - grpcConnection, err := GrpcDial(context.Background(), address, opts...) + grpcConnection, err := GrpcDial(context.Background(), address, waitForReady, opts...) if err != nil { return fmt.Errorf("fail to dial %s: %v", address, err) } @@ -200,11 +203,11 @@ func GrpcAddressToServerAddress(grpcAddress string) (serverAddress string) { return util.JoinHostPort(host, port) } -func WithMasterClient(streamingMode bool, master ServerAddress, grpcDialOption grpc.DialOption, fn func(client master_pb.SeaweedClient) error) error { +func WithMasterClient(streamingMode bool, master ServerAddress, grpcDialOption grpc.DialOption, waitForReady bool, fn func(client master_pb.SeaweedClient) error) error { return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) - }, master.ToGrpcAddress(), grpcDialOption) + }, master.ToGrpcAddress(), waitForReady, grpcDialOption) } @@ -212,7 +215,7 @@ func WithVolumeServerClient(streamingMode bool, volumeServer ServerAddress, grpc return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := volume_server_pb.NewVolumeServerClient(grpcConnection) return fn(client) - }, volumeServer.ToGrpcAddress(), grpcDialOption) + }, volumeServer.ToGrpcAddress(), false, grpcDialOption) } @@ -220,7 +223,7 @@ func WithBrokerClient(streamingMode bool, broker ServerAddress, grpcDialOption g return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := mq_pb.NewSeaweedMessagingClient(grpcConnection) return fn(client) - }, broker.ToGrpcAddress(), grpcDialOption) + }, broker.ToGrpcAddress(), false, grpcDialOption) } @@ -230,7 +233,7 @@ func WithOneOfGrpcMasterClients(streamingMode bool, masterGrpcAddresses map[stri err = WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := master_pb.NewSeaweedClient(grpcConnection) return fn(client) - }, masterGrpcAddress.ToGrpcAddress(), grpcDialOption) + }, masterGrpcAddress.ToGrpcAddress(), false, grpcDialOption) if err == nil { return nil } @@ -244,7 +247,7 @@ func WithBrokerGrpcClient(streamingMode bool, brokerGrpcAddress string, grpcDial return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := mq_pb.NewSeaweedMessagingClient(grpcConnection) return fn(client) - }, brokerGrpcAddress, grpcDialOption) + }, brokerGrpcAddress, false, grpcDialOption) } @@ -259,7 +262,7 @@ func WithGrpcFilerClient(streamingMode bool, filerGrpcAddress ServerAddress, grp return WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) - }, filerGrpcAddress.ToGrpcAddress(), grpcDialOption) + }, filerGrpcAddress.ToGrpcAddress(), false, grpcDialOption) } @@ -269,7 +272,7 @@ func WithOneOfGrpcFilerClients(streamingMode bool, filerAddresses []ServerAddres err = WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) - }, filerAddress.ToGrpcAddress(), grpcDialOption) + }, filerAddress.ToGrpcAddress(), false, grpcDialOption) if err == nil { return nil } diff --git a/weed/replication/sink/filersink/fetch_write.go b/weed/replication/sink/filersink/fetch_write.go index d1a5d7ebd..c0321039b 100644 --- a/weed/replication/sink/filersink/fetch_write.go +++ b/weed/replication/sink/filersink/fetch_write.go @@ -2,9 +2,10 @@ package filersink import ( "fmt" - "github.com/seaweedfs/seaweedfs/weed/util" "sync" + "github.com/seaweedfs/seaweedfs/weed/util" + "google.golang.org/grpc" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -113,7 +114,7 @@ func (fs *FilerSink) WithFilerClient(streamingMode bool, fn func(filer_pb.Seawee return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) - }, fs.grpcAddress, fs.grpcDialOption) + }, fs.grpcAddress, false, fs.grpcDialOption) } diff --git a/weed/replication/source/filer_source.go b/weed/replication/source/filer_source.go index 6c69b735c..2da883ba6 100644 --- a/weed/replication/source/filer_source.go +++ b/weed/replication/source/filer_source.go @@ -131,7 +131,7 @@ func (fs *FilerSource) WithFilerClient(streamingMode bool, fn func(filer_pb.Seaw return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) - }, fs.grpcAddress, fs.grpcDialOption) + }, fs.grpcAddress, false, fs.grpcDialOption) } diff --git a/weed/s3api/s3api_handlers.go b/weed/s3api/s3api_handlers.go index 4e1dd09a7..b85ff485d 100644 --- a/weed/s3api/s3api_handlers.go +++ b/weed/s3api/s3api_handlers.go @@ -3,9 +3,10 @@ package s3api import ( "encoding/base64" "fmt" + "net/http" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "google.golang.org/grpc" - "net/http" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" @@ -18,7 +19,7 @@ func (s3a *S3ApiServer) WithFilerClient(streamingMode bool, fn func(filer_pb.Sea return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) - }, s3a.option.Filer.ToGrpcAddress(), s3a.option.GrpcDialOption) + }, s3a.option.Filer.ToGrpcAddress(), false, s3a.option.GrpcDialOption) } diff --git a/weed/server/filer_grpc_server_admin.go b/weed/server/filer_grpc_server_admin.go index 50c52c650..32cb2830d 100644 --- a/weed/server/filer_grpc_server_admin.go +++ b/weed/server/filer_grpc_server_admin.go @@ -3,6 +3,8 @@ package weed_server import ( "context" "fmt" + "time" + "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -10,7 +12,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" "github.com/seaweedfs/seaweedfs/weed/util" - "time" ) func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsRequest) (resp *filer_pb.StatisticsResponse, err error) { @@ -66,7 +67,7 @@ func (fs *FilerServer) Ping(ctx context.Context, req *filer_pb.PingRequest) (res }) } if req.TargetType == cluster.MasterType { - pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client master_pb.SeaweedClient) error { + pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, false, func(client master_pb.SeaweedClient) error { pingResp, err := client.Ping(ctx, &master_pb.PingRequest{}) if pingResp != nil { resp.RemoteTimeNs = pingResp.StartTimeNs diff --git a/weed/server/master_grpc_server_admin.go b/weed/server/master_grpc_server_admin.go index 1dd89ad60..fb2c5bd50 100644 --- a/weed/server/master_grpc_server_admin.go +++ b/weed/server/master_grpc_server_admin.go @@ -3,15 +3,16 @@ package weed_server import ( "context" "fmt" + "math/rand" + "sync" + "time" + "github.com/seaweedfs/raft" "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" - "math/rand" - "sync" - "time" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" ) @@ -175,7 +176,7 @@ func (ms *MasterServer) Ping(ctx context.Context, req *master_pb.PingRequest) (r }) } if req.TargetType == cluster.MasterType { - pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client master_pb.SeaweedClient) error { + pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, false, func(client master_pb.SeaweedClient) error { pingResp, err := client.Ping(ctx, &master_pb.PingRequest{}) if pingResp != nil { resp.RemoteTimeNs = pingResp.StartTimeNs diff --git a/weed/server/master_server.go b/weed/server/master_server.go index fbc27e610..9adcafc6f 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -1,8 +1,8 @@ package weed_server import ( + "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/stats" "net/http" "net/http/httputil" "net/url" @@ -12,6 +12,8 @@ import ( "sync" "time" + "github.com/seaweedfs/seaweedfs/weed/stats" + "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -242,7 +244,6 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc { } func (ms *MasterServer) startAdminScripts() { - v := util.GetViper() adminScripts := v.GetString("master.maintenance.scripts") if adminScripts == "" { @@ -342,8 +343,10 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF peerAddress := pb.ServerAddress(update.Address) peerName := string(peerAddress) - isLeader := ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader - if update.IsAdd && isLeader { + if ms.Topo.HashicorpRaft.State() != hashicorpRaft.Leader { + return + } + if update.IsAdd { raftServerFound := false for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers { if string(server.ID) == peerName { @@ -356,5 +359,27 @@ func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startF hashicorpRaft.ServerID(peerName), hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) } + } else { + pb.WithMasterClient(false, peerAddress, ms.grpcDialOption, true, func(client master_pb.SeaweedClient) error { + ctx, cancel := context.WithTimeout(context.TODO(), time.Minute*72) + defer cancel() + if _, err := client.Ping(ctx, &master_pb.PingRequest{Target: string(peerAddress), TargetType: cluster.MasterType}); err != nil { + glog.V(0).Infof("master %s didn't respond to pings. remove raft server", peerName) + if err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + _, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{ + Id: peerName, + Force: false, + }) + return err + }); err != nil { + glog.Warningf("failed removing old raft server: %v", err) + return err + } + } else { + glog.V(0).Infof("master %s successfully responded to ping", peerName) + } + + return nil + }) } } diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index c570ae2df..aace63fd8 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -3,13 +3,14 @@ package weed_server import ( "context" "fmt" + "path/filepath" + "time" + "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/util" - "path/filepath" - "time" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" @@ -280,7 +281,7 @@ func (vs *VolumeServer) Ping(ctx context.Context, req *volume_server_pb.PingRequ }) } if req.TargetType == cluster.MasterType { - pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client master_pb.SeaweedClient) error { + pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error { pingResp, err := client.Ping(ctx, &master_pb.PingRequest{}) if pingResp != nil { resp.RemoteTimeNs = pingResp.StartTimeNs diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index c00524577..e55d821a8 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -94,7 +94,7 @@ func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOpti ctx, cancel := context.WithCancel(context.Background()) defer cancel() - grpcConection, err := pb.GrpcDial(ctx, masterAddress.ToGrpcAddress(), grpcDialOption) + grpcConection, err := pb.GrpcDial(ctx, masterAddress.ToGrpcAddress(), false, grpcDialOption) if err != nil { return "", fmt.Errorf("fail to dial %s : %v", masterAddress, err) } diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index d9928ed18..be152d246 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -3,13 +3,14 @@ package weed_server import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" - "github.com/seaweedfs/seaweedfs/weed/storage/backend" "io" "math" "os" "time" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/storage/backend" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/operation" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -81,7 +82,7 @@ func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stre }() var preallocateSize int64 - if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, func(client master_pb.SeaweedClient) error { + if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, false, func(client master_pb.SeaweedClient) error { resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { return fmt.Errorf("get master %s configuration: %v", vs.GetMaster(), err) diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index eaa373dd0..b874ee9a2 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -127,7 +127,7 @@ func (fs *WebDavFileSystem) WithFilerClient(streamingMode bool, fn func(filer_pb return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) - }, fs.option.Filer.ToGrpcAddress(), fs.option.GrpcDialOption) + }, fs.option.Filer.ToGrpcAddress(), false, fs.option.GrpcDialOption) } func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string { diff --git a/weed/shell/command_cluster_check.go b/weed/shell/command_cluster_check.go index fef371b49..2cabf91b8 100644 --- a/weed/shell/command_cluster_check.go +++ b/weed/shell/command_cluster_check.go @@ -4,12 +4,13 @@ import ( "context" "flag" "fmt" + "io" + "github.com/seaweedfs/seaweedfs/weed/cluster" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" - "io" ) func init() { @@ -97,7 +98,7 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i for _, master := range masters { for _, volumeServer := range volumeServers { fmt.Fprintf(writer, "checking master %s to volume server %s ... ", string(master), string(volumeServer)) - err := pb.WithMasterClient(false, master, commandEnv.option.GrpcDialOption, func(client master_pb.SeaweedClient) error { + err := pb.WithMasterClient(false, master, commandEnv.option.GrpcDialOption, false, func(client master_pb.SeaweedClient) error { pong, err := client.Ping(context.Background(), &master_pb.PingRequest{ Target: string(volumeServer), TargetType: cluster.VolumeServerType, @@ -120,7 +121,7 @@ func (c *commandClusterCheck) Do(args []string, commandEnv *CommandEnv, writer i continue } fmt.Fprintf(writer, "checking master %s to %s ... ", string(sourceMaster), string(targetMaster)) - err := pb.WithMasterClient(false, sourceMaster, commandEnv.option.GrpcDialOption, func(client master_pb.SeaweedClient) error { + err := pb.WithMasterClient(false, sourceMaster, commandEnv.option.GrpcDialOption, false, func(client master_pb.SeaweedClient) error { pong, err := client.Ping(context.Background(), &master_pb.PingRequest{ Target: string(targetMaster), TargetType: cluster.MasterType, diff --git a/weed/shell/command_collection_delete.go b/weed/shell/command_collection_delete.go index 78a5b7157..936f35b46 100644 --- a/weed/shell/command_collection_delete.go +++ b/weed/shell/command_collection_delete.go @@ -4,8 +4,9 @@ import ( "context" "flag" "fmt" - "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "io" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" ) func init() { diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 391dd9199..2583bda80 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -3,10 +3,11 @@ package wdclient import ( "context" "fmt" - "github.com/seaweedfs/seaweedfs/weed/stats" "math/rand" "time" + "github.com/seaweedfs/seaweedfs/weed/stats" + "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/grpc" @@ -52,7 +53,7 @@ func (mc *MasterClient) LookupFileIdWithFallback(fileId string) (fullUrls []stri if err == nil && len(fullUrls) > 0 { return } - err = pb.WithMasterClient(false, mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { + err = pb.WithMasterClient(false, mc.currentMaster, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { resp, err := client.LookupVolume(context.Background(), &master_pb.LookupVolumeRequest{ VolumeOrFileIds: []string{fileId}, }) @@ -114,7 +115,7 @@ func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddres if master == myMasterAddress { continue } - if grpcErr := pb.WithMasterClient(false, master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { + if grpcErr := pb.WithMasterClient(false, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { ctx, cancel := context.WithTimeout(context.Background(), 120*time.Millisecond) defer cancel() resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{}) @@ -150,7 +151,7 @@ func (mc *MasterClient) tryAllMasters() { func (mc *MasterClient) tryConnectToMaster(master pb.ServerAddress) (nextHintedLeader pb.ServerAddress) { glog.V(1).Infof("%s.%s masterClient Connecting to master %v", mc.FilerGroup, mc.clientType, master) stats.MasterClientConnectCounter.WithLabelValues("total").Inc() - gprcErr := pb.WithMasterClient(true, master, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { + gprcErr := pb.WithMasterClient(true, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -271,7 +272,7 @@ func (mc *MasterClient) WithClient(streamingMode bool, fn func(client master_pb. for mc.currentMaster == "" { time.Sleep(3 * time.Second) } - return pb.WithMasterClient(streamingMode, mc.currentMaster, mc.grpcDialOption, func(client master_pb.SeaweedClient) error { + return pb.WithMasterClient(streamingMode, mc.currentMaster, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { return fn(client) }) })