diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index d2c86598e..93dce59d8 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -4,6 +4,7 @@ import ( "fmt" "net" "strings" + "time" "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/glog" @@ -172,6 +173,7 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ } }() + ticker := time.NewTicker(5 * time.Second) for { select { case message := <-messageChan: @@ -179,6 +181,10 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ glog.V(0).Infof("=> client %v: %+v", clientName, message) return err } + case <-ticker.C: + if !ms.Topo.IsLeader() { + return raft.NotLeaderError + } case <-stopChan: return nil } diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 3a70fed5d..ae0819d2d 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -4,12 +4,18 @@ import ( "context" "fmt" + "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/topology" ) func (ms *MasterServer) LookupVolume(ctx context.Context, req *master_pb.LookupVolumeRequest) (*master_pb.LookupVolumeResponse, error) { + + if !ms.Topo.IsLeader() { + return nil, raft.NotLeaderError + } + resp := &master_pb.LookupVolumeResponse{} volumeLocations := ms.lookupVolumeId(req.VolumeIds, req.Collection) @@ -33,6 +39,10 @@ func (ms *MasterServer) LookupVolume(ctx context.Context, req *master_pb.LookupV func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest) (*master_pb.AssignResponse, error) { + if !ms.Topo.IsLeader() { + return nil, raft.NotLeaderError + } + if req.Count == 0 { req.Count = 1 } @@ -87,6 +97,10 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.StatisticsRequest) (*master_pb.StatisticsResponse, error) { + if !ms.Topo.IsLeader() { + return nil, raft.NotLeaderError + } + if req.Replication == "" { req.Replication = ms.defaultReplicaPlacement }