Browse Source

Add volume server RPCs to read and update state flags. (#8186)

* Boostrap persistent state for volume servers.

This PR implements logic load/save persistent state information for storages
associated with volume servers, and reporting state changes back to masters
via heartbeat messages.

More work ensues!

See https://github.com/seaweedfs/seaweedfs/issues/7977 for details.

* Add volume server RPCs to read and update state flags.
pull/8187/head
Lisandro Pin 1 day ago
committed by GitHub
parent
commit
345ac950b6
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 21
      weed/pb/volume_server.proto
  2. 1020
      weed/pb/volume_server_pb/volume_server.pb.go
  3. 170
      weed/pb/volume_server_pb/volume_server_grpc.pb.go
  4. 26
      weed/server/volume_grpc_state.go
  5. 16
      weed/storage/store_state.go

21
weed/pb/volume_server.proto

@ -53,7 +53,11 @@ service VolumeServer {
}
rpc VolumeStatus (VolumeStatusRequest) returns (VolumeStatusResponse) {
}
// TODO(issues/7977): add RPCs to control state flags
rpc GetState (GetStateRequest) returns (GetStateResponse) {
}
rpc SetState (SetStateRequest) returns (SetStateResponse) {
}
// copy the .idx .dat files, and mount this volume
rpc VolumeCopy (VolumeCopyRequest) returns (stream VolumeCopyResponse) {
@ -270,6 +274,21 @@ message VolumeStatusResponse {
uint64 file_deleted_count = 4;
}
message GetStateRequest {
}
message GetStateResponse {
VolumeServerState state = 1;
}
message SetStateRequest {
// SetState updates *all* volume server flags at once. Retrieve state with GetState(),
// modify individual flags as required, then call this RPC to update.
VolumeServerState state = 1;
}
message SetStateResponse {
VolumeServerState state = 1;
}
message VolumeCopyRequest {
uint32 volume_id = 1;
string collection = 2;

1020
weed/pb/volume_server_pb/volume_server.pb.go
File diff suppressed because it is too large
View File

170
weed/pb/volume_server_pb/volume_server_grpc.pb.go

@ -1,7 +1,7 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.5.1
// - protoc v6.33.4
// - protoc-gen-go-grpc v1.6.0
// - protoc v3.21.12
// source: volume_server.proto
package volume_server_pb
@ -35,6 +35,8 @@ const (
VolumeServer_VolumeMarkWritable_FullMethodName = "/volume_server_pb.VolumeServer/VolumeMarkWritable"
VolumeServer_VolumeConfigure_FullMethodName = "/volume_server_pb.VolumeServer/VolumeConfigure"
VolumeServer_VolumeStatus_FullMethodName = "/volume_server_pb.VolumeServer/VolumeStatus"
VolumeServer_GetState_FullMethodName = "/volume_server_pb.VolumeServer/GetState"
VolumeServer_SetState_FullMethodName = "/volume_server_pb.VolumeServer/SetState"
VolumeServer_VolumeCopy_FullMethodName = "/volume_server_pb.VolumeServer/VolumeCopy"
VolumeServer_ReadVolumeFileStatus_FullMethodName = "/volume_server_pb.VolumeServer/ReadVolumeFileStatus"
VolumeServer_CopyFile_FullMethodName = "/volume_server_pb.VolumeServer/CopyFile"
@ -86,6 +88,8 @@ type VolumeServerClient interface {
VolumeMarkWritable(ctx context.Context, in *VolumeMarkWritableRequest, opts ...grpc.CallOption) (*VolumeMarkWritableResponse, error)
VolumeConfigure(ctx context.Context, in *VolumeConfigureRequest, opts ...grpc.CallOption) (*VolumeConfigureResponse, error)
VolumeStatus(ctx context.Context, in *VolumeStatusRequest, opts ...grpc.CallOption) (*VolumeStatusResponse, error)
GetState(ctx context.Context, in *GetStateRequest, opts ...grpc.CallOption) (*GetStateResponse, error)
SetState(ctx context.Context, in *SetStateRequest, opts ...grpc.CallOption) (*SetStateResponse, error)
// copy the .idx .dat files, and mount this volume
VolumeCopy(ctx context.Context, in *VolumeCopyRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[VolumeCopyResponse], error)
ReadVolumeFileStatus(ctx context.Context, in *ReadVolumeFileStatusRequest, opts ...grpc.CallOption) (*ReadVolumeFileStatusResponse, error)
@ -307,6 +311,26 @@ func (c *volumeServerClient) VolumeStatus(ctx context.Context, in *VolumeStatusR
return out, nil
}
func (c *volumeServerClient) GetState(ctx context.Context, in *GetStateRequest, opts ...grpc.CallOption) (*GetStateResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(GetStateResponse)
err := c.cc.Invoke(ctx, VolumeServer_GetState_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *volumeServerClient) SetState(ctx context.Context, in *SetStateRequest, opts ...grpc.CallOption) (*SetStateResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(SetStateResponse)
err := c.cc.Invoke(ctx, VolumeServer_SetState_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *volumeServerClient) VolumeCopy(ctx context.Context, in *VolumeCopyRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[VolumeCopyResponse], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &VolumeServer_ServiceDesc.Streams[2], VolumeServer_VolumeCopy_FullMethodName, cOpts...)
@ -683,6 +707,8 @@ type VolumeServerServer interface {
VolumeMarkWritable(context.Context, *VolumeMarkWritableRequest) (*VolumeMarkWritableResponse, error)
VolumeConfigure(context.Context, *VolumeConfigureRequest) (*VolumeConfigureResponse, error)
VolumeStatus(context.Context, *VolumeStatusRequest) (*VolumeStatusResponse, error)
GetState(context.Context, *GetStateRequest) (*GetStateResponse, error)
SetState(context.Context, *SetStateRequest) (*SetStateResponse, error)
// copy the .idx .dat files, and mount this volume
VolumeCopy(*VolumeCopyRequest, grpc.ServerStreamingServer[VolumeCopyResponse]) error
ReadVolumeFileStatus(context.Context, *ReadVolumeFileStatusRequest) (*ReadVolumeFileStatusResponse, error)
@ -727,136 +753,142 @@ type VolumeServerServer interface {
type UnimplementedVolumeServerServer struct{}
func (UnimplementedVolumeServerServer) BatchDelete(context.Context, *BatchDeleteRequest) (*BatchDeleteResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method BatchDelete not implemented")
return nil, status.Error(codes.Unimplemented, "method BatchDelete not implemented")
}
func (UnimplementedVolumeServerServer) VacuumVolumeCheck(context.Context, *VacuumVolumeCheckRequest) (*VacuumVolumeCheckResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VacuumVolumeCheck not implemented")
return nil, status.Error(codes.Unimplemented, "method VacuumVolumeCheck not implemented")
}
func (UnimplementedVolumeServerServer) VacuumVolumeCompact(*VacuumVolumeCompactRequest, grpc.ServerStreamingServer[VacuumVolumeCompactResponse]) error {
return status.Errorf(codes.Unimplemented, "method VacuumVolumeCompact not implemented")
return status.Error(codes.Unimplemented, "method VacuumVolumeCompact not implemented")
}
func (UnimplementedVolumeServerServer) VacuumVolumeCommit(context.Context, *VacuumVolumeCommitRequest) (*VacuumVolumeCommitResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VacuumVolumeCommit not implemented")
return nil, status.Error(codes.Unimplemented, "method VacuumVolumeCommit not implemented")
}
func (UnimplementedVolumeServerServer) VacuumVolumeCleanup(context.Context, *VacuumVolumeCleanupRequest) (*VacuumVolumeCleanupResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VacuumVolumeCleanup not implemented")
return nil, status.Error(codes.Unimplemented, "method VacuumVolumeCleanup not implemented")
}
func (UnimplementedVolumeServerServer) DeleteCollection(context.Context, *DeleteCollectionRequest) (*DeleteCollectionResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method DeleteCollection not implemented")
return nil, status.Error(codes.Unimplemented, "method DeleteCollection not implemented")
}
func (UnimplementedVolumeServerServer) AllocateVolume(context.Context, *AllocateVolumeRequest) (*AllocateVolumeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method AllocateVolume not implemented")
return nil, status.Error(codes.Unimplemented, "method AllocateVolume not implemented")
}
func (UnimplementedVolumeServerServer) VolumeSyncStatus(context.Context, *VolumeSyncStatusRequest) (*VolumeSyncStatusResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VolumeSyncStatus not implemented")
return nil, status.Error(codes.Unimplemented, "method VolumeSyncStatus not implemented")
}
func (UnimplementedVolumeServerServer) VolumeIncrementalCopy(*VolumeIncrementalCopyRequest, grpc.ServerStreamingServer[VolumeIncrementalCopyResponse]) error {
return status.Errorf(codes.Unimplemented, "method VolumeIncrementalCopy not implemented")
return status.Error(codes.Unimplemented, "method VolumeIncrementalCopy not implemented")
}
func (UnimplementedVolumeServerServer) VolumeMount(context.Context, *VolumeMountRequest) (*VolumeMountResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VolumeMount not implemented")
return nil, status.Error(codes.Unimplemented, "method VolumeMount not implemented")
}
func (UnimplementedVolumeServerServer) VolumeUnmount(context.Context, *VolumeUnmountRequest) (*VolumeUnmountResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VolumeUnmount not implemented")
return nil, status.Error(codes.Unimplemented, "method VolumeUnmount not implemented")
}
func (UnimplementedVolumeServerServer) VolumeDelete(context.Context, *VolumeDeleteRequest) (*VolumeDeleteResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VolumeDelete not implemented")
return nil, status.Error(codes.Unimplemented, "method VolumeDelete not implemented")
}
func (UnimplementedVolumeServerServer) VolumeMarkReadonly(context.Context, *VolumeMarkReadonlyRequest) (*VolumeMarkReadonlyResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VolumeMarkReadonly not implemented")
return nil, status.Error(codes.Unimplemented, "method VolumeMarkReadonly not implemented")
}
func (UnimplementedVolumeServerServer) VolumeMarkWritable(context.Context, *VolumeMarkWritableRequest) (*VolumeMarkWritableResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VolumeMarkWritable not implemented")
return nil, status.Error(codes.Unimplemented, "method VolumeMarkWritable not implemented")
}
func (UnimplementedVolumeServerServer) VolumeConfigure(context.Context, *VolumeConfigureRequest) (*VolumeConfigureResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VolumeConfigure not implemented")
return nil, status.Error(codes.Unimplemented, "method VolumeConfigure not implemented")
}
func (UnimplementedVolumeServerServer) VolumeStatus(context.Context, *VolumeStatusRequest) (*VolumeStatusResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VolumeStatus not implemented")
return nil, status.Error(codes.Unimplemented, "method VolumeStatus not implemented")
}
func (UnimplementedVolumeServerServer) GetState(context.Context, *GetStateRequest) (*GetStateResponse, error) {
return nil, status.Error(codes.Unimplemented, "method GetState not implemented")
}
func (UnimplementedVolumeServerServer) SetState(context.Context, *SetStateRequest) (*SetStateResponse, error) {
return nil, status.Error(codes.Unimplemented, "method SetState not implemented")
}
func (UnimplementedVolumeServerServer) VolumeCopy(*VolumeCopyRequest, grpc.ServerStreamingServer[VolumeCopyResponse]) error {
return status.Errorf(codes.Unimplemented, "method VolumeCopy not implemented")
return status.Error(codes.Unimplemented, "method VolumeCopy not implemented")
}
func (UnimplementedVolumeServerServer) ReadVolumeFileStatus(context.Context, *ReadVolumeFileStatusRequest) (*ReadVolumeFileStatusResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ReadVolumeFileStatus not implemented")
return nil, status.Error(codes.Unimplemented, "method ReadVolumeFileStatus not implemented")
}
func (UnimplementedVolumeServerServer) CopyFile(*CopyFileRequest, grpc.ServerStreamingServer[CopyFileResponse]) error {
return status.Errorf(codes.Unimplemented, "method CopyFile not implemented")
return status.Error(codes.Unimplemented, "method CopyFile not implemented")
}
func (UnimplementedVolumeServerServer) ReceiveFile(grpc.ClientStreamingServer[ReceiveFileRequest, ReceiveFileResponse]) error {
return status.Errorf(codes.Unimplemented, "method ReceiveFile not implemented")
return status.Error(codes.Unimplemented, "method ReceiveFile not implemented")
}
func (UnimplementedVolumeServerServer) ReadNeedleBlob(context.Context, *ReadNeedleBlobRequest) (*ReadNeedleBlobResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ReadNeedleBlob not implemented")
return nil, status.Error(codes.Unimplemented, "method ReadNeedleBlob not implemented")
}
func (UnimplementedVolumeServerServer) ReadNeedleMeta(context.Context, *ReadNeedleMetaRequest) (*ReadNeedleMetaResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method ReadNeedleMeta not implemented")
return nil, status.Error(codes.Unimplemented, "method ReadNeedleMeta not implemented")
}
func (UnimplementedVolumeServerServer) WriteNeedleBlob(context.Context, *WriteNeedleBlobRequest) (*WriteNeedleBlobResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method WriteNeedleBlob not implemented")
return nil, status.Error(codes.Unimplemented, "method WriteNeedleBlob not implemented")
}
func (UnimplementedVolumeServerServer) ReadAllNeedles(*ReadAllNeedlesRequest, grpc.ServerStreamingServer[ReadAllNeedlesResponse]) error {
return status.Errorf(codes.Unimplemented, "method ReadAllNeedles not implemented")
return status.Error(codes.Unimplemented, "method ReadAllNeedles not implemented")
}
func (UnimplementedVolumeServerServer) VolumeTailSender(*VolumeTailSenderRequest, grpc.ServerStreamingServer[VolumeTailSenderResponse]) error {
return status.Errorf(codes.Unimplemented, "method VolumeTailSender not implemented")
return status.Error(codes.Unimplemented, "method VolumeTailSender not implemented")
}
func (UnimplementedVolumeServerServer) VolumeTailReceiver(context.Context, *VolumeTailReceiverRequest) (*VolumeTailReceiverResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VolumeTailReceiver not implemented")
return nil, status.Error(codes.Unimplemented, "method VolumeTailReceiver not implemented")
}
func (UnimplementedVolumeServerServer) VolumeEcShardsGenerate(context.Context, *VolumeEcShardsGenerateRequest) (*VolumeEcShardsGenerateResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsGenerate not implemented")
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsGenerate not implemented")
}
func (UnimplementedVolumeServerServer) VolumeEcShardsRebuild(context.Context, *VolumeEcShardsRebuildRequest) (*VolumeEcShardsRebuildResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsRebuild not implemented")
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsRebuild not implemented")
}
func (UnimplementedVolumeServerServer) VolumeEcShardsCopy(context.Context, *VolumeEcShardsCopyRequest) (*VolumeEcShardsCopyResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsCopy not implemented")
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsCopy not implemented")
}
func (UnimplementedVolumeServerServer) VolumeEcShardsDelete(context.Context, *VolumeEcShardsDeleteRequest) (*VolumeEcShardsDeleteResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsDelete not implemented")
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsDelete not implemented")
}
func (UnimplementedVolumeServerServer) VolumeEcShardsMount(context.Context, *VolumeEcShardsMountRequest) (*VolumeEcShardsMountResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsMount not implemented")
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsMount not implemented")
}
func (UnimplementedVolumeServerServer) VolumeEcShardsUnmount(context.Context, *VolumeEcShardsUnmountRequest) (*VolumeEcShardsUnmountResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsUnmount not implemented")
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsUnmount not implemented")
}
func (UnimplementedVolumeServerServer) VolumeEcShardRead(*VolumeEcShardReadRequest, grpc.ServerStreamingServer[VolumeEcShardReadResponse]) error {
return status.Errorf(codes.Unimplemented, "method VolumeEcShardRead not implemented")
return status.Error(codes.Unimplemented, "method VolumeEcShardRead not implemented")
}
func (UnimplementedVolumeServerServer) VolumeEcBlobDelete(context.Context, *VolumeEcBlobDeleteRequest) (*VolumeEcBlobDeleteResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcBlobDelete not implemented")
return nil, status.Error(codes.Unimplemented, "method VolumeEcBlobDelete not implemented")
}
func (UnimplementedVolumeServerServer) VolumeEcShardsToVolume(context.Context, *VolumeEcShardsToVolumeRequest) (*VolumeEcShardsToVolumeResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsToVolume not implemented")
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsToVolume not implemented")
}
func (UnimplementedVolumeServerServer) VolumeEcShardsInfo(context.Context, *VolumeEcShardsInfoRequest) (*VolumeEcShardsInfoResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VolumeEcShardsInfo not implemented")
return nil, status.Error(codes.Unimplemented, "method VolumeEcShardsInfo not implemented")
}
func (UnimplementedVolumeServerServer) VolumeTierMoveDatToRemote(*VolumeTierMoveDatToRemoteRequest, grpc.ServerStreamingServer[VolumeTierMoveDatToRemoteResponse]) error {
return status.Errorf(codes.Unimplemented, "method VolumeTierMoveDatToRemote not implemented")
return status.Error(codes.Unimplemented, "method VolumeTierMoveDatToRemote not implemented")
}
func (UnimplementedVolumeServerServer) VolumeTierMoveDatFromRemote(*VolumeTierMoveDatFromRemoteRequest, grpc.ServerStreamingServer[VolumeTierMoveDatFromRemoteResponse]) error {
return status.Errorf(codes.Unimplemented, "method VolumeTierMoveDatFromRemote not implemented")
return status.Error(codes.Unimplemented, "method VolumeTierMoveDatFromRemote not implemented")
}
func (UnimplementedVolumeServerServer) VolumeServerStatus(context.Context, *VolumeServerStatusRequest) (*VolumeServerStatusResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VolumeServerStatus not implemented")
return nil, status.Error(codes.Unimplemented, "method VolumeServerStatus not implemented")
}
func (UnimplementedVolumeServerServer) VolumeServerLeave(context.Context, *VolumeServerLeaveRequest) (*VolumeServerLeaveResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VolumeServerLeave not implemented")
return nil, status.Error(codes.Unimplemented, "method VolumeServerLeave not implemented")
}
func (UnimplementedVolumeServerServer) FetchAndWriteNeedle(context.Context, *FetchAndWriteNeedleRequest) (*FetchAndWriteNeedleResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method FetchAndWriteNeedle not implemented")
return nil, status.Error(codes.Unimplemented, "method FetchAndWriteNeedle not implemented")
}
func (UnimplementedVolumeServerServer) Query(*QueryRequest, grpc.ServerStreamingServer[QueriedStripe]) error {
return status.Errorf(codes.Unimplemented, "method Query not implemented")
return status.Error(codes.Unimplemented, "method Query not implemented")
}
func (UnimplementedVolumeServerServer) VolumeNeedleStatus(context.Context, *VolumeNeedleStatusRequest) (*VolumeNeedleStatusResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method VolumeNeedleStatus not implemented")
return nil, status.Error(codes.Unimplemented, "method VolumeNeedleStatus not implemented")
}
func (UnimplementedVolumeServerServer) Ping(context.Context, *PingRequest) (*PingResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Ping not implemented")
return nil, status.Error(codes.Unimplemented, "method Ping not implemented")
}
func (UnimplementedVolumeServerServer) mustEmbedUnimplementedVolumeServerServer() {}
func (UnimplementedVolumeServerServer) testEmbeddedByValue() {}
@ -869,7 +901,7 @@ type UnsafeVolumeServerServer interface {
}
func RegisterVolumeServerServer(s grpc.ServiceRegistrar, srv VolumeServerServer) {
// If the following call pancis, it indicates UnimplementedVolumeServerServer was
// If the following call panics, it indicates UnimplementedVolumeServerServer was
// embedded by pointer and is nil. This will cause panics if an
// unimplemented method is ever invoked, so we test this at initialization
// time to prevent it from happening at runtime later due to I/O.
@ -1153,6 +1185,42 @@ func _VolumeServer_VolumeStatus_Handler(srv interface{}, ctx context.Context, de
return interceptor(ctx, in, info, handler)
}
func _VolumeServer_GetState_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(GetStateRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(VolumeServerServer).GetState(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: VolumeServer_GetState_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(VolumeServerServer).GetState(ctx, req.(*GetStateRequest))
}
return interceptor(ctx, in, info, handler)
}
func _VolumeServer_SetState_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(SetStateRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(VolumeServerServer).SetState(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: VolumeServer_SetState_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(VolumeServerServer).SetState(ctx, req.(*SetStateRequest))
}
return interceptor(ctx, in, info, handler)
}
func _VolumeServer_VolumeCopy_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(VolumeCopyRequest)
if err := stream.RecvMsg(m); err != nil {
@ -1653,6 +1721,14 @@ var VolumeServer_ServiceDesc = grpc.ServiceDesc{
MethodName: "VolumeStatus",
Handler: _VolumeServer_VolumeStatus_Handler,
},
{
MethodName: "GetState",
Handler: _VolumeServer_GetState_Handler,
},
{
MethodName: "SetState",
Handler: _VolumeServer_SetState_Handler,
},
{
MethodName: "ReadVolumeFileStatus",
Handler: _VolumeServer_ReadVolumeFileStatus_Handler,

26
weed/server/volume_grpc_state.go

@ -0,0 +1,26 @@
package weed_server
import (
"context"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
)
// GetState returns a volume server's state flags.
func (vs *VolumeServer) GetState(ctx context.Context, req *volume_server_pb.GetStateRequest) (*volume_server_pb.GetStateResponse, error) {
resp := &volume_server_pb.GetStateResponse{
State: vs.store.State.Pb,
}
return resp, nil
}
// SetState updates state flags for volume servers.
func (vs *VolumeServer) SetState(ctx context.Context, req *volume_server_pb.SetStateRequest) (*volume_server_pb.SetStateResponse, error) {
err := vs.store.State.Update(req.State)
resp := &volume_server_pb.SetStateResponse{
State: vs.store.State.Pb,
}
return resp, err
}

16
weed/storage/store_state.go

@ -69,3 +69,19 @@ func (st *State) Save() error {
glog.V(1).Infof("Saved store state %v to %s", st.Pb, st.FilePath)
return nil
}
func (st *State) Update(state *volume_server_pb.VolumeServerState) error {
if state == nil {
return nil
}
origState := st.Pb
st.Pb = state
err := st.Save()
if err != nil {
// restore the original state upon save failures, to avoid skew between in-memory and disk state protos.
st.Pb = origState
}
return err
}
Loading…
Cancel
Save