Browse Source

Boostrap persistent state for volume servers.

This PR implements logic load/save persistent state information for storages
associated with volume servers, and reporting state changes back to masters
via heartbeat messages.

More work ensues!

See https://github.com/seaweedfs/seaweedfs/issues/7977 for details.
pull/7984/head
Lisandro Pin 6 days ago
parent
commit
92429d2d91
Failed to extract signature
  1. 5
      weed/pb/master.proto
  2. 185
      weed/pb/master_pb/master.pb.go
  3. 10
      weed/pb/volume_server.proto
  4. 1133
      weed/pb/volume_server_pb/volume_server.pb.go
  5. 777
      weed/pb/volume_server_pb/volume_server_grpc.pb.go
  6. 1
      weed/server/master_grpc_server.go
  7. 13
      weed/server/volume_grpc_client_to_master.go
  8. 9
      weed/storage/disk_location.go
  9. 68
      weed/storage/store.go
  10. 71
      weed/storage/store_state.go

5
weed/pb/master.proto

@ -4,6 +4,8 @@ package master_pb;
option go_package = "github.com/seaweedfs/seaweedfs/weed/pb/master_pb";
import "volume_server.proto";
//////////////////////////////////////////////////
service Seaweed {
@ -84,6 +86,9 @@ message Heartbeat {
uint32 grpc_port = 20;
repeated string location_uuids = 21;
string id = 22; // volume server id, independent of ip:port for stable identification
// state flags
volume_server_pb.VolumeServerState state = 23;
}
message HeartbeatResponse {

185
weed/pb/master_pb/master.pb.go

@ -11,6 +11,7 @@ import (
sync "sync"
unsafe "unsafe"
volume_server_pb "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
)
@ -46,8 +47,10 @@ type Heartbeat struct {
GrpcPort uint32 `protobuf:"varint,20,opt,name=grpc_port,json=grpcPort,proto3" json:"grpc_port,omitempty"`
LocationUuids []string `protobuf:"bytes,21,rep,name=location_uuids,json=locationUuids,proto3" json:"location_uuids,omitempty"`
Id string `protobuf:"bytes,22,opt,name=id,proto3" json:"id,omitempty"` // volume server id, independent of ip:port for stable identification
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
// state flags
State *volume_server_pb.VolumeServerState `protobuf:"bytes,23,opt,name=state,proto3" json:"state,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *Heartbeat) Reset() {
@ -213,6 +216,13 @@ func (x *Heartbeat) GetId() string {
return ""
}
func (x *Heartbeat) GetState() *volume_server_pb.VolumeServerState {
if x != nil {
return x.State
}
return nil
}
type HeartbeatResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
VolumeSizeLimit uint64 `protobuf:"varint,1,opt,name=volume_size_limit,json=volumeSizeLimit,proto3" json:"volume_size_limit,omitempty"`
@ -4159,7 +4169,7 @@ var File_master_proto protoreflect.FileDescriptor
const file_master_proto_rawDesc = "" +
"\n" +
"\fmaster.proto\x12\tmaster_pb\"\xd0\a\n" +
"\fmaster.proto\x12\tmaster_pb\x1a\x13volume_server.proto\"\x8b\b\n" +
"\tHeartbeat\x12\x0e\n" +
"\x02ip\x18\x01 \x01(\tR\x02ip\x12\x12\n" +
"\x04port\x18\x02 \x01(\rR\x04port\x12\x1d\n" +
@ -4185,7 +4195,8 @@ const file_master_proto_rawDesc = "" +
"\x11max_volume_counts\x18\x04 \x03(\v2).master_pb.Heartbeat.MaxVolumeCountsEntryR\x0fmaxVolumeCounts\x12\x1b\n" +
"\tgrpc_port\x18\x14 \x01(\rR\bgrpcPort\x12%\n" +
"\x0elocation_uuids\x18\x15 \x03(\tR\rlocationUuids\x12\x0e\n" +
"\x02id\x18\x16 \x01(\tR\x02id\x1aB\n" +
"\x02id\x18\x16 \x01(\tR\x02id\x129\n" +
"\x05state\x18\x17 \x01(\v2#.volume_server_pb.VolumeServerStateR\x05state\x1aB\n" +
"\x14MaxVolumeCountsEntry\x12\x10\n" +
"\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n" +
"\x05value\x18\x02 \x01(\rR\x05value:\x028\x01\"\xcd\x02\n" +
@ -4634,6 +4645,7 @@ var file_master_proto_goTypes = []any{
(*LookupEcVolumeResponse_EcShardIdLocation)(nil), // 69: master_pb.LookupEcVolumeResponse.EcShardIdLocation
(*ListClusterNodesResponse_ClusterNode)(nil), // 70: master_pb.ListClusterNodesResponse.ClusterNode
(*RaftListClusterServersResponse_ClusterServers)(nil), // 71: master_pb.RaftListClusterServersResponse.ClusterServers
(*volume_server_pb.VolumeServerState)(nil), // 72: volume_server_pb.VolumeServerState
}
var file_master_proto_depIdxs = []int32{
2, // 0: master_pb.Heartbeat.volumes:type_name -> master_pb.VolumeInformationMessage
@ -4643,88 +4655,89 @@ var file_master_proto_depIdxs = []int32{
4, // 4: master_pb.Heartbeat.new_ec_shards:type_name -> master_pb.VolumeEcShardInformationMessage
4, // 5: master_pb.Heartbeat.deleted_ec_shards:type_name -> master_pb.VolumeEcShardInformationMessage
61, // 6: master_pb.Heartbeat.max_volume_counts:type_name -> master_pb.Heartbeat.MaxVolumeCountsEntry
5, // 7: master_pb.HeartbeatResponse.storage_backends:type_name -> master_pb.StorageBackend
62, // 8: master_pb.StorageBackend.properties:type_name -> master_pb.StorageBackend.PropertiesEntry
63, // 9: master_pb.SuperBlockExtra.erasure_coding:type_name -> master_pb.SuperBlockExtra.ErasureCoding
9, // 10: master_pb.KeepConnectedResponse.volume_location:type_name -> master_pb.VolumeLocation
10, // 11: master_pb.KeepConnectedResponse.cluster_node_update:type_name -> master_pb.ClusterNodeUpdate
64, // 12: master_pb.LookupVolumeResponse.volume_id_locations:type_name -> master_pb.LookupVolumeResponse.VolumeIdLocation
14, // 13: master_pb.AssignResponse.replicas:type_name -> master_pb.Location
14, // 14: master_pb.AssignResponse.location:type_name -> master_pb.Location
20, // 15: master_pb.CollectionListResponse.collections:type_name -> master_pb.Collection
2, // 16: master_pb.DiskInfo.volume_infos:type_name -> master_pb.VolumeInformationMessage
4, // 17: master_pb.DiskInfo.ec_shard_infos:type_name -> master_pb.VolumeEcShardInformationMessage
65, // 18: master_pb.DataNodeInfo.diskInfos:type_name -> master_pb.DataNodeInfo.DiskInfosEntry
26, // 19: master_pb.RackInfo.data_node_infos:type_name -> master_pb.DataNodeInfo
66, // 20: master_pb.RackInfo.diskInfos:type_name -> master_pb.RackInfo.DiskInfosEntry
27, // 21: master_pb.DataCenterInfo.rack_infos:type_name -> master_pb.RackInfo
67, // 22: master_pb.DataCenterInfo.diskInfos:type_name -> master_pb.DataCenterInfo.DiskInfosEntry
28, // 23: master_pb.TopologyInfo.data_center_infos:type_name -> master_pb.DataCenterInfo
68, // 24: master_pb.TopologyInfo.diskInfos:type_name -> master_pb.TopologyInfo.DiskInfosEntry
29, // 25: master_pb.VolumeListResponse.topology_info:type_name -> master_pb.TopologyInfo
69, // 26: master_pb.LookupEcVolumeResponse.shard_id_locations:type_name -> master_pb.LookupEcVolumeResponse.EcShardIdLocation
5, // 27: master_pb.GetMasterConfigurationResponse.storage_backends:type_name -> master_pb.StorageBackend
70, // 28: master_pb.ListClusterNodesResponse.cluster_nodes:type_name -> master_pb.ListClusterNodesResponse.ClusterNode
71, // 29: master_pb.RaftListClusterServersResponse.cluster_servers:type_name -> master_pb.RaftListClusterServersResponse.ClusterServers
14, // 30: master_pb.LookupVolumeResponse.VolumeIdLocation.locations:type_name -> master_pb.Location
25, // 31: master_pb.DataNodeInfo.DiskInfosEntry.value:type_name -> master_pb.DiskInfo
25, // 32: master_pb.RackInfo.DiskInfosEntry.value:type_name -> master_pb.DiskInfo
25, // 33: master_pb.DataCenterInfo.DiskInfosEntry.value:type_name -> master_pb.DiskInfo
25, // 34: master_pb.TopologyInfo.DiskInfosEntry.value:type_name -> master_pb.DiskInfo
14, // 35: master_pb.LookupEcVolumeResponse.EcShardIdLocation.locations:type_name -> master_pb.Location
0, // 36: master_pb.Seaweed.SendHeartbeat:input_type -> master_pb.Heartbeat
8, // 37: master_pb.Seaweed.KeepConnected:input_type -> master_pb.KeepConnectedRequest
12, // 38: master_pb.Seaweed.LookupVolume:input_type -> master_pb.LookupVolumeRequest
15, // 39: master_pb.Seaweed.Assign:input_type -> master_pb.AssignRequest
15, // 40: master_pb.Seaweed.StreamAssign:input_type -> master_pb.AssignRequest
18, // 41: master_pb.Seaweed.Statistics:input_type -> master_pb.StatisticsRequest
21, // 42: master_pb.Seaweed.CollectionList:input_type -> master_pb.CollectionListRequest
23, // 43: master_pb.Seaweed.CollectionDelete:input_type -> master_pb.CollectionDeleteRequest
30, // 44: master_pb.Seaweed.VolumeList:input_type -> master_pb.VolumeListRequest
32, // 45: master_pb.Seaweed.LookupEcVolume:input_type -> master_pb.LookupEcVolumeRequest
34, // 46: master_pb.Seaweed.VacuumVolume:input_type -> master_pb.VacuumVolumeRequest
36, // 47: master_pb.Seaweed.DisableVacuum:input_type -> master_pb.DisableVacuumRequest
38, // 48: master_pb.Seaweed.EnableVacuum:input_type -> master_pb.EnableVacuumRequest
40, // 49: master_pb.Seaweed.VolumeMarkReadonly:input_type -> master_pb.VolumeMarkReadonlyRequest
42, // 50: master_pb.Seaweed.GetMasterConfiguration:input_type -> master_pb.GetMasterConfigurationRequest
44, // 51: master_pb.Seaweed.ListClusterNodes:input_type -> master_pb.ListClusterNodesRequest
46, // 52: master_pb.Seaweed.LeaseAdminToken:input_type -> master_pb.LeaseAdminTokenRequest
48, // 53: master_pb.Seaweed.ReleaseAdminToken:input_type -> master_pb.ReleaseAdminTokenRequest
50, // 54: master_pb.Seaweed.Ping:input_type -> master_pb.PingRequest
56, // 55: master_pb.Seaweed.RaftListClusterServers:input_type -> master_pb.RaftListClusterServersRequest
52, // 56: master_pb.Seaweed.RaftAddServer:input_type -> master_pb.RaftAddServerRequest
54, // 57: master_pb.Seaweed.RaftRemoveServer:input_type -> master_pb.RaftRemoveServerRequest
58, // 58: master_pb.Seaweed.RaftLeadershipTransfer:input_type -> master_pb.RaftLeadershipTransferRequest
16, // 59: master_pb.Seaweed.VolumeGrow:input_type -> master_pb.VolumeGrowRequest
1, // 60: master_pb.Seaweed.SendHeartbeat:output_type -> master_pb.HeartbeatResponse
11, // 61: master_pb.Seaweed.KeepConnected:output_type -> master_pb.KeepConnectedResponse
13, // 62: master_pb.Seaweed.LookupVolume:output_type -> master_pb.LookupVolumeResponse
17, // 63: master_pb.Seaweed.Assign:output_type -> master_pb.AssignResponse
17, // 64: master_pb.Seaweed.StreamAssign:output_type -> master_pb.AssignResponse
19, // 65: master_pb.Seaweed.Statistics:output_type -> master_pb.StatisticsResponse
22, // 66: master_pb.Seaweed.CollectionList:output_type -> master_pb.CollectionListResponse
24, // 67: master_pb.Seaweed.CollectionDelete:output_type -> master_pb.CollectionDeleteResponse
31, // 68: master_pb.Seaweed.VolumeList:output_type -> master_pb.VolumeListResponse
33, // 69: master_pb.Seaweed.LookupEcVolume:output_type -> master_pb.LookupEcVolumeResponse
35, // 70: master_pb.Seaweed.VacuumVolume:output_type -> master_pb.VacuumVolumeResponse
37, // 71: master_pb.Seaweed.DisableVacuum:output_type -> master_pb.DisableVacuumResponse
39, // 72: master_pb.Seaweed.EnableVacuum:output_type -> master_pb.EnableVacuumResponse
41, // 73: master_pb.Seaweed.VolumeMarkReadonly:output_type -> master_pb.VolumeMarkReadonlyResponse
43, // 74: master_pb.Seaweed.GetMasterConfiguration:output_type -> master_pb.GetMasterConfigurationResponse
45, // 75: master_pb.Seaweed.ListClusterNodes:output_type -> master_pb.ListClusterNodesResponse
47, // 76: master_pb.Seaweed.LeaseAdminToken:output_type -> master_pb.LeaseAdminTokenResponse
49, // 77: master_pb.Seaweed.ReleaseAdminToken:output_type -> master_pb.ReleaseAdminTokenResponse
51, // 78: master_pb.Seaweed.Ping:output_type -> master_pb.PingResponse
57, // 79: master_pb.Seaweed.RaftListClusterServers:output_type -> master_pb.RaftListClusterServersResponse
53, // 80: master_pb.Seaweed.RaftAddServer:output_type -> master_pb.RaftAddServerResponse
55, // 81: master_pb.Seaweed.RaftRemoveServer:output_type -> master_pb.RaftRemoveServerResponse
59, // 82: master_pb.Seaweed.RaftLeadershipTransfer:output_type -> master_pb.RaftLeadershipTransferResponse
60, // 83: master_pb.Seaweed.VolumeGrow:output_type -> master_pb.VolumeGrowResponse
60, // [60:84] is the sub-list for method output_type
36, // [36:60] is the sub-list for method input_type
36, // [36:36] is the sub-list for extension type_name
36, // [36:36] is the sub-list for extension extendee
0, // [0:36] is the sub-list for field type_name
72, // 7: master_pb.Heartbeat.state:type_name -> volume_server_pb.VolumeServerState
5, // 8: master_pb.HeartbeatResponse.storage_backends:type_name -> master_pb.StorageBackend
62, // 9: master_pb.StorageBackend.properties:type_name -> master_pb.StorageBackend.PropertiesEntry
63, // 10: master_pb.SuperBlockExtra.erasure_coding:type_name -> master_pb.SuperBlockExtra.ErasureCoding
9, // 11: master_pb.KeepConnectedResponse.volume_location:type_name -> master_pb.VolumeLocation
10, // 12: master_pb.KeepConnectedResponse.cluster_node_update:type_name -> master_pb.ClusterNodeUpdate
64, // 13: master_pb.LookupVolumeResponse.volume_id_locations:type_name -> master_pb.LookupVolumeResponse.VolumeIdLocation
14, // 14: master_pb.AssignResponse.replicas:type_name -> master_pb.Location
14, // 15: master_pb.AssignResponse.location:type_name -> master_pb.Location
20, // 16: master_pb.CollectionListResponse.collections:type_name -> master_pb.Collection
2, // 17: master_pb.DiskInfo.volume_infos:type_name -> master_pb.VolumeInformationMessage
4, // 18: master_pb.DiskInfo.ec_shard_infos:type_name -> master_pb.VolumeEcShardInformationMessage
65, // 19: master_pb.DataNodeInfo.diskInfos:type_name -> master_pb.DataNodeInfo.DiskInfosEntry
26, // 20: master_pb.RackInfo.data_node_infos:type_name -> master_pb.DataNodeInfo
66, // 21: master_pb.RackInfo.diskInfos:type_name -> master_pb.RackInfo.DiskInfosEntry
27, // 22: master_pb.DataCenterInfo.rack_infos:type_name -> master_pb.RackInfo
67, // 23: master_pb.DataCenterInfo.diskInfos:type_name -> master_pb.DataCenterInfo.DiskInfosEntry
28, // 24: master_pb.TopologyInfo.data_center_infos:type_name -> master_pb.DataCenterInfo
68, // 25: master_pb.TopologyInfo.diskInfos:type_name -> master_pb.TopologyInfo.DiskInfosEntry
29, // 26: master_pb.VolumeListResponse.topology_info:type_name -> master_pb.TopologyInfo
69, // 27: master_pb.LookupEcVolumeResponse.shard_id_locations:type_name -> master_pb.LookupEcVolumeResponse.EcShardIdLocation
5, // 28: master_pb.GetMasterConfigurationResponse.storage_backends:type_name -> master_pb.StorageBackend
70, // 29: master_pb.ListClusterNodesResponse.cluster_nodes:type_name -> master_pb.ListClusterNodesResponse.ClusterNode
71, // 30: master_pb.RaftListClusterServersResponse.cluster_servers:type_name -> master_pb.RaftListClusterServersResponse.ClusterServers
14, // 31: master_pb.LookupVolumeResponse.VolumeIdLocation.locations:type_name -> master_pb.Location
25, // 32: master_pb.DataNodeInfo.DiskInfosEntry.value:type_name -> master_pb.DiskInfo
25, // 33: master_pb.RackInfo.DiskInfosEntry.value:type_name -> master_pb.DiskInfo
25, // 34: master_pb.DataCenterInfo.DiskInfosEntry.value:type_name -> master_pb.DiskInfo
25, // 35: master_pb.TopologyInfo.DiskInfosEntry.value:type_name -> master_pb.DiskInfo
14, // 36: master_pb.LookupEcVolumeResponse.EcShardIdLocation.locations:type_name -> master_pb.Location
0, // 37: master_pb.Seaweed.SendHeartbeat:input_type -> master_pb.Heartbeat
8, // 38: master_pb.Seaweed.KeepConnected:input_type -> master_pb.KeepConnectedRequest
12, // 39: master_pb.Seaweed.LookupVolume:input_type -> master_pb.LookupVolumeRequest
15, // 40: master_pb.Seaweed.Assign:input_type -> master_pb.AssignRequest
15, // 41: master_pb.Seaweed.StreamAssign:input_type -> master_pb.AssignRequest
18, // 42: master_pb.Seaweed.Statistics:input_type -> master_pb.StatisticsRequest
21, // 43: master_pb.Seaweed.CollectionList:input_type -> master_pb.CollectionListRequest
23, // 44: master_pb.Seaweed.CollectionDelete:input_type -> master_pb.CollectionDeleteRequest
30, // 45: master_pb.Seaweed.VolumeList:input_type -> master_pb.VolumeListRequest
32, // 46: master_pb.Seaweed.LookupEcVolume:input_type -> master_pb.LookupEcVolumeRequest
34, // 47: master_pb.Seaweed.VacuumVolume:input_type -> master_pb.VacuumVolumeRequest
36, // 48: master_pb.Seaweed.DisableVacuum:input_type -> master_pb.DisableVacuumRequest
38, // 49: master_pb.Seaweed.EnableVacuum:input_type -> master_pb.EnableVacuumRequest
40, // 50: master_pb.Seaweed.VolumeMarkReadonly:input_type -> master_pb.VolumeMarkReadonlyRequest
42, // 51: master_pb.Seaweed.GetMasterConfiguration:input_type -> master_pb.GetMasterConfigurationRequest
44, // 52: master_pb.Seaweed.ListClusterNodes:input_type -> master_pb.ListClusterNodesRequest
46, // 53: master_pb.Seaweed.LeaseAdminToken:input_type -> master_pb.LeaseAdminTokenRequest
48, // 54: master_pb.Seaweed.ReleaseAdminToken:input_type -> master_pb.ReleaseAdminTokenRequest
50, // 55: master_pb.Seaweed.Ping:input_type -> master_pb.PingRequest
56, // 56: master_pb.Seaweed.RaftListClusterServers:input_type -> master_pb.RaftListClusterServersRequest
52, // 57: master_pb.Seaweed.RaftAddServer:input_type -> master_pb.RaftAddServerRequest
54, // 58: master_pb.Seaweed.RaftRemoveServer:input_type -> master_pb.RaftRemoveServerRequest
58, // 59: master_pb.Seaweed.RaftLeadershipTransfer:input_type -> master_pb.RaftLeadershipTransferRequest
16, // 60: master_pb.Seaweed.VolumeGrow:input_type -> master_pb.VolumeGrowRequest
1, // 61: master_pb.Seaweed.SendHeartbeat:output_type -> master_pb.HeartbeatResponse
11, // 62: master_pb.Seaweed.KeepConnected:output_type -> master_pb.KeepConnectedResponse
13, // 63: master_pb.Seaweed.LookupVolume:output_type -> master_pb.LookupVolumeResponse
17, // 64: master_pb.Seaweed.Assign:output_type -> master_pb.AssignResponse
17, // 65: master_pb.Seaweed.StreamAssign:output_type -> master_pb.AssignResponse
19, // 66: master_pb.Seaweed.Statistics:output_type -> master_pb.StatisticsResponse
22, // 67: master_pb.Seaweed.CollectionList:output_type -> master_pb.CollectionListResponse
24, // 68: master_pb.Seaweed.CollectionDelete:output_type -> master_pb.CollectionDeleteResponse
31, // 69: master_pb.Seaweed.VolumeList:output_type -> master_pb.VolumeListResponse
33, // 70: master_pb.Seaweed.LookupEcVolume:output_type -> master_pb.LookupEcVolumeResponse
35, // 71: master_pb.Seaweed.VacuumVolume:output_type -> master_pb.VacuumVolumeResponse
37, // 72: master_pb.Seaweed.DisableVacuum:output_type -> master_pb.DisableVacuumResponse
39, // 73: master_pb.Seaweed.EnableVacuum:output_type -> master_pb.EnableVacuumResponse
41, // 74: master_pb.Seaweed.VolumeMarkReadonly:output_type -> master_pb.VolumeMarkReadonlyResponse
43, // 75: master_pb.Seaweed.GetMasterConfiguration:output_type -> master_pb.GetMasterConfigurationResponse
45, // 76: master_pb.Seaweed.ListClusterNodes:output_type -> master_pb.ListClusterNodesResponse
47, // 77: master_pb.Seaweed.LeaseAdminToken:output_type -> master_pb.LeaseAdminTokenResponse
49, // 78: master_pb.Seaweed.ReleaseAdminToken:output_type -> master_pb.ReleaseAdminTokenResponse
51, // 79: master_pb.Seaweed.Ping:output_type -> master_pb.PingResponse
57, // 80: master_pb.Seaweed.RaftListClusterServers:output_type -> master_pb.RaftListClusterServersResponse
53, // 81: master_pb.Seaweed.RaftAddServer:output_type -> master_pb.RaftAddServerResponse
55, // 82: master_pb.Seaweed.RaftRemoveServer:output_type -> master_pb.RaftRemoveServerResponse
59, // 83: master_pb.Seaweed.RaftLeadershipTransfer:output_type -> master_pb.RaftLeadershipTransferResponse
60, // 84: master_pb.Seaweed.VolumeGrow:output_type -> master_pb.VolumeGrowResponse
61, // [61:85] is the sub-list for method output_type
37, // [37:61] is the sub-list for method input_type
37, // [37:37] is the sub-list for extension type_name
37, // [37:37] is the sub-list for extension extendee
0, // [0:37] is the sub-list for field type_name
}
func init() { file_master_proto_init() }

10
weed/pb/volume_server.proto

@ -7,6 +7,14 @@ import "remote.proto";
//////////////////////////////////////////////////
// Persistent state for volume servers.
message VolumeServerState {
// Whether the server is in maintenance (i.e. read-only) mode.
bool maintenance = 1;
}
//////////////////////////////////////////////////
service VolumeServer {
//Experts only: takes multiple fid parameters. This function does not propagate deletes to replicas.
rpc BatchDelete (BatchDeleteRequest) returns (BatchDeleteResponse) {
@ -45,6 +53,7 @@ service VolumeServer {
}
rpc VolumeStatus (VolumeStatusRequest) returns (VolumeStatusResponse) {
}
// TODO(issues/7977): add RPCs to control state flags
// copy the .idx .dat files, and mount this volume
rpc VolumeCopy (VolumeCopyRequest) returns (stream VolumeCopyResponse) {
@ -569,6 +578,7 @@ message VolumeServerStatusRequest {
}
message VolumeServerStatusResponse {
// TODO(issues/7977): add volume server state to response
repeated DiskStatus disk_statuses = 1;
MemStatus memory_status = 2;
string version = 3;

1133
weed/pb/volume_server_pb/volume_server.pb.go
File diff suppressed because it is too large
View File

777
weed/pb/volume_server_pb/volume_server_grpc.pb.go
File diff suppressed because it is too large
View File

1
weed/server/master_grpc_server.go

@ -165,6 +165,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
glog.V(4).Infof("master received heartbeat %s", heartbeat.String())
stats.MasterReceivedHeartbeatCounter.WithLabelValues("total").Inc()
// TODO(issues/7977): process status heartbeat updates from volume servers
message := &master_pb.VolumeLocation{
Url: dn.Url(),

13
weed/server/volume_grpc_client_to_master.go

@ -212,6 +212,19 @@ func (vs *VolumeServer) doHeartbeatWithRetry(masterAddress pb.ServerAddress, grp
port := uint32(vs.store.Port)
for {
select {
case stateMessage := <-vs.store.StateUpdateChan:
stateBeat := &master_pb.Heartbeat{
Ip: ip,
Port: port,
DataCenter: dataCenter,
Rack: rack,
State: stateMessage,
}
glog.V(0).Infof("volume server %s:%d updates state to %v", vs.store.Ip, vs.store.Port, stateMessage)
if err = stream.Send(stateBeat); err != nil {
glog.V(0).Infof("Volume Server Failed to update state to master %s: %v", masterAddress, err)
return "", err
}
case volumeMessage := <-vs.store.NewVolumesChan:
deltaBeat := &master_pb.Heartbeat{
Ip: ip,

9
weed/storage/disk_location.go

@ -19,6 +19,11 @@ import (
"github.com/seaweedfs/seaweedfs/weed/util"
)
const (
UUIDFileName = "vol_dir.uuid"
UUIDFileMod = 0644
)
type DiskLocation struct {
Directory string
DirectoryUuid string
@ -42,7 +47,7 @@ type DiskLocation struct {
func GenerateDirUuid(dir string) (dirUuidString string, err error) {
glog.V(1).Infof("Getting uuid of volume directory:%s", dir)
fileName := dir + "/vol_dir.uuid"
fileName := filepath.Join(dir, UUIDFileName)
if !util.FileExists(fileName) {
dirUuidString, err = writeNewUuid(fileName)
} else {
@ -62,7 +67,7 @@ func GenerateDirUuid(dir string) (dirUuidString string, err error) {
func writeNewUuid(fileName string) (string, error) {
dirUuid, _ := uuid.NewRandom()
dirUuidString := dirUuid.String()
if err := util.WriteFile(fileName, []byte(dirUuidString), 0644); err != nil {
if err := util.WriteFile(fileName, []byte(dirUuidString), UUIDFileMod); err != nil {
return "", fmt.Errorf("failed to write uuid to %s : %v", fileName, err)
}
return dirUuidString, nil

68
weed/storage/store.go

@ -16,6 +16,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
@ -25,6 +26,7 @@ import (
const (
MAX_TTL_VOLUME_REMOVAL_DELAY = 10 // 10 minutes
HEARTBEAT_CHAN_SIZE = 1024
)
type ReadOption struct {
@ -69,6 +71,8 @@ type Store struct {
rack string // optional information, overwriting master setting if exists
connected bool
NeedleMapKind NeedleMapKind
State *State
StateUpdateChan chan *volume_server_pb.VolumeServerState
NewVolumesChan chan master_pb.VolumeShortInformationMessage
DeletedVolumesChan chan master_pb.VolumeShortInformationMessage
NewEcShardsChan chan master_pb.VolumeEcShardInformationMessage
@ -81,16 +85,31 @@ func (s *Store) String() (str string) {
return
}
func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int, publicUrl string, id string, dirnames []string, maxVolumeCounts []int32,
minFreeSpaces []util.MinFreeSpace, idxFolder string, needleMapKind NeedleMapKind, diskTypes []DiskType, ldbTimeout int64) (s *Store) {
s = &Store{grpcDialOption: grpcDialOption, Port: port, Ip: ip, GrpcPort: grpcPort, PublicUrl: publicUrl, Id: id, NeedleMapKind: needleMapKind}
s.Locations = make([]*DiskLocation, 0)
s.NewVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 1024)
s.DeletedVolumesChan = make(chan master_pb.VolumeShortInformationMessage, 1024)
s.NewEcShardsChan = make(chan master_pb.VolumeEcShardInformationMessage, 1024)
s.DeletedEcShardsChan = make(chan master_pb.VolumeEcShardInformationMessage, 1024)
func NewStore(
grpcDialOption grpc.DialOption,
ip string, port int, grpcPort int, publicUrl string, id string,
dirnames []string, maxVolumeCounts []int32, minFreeSpaces []util.MinFreeSpace,
idxFolder string,
needleMapKind NeedleMapKind,
diskTypes []DiskType,
ldbTimeout int64,
) (s *Store) {
s = &Store{
grpcDialOption: grpcDialOption,
Port: port,
Ip: ip,
GrpcPort: grpcPort,
PublicUrl: publicUrl,
Id: id,
NeedleMapKind: needleMapKind,
Locations: make([]*DiskLocation, 0),
StateUpdateChan: make(chan *volume_server_pb.VolumeServerState, HEARTBEAT_CHAN_SIZE),
NewVolumesChan: make(chan master_pb.VolumeShortInformationMessage, HEARTBEAT_CHAN_SIZE),
DeletedVolumesChan: make(chan master_pb.VolumeShortInformationMessage, HEARTBEAT_CHAN_SIZE),
NewEcShardsChan: make(chan master_pb.VolumeEcShardInformationMessage, HEARTBEAT_CHAN_SIZE),
DeletedEcShardsChan: make(chan master_pb.VolumeEcShardInformationMessage, HEARTBEAT_CHAN_SIZE),
}
var wg sync.WaitGroup
for i := 0; i < len(dirnames); i++ {
@ -130,8 +149,36 @@ func NewStore(grpcDialOption grpc.DialOption, ip string, port int, grpcPort int,
}
wg.Wait()
var err error
s.State, err = NewState(idxFolder)
if err != nil {
glog.Fatalf("failed to resolve state for volume %s: %v", id, err)
}
return
}
func (s *Store) LoadState() error {
err := s.State.Load()
if s.State.Pb != nil && err == nil {
s.StateUpdateChan <- s.State.Pb
}
return err
}
func (s *Store) SaveState() error {
if s.State.Pb == nil {
glog.Warningf("tried to save empty state for store %s", s.Id)
return nil
}
err := s.State.Save()
if s.State.Pb != nil && err == nil {
s.StateUpdateChan <- s.State.Pb
}
return err
}
func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMapKind NeedleMapKind, replicaPlacement string, ttlString string, preallocate int64, ver needle.Version, MemoryMapMaxSizeMb uint32, diskType DiskType, ldbTimeout int64) error {
rt, e := super_block.NewReplicaPlacementFromString(replicaPlacement)
if e != nil {
@ -144,6 +191,7 @@ func (s *Store) AddVolume(volumeId needle.VolumeId, collection string, needleMap
e = s.addVolume(volumeId, collection, needleMapKind, rt, ttl, preallocate, ver, MemoryMapMaxSizeMb, diskType, ldbTimeout)
return e
}
func (s *Store) DeleteCollection(collection string) (e error) {
for _, location := range s.Locations {
e = location.DeleteCollectionFromDiskLocation(collection)

71
weed/storage/store_state.go

@ -0,0 +1,71 @@
package storage
import (
"fmt"
"os"
"path/filepath"
"github.com/golang/protobuf/proto"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
const (
StateFileName = "state.pb"
StateFileMode = 0644
)
type State struct {
FilePath string
Pb *volume_server_pb.VolumeServerState
}
func NewState(dir string) (*State, error) {
state := &State{
FilePath: filepath.Join(dir, StateFileName),
Pb: nil,
}
err := state.Load()
return state, err
}
func (st *State) Load() error {
st.Pb = &volume_server_pb.VolumeServerState{}
if !util.FileExists(st.FilePath) {
glog.V(1).Infof("No preexisting store state at %s", st.FilePath)
return nil
}
binPb, err := os.ReadFile(st.FilePath)
if err != nil {
st.Pb = nil
return fmt.Errorf("failed to read store state from %s : %v", st.FilePath, err)
}
if err := proto.Unmarshal(binPb, st.Pb); err != nil {
st.Pb = nil
return fmt.Errorf("failed to parse store state from %s : %v", st.FilePath, err)
}
glog.V(1).Infof("Got store state from %s: %v", st.FilePath, st.Pb)
return nil
}
func (st *State) Save() error {
if st.Pb == nil {
st.Pb = &volume_server_pb.VolumeServerState{}
}
binPb, err := proto.Marshal(st.Pb)
if err != nil {
return fmt.Errorf("failed to serialize store state %v: %s", st.Pb, err)
}
if err := util.WriteFile(st.FilePath, binPb, StateFileMode); err != nil {
return fmt.Errorf("failed to write store state to %s : %v", st.FilePath, err)
}
glog.V(1).Infof("Saved store state %v to %s", st.Pb, st.FilePath)
return nil
}
Loading…
Cancel
Save