Browse Source

Add a version token on RPCs to read/update volume server states. (#8191)

* Add a version token on `GetState()`/`SetState()` RPCs for volume server states.

* Make state version a property ov `VolumeServerState` instead of an in-memory counter.

Also extend state atomicity to reads, instead of just writes.
pull/8233/head^2
Lisandro Pin 2 days ago
committed by GitHub
parent
commit
2cda4289f4
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 4
      weed/pb/volume_server.proto
  2. 20
      weed/pb/volume_server_pb/volume_server.pb.go
  3. 2
      weed/server/volume_grpc_admin.go
  4. 6
      weed/server/volume_grpc_state.go
  5. 2
      weed/server/volume_server.go
  6. 7
      weed/server/volume_server_test.go
  7. 10
      weed/storage/store.go
  8. 93
      weed/storage/store_state.go

4
weed/pb/volume_server.proto

@ -9,8 +9,10 @@ import "remote.proto";
// Persistent state for volume servers.
message VolumeServerState {
// Whether the server is in maintenance (i.e. read-only) mode.
// whether the server is in maintenance (i.e. read-only) mode.
bool maintenance = 1;
// incremental version counter
uint32 version = 2;
}
//////////////////////////////////////////////////

20
weed/pb/volume_server_pb/volume_server.pb.go

@ -77,8 +77,10 @@ func (VolumeScrubMode) EnumDescriptor() ([]byte, []int) {
// Persistent state for volume servers.
type VolumeServerState struct {
state protoimpl.MessageState `protogen:"open.v1"`
// Whether the server is in maintenance (i.e. read-only) mode.
Maintenance bool `protobuf:"varint,1,opt,name=maintenance,proto3" json:"maintenance,omitempty"`
// whether the server is in maintenance (i.e. read-only) mode.
Maintenance bool `protobuf:"varint,1,opt,name=maintenance,proto3" json:"maintenance,omitempty"`
// incremental version counter
Version uint32 `protobuf:"varint,2,opt,name=version,proto3" json:"version,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
@ -120,6 +122,13 @@ func (x *VolumeServerState) GetMaintenance() bool {
return false
}
func (x *VolumeServerState) GetVersion() uint32 {
if x != nil {
return x.Version
}
return 0
}
type BatchDeleteRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
FileIds []string `protobuf:"bytes,1,rep,name=file_ids,json=fileIds,proto3" json:"file_ids,omitempty"`
@ -1858,7 +1867,7 @@ func (x *GetStateResponse) GetState() *VolumeServerState {
type SetStateRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
// SetState updates *all* volume server flags at once. Retrieve state with GetState(),
// SetState updates *all* volume server flags at once. Retrieve state/version with GetState(),
// modify individual flags as required, then call this RPC to update.
State *VolumeServerState `protobuf:"bytes,1,opt,name=state,proto3" json:"state,omitempty"`
unknownFields protoimpl.UnknownFields
@ -6690,9 +6699,10 @@ var File_volume_server_proto protoreflect.FileDescriptor
const file_volume_server_proto_rawDesc = "" +
"\n" +
"\x13volume_server.proto\x12\x10volume_server_pb\x1a\fremote.proto\"5\n" +
"\x13volume_server.proto\x12\x10volume_server_pb\x1a\fremote.proto\"O\n" +
"\x11VolumeServerState\x12 \n" +
"\vmaintenance\x18\x01 \x01(\bR\vmaintenance\"[\n" +
"\vmaintenance\x18\x01 \x01(\bR\vmaintenance\x12\x18\n" +
"\aversion\x18\x02 \x01(\rR\aversion\"[\n" +
"\x12BatchDeleteRequest\x12\x19\n" +
"\bfile_ids\x18\x01 \x03(\tR\afileIds\x12*\n" +
"\x11skip_cookie_check\x18\x02 \x01(\bR\x0fskipCookieCheck\"O\n" +

2
weed/server/volume_grpc_admin.go

@ -273,7 +273,7 @@ func (vs *VolumeServer) VolumeStatus(ctx context.Context, req *volume_server_pb.
func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_server_pb.VolumeServerStatusRequest) (*volume_server_pb.VolumeServerStatusResponse, error) {
resp := &volume_server_pb.VolumeServerStatusResponse{
State: vs.store.State.Pb,
State: vs.store.State.Proto(),
MemoryStatus: stats.MemStat(),
Version: version.Version(),
DataCenter: vs.dataCenter,

6
weed/server/volume_grpc_state.go

@ -9,7 +9,7 @@ import (
// GetState returns a volume server's state flags.
func (vs *VolumeServer) GetState(ctx context.Context, req *volume_server_pb.GetStateRequest) (*volume_server_pb.GetStateResponse, error) {
resp := &volume_server_pb.GetStateResponse{
State: vs.store.State.Pb,
State: vs.store.State.Proto(),
}
return resp, nil
@ -17,9 +17,9 @@ func (vs *VolumeServer) GetState(ctx context.Context, req *volume_server_pb.GetS
// SetState updates state flags for volume servers.
func (vs *VolumeServer) SetState(ctx context.Context, req *volume_server_pb.SetStateRequest) (*volume_server_pb.SetStateResponse, error) {
err := vs.store.State.Update(req.State)
err := vs.store.State.Update(req.GetState())
resp := &volume_server_pb.SetStateResponse{
State: vs.store.State.Pb,
State: vs.store.State.Proto(),
}
return resp, err

2
weed/server/volume_server.go

@ -178,7 +178,7 @@ func (vs *VolumeServer) MaintenanceMode() bool {
if vs.store == nil {
return false
}
return vs.store.State.Pb.GetMaintenance()
return vs.store.State.Proto().GetMaintenance()
}
// Checks if a volume server is in maintenance mode, and returns an error explaining why.

7
weed/server/volume_server_test.go

@ -42,11 +42,8 @@ func TestMaintenanceMode(t *testing.T) {
t.Run(tc.name, func(t *testing.T) {
vs := VolumeServer{
store: &storage.Store{
Id: "test_1234",
State: &storage.State{
FilePath: "/some/path.pb",
Pb: tc.pb,
},
Id: "test_1234",
State: storage.NewStateFromProto("/some/path.pb", tc.pb),
},
}

10
weed/storage/store.go

@ -160,9 +160,9 @@ func NewStore(
func (s *Store) LoadState() error {
err := s.State.Load()
if s.State.Pb != nil && err == nil {
if s.State.Proto() != nil && err == nil {
select {
case s.StateUpdateChan <- s.State.Pb:
case s.StateUpdateChan <- s.State.Proto():
default:
glog.V(2).Infof("StateUpdateChan full during LoadState, state will be reported in heartbeat")
}
@ -171,15 +171,15 @@ func (s *Store) LoadState() error {
}
func (s *Store) SaveState() error {
if s.State.Pb == nil {
if s.State.Proto() == nil {
glog.Warningf("tried to save empty state for store %s", s.Id)
return nil
}
err := s.State.Save()
if s.State.Pb != nil && err == nil {
if s.State.Proto() != nil && err == nil {
select {
case s.StateUpdateChan <- s.State.Pb:
case s.StateUpdateChan <- s.State.Proto():
default:
glog.V(2).Infof("StateUpdateChan full during SaveState, state will be reported in heartbeat")
}

93
weed/storage/store_state.go

@ -4,6 +4,7 @@ import (
"fmt"
"os"
"path/filepath"
"sync"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
@ -17,70 +18,110 @@ const (
)
type State struct {
FilePath string
Pb *volume_server_pb.VolumeServerState
filePath string
pb *volume_server_pb.VolumeServerState
mu sync.Mutex
}
func NewState(dir string) (*State, error) {
state := &State{
FilePath: filepath.Join(dir, StateFileName),
Pb: nil,
filePath: filepath.Join(dir, StateFileName),
pb: nil,
}
err := state.Load()
return state, err
}
func NewStateFromProto(filePath string, state *volume_server_pb.VolumeServerState) *State {
pb := &volume_server_pb.VolumeServerState{}
proto.Merge(pb, state)
return &State{
filePath: filePath,
pb: pb,
}
}
func (st *State) Proto() *volume_server_pb.VolumeServerState {
st.mu.Lock()
defer st.mu.Unlock()
return st.pb
}
func (st *State) Load() error {
st.Pb = &volume_server_pb.VolumeServerState{}
st.mu.Lock()
defer st.mu.Unlock()
st.pb = &volume_server_pb.VolumeServerState{}
if !util.FileExists(st.FilePath) {
glog.V(1).Infof("No preexisting store state at %s", st.FilePath)
if !util.FileExists(st.filePath) {
glog.V(1).Infof("No preexisting store state at %s", st.filePath)
return nil
}
binPb, err := os.ReadFile(st.FilePath)
binPb, err := os.ReadFile(st.filePath)
if err != nil {
st.Pb = nil
return fmt.Errorf("failed to read store state from %s : %v", st.FilePath, err)
st.pb = nil
return fmt.Errorf("failed to read store state from %s : %v", st.filePath, err)
}
if err := proto.Unmarshal(binPb, st.Pb); err != nil {
st.Pb = nil
return fmt.Errorf("failed to parse store state from %s : %v", st.FilePath, err)
if err := proto.Unmarshal(binPb, st.pb); err != nil {
st.pb = nil
return fmt.Errorf("failed to parse store state from %s : %v", st.filePath, err)
}
glog.V(1).Infof("Got store state from %s: %v", st.FilePath, st.Pb)
glog.V(1).Infof("Got store state from %s: %v", st.filePath, st.pb)
return nil
}
func (st *State) Save() error {
if st.Pb == nil {
st.Pb = &volume_server_pb.VolumeServerState{}
func (st *State) save(locking bool) error {
if locking {
st.mu.Lock()
defer st.mu.Unlock()
}
if st.pb == nil {
st.pb = &volume_server_pb.VolumeServerState{}
}
binPb, err := proto.Marshal(st.Pb)
binPb, err := proto.Marshal(st.pb)
if err != nil {
return fmt.Errorf("failed to serialize store state %v: %s", st.Pb, err)
return fmt.Errorf("failed to serialize store state %v: %s", st.pb, err)
}
if err := util.WriteFile(st.FilePath, binPb, StateFileMode); err != nil {
return fmt.Errorf("failed to write store state to %s : %v", st.FilePath, err)
if err := util.WriteFile(st.filePath, binPb, StateFileMode); err != nil {
return fmt.Errorf("failed to write store state to %s : %v", st.filePath, err)
}
glog.V(1).Infof("Saved store state %v to %s", st.Pb, st.FilePath)
glog.V(1).Infof("Saved store state %v to %s", st.pb, st.filePath)
return nil
}
func (st *State) Save() error {
return st.save(true)
}
func (st *State) Update(state *volume_server_pb.VolumeServerState) error {
st.mu.Lock()
defer st.mu.Unlock()
if state == nil {
return nil
}
if got, want := st.pb.GetVersion(), state.GetVersion(); got != want {
return fmt.Errorf("version mismatch for VolumeServerState (got %d, want %d)", got, want)
}
origState := st.pb
st.pb = &volume_server_pb.VolumeServerState{}
proto.Merge(st.pb, state)
st.pb.Version = st.pb.GetVersion() + 1
origState := st.Pb
st.Pb = state
err := st.Save()
err := st.save(false)
if err != nil {
// restore the original state upon save failures, to avoid skew between in-memory and disk state protos.
st.Pb = origState
st.pb = origState
}
return err

Loading…
Cancel
Save