Browse Source

Implement RPC skeleton for regular/EC volumes scrubbing. (#8187)

* Implement RPC skeleton for regular/EC volumes scrubbing.

See https://github.com/seaweedfs/seaweedfs/issues/8018 for details.

* Minor proto improvements for `ScrubVolume()`, `ScrubEcVolume()`:

  - Add fields for scrubbing details in `ScrubVolumeResponse` and `ScrubEcVolumeResponse`,
    instead of reporting these through RPC errors.
  - Return a list of broken shards when scrubbing EC volumes, via `EcShardInfo'.
pull/8189/head
Lisandro Pin 2 days ago
committed by GitHub
parent
commit
ff5a8f0579
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 38
      weed/pb/volume_server.proto
  2. 938
      weed/pb/volume_server_pb/volume_server.pb.go
  3. 78
      weed/pb/volume_server_pb/volume_server_grpc.pb.go
  4. 1
      weed/server/volume_grpc_erasure_coding.go
  5. 133
      weed/server/volume_grpc_scrub.go
  6. 33
      weed/storage/disk_location.go
  7. 23
      weed/storage/disk_location_test.go

38
weed/pb/volume_server.proto

@ -120,6 +120,12 @@ service VolumeServer {
rpc FetchAndWriteNeedle (FetchAndWriteNeedleRequest) returns (FetchAndWriteNeedleResponse) {
}
// scrubbing
rpc ScrubVolume (ScrubVolumeRequest) returns (ScrubVolumeResponse) {
}
rpc ScrubEcVolume (ScrubEcVolumeRequest) returns (ScrubEcVolumeResponse) {
}
// <experimental> query
rpc Query (QueryRequest) returns (stream QueriedStripe) {
}
@ -496,6 +502,7 @@ message EcShardInfo {
uint32 shard_id = 1;
int64 size = 2;
string collection = 3;
uint32 volume_id = 4;
}
message ReadVolumeFileStatusRequest {
@ -632,6 +639,37 @@ message FetchAndWriteNeedleResponse {
string e_tag = 1;
}
enum VolumeScrubMode {
UNKNOWN = 0;
INDEX = 1;
FULL = 2;
}
message ScrubVolumeRequest {
VolumeScrubMode mode = 1;
// optional list of volume IDs to scrub. if empty, all volumes for the server are scrubbed.
repeated uint32 volume_ids = 2;
}
message ScrubVolumeResponse {
uint64 total_volumes = 1;
uint64 total_files = 2;
repeated uint32 broken_volume_ids = 3;
repeated string details = 4;
}
message ScrubEcVolumeRequest {
VolumeScrubMode mode = 1;
// optional list of volume IDs to scrub. if empty, all EC volumes for the server are scrubbed.
repeated uint32 volume_ids = 2;
}
message ScrubEcVolumeResponse {
uint64 total_volumes = 1;
uint64 total_files = 2;
repeated uint32 broken_volume_ids = 3;
repeated EcShardInfo broken_shard_infos = 4;
repeated string details = 5;
}
// select on volume servers
message QueryRequest {
repeated string selections = 1;

938
weed/pb/volume_server_pb/volume_server.pb.go
File diff suppressed because it is too large
View File

78
weed/pb/volume_server_pb/volume_server_grpc.pb.go

@ -62,6 +62,8 @@ const (
VolumeServer_VolumeServerStatus_FullMethodName = "/volume_server_pb.VolumeServer/VolumeServerStatus"
VolumeServer_VolumeServerLeave_FullMethodName = "/volume_server_pb.VolumeServer/VolumeServerLeave"
VolumeServer_FetchAndWriteNeedle_FullMethodName = "/volume_server_pb.VolumeServer/FetchAndWriteNeedle"
VolumeServer_ScrubVolume_FullMethodName = "/volume_server_pb.VolumeServer/ScrubVolume"
VolumeServer_ScrubEcVolume_FullMethodName = "/volume_server_pb.VolumeServer/ScrubEcVolume"
VolumeServer_Query_FullMethodName = "/volume_server_pb.VolumeServer/Query"
VolumeServer_VolumeNeedleStatus_FullMethodName = "/volume_server_pb.VolumeServer/VolumeNeedleStatus"
VolumeServer_Ping_FullMethodName = "/volume_server_pb.VolumeServer/Ping"
@ -119,6 +121,9 @@ type VolumeServerClient interface {
VolumeServerLeave(ctx context.Context, in *VolumeServerLeaveRequest, opts ...grpc.CallOption) (*VolumeServerLeaveResponse, error)
// remote storage
FetchAndWriteNeedle(ctx context.Context, in *FetchAndWriteNeedleRequest, opts ...grpc.CallOption) (*FetchAndWriteNeedleResponse, error)
// scrubbing
ScrubVolume(ctx context.Context, in *ScrubVolumeRequest, opts ...grpc.CallOption) (*ScrubVolumeResponse, error)
ScrubEcVolume(ctx context.Context, in *ScrubEcVolumeRequest, opts ...grpc.CallOption) (*ScrubEcVolumeResponse, error)
// <experimental> query
Query(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[QueriedStripe], error)
VolumeNeedleStatus(ctx context.Context, in *VolumeNeedleStatusRequest, opts ...grpc.CallOption) (*VolumeNeedleStatusResponse, error)
@ -647,6 +652,26 @@ func (c *volumeServerClient) FetchAndWriteNeedle(ctx context.Context, in *FetchA
return out, nil
}
func (c *volumeServerClient) ScrubVolume(ctx context.Context, in *ScrubVolumeRequest, opts ...grpc.CallOption) (*ScrubVolumeResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(ScrubVolumeResponse)
err := c.cc.Invoke(ctx, VolumeServer_ScrubVolume_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *volumeServerClient) ScrubEcVolume(ctx context.Context, in *ScrubEcVolumeRequest, opts ...grpc.CallOption) (*ScrubEcVolumeResponse, error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
out := new(ScrubEcVolumeResponse)
err := c.cc.Invoke(ctx, VolumeServer_ScrubEcVolume_FullMethodName, in, out, cOpts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *volumeServerClient) Query(ctx context.Context, in *QueryRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[QueriedStripe], error) {
cOpts := append([]grpc.CallOption{grpc.StaticMethod()}, opts...)
stream, err := c.cc.NewStream(ctx, &VolumeServer_ServiceDesc.Streams[10], VolumeServer_Query_FullMethodName, cOpts...)
@ -738,6 +763,9 @@ type VolumeServerServer interface {
VolumeServerLeave(context.Context, *VolumeServerLeaveRequest) (*VolumeServerLeaveResponse, error)
// remote storage
FetchAndWriteNeedle(context.Context, *FetchAndWriteNeedleRequest) (*FetchAndWriteNeedleResponse, error)
// scrubbing
ScrubVolume(context.Context, *ScrubVolumeRequest) (*ScrubVolumeResponse, error)
ScrubEcVolume(context.Context, *ScrubEcVolumeRequest) (*ScrubEcVolumeResponse, error)
// <experimental> query
Query(*QueryRequest, grpc.ServerStreamingServer[QueriedStripe]) error
VolumeNeedleStatus(context.Context, *VolumeNeedleStatusRequest) (*VolumeNeedleStatusResponse, error)
@ -881,6 +909,12 @@ func (UnimplementedVolumeServerServer) VolumeServerLeave(context.Context, *Volum
func (UnimplementedVolumeServerServer) FetchAndWriteNeedle(context.Context, *FetchAndWriteNeedleRequest) (*FetchAndWriteNeedleResponse, error) {
return nil, status.Error(codes.Unimplemented, "method FetchAndWriteNeedle not implemented")
}
func (UnimplementedVolumeServerServer) ScrubVolume(context.Context, *ScrubVolumeRequest) (*ScrubVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "method ScrubVolume not implemented")
}
func (UnimplementedVolumeServerServer) ScrubEcVolume(context.Context, *ScrubEcVolumeRequest) (*ScrubEcVolumeResponse, error) {
return nil, status.Error(codes.Unimplemented, "method ScrubEcVolume not implemented")
}
func (UnimplementedVolumeServerServer) Query(*QueryRequest, grpc.ServerStreamingServer[QueriedStripe]) error {
return status.Error(codes.Unimplemented, "method Query not implemented")
}
@ -1611,6 +1645,42 @@ func _VolumeServer_FetchAndWriteNeedle_Handler(srv interface{}, ctx context.Cont
return interceptor(ctx, in, info, handler)
}
func _VolumeServer_ScrubVolume_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ScrubVolumeRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(VolumeServerServer).ScrubVolume(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: VolumeServer_ScrubVolume_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(VolumeServerServer).ScrubVolume(ctx, req.(*ScrubVolumeRequest))
}
return interceptor(ctx, in, info, handler)
}
func _VolumeServer_ScrubEcVolume_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ScrubEcVolumeRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(VolumeServerServer).ScrubEcVolume(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: VolumeServer_ScrubEcVolume_FullMethodName,
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(VolumeServerServer).ScrubEcVolume(ctx, req.(*ScrubEcVolumeRequest))
}
return interceptor(ctx, in, info, handler)
}
func _VolumeServer_Query_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(QueryRequest)
if err := stream.RecvMsg(m); err != nil {
@ -1797,6 +1867,14 @@ var VolumeServer_ServiceDesc = grpc.ServiceDesc{
MethodName: "FetchAndWriteNeedle",
Handler: _VolumeServer_FetchAndWriteNeedle_Handler,
},
{
MethodName: "ScrubVolume",
Handler: _VolumeServer_ScrubVolume_Handler,
},
{
MethodName: "ScrubEcVolume",
Handler: _VolumeServer_ScrubEcVolume_Handler,
},
{
MethodName: "VolumeNeedleStatus",
Handler: _VolumeServer_VolumeNeedleStatus_Handler,

1
weed/server/volume_grpc_erasure_coding.go

@ -570,6 +570,7 @@ func (vs *VolumeServer) VolumeEcShardsInfo(ctx context.Context, req *volume_serv
ShardId: uint32(si.Id),
Size: int64(si.Size),
Collection: v.Collection,
VolumeId: uint32(v.VolumeId),
}
ecShardInfos = append(ecShardInfos, ecShardInfo)
}

133
weed/server/volume_grpc_scrub.go

@ -0,0 +1,133 @@
package weed_server
import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/storage"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
)
func (vs *VolumeServer) ScrubVolume(ctx context.Context, req *volume_server_pb.ScrubVolumeRequest) (*volume_server_pb.ScrubVolumeResponse, error) {
vids := []needle.VolumeId{}
if len(req.GetVolumeIds()) == 0 {
for _, l := range vs.store.Locations {
vids = append(vids, l.VolumeIds()...)
}
} else {
for _, vid := range req.GetVolumeIds() {
vids = append(vids, needle.VolumeId(vid))
}
}
var details []string
var totalVolumes, totalFiles uint64
var brokenVolumeIds []uint32
for _, vid := range vids {
v := vs.store.GetVolume(vid)
if v == nil {
return nil, fmt.Errorf("volume id %d not found", vid)
}
var files uint64
var serrs []error
switch m := req.GetMode(); m {
case volume_server_pb.VolumeScrubMode_INDEX:
files, serrs = scrubVolumeIndex(ctx, v)
case volume_server_pb.VolumeScrubMode_FULL:
files, serrs = scrubVolumeFull(ctx, v)
default:
return nil, fmt.Errorf("unsupported volume scrub mode %d", m)
}
totalVolumes += 1
totalFiles += files
if len(serrs) != 0 {
brokenVolumeIds = append(brokenVolumeIds, uint32(vid))
for _, err := range serrs {
details = append(details, err.Error())
}
}
}
res := &volume_server_pb.ScrubVolumeResponse{
TotalVolumes: totalVolumes,
TotalFiles: totalFiles,
BrokenVolumeIds: brokenVolumeIds,
Details: details,
}
return res, nil
}
func scrubVolumeIndex(ctx context.Context, v *storage.Volume) (uint64, []error) {
return 0, []error{fmt.Errorf("scrubVolumeIndex(): not implemented")}
}
func scrubVolumeFull(ctx context.Context, v *storage.Volume) (uint64, []error) {
return 0, []error{fmt.Errorf("scrubVolumeFull(): not implemented")}
}
func (vs *VolumeServer) ScrubEcVolume(ctx context.Context, req *volume_server_pb.ScrubEcVolumeRequest) (*volume_server_pb.ScrubEcVolumeResponse, error) {
vids := []needle.VolumeId{}
if len(req.GetVolumeIds()) == 0 {
for _, l := range vs.store.Locations {
vids = append(vids, l.EcVolumeIds()...)
}
} else {
for _, vid := range req.GetVolumeIds() {
vids = append(vids, needle.VolumeId(vid))
}
}
var details []string
var totalVolumes, totalFiles uint64
var brokenVolumeIds []uint32
var brokenShardInfos []*volume_server_pb.EcShardInfo
for _, vid := range vids {
v, found := vs.store.FindEcVolume(vid)
if !found {
return nil, fmt.Errorf("EC volume id %d not found", vid)
}
var files uint64
var shardInfos []*volume_server_pb.EcShardInfo
var serrs []error
switch m := req.GetMode(); m {
case volume_server_pb.VolumeScrubMode_INDEX:
files, shardInfos, serrs = scrubEcVolumeIndex(v)
case volume_server_pb.VolumeScrubMode_FULL:
files, shardInfos, serrs = scrubEcVolumeFull(ctx, v)
default:
return nil, fmt.Errorf("unsupported EC volume scrub mode %d", m)
}
totalVolumes += 1
totalFiles += files
if len(serrs) != 0 || len(shardInfos) != 0 {
brokenVolumeIds = append(brokenVolumeIds, uint32(vid))
brokenShardInfos = append(brokenShardInfos, shardInfos...)
for _, err := range serrs {
details = append(details, err.Error())
}
}
}
res := &volume_server_pb.ScrubEcVolumeResponse{
TotalVolumes: totalVolumes,
TotalFiles: totalFiles,
BrokenVolumeIds: brokenVolumeIds,
BrokenShardInfos: brokenShardInfos,
Details: details,
}
return res, nil
}
func scrubEcVolumeIndex(ecv *erasure_coding.EcVolume) (uint64, []*volume_server_pb.EcShardInfo, []error) {
return 0, nil, []error{fmt.Errorf("scrubEcVolumeIndex(): not implemented")}
}
func scrubEcVolumeFull(ctx context.Context, v *erasure_coding.EcVolume) (uint64, []*volume_server_pb.EcShardInfo, []error) {
return 0, nil, []error{fmt.Errorf("scrubEcVolumeFull(): not implemented")}
}

33
weed/storage/disk_location.go

@ -5,6 +5,7 @@ import (
"os"
"path/filepath"
"runtime"
"slices"
"strconv"
"strings"
"sync"
@ -395,6 +396,38 @@ func (l *DiskLocation) FindVolume(vid needle.VolumeId) (*Volume, bool) {
return v, ok
}
// Returns all regular volume IDs stored at this location.
func (l *DiskLocation) VolumeIds() []needle.VolumeId {
l.volumesLock.RLock()
defer l.volumesLock.RUnlock()
vids := make([]needle.VolumeId, len(l.volumes))
i := 0
for vid := range l.volumes {
vids[i] = vid
i++
}
slices.Sort(vids)
return vids
}
// Returns all EC volume IDs stored at this location.
func (l *DiskLocation) EcVolumeIds() []needle.VolumeId {
l.ecVolumesLock.RLock()
defer l.ecVolumesLock.RUnlock()
vids := make([]needle.VolumeId, len(l.ecVolumes))
i := 0
for vid := range l.ecVolumes {
vids[i] = vid
i++
}
slices.Sort(vids)
return vids
}
func (l *DiskLocation) VolumesLen() int {
l.volumesLock.RLock()
defer l.volumesLock.RUnlock()

23
weed/storage/disk_location_test.go

@ -1,10 +1,12 @@
package storage
import (
"reflect"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/storage/backend"
"github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
@ -75,5 +77,26 @@ func TestUnUsedSpace(t *testing.T) {
if unUsedSpace != 0 {
t.Errorf("unUsedSpace incorrect: %d != %d", unUsedSpace, 0)
}
}
func TestResolveVolumeIDs(t *testing.T) {
l := DiskLocation{
volumes: map[needle.VolumeId]*Volume{
0: &Volume{},
1: &Volume{},
2: &Volume{},
},
ecVolumes: map[needle.VolumeId]*erasure_coding.EcVolume{
3: &erasure_coding.EcVolume{},
4: &erasure_coding.EcVolume{},
5: &erasure_coding.EcVolume{},
},
}
if got, want := l.VolumeIds(), []needle.VolumeId{0, 1, 2}; !reflect.DeepEqual(got, want) {
t.Errorf("wanted volume IDs %v, got %v", want, got)
}
if got, want := l.EcVolumeIds(), []needle.VolumeId{3, 4, 5}; !reflect.DeepEqual(got, want) {
t.Errorf("wanted EC volume IDs %v, got %v", want, got)
}
}
Loading…
Cancel
Save