From f4cdfcc5fde50cf61d525f6b7fb0f951c5898e45 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 19 Dec 2025 00:15:39 -0800 Subject: [PATCH] Add cluster.raft.leader.transfer command for graceful leader change (#7819) * proto: add RaftLeadershipTransfer RPC for forced leader change Add new gRPC RPC and messages for leadership transfer: - RaftLeadershipTransferRequest: optional target_id and target_address - RaftLeadershipTransferResponse: previous_leader and new_leader This enables graceful leadership transfer before master maintenance, reducing errors in filers during planned maintenance windows. Ref: https://github.com/seaweedfs/seaweedfs/issues/7527 * proto: regenerate Go files for RaftLeadershipTransfer Generated from master.proto changes. * master: implement RaftLeadershipTransfer gRPC handler Add gRPC handler for leadership transfer with support for: - Transfer to any eligible follower (when target_id is empty) - Transfer to a specific server (when target_id and target_address are provided) Uses hashicorp/raft LeadershipTransfer() and LeadershipTransferToServer() APIs. Returns the previous and new leader in the response. * shell: add cluster.raft.leader.transfer command Add weed shell command for graceful leadership transfer: - Displays current cluster status before transfer - Supports auto-selection of target (any eligible follower) - Supports targeted transfer with -id and -address flags - Provides clear feedback on success/failure with troubleshooting tips Usage: cluster.raft.leader.transfer cluster.raft.leader.transfer -id -address * master: add unit tests for raft gRPC handlers Add tests covering: - RaftLeadershipTransfer with no raft initialized - RaftLeadershipTransfer with target_id but no address - RaftListClusterServers with no raft initialized - RaftAddServer with no raft initialized - RaftRemoveServer with no raft initialized These tests verify error handling when raft is not configured. * shell: add tests for cluster.raft.leader.transfer command Add tests covering: - Command name and help text validation - HasTag returns false for ResourceHeavy - Validation of -id without -address - Argument parsing with unknown flags * master: clarify that leadership transfer requires -raftHashicorp The default raft implementation (seaweedfs/raft, a goraft fork) does not support graceful leadership transfer. This feature is only available when using hashicorp raft (-raftHashicorp=true). Update error messages and help text to make this requirement clear: - gRPC handler returns specific error for goraft users - Shell command help text notes the requirement - Added test for goraft case * test: use strings.Contains instead of custom helper Replace custom contains/containsHelper functions with the standard library strings.Contains for better maintainability. * shell: return flag parsing errors instead of swallowing them - Return the error from flag.Parse() instead of returning nil - Update test to explicitly assert error for unknown flags * test: document integration test scenarios for Raft leadership transfer Add comments explaining: - Why these unit tests only cover 'Raft not initialized' scenarios - What integration tests should cover (with multi-master cluster) - hashicorp/raft uses concrete types that cannot be easily mocked * fix: address reviewer feedback on tests and leader routing - Remove misleading tests that couldn't properly validate their documented behavior without a real Raft cluster: - TestRaftLeadershipTransfer_GoraftNotSupported - TestRaftLeadershipTransfer_ValidationTargetIdWithoutAddress - Change WithClient(false) to WithClient(true) for RaftLeadershipTransfer RPC to ensure the request is routed to the current leader * Improve cluster.raft.transferLeader command - Rename command from cluster.raft.leader.transfer to cluster.raft.transferLeader - Add symmetric validation: -id and -address must be specified together - Handle case where same leader is re-elected after transfer - Add test for -address without -id validation - Add docker compose file for 5-master raft cluster testing --- weed/pb/master.proto | 11 + weed/pb/master_pb/master.pb.go | 250 +++++++++++++----- weed/pb/master_pb/master_grpc.pb.go | 38 +++ weed/server/master_grpc_server_raft.go | 47 ++++ weed/server/master_grpc_server_raft_test.go | 109 ++++++++ .../command_cluster_raft_leader_transfer.go | 144 ++++++++++ ...mmand_cluster_raft_leader_transfer_test.go | 89 +++++++ 7 files changed, 621 insertions(+), 67 deletions(-) create mode 100644 weed/server/master_grpc_server_raft_test.go create mode 100644 weed/shell/command_cluster_raft_leader_transfer.go create mode 100644 weed/shell/command_cluster_raft_leader_transfer_test.go diff --git a/weed/pb/master.proto b/weed/pb/master.proto index afbf31de9..bf93bd104 100644 --- a/weed/pb/master.proto +++ b/weed/pb/master.proto @@ -51,6 +51,8 @@ service Seaweed { } rpc RaftRemoveServer (RaftRemoveServerRequest) returns (RaftRemoveServerResponse) { } + rpc RaftLeadershipTransfer (RaftLeadershipTransferRequest) returns (RaftLeadershipTransferResponse) { + } rpc VolumeGrow (VolumeGrowRequest) returns (VolumeGrowResponse) { } } @@ -443,5 +445,14 @@ message RaftListClusterServersResponse { repeated ClusterServers cluster_servers = 1; } +message RaftLeadershipTransferRequest { + string target_id = 1; // Optional: target server ID. If empty, transfers to any eligible follower + string target_address = 2; // Optional: target server address. Required if target_id is specified +} +message RaftLeadershipTransferResponse { + string previous_leader = 1; + string new_leader = 2; +} + message VolumeGrowResponse { } \ No newline at end of file diff --git a/weed/pb/master_pb/master.pb.go b/weed/pb/master_pb/master.pb.go index 41d46fad1..8f0a54351 100644 --- a/weed/pb/master_pb/master.pb.go +++ b/weed/pb/master_pb/master.pb.go @@ -3690,6 +3690,110 @@ func (x *RaftListClusterServersResponse) GetClusterServers() []*RaftListClusterS return nil } +type RaftLeadershipTransferRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + TargetId string `protobuf:"bytes,1,opt,name=target_id,json=targetId,proto3" json:"target_id,omitempty"` // Optional: target server ID. If empty, transfers to any eligible follower + TargetAddress string `protobuf:"bytes,2,opt,name=target_address,json=targetAddress,proto3" json:"target_address,omitempty"` // Optional: target server address. Required if target_id is specified + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *RaftLeadershipTransferRequest) Reset() { + *x = RaftLeadershipTransferRequest{} + mi := &file_master_proto_msgTypes[58] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *RaftLeadershipTransferRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RaftLeadershipTransferRequest) ProtoMessage() {} + +func (x *RaftLeadershipTransferRequest) ProtoReflect() protoreflect.Message { + mi := &file_master_proto_msgTypes[58] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RaftLeadershipTransferRequest.ProtoReflect.Descriptor instead. +func (*RaftLeadershipTransferRequest) Descriptor() ([]byte, []int) { + return file_master_proto_rawDescGZIP(), []int{58} +} + +func (x *RaftLeadershipTransferRequest) GetTargetId() string { + if x != nil { + return x.TargetId + } + return "" +} + +func (x *RaftLeadershipTransferRequest) GetTargetAddress() string { + if x != nil { + return x.TargetAddress + } + return "" +} + +type RaftLeadershipTransferResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + PreviousLeader string `protobuf:"bytes,1,opt,name=previous_leader,json=previousLeader,proto3" json:"previous_leader,omitempty"` + NewLeader string `protobuf:"bytes,2,opt,name=new_leader,json=newLeader,proto3" json:"new_leader,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *RaftLeadershipTransferResponse) Reset() { + *x = RaftLeadershipTransferResponse{} + mi := &file_master_proto_msgTypes[59] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *RaftLeadershipTransferResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*RaftLeadershipTransferResponse) ProtoMessage() {} + +func (x *RaftLeadershipTransferResponse) ProtoReflect() protoreflect.Message { + mi := &file_master_proto_msgTypes[59] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use RaftLeadershipTransferResponse.ProtoReflect.Descriptor instead. +func (*RaftLeadershipTransferResponse) Descriptor() ([]byte, []int) { + return file_master_proto_rawDescGZIP(), []int{59} +} + +func (x *RaftLeadershipTransferResponse) GetPreviousLeader() string { + if x != nil { + return x.PreviousLeader + } + return "" +} + +func (x *RaftLeadershipTransferResponse) GetNewLeader() string { + if x != nil { + return x.NewLeader + } + return "" +} + type VolumeGrowResponse struct { state protoimpl.MessageState `protogen:"open.v1"` unknownFields protoimpl.UnknownFields @@ -3698,7 +3802,7 @@ type VolumeGrowResponse struct { func (x *VolumeGrowResponse) Reset() { *x = VolumeGrowResponse{} - mi := &file_master_proto_msgTypes[58] + mi := &file_master_proto_msgTypes[60] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3710,7 +3814,7 @@ func (x *VolumeGrowResponse) String() string { func (*VolumeGrowResponse) ProtoMessage() {} func (x *VolumeGrowResponse) ProtoReflect() protoreflect.Message { - mi := &file_master_proto_msgTypes[58] + mi := &file_master_proto_msgTypes[60] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3723,7 +3827,7 @@ func (x *VolumeGrowResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use VolumeGrowResponse.ProtoReflect.Descriptor instead. func (*VolumeGrowResponse) Descriptor() ([]byte, []int) { - return file_master_proto_rawDescGZIP(), []int{58} + return file_master_proto_rawDescGZIP(), []int{60} } type SuperBlockExtra_ErasureCoding struct { @@ -3737,7 +3841,7 @@ type SuperBlockExtra_ErasureCoding struct { func (x *SuperBlockExtra_ErasureCoding) Reset() { *x = SuperBlockExtra_ErasureCoding{} - mi := &file_master_proto_msgTypes[61] + mi := &file_master_proto_msgTypes[63] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3749,7 +3853,7 @@ func (x *SuperBlockExtra_ErasureCoding) String() string { func (*SuperBlockExtra_ErasureCoding) ProtoMessage() {} func (x *SuperBlockExtra_ErasureCoding) ProtoReflect() protoreflect.Message { - mi := &file_master_proto_msgTypes[61] + mi := &file_master_proto_msgTypes[63] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3798,7 +3902,7 @@ type LookupVolumeResponse_VolumeIdLocation struct { func (x *LookupVolumeResponse_VolumeIdLocation) Reset() { *x = LookupVolumeResponse_VolumeIdLocation{} - mi := &file_master_proto_msgTypes[62] + mi := &file_master_proto_msgTypes[64] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3810,7 +3914,7 @@ func (x *LookupVolumeResponse_VolumeIdLocation) String() string { func (*LookupVolumeResponse_VolumeIdLocation) ProtoMessage() {} func (x *LookupVolumeResponse_VolumeIdLocation) ProtoReflect() protoreflect.Message { - mi := &file_master_proto_msgTypes[62] + mi := &file_master_proto_msgTypes[64] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3864,7 +3968,7 @@ type LookupEcVolumeResponse_EcShardIdLocation struct { func (x *LookupEcVolumeResponse_EcShardIdLocation) Reset() { *x = LookupEcVolumeResponse_EcShardIdLocation{} - mi := &file_master_proto_msgTypes[67] + mi := &file_master_proto_msgTypes[69] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3876,7 +3980,7 @@ func (x *LookupEcVolumeResponse_EcShardIdLocation) String() string { func (*LookupEcVolumeResponse_EcShardIdLocation) ProtoMessage() {} func (x *LookupEcVolumeResponse_EcShardIdLocation) ProtoReflect() protoreflect.Message { - mi := &file_master_proto_msgTypes[67] + mi := &file_master_proto_msgTypes[69] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3919,7 +4023,7 @@ type ListClusterNodesResponse_ClusterNode struct { func (x *ListClusterNodesResponse_ClusterNode) Reset() { *x = ListClusterNodesResponse_ClusterNode{} - mi := &file_master_proto_msgTypes[68] + mi := &file_master_proto_msgTypes[70] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -3931,7 +4035,7 @@ func (x *ListClusterNodesResponse_ClusterNode) String() string { func (*ListClusterNodesResponse_ClusterNode) ProtoMessage() {} func (x *ListClusterNodesResponse_ClusterNode) ProtoReflect() protoreflect.Message { - mi := &file_master_proto_msgTypes[68] + mi := &file_master_proto_msgTypes[70] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -3994,7 +4098,7 @@ type RaftListClusterServersResponse_ClusterServers struct { func (x *RaftListClusterServersResponse_ClusterServers) Reset() { *x = RaftListClusterServersResponse_ClusterServers{} - mi := &file_master_proto_msgTypes[69] + mi := &file_master_proto_msgTypes[71] ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) ms.StoreMessageInfo(mi) } @@ -4006,7 +4110,7 @@ func (x *RaftListClusterServersResponse_ClusterServers) String() string { func (*RaftListClusterServersResponse_ClusterServers) ProtoMessage() {} func (x *RaftListClusterServersResponse_ClusterServers) ProtoReflect() protoreflect.Message { - mi := &file_master_proto_msgTypes[69] + mi := &file_master_proto_msgTypes[71] if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { @@ -4405,8 +4509,15 @@ const file_master_proto_rawDesc = "" + "\x02id\x18\x01 \x01(\tR\x02id\x12\x18\n" + "\aaddress\x18\x02 \x01(\tR\aaddress\x12\x1a\n" + "\bsuffrage\x18\x03 \x01(\tR\bsuffrage\x12\x1a\n" + - "\bisLeader\x18\x04 \x01(\bR\bisLeader\"\x14\n" + - "\x12VolumeGrowResponse2\xd5\x0f\n" + + "\bisLeader\x18\x04 \x01(\bR\bisLeader\"c\n" + + "\x1dRaftLeadershipTransferRequest\x12\x1b\n" + + "\ttarget_id\x18\x01 \x01(\tR\btargetId\x12%\n" + + "\x0etarget_address\x18\x02 \x01(\tR\rtargetAddress\"h\n" + + "\x1eRaftLeadershipTransferResponse\x12'\n" + + "\x0fprevious_leader\x18\x01 \x01(\tR\x0epreviousLeader\x12\x1d\n" + + "\n" + + "new_leader\x18\x02 \x01(\tR\tnewLeader\"\x14\n" + + "\x12VolumeGrowResponse2\xc6\x10\n" + "\aSeaweed\x12I\n" + "\rSendHeartbeat\x12\x14.master_pb.Heartbeat\x1a\x1c.master_pb.HeartbeatResponse\"\x00(\x010\x01\x12X\n" + "\rKeepConnected\x12\x1f.master_pb.KeepConnectedRequest\x1a .master_pb.KeepConnectedResponse\"\x00(\x010\x01\x12Q\n" + @@ -4431,7 +4542,8 @@ const file_master_proto_rawDesc = "" + "\x04Ping\x12\x16.master_pb.PingRequest\x1a\x17.master_pb.PingResponse\"\x00\x12o\n" + "\x16RaftListClusterServers\x12(.master_pb.RaftListClusterServersRequest\x1a).master_pb.RaftListClusterServersResponse\"\x00\x12T\n" + "\rRaftAddServer\x12\x1f.master_pb.RaftAddServerRequest\x1a .master_pb.RaftAddServerResponse\"\x00\x12]\n" + - "\x10RaftRemoveServer\x12\".master_pb.RaftRemoveServerRequest\x1a#.master_pb.RaftRemoveServerResponse\"\x00\x12K\n" + + "\x10RaftRemoveServer\x12\".master_pb.RaftRemoveServerRequest\x1a#.master_pb.RaftRemoveServerResponse\"\x00\x12o\n" + + "\x16RaftLeadershipTransfer\x12(.master_pb.RaftLeadershipTransferRequest\x1a).master_pb.RaftLeadershipTransferResponse\"\x00\x12K\n" + "\n" + "VolumeGrow\x12\x1c.master_pb.VolumeGrowRequest\x1a\x1d.master_pb.VolumeGrowResponse\"\x00B2Z0github.com/seaweedfs/seaweedfs/weed/pb/master_pbb\x06proto3" @@ -4447,7 +4559,7 @@ func file_master_proto_rawDescGZIP() []byte { return file_master_proto_rawDescData } -var file_master_proto_msgTypes = make([]protoimpl.MessageInfo, 70) +var file_master_proto_msgTypes = make([]protoimpl.MessageInfo, 72) var file_master_proto_goTypes = []any{ (*Heartbeat)(nil), // 0: master_pb.Heartbeat (*HeartbeatResponse)(nil), // 1: master_pb.HeartbeatResponse @@ -4507,18 +4619,20 @@ var file_master_proto_goTypes = []any{ (*RaftRemoveServerResponse)(nil), // 55: master_pb.RaftRemoveServerResponse (*RaftListClusterServersRequest)(nil), // 56: master_pb.RaftListClusterServersRequest (*RaftListClusterServersResponse)(nil), // 57: master_pb.RaftListClusterServersResponse - (*VolumeGrowResponse)(nil), // 58: master_pb.VolumeGrowResponse - nil, // 59: master_pb.Heartbeat.MaxVolumeCountsEntry - nil, // 60: master_pb.StorageBackend.PropertiesEntry - (*SuperBlockExtra_ErasureCoding)(nil), // 61: master_pb.SuperBlockExtra.ErasureCoding - (*LookupVolumeResponse_VolumeIdLocation)(nil), // 62: master_pb.LookupVolumeResponse.VolumeIdLocation - nil, // 63: master_pb.DataNodeInfo.DiskInfosEntry - nil, // 64: master_pb.RackInfo.DiskInfosEntry - nil, // 65: master_pb.DataCenterInfo.DiskInfosEntry - nil, // 66: master_pb.TopologyInfo.DiskInfosEntry - (*LookupEcVolumeResponse_EcShardIdLocation)(nil), // 67: master_pb.LookupEcVolumeResponse.EcShardIdLocation - (*ListClusterNodesResponse_ClusterNode)(nil), // 68: master_pb.ListClusterNodesResponse.ClusterNode - (*RaftListClusterServersResponse_ClusterServers)(nil), // 69: master_pb.RaftListClusterServersResponse.ClusterServers + (*RaftLeadershipTransferRequest)(nil), // 58: master_pb.RaftLeadershipTransferRequest + (*RaftLeadershipTransferResponse)(nil), // 59: master_pb.RaftLeadershipTransferResponse + (*VolumeGrowResponse)(nil), // 60: master_pb.VolumeGrowResponse + nil, // 61: master_pb.Heartbeat.MaxVolumeCountsEntry + nil, // 62: master_pb.StorageBackend.PropertiesEntry + (*SuperBlockExtra_ErasureCoding)(nil), // 63: master_pb.SuperBlockExtra.ErasureCoding + (*LookupVolumeResponse_VolumeIdLocation)(nil), // 64: master_pb.LookupVolumeResponse.VolumeIdLocation + nil, // 65: master_pb.DataNodeInfo.DiskInfosEntry + nil, // 66: master_pb.RackInfo.DiskInfosEntry + nil, // 67: master_pb.DataCenterInfo.DiskInfosEntry + nil, // 68: master_pb.TopologyInfo.DiskInfosEntry + (*LookupEcVolumeResponse_EcShardIdLocation)(nil), // 69: master_pb.LookupEcVolumeResponse.EcShardIdLocation + (*ListClusterNodesResponse_ClusterNode)(nil), // 70: master_pb.ListClusterNodesResponse.ClusterNode + (*RaftListClusterServersResponse_ClusterServers)(nil), // 71: master_pb.RaftListClusterServersResponse.ClusterServers } var file_master_proto_depIdxs = []int32{ 2, // 0: master_pb.Heartbeat.volumes:type_name -> master_pb.VolumeInformationMessage @@ -4527,30 +4641,30 @@ var file_master_proto_depIdxs = []int32{ 4, // 3: master_pb.Heartbeat.ec_shards:type_name -> master_pb.VolumeEcShardInformationMessage 4, // 4: master_pb.Heartbeat.new_ec_shards:type_name -> master_pb.VolumeEcShardInformationMessage 4, // 5: master_pb.Heartbeat.deleted_ec_shards:type_name -> master_pb.VolumeEcShardInformationMessage - 59, // 6: master_pb.Heartbeat.max_volume_counts:type_name -> master_pb.Heartbeat.MaxVolumeCountsEntry + 61, // 6: master_pb.Heartbeat.max_volume_counts:type_name -> master_pb.Heartbeat.MaxVolumeCountsEntry 5, // 7: master_pb.HeartbeatResponse.storage_backends:type_name -> master_pb.StorageBackend - 60, // 8: master_pb.StorageBackend.properties:type_name -> master_pb.StorageBackend.PropertiesEntry - 61, // 9: master_pb.SuperBlockExtra.erasure_coding:type_name -> master_pb.SuperBlockExtra.ErasureCoding + 62, // 8: master_pb.StorageBackend.properties:type_name -> master_pb.StorageBackend.PropertiesEntry + 63, // 9: master_pb.SuperBlockExtra.erasure_coding:type_name -> master_pb.SuperBlockExtra.ErasureCoding 9, // 10: master_pb.KeepConnectedResponse.volume_location:type_name -> master_pb.VolumeLocation 10, // 11: master_pb.KeepConnectedResponse.cluster_node_update:type_name -> master_pb.ClusterNodeUpdate - 62, // 12: master_pb.LookupVolumeResponse.volume_id_locations:type_name -> master_pb.LookupVolumeResponse.VolumeIdLocation + 64, // 12: master_pb.LookupVolumeResponse.volume_id_locations:type_name -> master_pb.LookupVolumeResponse.VolumeIdLocation 14, // 13: master_pb.AssignResponse.replicas:type_name -> master_pb.Location 14, // 14: master_pb.AssignResponse.location:type_name -> master_pb.Location 20, // 15: master_pb.CollectionListResponse.collections:type_name -> master_pb.Collection 2, // 16: master_pb.DiskInfo.volume_infos:type_name -> master_pb.VolumeInformationMessage 4, // 17: master_pb.DiskInfo.ec_shard_infos:type_name -> master_pb.VolumeEcShardInformationMessage - 63, // 18: master_pb.DataNodeInfo.diskInfos:type_name -> master_pb.DataNodeInfo.DiskInfosEntry + 65, // 18: master_pb.DataNodeInfo.diskInfos:type_name -> master_pb.DataNodeInfo.DiskInfosEntry 26, // 19: master_pb.RackInfo.data_node_infos:type_name -> master_pb.DataNodeInfo - 64, // 20: master_pb.RackInfo.diskInfos:type_name -> master_pb.RackInfo.DiskInfosEntry + 66, // 20: master_pb.RackInfo.diskInfos:type_name -> master_pb.RackInfo.DiskInfosEntry 27, // 21: master_pb.DataCenterInfo.rack_infos:type_name -> master_pb.RackInfo - 65, // 22: master_pb.DataCenterInfo.diskInfos:type_name -> master_pb.DataCenterInfo.DiskInfosEntry + 67, // 22: master_pb.DataCenterInfo.diskInfos:type_name -> master_pb.DataCenterInfo.DiskInfosEntry 28, // 23: master_pb.TopologyInfo.data_center_infos:type_name -> master_pb.DataCenterInfo - 66, // 24: master_pb.TopologyInfo.diskInfos:type_name -> master_pb.TopologyInfo.DiskInfosEntry + 68, // 24: master_pb.TopologyInfo.diskInfos:type_name -> master_pb.TopologyInfo.DiskInfosEntry 29, // 25: master_pb.VolumeListResponse.topology_info:type_name -> master_pb.TopologyInfo - 67, // 26: master_pb.LookupEcVolumeResponse.shard_id_locations:type_name -> master_pb.LookupEcVolumeResponse.EcShardIdLocation + 69, // 26: master_pb.LookupEcVolumeResponse.shard_id_locations:type_name -> master_pb.LookupEcVolumeResponse.EcShardIdLocation 5, // 27: master_pb.GetMasterConfigurationResponse.storage_backends:type_name -> master_pb.StorageBackend - 68, // 28: master_pb.ListClusterNodesResponse.cluster_nodes:type_name -> master_pb.ListClusterNodesResponse.ClusterNode - 69, // 29: master_pb.RaftListClusterServersResponse.cluster_servers:type_name -> master_pb.RaftListClusterServersResponse.ClusterServers + 70, // 28: master_pb.ListClusterNodesResponse.cluster_nodes:type_name -> master_pb.ListClusterNodesResponse.ClusterNode + 71, // 29: master_pb.RaftListClusterServersResponse.cluster_servers:type_name -> master_pb.RaftListClusterServersResponse.ClusterServers 14, // 30: master_pb.LookupVolumeResponse.VolumeIdLocation.locations:type_name -> master_pb.Location 25, // 31: master_pb.DataNodeInfo.DiskInfosEntry.value:type_name -> master_pb.DiskInfo 25, // 32: master_pb.RackInfo.DiskInfosEntry.value:type_name -> master_pb.DiskInfo @@ -4579,32 +4693,34 @@ var file_master_proto_depIdxs = []int32{ 56, // 55: master_pb.Seaweed.RaftListClusterServers:input_type -> master_pb.RaftListClusterServersRequest 52, // 56: master_pb.Seaweed.RaftAddServer:input_type -> master_pb.RaftAddServerRequest 54, // 57: master_pb.Seaweed.RaftRemoveServer:input_type -> master_pb.RaftRemoveServerRequest - 16, // 58: master_pb.Seaweed.VolumeGrow:input_type -> master_pb.VolumeGrowRequest - 1, // 59: master_pb.Seaweed.SendHeartbeat:output_type -> master_pb.HeartbeatResponse - 11, // 60: master_pb.Seaweed.KeepConnected:output_type -> master_pb.KeepConnectedResponse - 13, // 61: master_pb.Seaweed.LookupVolume:output_type -> master_pb.LookupVolumeResponse - 17, // 62: master_pb.Seaweed.Assign:output_type -> master_pb.AssignResponse - 17, // 63: master_pb.Seaweed.StreamAssign:output_type -> master_pb.AssignResponse - 19, // 64: master_pb.Seaweed.Statistics:output_type -> master_pb.StatisticsResponse - 22, // 65: master_pb.Seaweed.CollectionList:output_type -> master_pb.CollectionListResponse - 24, // 66: master_pb.Seaweed.CollectionDelete:output_type -> master_pb.CollectionDeleteResponse - 31, // 67: master_pb.Seaweed.VolumeList:output_type -> master_pb.VolumeListResponse - 33, // 68: master_pb.Seaweed.LookupEcVolume:output_type -> master_pb.LookupEcVolumeResponse - 35, // 69: master_pb.Seaweed.VacuumVolume:output_type -> master_pb.VacuumVolumeResponse - 37, // 70: master_pb.Seaweed.DisableVacuum:output_type -> master_pb.DisableVacuumResponse - 39, // 71: master_pb.Seaweed.EnableVacuum:output_type -> master_pb.EnableVacuumResponse - 41, // 72: master_pb.Seaweed.VolumeMarkReadonly:output_type -> master_pb.VolumeMarkReadonlyResponse - 43, // 73: master_pb.Seaweed.GetMasterConfiguration:output_type -> master_pb.GetMasterConfigurationResponse - 45, // 74: master_pb.Seaweed.ListClusterNodes:output_type -> master_pb.ListClusterNodesResponse - 47, // 75: master_pb.Seaweed.LeaseAdminToken:output_type -> master_pb.LeaseAdminTokenResponse - 49, // 76: master_pb.Seaweed.ReleaseAdminToken:output_type -> master_pb.ReleaseAdminTokenResponse - 51, // 77: master_pb.Seaweed.Ping:output_type -> master_pb.PingResponse - 57, // 78: master_pb.Seaweed.RaftListClusterServers:output_type -> master_pb.RaftListClusterServersResponse - 53, // 79: master_pb.Seaweed.RaftAddServer:output_type -> master_pb.RaftAddServerResponse - 55, // 80: master_pb.Seaweed.RaftRemoveServer:output_type -> master_pb.RaftRemoveServerResponse - 58, // 81: master_pb.Seaweed.VolumeGrow:output_type -> master_pb.VolumeGrowResponse - 59, // [59:82] is the sub-list for method output_type - 36, // [36:59] is the sub-list for method input_type + 58, // 58: master_pb.Seaweed.RaftLeadershipTransfer:input_type -> master_pb.RaftLeadershipTransferRequest + 16, // 59: master_pb.Seaweed.VolumeGrow:input_type -> master_pb.VolumeGrowRequest + 1, // 60: master_pb.Seaweed.SendHeartbeat:output_type -> master_pb.HeartbeatResponse + 11, // 61: master_pb.Seaweed.KeepConnected:output_type -> master_pb.KeepConnectedResponse + 13, // 62: master_pb.Seaweed.LookupVolume:output_type -> master_pb.LookupVolumeResponse + 17, // 63: master_pb.Seaweed.Assign:output_type -> master_pb.AssignResponse + 17, // 64: master_pb.Seaweed.StreamAssign:output_type -> master_pb.AssignResponse + 19, // 65: master_pb.Seaweed.Statistics:output_type -> master_pb.StatisticsResponse + 22, // 66: master_pb.Seaweed.CollectionList:output_type -> master_pb.CollectionListResponse + 24, // 67: master_pb.Seaweed.CollectionDelete:output_type -> master_pb.CollectionDeleteResponse + 31, // 68: master_pb.Seaweed.VolumeList:output_type -> master_pb.VolumeListResponse + 33, // 69: master_pb.Seaweed.LookupEcVolume:output_type -> master_pb.LookupEcVolumeResponse + 35, // 70: master_pb.Seaweed.VacuumVolume:output_type -> master_pb.VacuumVolumeResponse + 37, // 71: master_pb.Seaweed.DisableVacuum:output_type -> master_pb.DisableVacuumResponse + 39, // 72: master_pb.Seaweed.EnableVacuum:output_type -> master_pb.EnableVacuumResponse + 41, // 73: master_pb.Seaweed.VolumeMarkReadonly:output_type -> master_pb.VolumeMarkReadonlyResponse + 43, // 74: master_pb.Seaweed.GetMasterConfiguration:output_type -> master_pb.GetMasterConfigurationResponse + 45, // 75: master_pb.Seaweed.ListClusterNodes:output_type -> master_pb.ListClusterNodesResponse + 47, // 76: master_pb.Seaweed.LeaseAdminToken:output_type -> master_pb.LeaseAdminTokenResponse + 49, // 77: master_pb.Seaweed.ReleaseAdminToken:output_type -> master_pb.ReleaseAdminTokenResponse + 51, // 78: master_pb.Seaweed.Ping:output_type -> master_pb.PingResponse + 57, // 79: master_pb.Seaweed.RaftListClusterServers:output_type -> master_pb.RaftListClusterServersResponse + 53, // 80: master_pb.Seaweed.RaftAddServer:output_type -> master_pb.RaftAddServerResponse + 55, // 81: master_pb.Seaweed.RaftRemoveServer:output_type -> master_pb.RaftRemoveServerResponse + 59, // 82: master_pb.Seaweed.RaftLeadershipTransfer:output_type -> master_pb.RaftLeadershipTransferResponse + 60, // 83: master_pb.Seaweed.VolumeGrow:output_type -> master_pb.VolumeGrowResponse + 60, // [60:84] is the sub-list for method output_type + 36, // [36:60] is the sub-list for method input_type 36, // [36:36] is the sub-list for extension type_name 36, // [36:36] is the sub-list for extension extendee 0, // [0:36] is the sub-list for field type_name @@ -4621,7 +4737,7 @@ func file_master_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_master_proto_rawDesc), len(file_master_proto_rawDesc)), NumEnums: 0, - NumMessages: 70, + NumMessages: 72, NumExtensions: 0, NumServices: 1, }, diff --git a/weed/pb/master_pb/master_grpc.pb.go b/weed/pb/master_pb/master_grpc.pb.go index 3062c5a5a..ad8ac920d 100644 --- a/weed/pb/master_pb/master_grpc.pb.go +++ b/weed/pb/master_pb/master_grpc.pb.go @@ -41,6 +41,7 @@ const ( Seaweed_RaftListClusterServers_FullMethodName = "/master_pb.Seaweed/RaftListClusterServers" Seaweed_RaftAddServer_FullMethodName = "/master_pb.Seaweed/RaftAddServer" Seaweed_RaftRemoveServer_FullMethodName = "/master_pb.Seaweed/RaftRemoveServer" + Seaweed_RaftLeadershipTransfer_FullMethodName = "/master_pb.Seaweed/RaftLeadershipTransfer" Seaweed_VolumeGrow_FullMethodName = "/master_pb.Seaweed/VolumeGrow" ) @@ -70,6 +71,7 @@ type SeaweedClient interface { RaftListClusterServers(ctx context.Context, in *RaftListClusterServersRequest, opts ...grpc.CallOption) (*RaftListClusterServersResponse, error) RaftAddServer(ctx context.Context, in *RaftAddServerRequest, opts ...grpc.CallOption) (*RaftAddServerResponse, error) RaftRemoveServer(ctx context.Context, in *RaftRemoveServerRequest, opts ...grpc.CallOption) (*RaftRemoveServerResponse, error) + RaftLeadershipTransfer(ctx context.Context, in *RaftLeadershipTransferRequest, opts ...grpc.CallOption) (*RaftLeadershipTransferResponse, error) VolumeGrow(ctx context.Context, in *VolumeGrowRequest, opts ...grpc.CallOption) (*VolumeGrowResponse, error) } @@ -310,6 +312,16 @@ func (c *seaweedClient) RaftRemoveServer(ctx context.Context, in *RaftRemoveServ return out, nil } +func (c *seaweedClient) RaftLeadershipTransfer(ctx context.Context, in *RaftLeadershipTransferRequest, opts ...grpc.CallOption) (*RaftLeadershipTransferResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(RaftLeadershipTransferResponse) + err := c.cc.Invoke(ctx, Seaweed_RaftLeadershipTransfer_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *seaweedClient) VolumeGrow(ctx context.Context, in *VolumeGrowRequest, opts ...grpc.CallOption) (*VolumeGrowResponse, error) { cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) out := new(VolumeGrowResponse) @@ -346,6 +358,7 @@ type SeaweedServer interface { RaftListClusterServers(context.Context, *RaftListClusterServersRequest) (*RaftListClusterServersResponse, error) RaftAddServer(context.Context, *RaftAddServerRequest) (*RaftAddServerResponse, error) RaftRemoveServer(context.Context, *RaftRemoveServerRequest) (*RaftRemoveServerResponse, error) + RaftLeadershipTransfer(context.Context, *RaftLeadershipTransferRequest) (*RaftLeadershipTransferResponse, error) VolumeGrow(context.Context, *VolumeGrowRequest) (*VolumeGrowResponse, error) mustEmbedUnimplementedSeaweedServer() } @@ -423,6 +436,9 @@ func (UnimplementedSeaweedServer) RaftAddServer(context.Context, *RaftAddServerR func (UnimplementedSeaweedServer) RaftRemoveServer(context.Context, *RaftRemoveServerRequest) (*RaftRemoveServerResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method RaftRemoveServer not implemented") } +func (UnimplementedSeaweedServer) RaftLeadershipTransfer(context.Context, *RaftLeadershipTransferRequest) (*RaftLeadershipTransferResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method RaftLeadershipTransfer not implemented") +} func (UnimplementedSeaweedServer) VolumeGrow(context.Context, *VolumeGrowRequest) (*VolumeGrowResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method VolumeGrow not implemented") } @@ -810,6 +826,24 @@ func _Seaweed_RaftRemoveServer_Handler(srv interface{}, ctx context.Context, dec return interceptor(ctx, in, info, handler) } +func _Seaweed_RaftLeadershipTransfer_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(RaftLeadershipTransferRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(SeaweedServer).RaftLeadershipTransfer(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: Seaweed_RaftLeadershipTransfer_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(SeaweedServer).RaftLeadershipTransfer(ctx, req.(*RaftLeadershipTransferRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _Seaweed_VolumeGrow_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(VolumeGrowRequest) if err := dec(in); err != nil { @@ -911,6 +945,10 @@ var Seaweed_ServiceDesc = grpc.ServiceDesc{ MethodName: "RaftRemoveServer", Handler: _Seaweed_RaftRemoveServer_Handler, }, + { + MethodName: "RaftLeadershipTransfer", + Handler: _Seaweed_RaftLeadershipTransfer_Handler, + }, { MethodName: "VolumeGrow", Handler: _Seaweed_VolumeGrow_Handler, diff --git a/weed/server/master_grpc_server_raft.go b/weed/server/master_grpc_server_raft.go index e6454704e..4aab7fe6e 100644 --- a/weed/server/master_grpc_server_raft.go +++ b/weed/server/master_grpc_server_raft.go @@ -90,3 +90,50 @@ func (ms *MasterServer) RaftRemoveServer(ctx context.Context, req *master_pb.Raf } return resp, nil } + +func (ms *MasterServer) RaftLeadershipTransfer(ctx context.Context, req *master_pb.RaftLeadershipTransferRequest) (*master_pb.RaftLeadershipTransferResponse, error) { + resp := &master_pb.RaftLeadershipTransferResponse{} + + ms.Topo.RaftServerAccessLock.RLock() + defer ms.Topo.RaftServerAccessLock.RUnlock() + + // Leadership transfer is only supported with hashicorp raft (-raftHashicorp=true) + // The default seaweedfs/raft (goraft) implementation does not support this feature + if ms.Topo.HashicorpRaft == nil { + if ms.Topo.RaftServer != nil { + return nil, fmt.Errorf("leadership transfer requires -raftHashicorp=true; the default raft implementation does not support this feature") + } + return nil, fmt.Errorf("raft not initialized (single master mode)") + } + + if ms.Topo.HashicorpRaft.State() != raft.Leader { + leaderAddr, _ := ms.Topo.HashicorpRaft.LeaderWithID() + return nil, fmt.Errorf("this server is not the leader; current leader is %s", leaderAddr) + } + + // Record previous leader + _, previousLeaderId := ms.Topo.HashicorpRaft.LeaderWithID() + resp.PreviousLeader = string(previousLeaderId) + + var future raft.Future + if req.TargetId != "" && req.TargetAddress != "" { + // Transfer to specific server + future = ms.Topo.HashicorpRaft.LeadershipTransferToServer( + raft.ServerID(req.TargetId), + raft.ServerAddress(req.TargetAddress), + ) + } else { + // Transfer to any eligible follower + future = ms.Topo.HashicorpRaft.LeadershipTransfer() + } + + if err := future.Error(); err != nil { + return nil, fmt.Errorf("leadership transfer failed: %v", err) + } + + // Get new leader info + _, newLeaderId := ms.Topo.HashicorpRaft.LeaderWithID() + resp.NewLeader = string(newLeaderId) + + return resp, nil +} diff --git a/weed/server/master_grpc_server_raft_test.go b/weed/server/master_grpc_server_raft_test.go new file mode 100644 index 000000000..954929aa0 --- /dev/null +++ b/weed/server/master_grpc_server_raft_test.go @@ -0,0 +1,109 @@ +package weed_server + +// These tests cover the Raft gRPC handlers in scenarios where Raft is not initialized +// (single master mode). Testing with an initialized Raft cluster requires integration +// tests with a multi-master setup, as hashicorp/raft uses concrete types that cannot +// be easily mocked. +// +// Integration tests for RaftLeadershipTransfer should cover: +// - Successful leadership transfer to any follower (auto-selection) +// - Successful leadership transfer to a specific target server +// - Error when caller is not the current leader +// - Error when target server is not a voting member +// - Error when target server is unreachable +// +// These scenarios are best tested with test/multi_master/ integration tests +// using a real 3-node master cluster with -raftHashicorp=true. + +import ( + "context" + "strings" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/topology" +) + +func TestRaftLeadershipTransfer_NoRaft(t *testing.T) { + // Test case: raft not initialized (single master mode) + ms := &MasterServer{ + Topo: topology.NewTopology("test", nil, 0, 0, false), + } + + ctx := context.Background() + req := &master_pb.RaftLeadershipTransferRequest{} + + _, err := ms.RaftLeadershipTransfer(ctx, req) + if err == nil { + t.Error("expected error when raft is not initialized") + } + + expectedMsg := "single master mode" + if err != nil && !strings.Contains(err.Error(), expectedMsg) { + t.Errorf("expected error message to contain %q, got %q", expectedMsg, err.Error()) + } +} + +func TestRaftListClusterServers_NoRaft(t *testing.T) { + // Test case: raft not initialized returns empty response + ms := &MasterServer{ + Topo: topology.NewTopology("test", nil, 0, 0, false), + } + + ctx := context.Background() + req := &master_pb.RaftListClusterServersRequest{} + + resp, err := ms.RaftListClusterServers(ctx, req) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if resp == nil { + t.Error("expected non-nil response") + } + if len(resp.ClusterServers) != 0 { + t.Errorf("expected empty cluster servers, got %d", len(resp.ClusterServers)) + } +} + +func TestRaftAddServer_NoRaft(t *testing.T) { + // Test case: raft not initialized returns empty response + ms := &MasterServer{ + Topo: topology.NewTopology("test", nil, 0, 0, false), + } + + ctx := context.Background() + req := &master_pb.RaftAddServerRequest{ + Id: "test-server", + Address: "localhost:19333", + Voter: true, + } + + resp, err := ms.RaftAddServer(ctx, req) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if resp == nil { + t.Error("expected non-nil response") + } +} + +func TestRaftRemoveServer_NoRaft(t *testing.T) { + // Test case: raft not initialized returns empty response + ms := &MasterServer{ + Topo: topology.NewTopology("test", nil, 0, 0, false), + } + + ctx := context.Background() + req := &master_pb.RaftRemoveServerRequest{ + Id: "test-server", + Force: true, + } + + resp, err := ms.RaftRemoveServer(ctx, req) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if resp == nil { + t.Error("expected non-nil response") + } +} diff --git a/weed/shell/command_cluster_raft_leader_transfer.go b/weed/shell/command_cluster_raft_leader_transfer.go new file mode 100644 index 000000000..a8bef4a2e --- /dev/null +++ b/weed/shell/command_cluster_raft_leader_transfer.go @@ -0,0 +1,144 @@ +package shell + +import ( + "context" + "flag" + "fmt" + "io" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" +) + +func init() { + Commands = append(Commands, &commandRaftLeaderTransfer{}) +} + +type commandRaftLeaderTransfer struct{} + +func (c *commandRaftLeaderTransfer) Name() string { + return "cluster.raft.transferLeader" +} + +func (c *commandRaftLeaderTransfer) Help() string { + return `transfer raft leadership to another master server + + This command initiates a graceful leadership transfer from the current + leader to another server. Use this before performing maintenance on + the current leader to reduce errors in filers and other components. + + Examples: + # Transfer to any eligible follower (auto-selection) + cluster.raft.transferLeader + + # Transfer to a specific server + cluster.raft.transferLeader -id -address + + Notes: + - Requires hashicorp raft (-raftHashicorp=true on master) + - This command must be sent to the current leader + - The target server must be a voting member of the raft cluster + - Use 'cluster.raft.ps' to list available servers and identify the leader +` +} + +func (c *commandRaftLeaderTransfer) HasTag(CommandTag) bool { + return false +} + +func (c *commandRaftLeaderTransfer) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error { + leaderTransferCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError) + targetId := leaderTransferCommand.String("id", "", "target server id (must be used with -address)") + targetAddress := leaderTransferCommand.String("address", "", "target server grpc address (must be used with -id)") + + if err := leaderTransferCommand.Parse(args); err != nil { + return err + } + + // Validate: id and address must be specified together + if *targetId != "" && *targetAddress == "" { + return fmt.Errorf("-address is required when -id is specified") + } + if *targetAddress != "" && *targetId == "" { + return fmt.Errorf("-id is required when -address is specified") + } + + // First, show current cluster status + fmt.Fprintf(writer, "Checking current raft cluster status...\n") + + var currentLeader string + err := commandEnv.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + resp, err := client.RaftListClusterServers(ctx, &master_pb.RaftListClusterServersRequest{}) + if err != nil { + return fmt.Errorf("failed to list cluster servers: %v", err) + } + + if len(resp.ClusterServers) == 0 { + fmt.Fprintf(writer, "No raft cluster configured (single master mode)\n") + return fmt.Errorf("leadership transfer not available in single master mode") + } + + fmt.Fprintf(writer, "Raft cluster has %d servers:\n", len(resp.ClusterServers)) + for _, server := range resp.ClusterServers { + suffix := "" + if server.IsLeader { + suffix = " <- current leader" + currentLeader = server.Id + } + fmt.Fprintf(writer, " %s %s [%s]%s\n", server.Id, server.Address, server.Suffrage, suffix) + } + return nil + }) + if err != nil { + return err + } + + if currentLeader == "" { + return fmt.Errorf("no leader found in cluster") + } + + // Perform the transfer + targetDesc := "any eligible follower" + if *targetId != "" { + targetDesc = fmt.Sprintf("server %s (%s)", *targetId, *targetAddress) + } + fmt.Fprintf(writer, "\nTransferring leadership from %s to %s...\n", currentLeader, targetDesc) + + err = commandEnv.MasterClient.WithClient(true, func(client master_pb.SeaweedClient) error { + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + resp, err := client.RaftLeadershipTransfer(ctx, &master_pb.RaftLeadershipTransferRequest{ + TargetId: *targetId, + TargetAddress: *targetAddress, + }) + if err != nil { + return fmt.Errorf("leadership transfer failed: %v", err) + } + + if resp.PreviousLeader != resp.NewLeader { + fmt.Fprintf(writer, "Leadership successfully transferred.\n") + fmt.Fprintf(writer, " Previous leader: %s\n", resp.PreviousLeader) + fmt.Fprintf(writer, " New leader: %s\n", resp.NewLeader) + } else { + fmt.Fprintf(writer, "Leadership transfer initiated, but the same leader was re-elected.\n") + fmt.Fprintf(writer, " Current leader: %s\n", resp.NewLeader) + } + return nil + }) + + if err != nil { + fmt.Fprintf(writer, "\nLeadership transfer failed: %v\n", err) + fmt.Fprintf(writer, "\nTroubleshooting:\n") + fmt.Fprintf(writer, " - Ensure you are connected to the current leader\n") + fmt.Fprintf(writer, " - Ensure target server is a voting member (use 'cluster.raft.ps')\n") + fmt.Fprintf(writer, " - Ensure target server is healthy and reachable\n") + return err + } + + return nil +} + diff --git a/weed/shell/command_cluster_raft_leader_transfer_test.go b/weed/shell/command_cluster_raft_leader_transfer_test.go new file mode 100644 index 000000000..8280489f7 --- /dev/null +++ b/weed/shell/command_cluster_raft_leader_transfer_test.go @@ -0,0 +1,89 @@ +package shell + +import ( + "bytes" + "strings" + "testing" +) + +func TestRaftLeaderTransfer_Name(t *testing.T) { + cmd := &commandRaftLeaderTransfer{} + expected := "cluster.raft.transferLeader" + if cmd.Name() != expected { + t.Errorf("expected name %q, got %q", expected, cmd.Name()) + } +} + +func TestRaftLeaderTransfer_Help(t *testing.T) { + cmd := &commandRaftLeaderTransfer{} + help := cmd.Help() + + // Verify help text contains key information + expectedPhrases := []string{ + "transfer raft leadership", + "cluster.raft.transferLeader", + "-id", + "-address", + "cluster.raft.ps", + "-raftHashicorp", + } + + for _, phrase := range expectedPhrases { + if !strings.Contains(help, phrase) { + t.Errorf("help text should contain %q", phrase) + } + } +} + +func TestRaftLeaderTransfer_HasTag(t *testing.T) { + cmd := &commandRaftLeaderTransfer{} + // The command should not have any special tags + if cmd.HasTag(ResourceHeavy) { + t.Error("expected HasTag to return false for ResourceHeavy") + } +} + +func TestRaftLeaderTransfer_ValidateTargetIdWithoutAddress(t *testing.T) { + cmd := &commandRaftLeaderTransfer{} + var buf bytes.Buffer + + // Create a mock command environment - this will fail because no master client + // but we can verify argument parsing + err := cmd.Do([]string{"-id", "test-server"}, nil, &buf) + + // Should fail because -address is required when -id is specified + if err == nil { + t.Error("expected error when -id is specified without -address") + } + if err != nil && !strings.Contains(err.Error(), "-address is required") { + t.Errorf("expected error about missing -address, got: %v", err) + } +} + +func TestRaftLeaderTransfer_ValidateTargetAddressWithoutId(t *testing.T) { + cmd := &commandRaftLeaderTransfer{} + var buf bytes.Buffer + + // Verify argument parsing - address without id should fail + err := cmd.Do([]string{"-address", "localhost:19333"}, nil, &buf) + + // Should fail because -id is required when -address is specified + if err == nil { + t.Error("expected error when -address is specified without -id") + } + if err != nil && !strings.Contains(err.Error(), "-id is required") { + t.Errorf("expected error about missing -id, got: %v", err) + } +} + +func TestRaftLeaderTransfer_UnknownFlag(t *testing.T) { + cmd := &commandRaftLeaderTransfer{} + var buf bytes.Buffer + + // Unknown flag should return an error + err := cmd.Do([]string{"-unknown-flag"}, nil, &buf) + if err == nil { + t.Error("expected error for unknown flag") + } +} +