diff --git a/weed/pb/volume_server_pb/volume_server.pb.go b/weed/pb/volume_server_pb/volume_server.pb.go index 9cfc43765..661eb8595 100644 --- a/weed/pb/volume_server_pb/volume_server.pb.go +++ b/weed/pb/volume_server_pb/volume_server.pb.go @@ -4994,13 +4994,14 @@ func (*VolumeServerStatusRequest) Descriptor() ([]byte, []int) { } type VolumeServerStatusResponse struct { - state protoimpl.MessageState `protogen:"open.v1"` - DiskStatuses []*DiskStatus `protobuf:"bytes,1,rep,name=disk_statuses,json=diskStatuses,proto3" json:"disk_statuses,omitempty"` - MemoryStatus *MemStatus `protobuf:"bytes,2,opt,name=memory_status,json=memoryStatus,proto3" json:"memory_status,omitempty"` - Version string `protobuf:"bytes,3,opt,name=version,proto3" json:"version,omitempty"` - DataCenter string `protobuf:"bytes,4,opt,name=data_center,json=dataCenter,proto3" json:"data_center,omitempty"` - Rack string `protobuf:"bytes,5,opt,name=rack,proto3" json:"rack,omitempty"` - State *VolumeServerState `protobuf:"bytes,6,opt,name=state,proto3" json:"state,omitempty"` + state protoimpl.MessageState `protogen:"open.v1"` + // TODO(issues/7977): add volume server state to response + DiskStatuses []*DiskStatus `protobuf:"bytes,1,rep,name=disk_statuses,json=diskStatuses,proto3" json:"disk_statuses,omitempty"` + MemoryStatus *MemStatus `protobuf:"bytes,2,opt,name=memory_status,json=memoryStatus,proto3" json:"memory_status,omitempty"` + Version string `protobuf:"bytes,3,opt,name=version,proto3" json:"version,omitempty"` + DataCenter string `protobuf:"bytes,4,opt,name=data_center,json=dataCenter,proto3" json:"data_center,omitempty"` + Rack string `protobuf:"bytes,5,opt,name=rack,proto3" json:"rack,omitempty"` + State *VolumeServerState `protobuf:"bytes,6,opt,name=state,proto3" json:"state,omitempty"` unknownFields protoimpl.UnknownFields sizeCache protoimpl.SizeCache } diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index ce1bf731d..584b1a1d2 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -39,9 +39,12 @@ func (vs *VolumeServer) DeleteCollection(ctx context.Context, req *volume_server } func (vs *VolumeServer) AllocateVolume(ctx context.Context, req *volume_server_pb.AllocateVolumeRequest) (*volume_server_pb.AllocateVolumeResponse, error) { - resp := &volume_server_pb.AllocateVolumeResponse{} + if err := vs.CheckMaintenanceMode(); err != nil { + return resp, err + } + err := vs.store.AddVolume( needle.VolumeId(req.VolumeId), req.Collection, @@ -98,9 +101,12 @@ func (vs *VolumeServer) VolumeUnmount(ctx context.Context, req *volume_server_pb } func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb.VolumeDeleteRequest) (*volume_server_pb.VolumeDeleteResponse, error) { - resp := &volume_server_pb.VolumeDeleteResponse{} + if err := vs.CheckMaintenanceMode(); err != nil { + return resp, err + } + err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId), req.OnlyEmpty) if err != nil { @@ -114,9 +120,12 @@ func (vs *VolumeServer) VolumeDelete(ctx context.Context, req *volume_server_pb. } func (vs *VolumeServer) VolumeConfigure(ctx context.Context, req *volume_server_pb.VolumeConfigureRequest) (*volume_server_pb.VolumeConfigureResponse, error) { - resp := &volume_server_pb.VolumeConfigureResponse{} + if err := vs.CheckMaintenanceMode(); err != nil { + return resp, err + } + // check replication format if _, err := super_block.NewReplicaPlacementFromString(req.Replication); err != nil { resp.Error = fmt.Sprintf("volume configure replication %v: %v", req, err) @@ -154,9 +163,12 @@ func (vs *VolumeServer) VolumeConfigure(ctx context.Context, req *volume_server_ } func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_server_pb.VolumeMarkReadonlyRequest) (*volume_server_pb.VolumeMarkReadonlyResponse, error) { - resp := &volume_server_pb.VolumeMarkReadonlyResponse{} + if err := vs.CheckMaintenanceMode(); err != nil { + return resp, err + } + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v == nil { return nil, fmt.Errorf("volume %d not found", req.VolumeId) @@ -210,9 +222,12 @@ func (vs *VolumeServer) notifyMasterVolumeReadonly(v *storage.Volume, isReadOnly } func (vs *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_server_pb.VolumeMarkWritableRequest) (*volume_server_pb.VolumeMarkWritableResponse, error) { - resp := &volume_server_pb.VolumeMarkWritableResponse{} + if err := vs.CheckMaintenanceMode(); err != nil { + return resp, err + } + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v == nil { return nil, fmt.Errorf("volume %d not found", req.VolumeId) diff --git a/weed/server/volume_grpc_batch_delete.go b/weed/server/volume_grpc_batch_delete.go index db67ae9f5..c6e4e02d3 100644 --- a/weed/server/volume_grpc_batch_delete.go +++ b/weed/server/volume_grpc_batch_delete.go @@ -11,9 +11,12 @@ import ( ) func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.BatchDeleteRequest) (*volume_server_pb.BatchDeleteResponse, error) { - resp := &volume_server_pb.BatchDeleteResponse{} + if err := vs.CheckMaintenanceMode(); err != nil { + return resp, err + } + now := uint64(time.Now().Unix()) for _, fid := range req.FileIds { diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 92f1edcb0..5537cf720 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -26,6 +26,9 @@ const BufferSizeLimit = 1024 * 1024 * 2 // VolumeCopy copy the .idx .dat .vif files, and mount the volume func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stream volume_server_pb.VolumeServer_VolumeCopyServer) error { + if err := vs.CheckMaintenanceMode(); err != nil { + return err + } v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v != nil { @@ -446,6 +449,10 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v // ReceiveFile receives a file stream from client and writes it to storage func (vs *VolumeServer) ReceiveFile(stream volume_server_pb.VolumeServer_ReceiveFileServer) error { + if err := vs.CheckMaintenanceMode(); err != nil { + return err + } + var fileInfo *volume_server_pb.ReceiveFileInfo var targetFile *os.File var filePath string diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 97abba98f..bd4bba887 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -40,6 +40,9 @@ Steps to apply erasure coding to .dat .idx files // VolumeEcShardsGenerate generates the .ecx and .ec00 ~ .ec13 files func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) { + if err := vs.CheckMaintenanceMode(); err != nil { + return nil, err + } glog.V(0).Infof("VolumeEcShardsGenerate: %v", req) @@ -131,9 +134,11 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_ // VolumeEcShardsRebuild generates the any of the missing .ec00 ~ .ec13 files func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) { + if err := vs.CheckMaintenanceMode(); err != nil { + return nil, err + } glog.V(0).Infof("VolumeEcShardsRebuild: %v", req) - baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) var rebuiltShardIds []uint32 @@ -173,6 +178,9 @@ func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_s // VolumeEcShardsCopy copy the .ecx and some ec data slices func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_server_pb.VolumeEcShardsCopyRequest) (*volume_server_pb.VolumeEcShardsCopyResponse, error) { + if err := vs.CheckMaintenanceMode(); err != nil { + return nil, err + } glog.V(0).Infof("VolumeEcShardsCopy: %v", req) @@ -249,6 +257,9 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv // VolumeEcShardsDelete local delete the .ecx and some ec data slices if not needed // the shard should not be mounted before calling this. func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_server_pb.VolumeEcShardsDeleteRequest) (*volume_server_pb.VolumeEcShardsDeleteResponse, error) { + if err := vs.CheckMaintenanceMode(); err != nil { + return nil, err + } bName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) @@ -445,6 +456,9 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea } func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_server_pb.VolumeEcBlobDeleteRequest) (*volume_server_pb.VolumeEcBlobDeleteResponse, error) { + if err := vs.CheckMaintenanceMode(); err != nil { + return nil, err + } glog.V(0).Infof("VolumeEcBlobDelete: %v", req) @@ -475,6 +489,9 @@ func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_serv // VolumeEcShardsToVolume generates the .idx, .dat files from .ecx, .ecj and .ec01 ~ .ec14 files func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_server_pb.VolumeEcShardsToVolumeRequest) (*volume_server_pb.VolumeEcShardsToVolumeResponse, error) { + if err := vs.CheckMaintenanceMode(); err != nil { + return nil, err + } glog.V(0).Infof("VolumeEcShardsToVolume: %v", req) diff --git a/weed/server/volume_grpc_read_write.go b/weed/server/volume_grpc_read_write.go index 1c0ddf5dd..1398075b1 100644 --- a/weed/server/volume_grpc_read_write.go +++ b/weed/server/volume_grpc_read_write.go @@ -55,7 +55,12 @@ func (vs *VolumeServer) ReadNeedleMeta(ctx context.Context, req *volume_server_p } func (vs *VolumeServer) WriteNeedleBlob(ctx context.Context, req *volume_server_pb.WriteNeedleBlobRequest) (resp *volume_server_pb.WriteNeedleBlobResponse, err error) { + if err := vs.CheckMaintenanceMode(); err != nil { + return nil, err + } + resp = &volume_server_pb.WriteNeedleBlobResponse{} + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v == nil { return nil, fmt.Errorf("not found volume id %d", req.VolumeId) diff --git a/weed/server/volume_grpc_remote.go b/weed/server/volume_grpc_remote.go index 0dffb49cf..de562662e 100644 --- a/weed/server/volume_grpc_remote.go +++ b/weed/server/volume_grpc_remote.go @@ -15,6 +15,10 @@ import ( ) func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_server_pb.FetchAndWriteNeedleRequest) (resp *volume_server_pb.FetchAndWriteNeedleResponse, err error) { + if err := vs.CheckMaintenanceMode(); err != nil { + return nil, err + } + resp = &volume_server_pb.FetchAndWriteNeedleResponse{} v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v == nil { diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go index cf3998e26..08aae06af 100644 --- a/weed/server/volume_grpc_tail.go +++ b/weed/server/volume_grpc_tail.go @@ -16,7 +16,6 @@ import ( ) func (vs *VolumeServer) VolumeTailSender(req *volume_server_pb.VolumeTailSenderRequest, stream volume_server_pb.VolumeServer_VolumeTailSenderServer) error { - v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v == nil { return fmt.Errorf("not found volume id %d", req.VolumeId) diff --git a/weed/server/volume_grpc_tier_upload.go b/weed/server/volume_grpc_tier_upload.go index 914f352ee..4f8ac028b 100644 --- a/weed/server/volume_grpc_tier_upload.go +++ b/weed/server/volume_grpc_tier_upload.go @@ -12,6 +12,9 @@ import ( // VolumeTierMoveDatToRemote copy dat file to a remote tier func (vs *VolumeServer) VolumeTierMoveDatToRemote(req *volume_server_pb.VolumeTierMoveDatToRemoteRequest, stream volume_server_pb.VolumeServer_VolumeTierMoveDatToRemoteServer) error { + if err := vs.CheckMaintenanceMode(); err != nil { + return err + } // find existing volume v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) diff --git a/weed/server/volume_grpc_vacuum.go b/weed/server/volume_grpc_vacuum.go index 990611052..0b32c1d5d 100644 --- a/weed/server/volume_grpc_vacuum.go +++ b/weed/server/volume_grpc_vacuum.go @@ -34,6 +34,10 @@ func (vs *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_serve } func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCompactRequest, stream volume_server_pb.VolumeServer_VacuumVolumeCompactServer) error { + if err := vs.CheckMaintenanceMode(); err != nil { + return err + } + start := time.Now() defer func(start time.Time) { stats.VolumeServerVacuumingHistogram.WithLabelValues("compact").Observe(time.Since(start).Seconds()) @@ -76,6 +80,10 @@ func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCo } func (vs *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_server_pb.VacuumVolumeCommitRequest) (*volume_server_pb.VacuumVolumeCommitResponse, error) { + if err := vs.CheckMaintenanceMode(); err != nil { + return nil, err + } + start := time.Now() defer func(start time.Time) { stats.VolumeServerVacuumingHistogram.WithLabelValues("commit").Observe(time.Since(start).Seconds()) @@ -98,9 +106,11 @@ func (vs *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_serv } func (vs *VolumeServer) VacuumVolumeCleanup(ctx context.Context, req *volume_server_pb.VacuumVolumeCleanupRequest) (*volume_server_pb.VacuumVolumeCleanupResponse, error) { + if err := vs.CheckMaintenanceMode(); err != nil { + return nil, err + } resp := &volume_server_pb.VacuumVolumeCleanupResponse{} - err := vs.store.CommitCleanupVolume(needle.VolumeId(req.VolumeId)) if err != nil { diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 0647c4196..23676c3f8 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -1,6 +1,7 @@ package weed_server import ( + "fmt" "net/http" "sync" "time" @@ -171,3 +172,19 @@ func (vs *VolumeServer) Reload() { v := util.GetViper() vs.guard.UpdateWhiteList(append(vs.whiteList, util.StringSplit(v.GetString("guard.white_list"), ",")...)) } + +// Returns whether a volume server is in maintenance (i.e. read-only) mode. +func (vs *VolumeServer) MaintenanceMode() bool { + if vs.store == nil { + return false + } + return vs.store.State.Pb.GetMaintenance() +} + +// Checks if a volume server is in maintenance mode, and returns an error explaining why. +func (vs *VolumeServer) CheckMaintenanceMode() error { + if !vs.MaintenanceMode() { + return nil + } + return fmt.Errorf("volume server %s is in maintenance mode", vs.store.Id) +} diff --git a/weed/server/volume_server_test.go b/weed/server/volume_server_test.go new file mode 100644 index 000000000..1115a4b4a --- /dev/null +++ b/weed/server/volume_server_test.go @@ -0,0 +1,72 @@ +package weed_server + +import ( + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/storage" +) + +func TestMaintenanceMode(t *testing.T) { + testCases := []struct { + name string + pb *volume_server_pb.VolumeServerState + want bool + wantCheckErr string + }{ + { + name: "non-initialized state", + pb: nil, + want: false, + wantCheckErr: "", + }, + { + name: "maintenance mode disabled", + pb: &volume_server_pb.VolumeServerState{ + Maintenance: false, + }, + want: false, + wantCheckErr: "", + }, + { + name: "maintenance mode enabled", + pb: &volume_server_pb.VolumeServerState{ + Maintenance: true, + }, + want: true, + wantCheckErr: "volume server test_1234 is in maintenance mode", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + vs := VolumeServer{ + store: &storage.Store{ + Id: "test_1234", + State: &storage.State{ + FilePath: "/some/path.pb", + Pb: tc.pb, + }, + }, + } + + if got, want := vs.MaintenanceMode(), tc.want; got != want { + t.Errorf("MaintenanceMode() returned %v, want %v", got, want) + } + + err, wantErrStr := vs.CheckMaintenanceMode(), tc.wantCheckErr + if err != nil { + if wantErrStr == "" { + t.Errorf("CheckMaintenanceMode() returned error %v, want nil", err) + } + if errStr := err.Error(); errStr != wantErrStr { + t.Errorf("CheckMaintenanceMode() returned error %q, want %q", errStr, wantErrStr) + } + } else { + if wantErrStr != "" { + t.Errorf("CheckMaintenanceMode() returned no error, want %q", wantErrStr) + } + } + }) + } +}