diff --git a/telemetry/proto/telemetry.pb.go b/telemetry/proto/telemetry.pb.go index 8aa6ab626..1e00fadf5 100644 --- a/telemetry/proto/telemetry.pb.go +++ b/telemetry/proto/telemetry.pb.go @@ -1,17 +1,17 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.34.2 -// protoc v5.29.3 -// source: telemetry.proto +// protoc-gen-go v1.36.6 +// protoc v6.33.1 +// source: telemetry/proto/telemetry.proto package proto import ( - reflect "reflect" - sync "sync" - protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" ) const ( @@ -23,12 +23,9 @@ const ( // TelemetryData represents cluster-level telemetry information type TelemetryData struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache - unknownFields protoimpl.UnknownFields - + state protoimpl.MessageState `protogen:"open.v1"` // Unique cluster identifier (generated in-memory) - ClusterId string `protobuf:"bytes,1,opt,name=cluster_id,json=clusterId,proto3" json:"cluster_id,omitempty"` + TopologyId string `protobuf:"bytes,1,opt,name=topology_id,json=topologyId,proto3" json:"topology_id,omitempty"` // SeaweedFS version Version string `protobuf:"bytes,2,opt,name=version,proto3" json:"version,omitempty"` // Operating system (e.g., "linux/amd64") @@ -44,16 +41,16 @@ type TelemetryData struct { // Number of broker servers in the cluster BrokerCount int32 `protobuf:"varint,10,opt,name=broker_count,json=brokerCount,proto3" json:"broker_count,omitempty"` // Unix timestamp when the data was collected - Timestamp int64 `protobuf:"varint,11,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + Timestamp int64 `protobuf:"varint,11,opt,name=timestamp,proto3" json:"timestamp,omitempty"` + unknownFields protoimpl.UnknownFields + sizeCache protoimpl.SizeCache } func (x *TelemetryData) Reset() { *x = TelemetryData{} - if protoimpl.UnsafeEnabled { - mi := &file_telemetry_proto_msgTypes[0] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } + mi := &file_telemetry_proto_telemetry_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } func (x *TelemetryData) String() string { @@ -63,8 +60,8 @@ func (x *TelemetryData) String() string { func (*TelemetryData) ProtoMessage() {} func (x *TelemetryData) ProtoReflect() protoreflect.Message { - mi := &file_telemetry_proto_msgTypes[0] - if protoimpl.UnsafeEnabled && x != nil { + mi := &file_telemetry_proto_telemetry_proto_msgTypes[0] + if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) @@ -76,12 +73,12 @@ func (x *TelemetryData) ProtoReflect() protoreflect.Message { // Deprecated: Use TelemetryData.ProtoReflect.Descriptor instead. func (*TelemetryData) Descriptor() ([]byte, []int) { - return file_telemetry_proto_rawDescGZIP(), []int{0} + return file_telemetry_proto_telemetry_proto_rawDescGZIP(), []int{0} } -func (x *TelemetryData) GetClusterId() string { +func (x *TelemetryData) GetTopologyId() string { if x != nil { - return x.ClusterId + return x.TopologyId } return "" } @@ -144,20 +141,17 @@ func (x *TelemetryData) GetTimestamp() int64 { // TelemetryRequest is sent from SeaweedFS clusters to the telemetry server type TelemetryRequest struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Data *TelemetryData `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` unknownFields protoimpl.UnknownFields - - Data *TelemetryData `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` + sizeCache protoimpl.SizeCache } func (x *TelemetryRequest) Reset() { *x = TelemetryRequest{} - if protoimpl.UnsafeEnabled { - mi := &file_telemetry_proto_msgTypes[1] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } + mi := &file_telemetry_proto_telemetry_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } func (x *TelemetryRequest) String() string { @@ -167,8 +161,8 @@ func (x *TelemetryRequest) String() string { func (*TelemetryRequest) ProtoMessage() {} func (x *TelemetryRequest) ProtoReflect() protoreflect.Message { - mi := &file_telemetry_proto_msgTypes[1] - if protoimpl.UnsafeEnabled && x != nil { + mi := &file_telemetry_proto_telemetry_proto_msgTypes[1] + if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) @@ -180,7 +174,7 @@ func (x *TelemetryRequest) ProtoReflect() protoreflect.Message { // Deprecated: Use TelemetryRequest.ProtoReflect.Descriptor instead. func (*TelemetryRequest) Descriptor() ([]byte, []int) { - return file_telemetry_proto_rawDescGZIP(), []int{1} + return file_telemetry_proto_telemetry_proto_rawDescGZIP(), []int{1} } func (x *TelemetryRequest) GetData() *TelemetryData { @@ -192,21 +186,18 @@ func (x *TelemetryRequest) GetData() *TelemetryData { // TelemetryResponse is returned by the telemetry server type TelemetryResponse struct { - state protoimpl.MessageState - sizeCache protoimpl.SizeCache + state protoimpl.MessageState `protogen:"open.v1"` + Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"` + Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` unknownFields protoimpl.UnknownFields - - Success bool `protobuf:"varint,1,opt,name=success,proto3" json:"success,omitempty"` - Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"` + sizeCache protoimpl.SizeCache } func (x *TelemetryResponse) Reset() { *x = TelemetryResponse{} - if protoimpl.UnsafeEnabled { - mi := &file_telemetry_proto_msgTypes[2] - ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) - ms.StoreMessageInfo(mi) - } + mi := &file_telemetry_proto_telemetry_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) } func (x *TelemetryResponse) String() string { @@ -216,8 +207,8 @@ func (x *TelemetryResponse) String() string { func (*TelemetryResponse) ProtoMessage() {} func (x *TelemetryResponse) ProtoReflect() protoreflect.Message { - mi := &file_telemetry_proto_msgTypes[2] - if protoimpl.UnsafeEnabled && x != nil { + mi := &file_telemetry_proto_telemetry_proto_msgTypes[2] + if x != nil { ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) if ms.LoadMessageInfo() == nil { ms.StoreMessageInfo(mi) @@ -229,7 +220,7 @@ func (x *TelemetryResponse) ProtoReflect() protoreflect.Message { // Deprecated: Use TelemetryResponse.ProtoReflect.Descriptor instead. func (*TelemetryResponse) Descriptor() ([]byte, []int) { - return file_telemetry_proto_rawDescGZIP(), []int{2} + return file_telemetry_proto_telemetry_proto_rawDescGZIP(), []int{2} } func (x *TelemetryResponse) GetSuccess() bool { @@ -246,66 +237,49 @@ func (x *TelemetryResponse) GetMessage() string { return "" } -var File_telemetry_proto protoreflect.FileDescriptor - -var file_telemetry_proto_rawDesc = []byte{ - 0x0a, 0x0f, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x12, 0x09, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x22, 0xce, 0x02, 0x0a, - 0x0d, 0x54, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x44, 0x61, 0x74, 0x61, 0x12, 0x1d, - 0x0a, 0x0a, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x09, 0x63, 0x6c, 0x75, 0x73, 0x74, 0x65, 0x72, 0x49, 0x64, 0x12, 0x18, 0x0a, - 0x07, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, - 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x12, 0x0e, 0x0a, 0x02, 0x6f, 0x73, 0x18, 0x03, 0x20, - 0x01, 0x28, 0x09, 0x52, 0x02, 0x6f, 0x73, 0x12, 0x2e, 0x0a, 0x13, 0x76, 0x6f, 0x6c, 0x75, 0x6d, - 0x65, 0x5f, 0x73, 0x65, 0x72, 0x76, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x06, - 0x20, 0x01, 0x28, 0x05, 0x52, 0x11, 0x76, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x53, 0x65, 0x72, 0x76, - 0x65, 0x72, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, 0x28, 0x0a, 0x10, 0x74, 0x6f, 0x74, 0x61, 0x6c, - 0x5f, 0x64, 0x69, 0x73, 0x6b, 0x5f, 0x62, 0x79, 0x74, 0x65, 0x73, 0x18, 0x07, 0x20, 0x01, 0x28, - 0x04, 0x52, 0x0e, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x44, 0x69, 0x73, 0x6b, 0x42, 0x79, 0x74, 0x65, - 0x73, 0x12, 0x2c, 0x0a, 0x12, 0x74, 0x6f, 0x74, 0x61, 0x6c, 0x5f, 0x76, 0x6f, 0x6c, 0x75, 0x6d, - 0x65, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x08, 0x20, 0x01, 0x28, 0x05, 0x52, 0x10, 0x74, - 0x6f, 0x74, 0x61, 0x6c, 0x56, 0x6f, 0x6c, 0x75, 0x6d, 0x65, 0x43, 0x6f, 0x75, 0x6e, 0x74, 0x12, - 0x1f, 0x0a, 0x0b, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x09, - 0x20, 0x01, 0x28, 0x05, 0x52, 0x0a, 0x66, 0x69, 0x6c, 0x65, 0x72, 0x43, 0x6f, 0x75, 0x6e, 0x74, - 0x12, 0x21, 0x0a, 0x0c, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x5f, 0x63, 0x6f, 0x75, 0x6e, 0x74, - 0x18, 0x0a, 0x20, 0x01, 0x28, 0x05, 0x52, 0x0b, 0x62, 0x72, 0x6f, 0x6b, 0x65, 0x72, 0x43, 0x6f, - 0x75, 0x6e, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, - 0x18, 0x0b, 0x20, 0x01, 0x28, 0x03, 0x52, 0x09, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, - 0x70, 0x4a, 0x04, 0x08, 0x04, 0x10, 0x05, 0x4a, 0x04, 0x08, 0x05, 0x10, 0x06, 0x22, 0x40, 0x0a, - 0x10, 0x54, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, - 0x74, 0x12, 0x2c, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, - 0x18, 0x2e, 0x74, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x2e, 0x54, 0x65, 0x6c, 0x65, - 0x6d, 0x65, 0x74, 0x72, 0x79, 0x44, 0x61, 0x74, 0x61, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x22, - 0x47, 0x0a, 0x11, 0x54, 0x65, 0x6c, 0x65, 0x6d, 0x65, 0x74, 0x72, 0x79, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x18, - 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x07, 0x73, 0x75, 0x63, 0x63, 0x65, 0x73, 0x73, 0x12, 0x18, - 0x0a, 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, - 0x07, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x42, 0x30, 0x5a, 0x2e, 0x67, 0x69, 0x74, 0x68, - 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, - 0x2f, 0x73, 0x65, 0x61, 0x77, 0x65, 0x65, 0x64, 0x66, 0x73, 0x2f, 0x74, 0x65, 0x6c, 0x65, 0x6d, - 0x65, 0x74, 0x72, 0x79, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, -} +var File_telemetry_proto_telemetry_proto protoreflect.FileDescriptor + +const file_telemetry_proto_telemetry_proto_rawDesc = "" + + "\n" + + "\x1ftelemetry/proto/telemetry.proto\x12\ttelemetry\"\xd0\x02\n" + + "\rTelemetryData\x12\x1f\n" + + "\vtopology_id\x18\x01 \x01(\tR\n" + + "topologyId\x12\x18\n" + + "\aversion\x18\x02 \x01(\tR\aversion\x12\x0e\n" + + "\x02os\x18\x03 \x01(\tR\x02os\x12.\n" + + "\x13volume_server_count\x18\x06 \x01(\x05R\x11volumeServerCount\x12(\n" + + "\x10total_disk_bytes\x18\a \x01(\x04R\x0etotalDiskBytes\x12,\n" + + "\x12total_volume_count\x18\b \x01(\x05R\x10totalVolumeCount\x12\x1f\n" + + "\vfiler_count\x18\t \x01(\x05R\n" + + "filerCount\x12!\n" + + "\fbroker_count\x18\n" + + " \x01(\x05R\vbrokerCount\x12\x1c\n" + + "\ttimestamp\x18\v \x01(\x03R\ttimestampJ\x04\b\x04\x10\x05J\x04\b\x05\x10\x06\"@\n" + + "\x10TelemetryRequest\x12,\n" + + "\x04data\x18\x01 \x01(\v2\x18.telemetry.TelemetryDataR\x04data\"G\n" + + "\x11TelemetryResponse\x12\x18\n" + + "\asuccess\x18\x01 \x01(\bR\asuccess\x12\x18\n" + + "\amessage\x18\x02 \x01(\tR\amessageB0Z.github.com/seaweedfs/seaweedfs/telemetry/protob\x06proto3" var ( - file_telemetry_proto_rawDescOnce sync.Once - file_telemetry_proto_rawDescData = file_telemetry_proto_rawDesc + file_telemetry_proto_telemetry_proto_rawDescOnce sync.Once + file_telemetry_proto_telemetry_proto_rawDescData []byte ) -func file_telemetry_proto_rawDescGZIP() []byte { - file_telemetry_proto_rawDescOnce.Do(func() { - file_telemetry_proto_rawDescData = protoimpl.X.CompressGZIP(file_telemetry_proto_rawDescData) +func file_telemetry_proto_telemetry_proto_rawDescGZIP() []byte { + file_telemetry_proto_telemetry_proto_rawDescOnce.Do(func() { + file_telemetry_proto_telemetry_proto_rawDescData = protoimpl.X.CompressGZIP(unsafe.Slice(unsafe.StringData(file_telemetry_proto_telemetry_proto_rawDesc), len(file_telemetry_proto_telemetry_proto_rawDesc))) }) - return file_telemetry_proto_rawDescData + return file_telemetry_proto_telemetry_proto_rawDescData } -var file_telemetry_proto_msgTypes = make([]protoimpl.MessageInfo, 3) -var file_telemetry_proto_goTypes = []any{ +var file_telemetry_proto_telemetry_proto_msgTypes = make([]protoimpl.MessageInfo, 3) +var file_telemetry_proto_telemetry_proto_goTypes = []any{ (*TelemetryData)(nil), // 0: telemetry.TelemetryData (*TelemetryRequest)(nil), // 1: telemetry.TelemetryRequest (*TelemetryResponse)(nil), // 2: telemetry.TelemetryResponse } -var file_telemetry_proto_depIdxs = []int32{ +var file_telemetry_proto_telemetry_proto_depIdxs = []int32{ 0, // 0: telemetry.TelemetryRequest.data:type_name -> telemetry.TelemetryData 1, // [1:1] is the sub-list for method output_type 1, // [1:1] is the sub-list for method input_type @@ -314,65 +288,26 @@ var file_telemetry_proto_depIdxs = []int32{ 0, // [0:1] is the sub-list for field type_name } -func init() { file_telemetry_proto_init() } -func file_telemetry_proto_init() { - if File_telemetry_proto != nil { +func init() { file_telemetry_proto_telemetry_proto_init() } +func file_telemetry_proto_telemetry_proto_init() { + if File_telemetry_proto_telemetry_proto != nil { return } - if !protoimpl.UnsafeEnabled { - file_telemetry_proto_msgTypes[0].Exporter = func(v any, i int) any { - switch v := v.(*TelemetryData); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_telemetry_proto_msgTypes[1].Exporter = func(v any, i int) any { - switch v := v.(*TelemetryRequest); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - file_telemetry_proto_msgTypes[2].Exporter = func(v any, i int) any { - switch v := v.(*TelemetryResponse); i { - case 0: - return &v.state - case 1: - return &v.sizeCache - case 2: - return &v.unknownFields - default: - return nil - } - } - } type x struct{} out := protoimpl.TypeBuilder{ File: protoimpl.DescBuilder{ GoPackagePath: reflect.TypeOf(x{}).PkgPath(), - RawDescriptor: file_telemetry_proto_rawDesc, + RawDescriptor: unsafe.Slice(unsafe.StringData(file_telemetry_proto_telemetry_proto_rawDesc), len(file_telemetry_proto_telemetry_proto_rawDesc)), NumEnums: 0, NumMessages: 3, NumExtensions: 0, NumServices: 0, }, - GoTypes: file_telemetry_proto_goTypes, - DependencyIndexes: file_telemetry_proto_depIdxs, - MessageInfos: file_telemetry_proto_msgTypes, + GoTypes: file_telemetry_proto_telemetry_proto_goTypes, + DependencyIndexes: file_telemetry_proto_telemetry_proto_depIdxs, + MessageInfos: file_telemetry_proto_telemetry_proto_msgTypes, }.Build() - File_telemetry_proto = out.File - file_telemetry_proto_rawDesc = nil - file_telemetry_proto_goTypes = nil - file_telemetry_proto_depIdxs = nil + File_telemetry_proto_telemetry_proto = out.File + file_telemetry_proto_telemetry_proto_goTypes = nil + file_telemetry_proto_telemetry_proto_depIdxs = nil } diff --git a/telemetry/proto/telemetry.proto b/telemetry/proto/telemetry.proto index 12bbdd4f6..ef1426dc5 100644 --- a/telemetry/proto/telemetry.proto +++ b/telemetry/proto/telemetry.proto @@ -7,7 +7,7 @@ option go_package = "github.com/seaweedfs/seaweedfs/telemetry/proto"; // TelemetryData represents cluster-level telemetry information message TelemetryData { // Unique cluster identifier (generated in-memory) - string cluster_id = 1; + string topology_id = 1; // SeaweedFS version string version = 2; diff --git a/weed/pb/master_pb/master.pb.go b/weed/pb/master_pb/master.pb.go index a197a972b..8e271f9ea 100644 --- a/weed/pb/master_pb/master.pb.go +++ b/weed/pb/master_pb/master.pb.go @@ -7,13 +7,12 @@ package master_pb import ( - reflect "reflect" - sync "sync" - unsafe "unsafe" - volume_server_pb "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" protoreflect "google.golang.org/protobuf/reflect/protoreflect" protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" + unsafe "unsafe" ) const ( diff --git a/weed/pb/master_pb/master_grpc.pb.go b/weed/pb/master_pb/master_grpc.pb.go index 719e8bb38..441c2ffb1 100644 --- a/weed/pb/master_pb/master_grpc.pb.go +++ b/weed/pb/master_pb/master_grpc.pb.go @@ -8,7 +8,6 @@ package master_pb import ( context "context" - grpc "google.golang.org/grpc" codes "google.golang.org/grpc/codes" status "google.golang.org/grpc/status" diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 10b54d58f..ee6acd90d 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -77,6 +77,8 @@ type MasterServer struct { grpcDialOption grpc.DialOption + topologyIdGenLock sync.Mutex + MasterClient *wdclient.MasterClient adminLocks *AdminLocks @@ -209,6 +211,9 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { if ms.Topo.RaftServer.Leader() != "" { glog.V(0).Infof("[%s] %s becomes leader.", ms.Topo.RaftServer.Name(), ms.Topo.RaftServer.Leader()) ms.Topo.LastLeaderChangeTime = time.Now() + if ms.Topo.RaftServer.Leader() == ms.Topo.RaftServer.Name() { + go ms.ensureTopologyId() + } } }) raftServerName = fmt.Sprintf("[%s]", ms.Topo.RaftServer.Name()) @@ -236,6 +241,42 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { } } +func (ms *MasterServer) ensureTopologyId() { + ms.topologyIdGenLock.Lock() + defer ms.topologyIdGenLock.Unlock() + + // Send a no-op command to ensure all previous logs are applied (barrier) + // This handles the case where log replay is still in progress + glog.V(1).Infof("ensureTopologyId: sending barrier command") + for { + if !ms.Topo.IsLeader() { + glog.V(1).Infof("lost leadership while sending barrier command for topologyId") + return + } + if _, err := ms.Topo.RaftServer.Do(topology.NewMaxVolumeIdCommand(ms.Topo.GetMaxVolumeId(), ms.Topo.GetTopologyId())); err != nil { + glog.Errorf("failed to sync raft for topologyId: %v, retrying in 1s", err) + time.Sleep(time.Second) + continue + } + break + } + glog.V(1).Infof("ensureTopologyId: barrier command completed") + + if !ms.Topo.IsLeader() { + return + } + + currentId := ms.Topo.GetTopologyId() + glog.V(1).Infof("ensureTopologyId: current TopologyId after barrier: %s", currentId) + + EnsureTopologyId(ms.Topo, func() bool { + return ms.Topo.IsLeader() + }, func(topologyId string) error { + _, err := ms.Topo.RaftServer.Do(topology.NewMaxVolumeIdCommand(ms.Topo.GetMaxVolumeId(), topologyId)) + return err + }) +} + func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if ms.Topo.IsLeader() { diff --git a/weed/server/raft_common.go b/weed/server/raft_common.go new file mode 100644 index 000000000..09aeaaac0 --- /dev/null +++ b/weed/server/raft_common.go @@ -0,0 +1,46 @@ +package weed_server + +import ( + "time" + + "github.com/google/uuid" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/topology" +) + +// EnsureTopologyId ensures that a TopologyId is generated and persisted if it's currently missing. +// It uses the provided checkLeaderFn to verify leadership and persistFn to save the new ID. +func EnsureTopologyId(topo *topology.Topology, checkLeaderFn func() bool, persistFn func(string) error) { + if topo.GetTopologyId() != "" { + return + } + + topologyId := uuid.New().String() + for { + if !checkLeaderFn() { + glog.V(0).Infof("lost leadership while saving topologyId") + return + } + + // Another concurrent operation may have set the ID between generation and now. + if latestId := topo.GetTopologyId(); latestId != "" { + glog.V(1).Infof("topologyId was set concurrently to %s, aborting generation", latestId) + return + } + + if err := persistFn(topologyId); err != nil { + glog.Errorf("failed to save topologyId, will retry: %v", err) + time.Sleep(time.Second) + continue + } + + // Verify that the topology ID was actually applied as expected. + appliedId := topo.GetTopologyId() + if appliedId != "" && appliedId != topologyId { + glog.V(0).Infof("TopologyId generation race: expected %s, but current TopologyId is %s", topologyId, appliedId) + } else { + glog.V(0).Infof("TopologyId generated: %s", topologyId) + } + break + } +} diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go index 348b243c2..33cf25412 100644 --- a/weed/server/raft_hashicorp.go +++ b/weed/server/raft_hashicorp.go @@ -4,6 +4,7 @@ package weed_server // https://github.com/Jille/raft-grpc-example/blob/cd5bcab0218f008e044fbeee4facdd01b06018ad/application.go#L18 import ( + "encoding/json" "fmt" "math/rand/v2" "os" @@ -17,10 +18,12 @@ import ( "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" "github.com/hashicorp/raft" + hashicorpRaft "github.com/hashicorp/raft" boltdb "github.com/hashicorp/raft-boltdb/v2" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/stats" + "github.com/seaweedfs/seaweedfs/weed/topology" "google.golang.org/grpc" ) @@ -72,6 +75,17 @@ func (s *RaftServer) monitorLeaderLoop(updatePeers bool) { s.topo.DoBarrier() + EnsureTopologyId(s.topo, func() bool { + return s.RaftHashicorp.State() == hashicorpRaft.Leader + }, func(topologyId string) error { + command := topology.NewMaxVolumeIdCommand(s.topo.GetMaxVolumeId(), topologyId) + b, err := json.Marshal(command) + if err != nil { + return err + } + return s.RaftHashicorp.Apply(b, 5*time.Second).Error() + }) + stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", leader)).Inc() } else { s.topo.BarrierReset() diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index 4d2209dc0..f05dd97e5 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -54,6 +54,7 @@ var _ hashicorpRaft.FSM = &StateMachine{} func (s StateMachine) Save() ([]byte, error) { state := topology.MaxVolumeIdCommand{ MaxVolumeId: s.topo.GetMaxVolumeId(), + TopologyId: s.topo.GetTopologyId(), } glog.V(1).Infof("Save raft state %+v", state) return json.Marshal(state) @@ -67,6 +68,10 @@ func (s StateMachine) Recovery(data []byte) error { } glog.V(1).Infof("Recovery raft state %+v", state) s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId) + if state.TopologyId != "" { + s.topo.SetTopologyId(state.TopologyId) + glog.V(0).Infof("Recovered TopologyId: %s", state.TopologyId) + } return nil } @@ -78,6 +83,14 @@ func (s *StateMachine) Apply(l *hashicorpRaft.Log) interface{} { return err } s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId) + if state.TopologyId != "" { + prevTopologyId := s.topo.GetTopologyId() + s.topo.SetTopologyId(state.TopologyId) + // Log when recovering TopologyId from Raft log replay, or setting it for the first time. + if prevTopologyId == "" { + glog.V(0).Infof("Set TopologyId from raft log: %s", state.TopologyId) + } + } glog.V(1).Infoln("max volume id", before, "==>", s.topo.GetMaxVolumeId()) return nil @@ -86,6 +99,7 @@ func (s *StateMachine) Apply(l *hashicorpRaft.Log) interface{} { func (s *StateMachine) Snapshot() (hashicorpRaft.FSMSnapshot, error) { return &topology.MaxVolumeIdCommand{ MaxVolumeId: s.topo.GetMaxVolumeId(), + TopologyId: s.topo.GetTopologyId(), }, nil } @@ -118,9 +132,9 @@ func NewRaftServer(option *RaftServerOption) (*RaftServer, error) { transporter := raft.NewGrpcTransporter(option.GrpcDialOption) glog.V(0).Infof("Starting RaftServer with %v", option.ServerAddr) - // always clear previous log to avoid server is promotable - os.RemoveAll(path.Join(s.dataDir, "log")) if !option.RaftResumeState { + // clear previous log to ensure fresh start + os.RemoveAll(path.Join(s.dataDir, "log")) // always clear previous metadata os.RemoveAll(path.Join(s.dataDir, "conf")) os.RemoveAll(path.Join(s.dataDir, "snapshot")) diff --git a/weed/telemetry/client.go b/weed/telemetry/client.go index 684ae4bae..c1ac74ce5 100644 --- a/weed/telemetry/client.go +++ b/weed/telemetry/client.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "net/http" + "sync" "time" "github.com/google/uuid" @@ -17,6 +18,8 @@ type Client struct { enabled bool instanceID string httpClient *http.Client + topologyId string + sync.RWMutex } // NewClient creates a new telemetry client @@ -31,6 +34,12 @@ func NewClient(url string, enabled bool) *Client { } } +func (c *Client) SetTopologyId(topologyId string) { + c.Lock() + defer c.Unlock() + c.topologyId = topologyId +} + // IsEnabled returns whether telemetry is enabled func (c *Client) IsEnabled() bool { return c.enabled && c.url != "" @@ -42,10 +51,20 @@ func (c *Client) SendTelemetry(data *proto.TelemetryData) error { return nil } - // Set the cluster ID - data.ClusterId = c.instanceID + // Work on a copy to avoid mutating the caller's TelemetryData + clonedData, ok := protobuf.Clone(data).(*proto.TelemetryData) + if !ok { + return fmt.Errorf("failed to clone telemetry data") + } + + // Set the topology ID + c.RLock() + if c.topologyId != "" { + clonedData.TopologyId = c.topologyId + } + c.RUnlock() - return c.sendProtobuf(data) + return c.sendProtobuf(clonedData) } // SendTelemetryAsync sends telemetry data asynchronously diff --git a/weed/telemetry/collector.go b/weed/telemetry/collector.go index 423e82fc6..8f1fc8859 100644 --- a/weed/telemetry/collector.go +++ b/weed/telemetry/collector.go @@ -62,6 +62,10 @@ func (c *Collector) CollectAndSendAsync() { return } + if c.topo != nil { + c.client.SetTopologyId(c.topo.GetTopologyId()) + } + go func() { data := c.collectData() c.client.SendTelemetryAsync(data) diff --git a/weed/topology/cluster_commands.go b/weed/topology/cluster_commands.go index 3e4b44648..dfa3bf6cc 100644 --- a/weed/topology/cluster_commands.go +++ b/weed/topology/cluster_commands.go @@ -12,11 +12,13 @@ import ( type MaxVolumeIdCommand struct { MaxVolumeId needle.VolumeId `json:"maxVolumeId"` + TopologyId string `json:"topologyId"` } -func NewMaxVolumeIdCommand(value needle.VolumeId) *MaxVolumeIdCommand { +func NewMaxVolumeIdCommand(value needle.VolumeId, topologyId string) *MaxVolumeIdCommand { return &MaxVolumeIdCommand{ MaxVolumeId: value, + TopologyId: topologyId, } } @@ -29,7 +31,18 @@ func (c *MaxVolumeIdCommand) Apply(server raft.Server) (interface{}, error) { topo := server.Context().(*Topology) before := topo.GetMaxVolumeId() topo.UpAdjustMaxVolumeId(c.MaxVolumeId) - + if c.TopologyId != "" { + prevTopologyId := topo.GetTopologyId() + topo.SetTopologyId(c.TopologyId) + // Log when TopologyId is set for the first time, with different messages for leader and follower. + if prevTopologyId == "" { + if server.State() == raft.Leader { + glog.V(0).Infof("TopologyId generated and applied on leader: %s", c.TopologyId) + } else { + glog.V(0).Infof("TopologyId applied on follower: %s", c.TopologyId) + } + } + } glog.V(1).Infoln("max volume id", before, "==>", topo.GetMaxVolumeId()) return nil, nil diff --git a/weed/topology/topology.go b/weed/topology/topology.go index bbae97d72..db7cb17d7 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -57,6 +57,9 @@ type Topology struct { UuidAccessLock sync.RWMutex UuidMap map[string][]string + topologyId string + topologyIdLock sync.RWMutex + LastLeaderChangeTime time.Time } @@ -234,11 +237,11 @@ func (t *Topology) NextVolumeId() (needle.VolumeId, error) { defer t.RaftServerAccessLock.RUnlock() if t.RaftServer != nil { - if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next)); err != nil { + if _, err := t.RaftServer.Do(NewMaxVolumeIdCommand(next, t.GetTopologyId())); err != nil { return 0, err } } else if t.HashicorpRaft != nil { - b, err := json.Marshal(NewMaxVolumeIdCommand(next)) + b, err := json.Marshal(NewMaxVolumeIdCommand(next, t.GetTopologyId())) if err != nil { return 0, fmt.Errorf("failed marshal NewMaxVolumeIdCommand: %+v", err) } @@ -468,3 +471,24 @@ func (t *Topology) EnableVacuum() { glog.V(0).Infof("EnableVacuum") t.isDisableVacuum = false } + +func (t *Topology) GetTopologyId() string { + t.topologyIdLock.RLock() + defer t.topologyIdLock.RUnlock() + return t.topologyId +} + +func (t *Topology) SetTopologyId(topologyId string) { + t.topologyIdLock.Lock() + defer t.topologyIdLock.Unlock() + if topologyId == "" { + return + } + if t.topologyId == "" { + t.topologyId = topologyId + return + } + if t.topologyId != topologyId { + glog.Fatalf("Split-brain detected! Current TopologyId is %s, but received %s. Stopping to prevent data corruption.", t.topologyId, topologyId) + } +}