diff --git a/weed/admin/dash/worker_grpc_server.go b/weed/admin/dash/worker_grpc_server.go index 78ba6d7de..58dfad46b 100644 --- a/weed/admin/dash/worker_grpc_server.go +++ b/weed/admin/dash/worker_grpc_server.go @@ -631,3 +631,36 @@ func findClientAddress(ctx context.Context) string { } return pr.Addr.String() } + +// GetMasterAddresses returns master server addresses to worker +func (s *WorkerGrpcServer) GetMasterAddresses(ctx context.Context, req *worker_pb.GetMasterAddressesRequest) (*worker_pb.GetMasterAddressesResponse, error) { + glog.V(1).Infof("Worker %s requesting master addresses", req.WorkerId) + + // Get master addresses from admin server + if s.adminServer.masterClient == nil { + return nil, fmt.Errorf("admin server has no master client configured") + } + + // Get current master leader and all master addresses + masterAddresses := s.adminServer.masterClient.GetMasters(ctx) + if len(masterAddresses) == 0 { + return nil, fmt.Errorf("no master addresses available") + } + + // Try to get the current leader + leader := s.adminServer.masterClient.GetMaster(ctx) + + // Convert pb.ServerAddress slice to string slice + masterAddressStrings := make([]string, len(masterAddresses)) + for i, addr := range masterAddresses { + masterAddressStrings[i] = string(addr) + } + + response := &worker_pb.GetMasterAddressesResponse{ + MasterAddresses: masterAddressStrings, + PrimaryMaster: string(leader), + } + + glog.V(1).Infof("Returning %d master addresses to worker %s, leader: %s", len(masterAddresses), req.WorkerId, leader) + return response, nil +} diff --git a/weed/pb/grpc_client_server.go b/weed/pb/grpc_client_server.go index 26cdb4f37..e5c18683f 100644 --- a/weed/pb/grpc_client_server.go +++ b/weed/pb/grpc_client_server.go @@ -94,7 +94,7 @@ func GrpcDial(ctx context.Context, address string, waitForReady bool, opts ...gr options = append(options, opt) } } - return grpc.DialContext(ctx, address, options...) + return grpc.NewClient(address, options...) } func getOrCreateConnection(address string, waitForReady bool, opts ...grpc.DialOption) (*versionedGrpcClient, error) { diff --git a/weed/pb/worker.proto b/weed/pb/worker.proto index 39e3a6fc8..ede828743 100644 --- a/weed/pb/worker.proto +++ b/weed/pb/worker.proto @@ -8,6 +8,9 @@ option go_package = "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb"; service WorkerService { // WorkerStream maintains a bidirectional stream for worker communication rpc WorkerStream(stream WorkerMessage) returns (stream AdminMessage); + + // GetMasterAddresses returns master server addresses for worker tasks + rpc GetMasterAddresses(GetMasterAddressesRequest) returns (GetMasterAddressesResponse); } // WorkerMessage represents messages from worker to admin @@ -385,4 +388,15 @@ message TaskStateFile { MaintenanceTaskData task = 1; int64 last_updated = 2; string admin_version = 3; +} + +// GetMasterAddressesRequest sent by worker to get master server addresses +message GetMasterAddressesRequest { + string worker_id = 1; // Worker identification +} + +// GetMasterAddressesResponse returns master addresses to worker +message GetMasterAddressesResponse { + repeated string master_addresses = 1; // List of available master addresses + string primary_master = 2; // Primary master address (if applicable) } \ No newline at end of file diff --git a/weed/pb/worker_pb/worker.pb.go b/weed/pb/worker_pb/worker.pb.go index 118dc08c2..e55ce6c5e 100644 --- a/weed/pb/worker_pb/worker.pb.go +++ b/weed/pb/worker_pb/worker.pb.go @@ -3197,6 +3197,104 @@ func (x *TaskStateFile) GetAdminVersion() string { return "" } +// GetMasterAddressesRequest sent by worker to get master server addresses +type GetMasterAddressesRequest struct { + state protoimpl.MessageState `protogen:"open.v1"` + WorkerId string `protobuf:"bytes,1,opt,name=worker_id,json=workerId,proto3" json:"worker_id,omitempty"` // Worker identification + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetMasterAddressesRequest) Reset() { + *x = GetMasterAddressesRequest{} + mi := &file_worker_proto_msgTypes[34] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetMasterAddressesRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetMasterAddressesRequest) ProtoMessage() {} + +func (x *GetMasterAddressesRequest) ProtoReflect() protoreflect.Message { + mi := &file_worker_proto_msgTypes[34] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetMasterAddressesRequest.ProtoReflect.Descriptor instead. +func (*GetMasterAddressesRequest) Descriptor() ([]byte, []int) { + return file_worker_proto_rawDescGZIP(), []int{34} +} + +func (x *GetMasterAddressesRequest) GetWorkerId() string { + if x != nil { + return x.WorkerId + } + return "" +} + +// GetMasterAddressesResponse returns master addresses to worker +type GetMasterAddressesResponse struct { + state protoimpl.MessageState `protogen:"open.v1"` + MasterAddresses []string `protobuf:"bytes,1,rep,name=master_addresses,json=masterAddresses,proto3" json:"master_addresses,omitempty"` // List of available master addresses + PrimaryMaster string `protobuf:"bytes,2,opt,name=primary_master,json=primaryMaster,proto3" json:"primary_master,omitempty"` // Primary master address (if applicable) + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache +} + +func (x *GetMasterAddressesResponse) Reset() { + *x = GetMasterAddressesResponse{} + mi := &file_worker_proto_msgTypes[35] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) +} + +func (x *GetMasterAddressesResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetMasterAddressesResponse) ProtoMessage() {} + +func (x *GetMasterAddressesResponse) ProtoReflect() protoreflect.Message { + mi := &file_worker_proto_msgTypes[35] + if x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetMasterAddressesResponse.ProtoReflect.Descriptor instead. +func (*GetMasterAddressesResponse) Descriptor() ([]byte, []int) { + return file_worker_proto_rawDescGZIP(), []int{35} +} + +func (x *GetMasterAddressesResponse) GetMasterAddresses() []string { + if x != nil { + return x.MasterAddresses + } + return nil +} + +func (x *GetMasterAddressesResponse) GetPrimaryMaster() string { + if x != nil { + return x.PrimaryMaster + } + return "" +} + var File_worker_proto protoreflect.FileDescriptor const file_worker_proto_rawDesc = "" + @@ -3525,9 +3623,15 @@ const file_worker_proto_rawDesc = "" + "\rTaskStateFile\x122\n" + "\x04task\x18\x01 \x01(\v2\x1e.worker_pb.MaintenanceTaskDataR\x04task\x12!\n" + "\flast_updated\x18\x02 \x01(\x03R\vlastUpdated\x12#\n" + - "\radmin_version\x18\x03 \x01(\tR\fadminVersion2V\n" + + "\radmin_version\x18\x03 \x01(\tR\fadminVersion\"8\n" + + "\x19GetMasterAddressesRequest\x12\x1b\n" + + "\tworker_id\x18\x01 \x01(\tR\bworkerId\"n\n" + + "\x1aGetMasterAddressesResponse\x12)\n" + + "\x10master_addresses\x18\x01 \x03(\tR\x0fmasterAddresses\x12%\n" + + "\x0eprimary_master\x18\x02 \x01(\tR\rprimaryMaster2\xb9\x01\n" + "\rWorkerService\x12E\n" + - "\fWorkerStream\x12\x18.worker_pb.WorkerMessage\x1a\x17.worker_pb.AdminMessage(\x010\x01B2Z0github.com/seaweedfs/seaweedfs/weed/pb/worker_pbb\x06proto3" + "\fWorkerStream\x12\x18.worker_pb.WorkerMessage\x1a\x17.worker_pb.AdminMessage(\x010\x01\x12a\n" + + "\x12GetMasterAddresses\x12$.worker_pb.GetMasterAddressesRequest\x1a%.worker_pb.GetMasterAddressesResponseB2Z0github.com/seaweedfs/seaweedfs/weed/pb/worker_pbb\x06proto3" var ( file_worker_proto_rawDescOnce sync.Once @@ -3541,51 +3645,53 @@ func file_worker_proto_rawDescGZIP() []byte { return file_worker_proto_rawDescData } -var file_worker_proto_msgTypes = make([]protoimpl.MessageInfo, 43) +var file_worker_proto_msgTypes = make([]protoimpl.MessageInfo, 45) var file_worker_proto_goTypes = []any{ - (*WorkerMessage)(nil), // 0: worker_pb.WorkerMessage - (*AdminMessage)(nil), // 1: worker_pb.AdminMessage - (*WorkerRegistration)(nil), // 2: worker_pb.WorkerRegistration - (*RegistrationResponse)(nil), // 3: worker_pb.RegistrationResponse - (*WorkerHeartbeat)(nil), // 4: worker_pb.WorkerHeartbeat - (*HeartbeatResponse)(nil), // 5: worker_pb.HeartbeatResponse - (*TaskRequest)(nil), // 6: worker_pb.TaskRequest - (*TaskAssignment)(nil), // 7: worker_pb.TaskAssignment - (*TaskParams)(nil), // 8: worker_pb.TaskParams - (*VacuumTaskParams)(nil), // 9: worker_pb.VacuumTaskParams - (*ErasureCodingTaskParams)(nil), // 10: worker_pb.ErasureCodingTaskParams - (*TaskSource)(nil), // 11: worker_pb.TaskSource - (*TaskTarget)(nil), // 12: worker_pb.TaskTarget - (*BalanceTaskParams)(nil), // 13: worker_pb.BalanceTaskParams - (*ReplicationTaskParams)(nil), // 14: worker_pb.ReplicationTaskParams - (*TaskUpdate)(nil), // 15: worker_pb.TaskUpdate - (*TaskComplete)(nil), // 16: worker_pb.TaskComplete - (*TaskCancellation)(nil), // 17: worker_pb.TaskCancellation - (*WorkerShutdown)(nil), // 18: worker_pb.WorkerShutdown - (*AdminShutdown)(nil), // 19: worker_pb.AdminShutdown - (*TaskLogRequest)(nil), // 20: worker_pb.TaskLogRequest - (*TaskLogResponse)(nil), // 21: worker_pb.TaskLogResponse - (*TaskLogMetadata)(nil), // 22: worker_pb.TaskLogMetadata - (*TaskLogEntry)(nil), // 23: worker_pb.TaskLogEntry - (*MaintenanceConfig)(nil), // 24: worker_pb.MaintenanceConfig - (*MaintenancePolicy)(nil), // 25: worker_pb.MaintenancePolicy - (*TaskPolicy)(nil), // 26: worker_pb.TaskPolicy - (*ErasureCodingTaskConfig)(nil), // 27: worker_pb.ErasureCodingTaskConfig - (*EcVacuumTaskConfig)(nil), // 28: worker_pb.EcVacuumTaskConfig - (*MaintenanceTaskData)(nil), // 29: worker_pb.MaintenanceTaskData - (*TaskAssignmentRecord)(nil), // 30: worker_pb.TaskAssignmentRecord - (*TaskCreationMetrics)(nil), // 31: worker_pb.TaskCreationMetrics - (*VolumeHealthMetrics)(nil), // 32: worker_pb.VolumeHealthMetrics - (*TaskStateFile)(nil), // 33: worker_pb.TaskStateFile - nil, // 34: worker_pb.WorkerRegistration.MetadataEntry - nil, // 35: worker_pb.TaskAssignment.MetadataEntry - nil, // 36: worker_pb.TaskUpdate.MetadataEntry - nil, // 37: worker_pb.TaskComplete.ResultMetadataEntry - nil, // 38: worker_pb.TaskLogMetadata.CustomDataEntry - nil, // 39: worker_pb.TaskLogEntry.FieldsEntry - nil, // 40: worker_pb.MaintenancePolicy.TaskPoliciesEntry - nil, // 41: worker_pb.MaintenanceTaskData.TagsEntry - nil, // 42: worker_pb.TaskCreationMetrics.AdditionalDataEntry + (*WorkerMessage)(nil), // 0: worker_pb.WorkerMessage + (*AdminMessage)(nil), // 1: worker_pb.AdminMessage + (*WorkerRegistration)(nil), // 2: worker_pb.WorkerRegistration + (*RegistrationResponse)(nil), // 3: worker_pb.RegistrationResponse + (*WorkerHeartbeat)(nil), // 4: worker_pb.WorkerHeartbeat + (*HeartbeatResponse)(nil), // 5: worker_pb.HeartbeatResponse + (*TaskRequest)(nil), // 6: worker_pb.TaskRequest + (*TaskAssignment)(nil), // 7: worker_pb.TaskAssignment + (*TaskParams)(nil), // 8: worker_pb.TaskParams + (*VacuumTaskParams)(nil), // 9: worker_pb.VacuumTaskParams + (*ErasureCodingTaskParams)(nil), // 10: worker_pb.ErasureCodingTaskParams + (*TaskSource)(nil), // 11: worker_pb.TaskSource + (*TaskTarget)(nil), // 12: worker_pb.TaskTarget + (*BalanceTaskParams)(nil), // 13: worker_pb.BalanceTaskParams + (*ReplicationTaskParams)(nil), // 14: worker_pb.ReplicationTaskParams + (*TaskUpdate)(nil), // 15: worker_pb.TaskUpdate + (*TaskComplete)(nil), // 16: worker_pb.TaskComplete + (*TaskCancellation)(nil), // 17: worker_pb.TaskCancellation + (*WorkerShutdown)(nil), // 18: worker_pb.WorkerShutdown + (*AdminShutdown)(nil), // 19: worker_pb.AdminShutdown + (*TaskLogRequest)(nil), // 20: worker_pb.TaskLogRequest + (*TaskLogResponse)(nil), // 21: worker_pb.TaskLogResponse + (*TaskLogMetadata)(nil), // 22: worker_pb.TaskLogMetadata + (*TaskLogEntry)(nil), // 23: worker_pb.TaskLogEntry + (*MaintenanceConfig)(nil), // 24: worker_pb.MaintenanceConfig + (*MaintenancePolicy)(nil), // 25: worker_pb.MaintenancePolicy + (*TaskPolicy)(nil), // 26: worker_pb.TaskPolicy + (*ErasureCodingTaskConfig)(nil), // 27: worker_pb.ErasureCodingTaskConfig + (*EcVacuumTaskConfig)(nil), // 28: worker_pb.EcVacuumTaskConfig + (*MaintenanceTaskData)(nil), // 29: worker_pb.MaintenanceTaskData + (*TaskAssignmentRecord)(nil), // 30: worker_pb.TaskAssignmentRecord + (*TaskCreationMetrics)(nil), // 31: worker_pb.TaskCreationMetrics + (*VolumeHealthMetrics)(nil), // 32: worker_pb.VolumeHealthMetrics + (*TaskStateFile)(nil), // 33: worker_pb.TaskStateFile + (*GetMasterAddressesRequest)(nil), // 34: worker_pb.GetMasterAddressesRequest + (*GetMasterAddressesResponse)(nil), // 35: worker_pb.GetMasterAddressesResponse + nil, // 36: worker_pb.WorkerRegistration.MetadataEntry + nil, // 37: worker_pb.TaskAssignment.MetadataEntry + nil, // 38: worker_pb.TaskUpdate.MetadataEntry + nil, // 39: worker_pb.TaskComplete.ResultMetadataEntry + nil, // 40: worker_pb.TaskLogMetadata.CustomDataEntry + nil, // 41: worker_pb.TaskLogEntry.FieldsEntry + nil, // 42: worker_pb.MaintenancePolicy.TaskPoliciesEntry + nil, // 43: worker_pb.MaintenanceTaskData.TagsEntry + nil, // 44: worker_pb.TaskCreationMetrics.AdditionalDataEntry } var file_worker_proto_depIdxs = []int32{ 2, // 0: worker_pb.WorkerMessage.registration:type_name -> worker_pb.WorkerRegistration @@ -3601,37 +3707,39 @@ var file_worker_proto_depIdxs = []int32{ 17, // 10: worker_pb.AdminMessage.task_cancellation:type_name -> worker_pb.TaskCancellation 19, // 11: worker_pb.AdminMessage.admin_shutdown:type_name -> worker_pb.AdminShutdown 20, // 12: worker_pb.AdminMessage.task_log_request:type_name -> worker_pb.TaskLogRequest - 34, // 13: worker_pb.WorkerRegistration.metadata:type_name -> worker_pb.WorkerRegistration.MetadataEntry + 36, // 13: worker_pb.WorkerRegistration.metadata:type_name -> worker_pb.WorkerRegistration.MetadataEntry 8, // 14: worker_pb.TaskAssignment.params:type_name -> worker_pb.TaskParams - 35, // 15: worker_pb.TaskAssignment.metadata:type_name -> worker_pb.TaskAssignment.MetadataEntry + 37, // 15: worker_pb.TaskAssignment.metadata:type_name -> worker_pb.TaskAssignment.MetadataEntry 11, // 16: worker_pb.TaskParams.sources:type_name -> worker_pb.TaskSource 12, // 17: worker_pb.TaskParams.targets:type_name -> worker_pb.TaskTarget 9, // 18: worker_pb.TaskParams.vacuum_params:type_name -> worker_pb.VacuumTaskParams 10, // 19: worker_pb.TaskParams.erasure_coding_params:type_name -> worker_pb.ErasureCodingTaskParams 13, // 20: worker_pb.TaskParams.balance_params:type_name -> worker_pb.BalanceTaskParams 14, // 21: worker_pb.TaskParams.replication_params:type_name -> worker_pb.ReplicationTaskParams - 36, // 22: worker_pb.TaskUpdate.metadata:type_name -> worker_pb.TaskUpdate.MetadataEntry - 37, // 23: worker_pb.TaskComplete.result_metadata:type_name -> worker_pb.TaskComplete.ResultMetadataEntry + 38, // 22: worker_pb.TaskUpdate.metadata:type_name -> worker_pb.TaskUpdate.MetadataEntry + 39, // 23: worker_pb.TaskComplete.result_metadata:type_name -> worker_pb.TaskComplete.ResultMetadataEntry 22, // 24: worker_pb.TaskLogResponse.metadata:type_name -> worker_pb.TaskLogMetadata 23, // 25: worker_pb.TaskLogResponse.log_entries:type_name -> worker_pb.TaskLogEntry - 38, // 26: worker_pb.TaskLogMetadata.custom_data:type_name -> worker_pb.TaskLogMetadata.CustomDataEntry - 39, // 27: worker_pb.TaskLogEntry.fields:type_name -> worker_pb.TaskLogEntry.FieldsEntry + 40, // 26: worker_pb.TaskLogMetadata.custom_data:type_name -> worker_pb.TaskLogMetadata.CustomDataEntry + 41, // 27: worker_pb.TaskLogEntry.fields:type_name -> worker_pb.TaskLogEntry.FieldsEntry 25, // 28: worker_pb.MaintenanceConfig.policy:type_name -> worker_pb.MaintenancePolicy - 40, // 29: worker_pb.MaintenancePolicy.task_policies:type_name -> worker_pb.MaintenancePolicy.TaskPoliciesEntry + 42, // 29: worker_pb.MaintenancePolicy.task_policies:type_name -> worker_pb.MaintenancePolicy.TaskPoliciesEntry 27, // 30: worker_pb.TaskPolicy.erasure_coding_config:type_name -> worker_pb.ErasureCodingTaskConfig 28, // 31: worker_pb.TaskPolicy.ec_vacuum_config:type_name -> worker_pb.EcVacuumTaskConfig 8, // 32: worker_pb.MaintenanceTaskData.typed_params:type_name -> worker_pb.TaskParams 30, // 33: worker_pb.MaintenanceTaskData.assignment_history:type_name -> worker_pb.TaskAssignmentRecord - 41, // 34: worker_pb.MaintenanceTaskData.tags:type_name -> worker_pb.MaintenanceTaskData.TagsEntry + 43, // 34: worker_pb.MaintenanceTaskData.tags:type_name -> worker_pb.MaintenanceTaskData.TagsEntry 31, // 35: worker_pb.MaintenanceTaskData.creation_metrics:type_name -> worker_pb.TaskCreationMetrics 32, // 36: worker_pb.TaskCreationMetrics.volume_metrics:type_name -> worker_pb.VolumeHealthMetrics - 42, // 37: worker_pb.TaskCreationMetrics.additional_data:type_name -> worker_pb.TaskCreationMetrics.AdditionalDataEntry + 44, // 37: worker_pb.TaskCreationMetrics.additional_data:type_name -> worker_pb.TaskCreationMetrics.AdditionalDataEntry 29, // 38: worker_pb.TaskStateFile.task:type_name -> worker_pb.MaintenanceTaskData 26, // 39: worker_pb.MaintenancePolicy.TaskPoliciesEntry.value:type_name -> worker_pb.TaskPolicy 0, // 40: worker_pb.WorkerService.WorkerStream:input_type -> worker_pb.WorkerMessage - 1, // 41: worker_pb.WorkerService.WorkerStream:output_type -> worker_pb.AdminMessage - 41, // [41:42] is the sub-list for method output_type - 40, // [40:41] is the sub-list for method input_type + 34, // 41: worker_pb.WorkerService.GetMasterAddresses:input_type -> worker_pb.GetMasterAddressesRequest + 1, // 42: worker_pb.WorkerService.WorkerStream:output_type -> worker_pb.AdminMessage + 35, // 43: worker_pb.WorkerService.GetMasterAddresses:output_type -> worker_pb.GetMasterAddressesResponse + 42, // [42:44] is the sub-list for method output_type + 40, // [40:42] is the sub-list for method input_type 40, // [40:40] is the sub-list for extension type_name 40, // [40:40] is the sub-list for extension extendee 0, // [0:40] is the sub-list for field type_name @@ -3675,7 +3783,7 @@ func file_worker_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: unsafe.Slice(unsafe.StringData(file_worker_proto_rawDesc), len(file_worker_proto_rawDesc)), NumEnums: 0, - NumMessages: 43, + NumMessages: 45, NumExtensions: 0, NumServices: 1, }, diff --git a/weed/pb/worker_pb/worker_grpc.pb.go b/weed/pb/worker_pb/worker_grpc.pb.go index 85bad96f4..b9a6d531f 100644 --- a/weed/pb/worker_pb/worker_grpc.pb.go +++ b/weed/pb/worker_pb/worker_grpc.pb.go @@ -19,7 +19,8 @@ import ( const _ = grpc.SupportPackageIsVersion9 const ( - WorkerService_WorkerStream_FullMethodName = "/worker_pb.WorkerService/WorkerStream" + WorkerService_WorkerStream_FullMethodName = "/worker_pb.WorkerService/WorkerStream" + WorkerService_GetMasterAddresses_FullMethodName = "/worker_pb.WorkerService/GetMasterAddresses" ) // WorkerServiceClient is the client API for WorkerService service. @@ -30,6 +31,8 @@ const ( type WorkerServiceClient interface { // WorkerStream maintains a bidirectional stream for worker communication WorkerStream(ctx context.Context, opts ...grpc.CallOption) (grpc.BidiStreamingClient[WorkerMessage, AdminMessage], error) + // GetMasterAddresses returns master server addresses for worker tasks + GetMasterAddresses(ctx context.Context, in *GetMasterAddressesRequest, opts ...grpc.CallOption) (*GetMasterAddressesResponse, error) } type workerServiceClient struct { @@ -53,6 +56,16 @@ func (c *workerServiceClient) WorkerStream(ctx context.Context, opts ...grpc.Cal // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. type WorkerService_WorkerStreamClient = grpc.BidiStreamingClient[WorkerMessage, AdminMessage] +func (c *workerServiceClient) GetMasterAddresses(ctx context.Context, in *GetMasterAddressesRequest, opts ...grpc.CallOption) (*GetMasterAddressesResponse, error) { + cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...) + out := new(GetMasterAddressesResponse) + err := c.cc.Invoke(ctx, WorkerService_GetMasterAddresses_FullMethodName, in, out, cOpts...) + if err != nil { + return nil, err + } + return out, nil +} + // WorkerServiceServer is the server API for WorkerService service. // All implementations must embed UnimplementedWorkerServiceServer // for forward compatibility. @@ -61,6 +74,8 @@ type WorkerService_WorkerStreamClient = grpc.BidiStreamingClient[WorkerMessage, type WorkerServiceServer interface { // WorkerStream maintains a bidirectional stream for worker communication WorkerStream(grpc.BidiStreamingServer[WorkerMessage, AdminMessage]) error + // GetMasterAddresses returns master server addresses for worker tasks + GetMasterAddresses(context.Context, *GetMasterAddressesRequest) (*GetMasterAddressesResponse, error) mustEmbedUnimplementedWorkerServiceServer() } @@ -74,6 +89,9 @@ type UnimplementedWorkerServiceServer struct{} func (UnimplementedWorkerServiceServer) WorkerStream(grpc.BidiStreamingServer[WorkerMessage, AdminMessage]) error { return status.Errorf(codes.Unimplemented, "method WorkerStream not implemented") } +func (UnimplementedWorkerServiceServer) GetMasterAddresses(context.Context, *GetMasterAddressesRequest) (*GetMasterAddressesResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method GetMasterAddresses not implemented") +} func (UnimplementedWorkerServiceServer) mustEmbedUnimplementedWorkerServiceServer() {} func (UnimplementedWorkerServiceServer) testEmbeddedByValue() {} @@ -102,13 +120,36 @@ func _WorkerService_WorkerStream_Handler(srv interface{}, stream grpc.ServerStre // This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name. type WorkerService_WorkerStreamServer = grpc.BidiStreamingServer[WorkerMessage, AdminMessage] +func _WorkerService_GetMasterAddresses_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(GetMasterAddressesRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(WorkerServiceServer).GetMasterAddresses(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: WorkerService_GetMasterAddresses_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(WorkerServiceServer).GetMasterAddresses(ctx, req.(*GetMasterAddressesRequest)) + } + return interceptor(ctx, in, info, handler) +} + // WorkerService_ServiceDesc is the grpc.ServiceDesc for WorkerService service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) var WorkerService_ServiceDesc = grpc.ServiceDesc{ ServiceName: "worker_pb.WorkerService", HandlerType: (*WorkerServiceServer)(nil), - Methods: []grpc.MethodDesc{}, + Methods: []grpc.MethodDesc{ + { + MethodName: "GetMasterAddresses", + Handler: _WorkerService_GetMasterAddresses_Handler, + }, + }, Streams: []grpc.StreamDesc{ { StreamName: "WorkerStream", diff --git a/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go b/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go index fedf98f67..2c32ec01e 100644 --- a/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go +++ b/weed/worker/tasks/ec_vacuum/ec_vacuum_task.go @@ -25,6 +25,12 @@ import ( "google.golang.org/grpc" ) +// Compile-time interface compliance checks +var ( + _ types.TaskWithGrpcDial = (*EcVacuumTask)(nil) + _ types.TaskWithAdminAddress = (*EcVacuumTask)(nil) +) + // EcVacuumTask represents an EC vacuum task that collects, decodes, and re-encodes EC volumes type EcVacuumTask struct { *base.BaseTask @@ -33,6 +39,7 @@ type EcVacuumTask struct { sourceNodes map[pb.ServerAddress]erasure_coding.ShardBits tempDir string grpcDialOption grpc.DialOption + adminAddress string // admin server address for API calls masterAddress pb.ServerAddress // master server address for activation RPC cleanupGracePeriod time.Duration // grace period before cleaning up old generation (1 minute default) topologyTaskID string // links to ActiveTopology task for capacity tracking @@ -1172,6 +1179,11 @@ func (t *EcVacuumTask) SetGrpcDialOption(option grpc.DialOption) { t.grpcDialOption = option } +// SetAdminAddress sets the admin server address for API calls +func (t *EcVacuumTask) SetAdminAddress(address string) { + t.adminAddress = address +} + // SetMasterAddress sets the master server address for generation activation func (t *EcVacuumTask) SetMasterAddress(address pb.ServerAddress) { t.masterAddress = address @@ -1181,3 +1193,56 @@ func (t *EcVacuumTask) SetMasterAddress(address pb.ServerAddress) { func (t *EcVacuumTask) SetCleanupGracePeriod(period time.Duration) { t.cleanupGracePeriod = period } + +// fetchMasterAddressFromAdmin gets master addresses from admin server +func (t *EcVacuumTask) fetchMasterAddressFromAdmin() error { + // Use admin address provided by worker + if t.adminAddress == "" { + return fmt.Errorf("admin server address not provided by worker - cannot fetch master addresses") + } + + // Convert admin HTTP address to gRPC address (HTTP port + 10000) + grpcAddress := pb.ServerToGrpcAddress(t.adminAddress) + + t.LogInfo("Fetching master address from admin server", map[string]interface{}{ + "admin_address": grpcAddress, + }) + + // Create gRPC connection to admin server + conn, err := grpc.NewClient(grpcAddress, t.grpcDialOption) + if err != nil { + return fmt.Errorf("failed to connect to admin server at %s: %w", grpcAddress, err) + } + defer conn.Close() + + // Create worker service client + client := worker_pb.NewWorkerServiceClient(conn) + + // Call GetMasterAddresses API + resp, err := client.GetMasterAddresses(context.Background(), &worker_pb.GetMasterAddressesRequest{ + WorkerId: t.ID(), // Use task ID as worker ID for logging + }) + if err != nil { + return fmt.Errorf("failed to get master addresses from admin: %w", err) + } + + if len(resp.MasterAddresses) == 0 { + return fmt.Errorf("no master addresses returned from admin server") + } + + // Use primary master if available, otherwise first address + masterAddress := resp.PrimaryMaster + if masterAddress == "" && len(resp.MasterAddresses) > 0 { + masterAddress = resp.MasterAddresses[0] + } + + t.masterAddress = pb.ServerAddress(masterAddress) + + t.LogInfo("Successfully obtained master address from admin server", map[string]interface{}{ + "master_address": masterAddress, + "available_masters": resp.MasterAddresses, + "primary_master": resp.PrimaryMaster, + }) + + return nil +} diff --git a/weed/worker/tasks/ec_vacuum/safety_checks.go b/weed/worker/tasks/ec_vacuum/safety_checks.go index 3ccaacdf0..56764372c 100644 --- a/weed/worker/tasks/ec_vacuum/safety_checks.go +++ b/weed/worker/tasks/ec_vacuum/safety_checks.go @@ -11,8 +11,11 @@ import ( // performSafetyChecks performs comprehensive safety verification before cleanup func (t *EcVacuumTask) performSafetyChecks() error { + // Get master address from admin server if not already set if t.masterAddress == "" { - return fmt.Errorf("CRITICAL: cannot perform safety checks - master address not set") + if err := t.fetchMasterAddressFromAdmin(); err != nil { + return fmt.Errorf("CRITICAL: cannot perform safety checks - failed to get master address: %w", err) + } } // Safety Check 1: Verify master connectivity and volume existence diff --git a/weed/worker/types/task.go b/weed/worker/types/task.go index 9106a63e3..330e499f0 100644 --- a/weed/worker/types/task.go +++ b/weed/worker/types/task.go @@ -9,6 +9,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" + "google.golang.org/grpc" ) // Task defines the core task interface that all tasks must implement @@ -37,6 +38,18 @@ type TaskWithLogging interface { Logger } +// TaskWithGrpcDial defines tasks that need gRPC dial options +type TaskWithGrpcDial interface { + Task + SetGrpcDialOption(option grpc.DialOption) +} + +// TaskWithAdminAddress defines tasks that need admin server address +type TaskWithAdminAddress interface { + Task + SetAdminAddress(address string) +} + // Logger defines standard logging interface type Logger interface { Info(msg string, args ...interface{}) diff --git a/weed/worker/worker.go b/weed/worker/worker.go index d18d84193..ccebbf011 100644 --- a/weed/worker/worker.go +++ b/weed/worker/worker.go @@ -14,7 +14,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/worker_pb" "github.com/seaweedfs/seaweedfs/weed/worker/tasks" "github.com/seaweedfs/seaweedfs/weed/worker/types" - "google.golang.org/grpc" // Import task packages to trigger their auto-registration _ "github.com/seaweedfs/seaweedfs/weed/worker/tasks/ec_vacuum" @@ -448,11 +447,17 @@ func (w *Worker) executeTask(task *types.TaskInput) { } // Pass worker's gRPC dial option to task if it supports it - if grpcTask, ok := taskInstance.(interface{ SetGrpcDialOption(grpc.DialOption) }); ok { + if grpcTask, ok := taskInstance.(types.TaskWithGrpcDial); ok { grpcTask.SetGrpcDialOption(w.config.GrpcDialOption) glog.V(2).Infof("Set gRPC dial option for task %s", task.ID) } + // Pass worker's admin server address to task if it supports it + if adminTask, ok := taskInstance.(types.TaskWithAdminAddress); ok { + adminTask.SetAdminAddress(w.config.AdminServer) + glog.V(2).Infof("Set admin server address for task %s", task.ID) + } + // Task execution uses the new unified Task interface glog.V(2).Infof("Executing task %s in working directory: %s", task.ID, taskWorkingDir)