From 2cda4289f42df9d9fa6b0186d8e7cd0b0ed6d659 Mon Sep 17 00:00:00 2001 From: Lisandro Pin Date: Fri, 6 Feb 2026 19:58:43 +0100 Subject: [PATCH] Add a version token on RPCs to read/update volume server states. (#8191) * Add a version token on `GetState()`/`SetState()` RPCs for volume server states. * Make state version a property ov `VolumeServerState` instead of an in-memory counter. Also extend state atomicity to reads, instead of just writes. --- weed/pb/volume_server.proto | 4 +- weed/pb/volume_server_pb/volume_server.pb.go | 20 +++-- weed/server/volume_grpc_admin.go | 2 +- weed/server/volume_grpc_state.go | 6 +- weed/server/volume_server.go | 2 +- weed/server/volume_server_test.go | 7 +- weed/storage/store.go | 10 +-- weed/storage/store_state.go | 93 ++++++++++++++------ 8 files changed, 97 insertions(+), 47 deletions(-) diff --git a/weed/pb/volume_server.proto b/weed/pb/volume_server.proto index a16ff31ec..df961cedc 100644 --- a/weed/pb/volume_server.proto +++ b/weed/pb/volume_server.proto @@ -9,8 +9,10 @@ import "remote.proto"; // Persistent state for volume servers. message VolumeServerState { - // Whether the server is in maintenance (i.e. read-only) mode. + // whether the server is in maintenance (i.e. read-only) mode. bool maintenance = 1; + // incremental version counter + uint32 version = 2; } ////////////////////////////////////////////////// diff --git a/weed/pb/volume_server_pb/volume_server.pb.go b/weed/pb/volume_server_pb/volume_server.pb.go index f91e76b1e..9467d8d4a 100644 --- a/weed/pb/volume_server_pb/volume_server.pb.go +++ b/weed/pb/volume_server_pb/volume_server.pb.go @@ -77,8 +77,10 @@ func (VolumeScrubMode) EnumDescriptor() ([]byte, []int) { // Persistent state for volume servers. type VolumeServerState struct { state protoimpl.MessageState `protogen:"open.v1"` - // Whether the server is in maintenance (i.e. read-only) mode. - Maintenance bool `protobuf:"varint,1,opt,name=maintenance,proto3" json:"maintenance,omitempty"` + // whether the server is in maintenance (i.e. read-only) mode. + Maintenance bool `protobuf:"varint,1,opt,name=maintenance,proto3" json:"maintenance,omitempty"` + // incremental version counter + Version uint32 `protobuf:"varint,2,opt,name=version,proto3" json:"version,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } @@ -120,6 +122,13 @@ func (x *VolumeServerState) GetMaintenance() bool { return false } +func (x *VolumeServerState) GetVersion() uint32 { + if x != nil { + return x.Version + } + return 0 +} + type BatchDeleteRequest struct { state protoimpl.MessageState `protogen:"open.v1"` FileIds []string `protobuf:"bytes,1,rep,name=file_ids,json=fileIds,proto3" json:"file_ids,omitempty"` @@ -1858,7 +1867,7 @@ func (x *GetStateResponse) GetState() *VolumeServerState { type SetStateRequest struct { state protoimpl.MessageState `protogen:"open.v1"` - // SetState updates *all* volume server flags at once. Retrieve state with GetState(), + // SetState updates *all* volume server flags at once. Retrieve state/version with GetState(), // modify individual flags as required, then call this RPC to update. State *VolumeServerState `protobuf:"bytes,1,opt,name=state,proto3" json:"state,omitempty"` unknownFields protoimpl.UnknownFields @@ -6690,9 +6699,10 @@ var File_volume_server_proto protoreflect.FileDescriptor const file_volume_server_proto_rawDesc = "" + "\n" + - "\x13volume_server.proto\x12\x10volume_server_pb\x1a\fremote.proto\"5\n" + + "\x13volume_server.proto\x12\x10volume_server_pb\x1a\fremote.proto\"O\n" + "\x11VolumeServerState\x12 \n" + - "\vmaintenance\x18\x01 \x01(\bR\vmaintenance\"[\n" + + "\vmaintenance\x18\x01 \x01(\bR\vmaintenance\x12\x18\n" + + "\aversion\x18\x02 \x01(\rR\aversion\"[\n" + "\x12BatchDeleteRequest\x12\x19\n" + "\bfile_ids\x18\x01 \x03(\tR\afileIds\x12*\n" + "\x11skip_cookie_check\x18\x02 \x01(\bR\x0fskipCookieCheck\"O\n" + diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index 584b1a1d2..963ec8773 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -273,7 +273,7 @@ func (vs *VolumeServer) VolumeStatus(ctx context.Context, req *volume_server_pb. func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_server_pb.VolumeServerStatusRequest) (*volume_server_pb.VolumeServerStatusResponse, error) { resp := &volume_server_pb.VolumeServerStatusResponse{ - State: vs.store.State.Pb, + State: vs.store.State.Proto(), MemoryStatus: stats.MemStat(), Version: version.Version(), DataCenter: vs.dataCenter, diff --git a/weed/server/volume_grpc_state.go b/weed/server/volume_grpc_state.go index 0f5f0e92e..52bc823d4 100644 --- a/weed/server/volume_grpc_state.go +++ b/weed/server/volume_grpc_state.go @@ -9,7 +9,7 @@ import ( // GetState returns a volume server's state flags. func (vs *VolumeServer) GetState(ctx context.Context, req *volume_server_pb.GetStateRequest) (*volume_server_pb.GetStateResponse, error) { resp := &volume_server_pb.GetStateResponse{ - State: vs.store.State.Pb, + State: vs.store.State.Proto(), } return resp, nil @@ -17,9 +17,9 @@ func (vs *VolumeServer) GetState(ctx context.Context, req *volume_server_pb.GetS // SetState updates state flags for volume servers. func (vs *VolumeServer) SetState(ctx context.Context, req *volume_server_pb.SetStateRequest) (*volume_server_pb.SetStateResponse, error) { - err := vs.store.State.Update(req.State) + err := vs.store.State.Update(req.GetState()) resp := &volume_server_pb.SetStateResponse{ - State: vs.store.State.Pb, + State: vs.store.State.Proto(), } return resp, err diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 23676c3f8..6bc3a6898 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -178,7 +178,7 @@ func (vs *VolumeServer) MaintenanceMode() bool { if vs.store == nil { return false } - return vs.store.State.Pb.GetMaintenance() + return vs.store.State.Proto().GetMaintenance() } // Checks if a volume server is in maintenance mode, and returns an error explaining why. diff --git a/weed/server/volume_server_test.go b/weed/server/volume_server_test.go index 1115a4b4a..ac1ad774e 100644 --- a/weed/server/volume_server_test.go +++ b/weed/server/volume_server_test.go @@ -42,11 +42,8 @@ func TestMaintenanceMode(t *testing.T) { t.Run(tc.name, func(t *testing.T) { vs := VolumeServer{ store: &storage.Store{ - Id: "test_1234", - State: &storage.State{ - FilePath: "/some/path.pb", - Pb: tc.pb, - }, + Id: "test_1234", + State: storage.NewStateFromProto("/some/path.pb", tc.pb), }, } diff --git a/weed/storage/store.go b/weed/storage/store.go index 389980667..3e5bcd3cf 100644 --- a/weed/storage/store.go +++ b/weed/storage/store.go @@ -160,9 +160,9 @@ func NewStore( func (s *Store) LoadState() error { err := s.State.Load() - if s.State.Pb != nil && err == nil { + if s.State.Proto() != nil && err == nil { select { - case s.StateUpdateChan <- s.State.Pb: + case s.StateUpdateChan <- s.State.Proto(): default: glog.V(2).Infof("StateUpdateChan full during LoadState, state will be reported in heartbeat") } @@ -171,15 +171,15 @@ func (s *Store) LoadState() error { } func (s *Store) SaveState() error { - if s.State.Pb == nil { + if s.State.Proto() == nil { glog.Warningf("tried to save empty state for store %s", s.Id) return nil } err := s.State.Save() - if s.State.Pb != nil && err == nil { + if s.State.Proto() != nil && err == nil { select { - case s.StateUpdateChan <- s.State.Pb: + case s.StateUpdateChan <- s.State.Proto(): default: glog.V(2).Infof("StateUpdateChan full during SaveState, state will be reported in heartbeat") } diff --git a/weed/storage/store_state.go b/weed/storage/store_state.go index 7b0a2fc3a..2bac4fae6 100644 --- a/weed/storage/store_state.go +++ b/weed/storage/store_state.go @@ -4,6 +4,7 @@ import ( "fmt" "os" "path/filepath" + "sync" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" @@ -17,70 +18,110 @@ const ( ) type State struct { - FilePath string - Pb *volume_server_pb.VolumeServerState + filePath string + pb *volume_server_pb.VolumeServerState + + mu sync.Mutex } func NewState(dir string) (*State, error) { state := &State{ - FilePath: filepath.Join(dir, StateFileName), - Pb: nil, + filePath: filepath.Join(dir, StateFileName), + pb: nil, } err := state.Load() return state, err } +func NewStateFromProto(filePath string, state *volume_server_pb.VolumeServerState) *State { + pb := &volume_server_pb.VolumeServerState{} + proto.Merge(pb, state) + + return &State{ + filePath: filePath, + pb: pb, + } +} + +func (st *State) Proto() *volume_server_pb.VolumeServerState { + st.mu.Lock() + defer st.mu.Unlock() + + return st.pb +} + func (st *State) Load() error { - st.Pb = &volume_server_pb.VolumeServerState{} + st.mu.Lock() + defer st.mu.Unlock() + + st.pb = &volume_server_pb.VolumeServerState{} - if !util.FileExists(st.FilePath) { - glog.V(1).Infof("No preexisting store state at %s", st.FilePath) + if !util.FileExists(st.filePath) { + glog.V(1).Infof("No preexisting store state at %s", st.filePath) return nil } - binPb, err := os.ReadFile(st.FilePath) + binPb, err := os.ReadFile(st.filePath) if err != nil { - st.Pb = nil - return fmt.Errorf("failed to read store state from %s : %v", st.FilePath, err) + st.pb = nil + return fmt.Errorf("failed to read store state from %s : %v", st.filePath, err) } - if err := proto.Unmarshal(binPb, st.Pb); err != nil { - st.Pb = nil - return fmt.Errorf("failed to parse store state from %s : %v", st.FilePath, err) + if err := proto.Unmarshal(binPb, st.pb); err != nil { + st.pb = nil + return fmt.Errorf("failed to parse store state from %s : %v", st.filePath, err) } - glog.V(1).Infof("Got store state from %s: %v", st.FilePath, st.Pb) + glog.V(1).Infof("Got store state from %s: %v", st.filePath, st.pb) return nil } -func (st *State) Save() error { - if st.Pb == nil { - st.Pb = &volume_server_pb.VolumeServerState{} +func (st *State) save(locking bool) error { + if locking { + st.mu.Lock() + defer st.mu.Unlock() + } + + if st.pb == nil { + st.pb = &volume_server_pb.VolumeServerState{} } - binPb, err := proto.Marshal(st.Pb) + binPb, err := proto.Marshal(st.pb) if err != nil { - return fmt.Errorf("failed to serialize store state %v: %s", st.Pb, err) + return fmt.Errorf("failed to serialize store state %v: %s", st.pb, err) } - if err := util.WriteFile(st.FilePath, binPb, StateFileMode); err != nil { - return fmt.Errorf("failed to write store state to %s : %v", st.FilePath, err) + if err := util.WriteFile(st.filePath, binPb, StateFileMode); err != nil { + return fmt.Errorf("failed to write store state to %s : %v", st.filePath, err) } - glog.V(1).Infof("Saved store state %v to %s", st.Pb, st.FilePath) + glog.V(1).Infof("Saved store state %v to %s", st.pb, st.filePath) return nil } +func (st *State) Save() error { + return st.save(true) +} + func (st *State) Update(state *volume_server_pb.VolumeServerState) error { + st.mu.Lock() + defer st.mu.Unlock() + if state == nil { return nil } + if got, want := st.pb.GetVersion(), state.GetVersion(); got != want { + return fmt.Errorf("version mismatch for VolumeServerState (got %d, want %d)", got, want) + } + + origState := st.pb + st.pb = &volume_server_pb.VolumeServerState{} + proto.Merge(st.pb, state) + st.pb.Version = st.pb.GetVersion() + 1 - origState := st.Pb - st.Pb = state - err := st.Save() + err := st.save(false) if err != nil { // restore the original state upon save failures, to avoid skew between in-memory and disk state protos. - st.Pb = origState + st.pb = origState } return err