Browse Source

Implement a `VolumeEcStatus()` RPC for volume servers. (#8006)

Just like `VolumeStatus()`, this call allows inspecting details for
a given EC volume - including number of files and their total size.
pull/8265/head
Lisandro Pin 2 days ago
committed by GitHub
parent
commit
1a5679a5eb
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 3
      weed/pb/volume_server.proto
  2. 43
      weed/pb/volume_server_pb/volume_server.pb.go
  3. 2
      weed/pb/volume_server_pb/volume_server_grpc.pb.go
  4. 53
      weed/server/volume_grpc_erasure_coding.go
  5. 10
      weed/storage/erasure_coding/ec_shard.go
  6. 7
      weed/storage/erasure_coding/ec_volume.go
  7. 11
      weed/storage/types/needle_types.go

3
weed/pb/volume_server.proto

@ -498,6 +498,9 @@ message VolumeEcShardsInfoRequest {
}
message VolumeEcShardsInfoResponse {
repeated EcShardInfo ec_shard_infos = 1;
uint64 volume_size = 2;
uint64 file_count = 3;
uint64 file_deleted_count = 4;
}
message EcShardInfo {

43
weed/pb/volume_server_pb/volume_server.pb.go

@ -1867,7 +1867,7 @@ func (x *GetStateResponse) GetState() *VolumeServerState {
type SetStateRequest struct {
state protoimpl.MessageState `protogen:"open.v1"`
// SetState updates *all* volume server flags at once. Retrieve state/version with GetState(),
// SetState updates *all* volume server flags at once. Retrieve state with GetState(),
// modify individual flags as required, then call this RPC to update.
State *VolumeServerState `protobuf:"bytes,1,opt,name=state,proto3" json:"state,omitempty"`
unknownFields protoimpl.UnknownFields
@ -4162,10 +4162,13 @@ func (x *VolumeEcShardsInfoRequest) GetVolumeId() uint32 {
}
type VolumeEcShardsInfoResponse struct {
state protoimpl.MessageState `protogen:"open.v1"`
EcShardInfos []*EcShardInfo `protobuf:"bytes,1,rep,name=ec_shard_infos,json=ecShardInfos,proto3" json:"ec_shard_infos,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
state protoimpl.MessageState `protogen:"open.v1"`
EcShardInfos []*EcShardInfo `protobuf:"bytes,1,rep,name=ec_shard_infos,json=ecShardInfos,proto3" json:"ec_shard_infos,omitempty"`
VolumeSize uint64 `protobuf:"varint,2,opt,name=volume_size,json=volumeSize,proto3" json:"volume_size,omitempty"`
FileCount uint64 `protobuf:"varint,3,opt,name=file_count,json=fileCount,proto3" json:"file_count,omitempty"`
FileDeletedCount uint64 `protobuf:"varint,4,opt,name=file_deleted_count,json=fileDeletedCount,proto3" json:"file_deleted_count,omitempty"`
unknownFields protoimpl.UnknownFields
sizeCache protoimpl.SizeCache
}
func (x *VolumeEcShardsInfoResponse) Reset() {
@ -4205,6 +4208,27 @@ func (x *VolumeEcShardsInfoResponse) GetEcShardInfos() []*EcShardInfo {
return nil
}
func (x *VolumeEcShardsInfoResponse) GetVolumeSize() uint64 {
if x != nil {
return x.VolumeSize
}
return 0
}
func (x *VolumeEcShardsInfoResponse) GetFileCount() uint64 {
if x != nil {
return x.FileCount
}
return 0
}
func (x *VolumeEcShardsInfoResponse) GetFileDeletedCount() uint64 {
if x != nil {
return x.FileDeletedCount
}
return 0
}
type EcShardInfo struct {
state protoimpl.MessageState `protogen:"open.v1"`
ShardId uint32 `protobuf:"varint,1,opt,name=shard_id,json=shardId,proto3" json:"shard_id,omitempty"`
@ -6981,9 +7005,14 @@ const file_volume_server_proto_rawDesc = "" +
"collection\" \n" +
"\x1eVolumeEcShardsToVolumeResponse\"8\n" +
"\x19VolumeEcShardsInfoRequest\x12\x1b\n" +
"\tvolume_id\x18\x01 \x01(\rR\bvolumeId\"a\n" +
"\tvolume_id\x18\x01 \x01(\rR\bvolumeId\"\xcf\x01\n" +
"\x1aVolumeEcShardsInfoResponse\x12C\n" +
"\x0eec_shard_infos\x18\x01 \x03(\v2\x1d.volume_server_pb.EcShardInfoR\fecShardInfos\"y\n" +
"\x0eec_shard_infos\x18\x01 \x03(\v2\x1d.volume_server_pb.EcShardInfoR\fecShardInfos\x12\x1f\n" +
"\vvolume_size\x18\x02 \x01(\x04R\n" +
"volumeSize\x12\x1d\n" +
"\n" +
"file_count\x18\x03 \x01(\x04R\tfileCount\x12,\n" +
"\x12file_deleted_count\x18\x04 \x01(\x04R\x10fileDeletedCount\"y\n" +
"\vEcShardInfo\x12\x19\n" +
"\bshard_id\x18\x01 \x01(\rR\ashardId\x12\x12\n" +
"\x04size\x18\x02 \x01(\x03R\x04size\x12\x1e\n" +

2
weed/pb/volume_server_pb/volume_server_grpc.pb.go

@ -1,6 +1,6 @@
// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.6.0
// - protoc-gen-go-grpc v1.6.1
// - protoc v3.21.12
// source: volume_server.proto

53
weed/server/volume_grpc_erasure_coding.go

@ -559,26 +559,43 @@ func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_
func (vs *VolumeServer) VolumeEcShardsInfo(ctx context.Context, req *volume_server_pb.VolumeEcShardsInfoRequest) (*volume_server_pb.VolumeEcShardsInfoResponse, error) {
glog.V(0).Infof("VolumeEcShardsInfo: volume %d", req.VolumeId)
var ecShardInfos []*volume_server_pb.EcShardInfo
glog.V(0).Infof("VolumeEcStatus: %v", req)
// Find the EC volume
for _, location := range vs.store.Locations {
if v, found := location.FindEcVolume(needle.VolumeId(req.VolumeId)); found {
// Get shard details from the EC volume
for _, si := range erasure_coding.ShardsInfoFromVolume(v).AsSlice() {
ecShardInfo := &volume_server_pb.EcShardInfo{
ShardId: uint32(si.Id),
Size: int64(si.Size),
Collection: v.Collection,
VolumeId: uint32(v.VolumeId),
}
ecShardInfos = append(ecShardInfos, ecShardInfo)
}
break
vid := needle.VolumeId(req.GetVolumeId())
ecv, found := vs.store.FindEcVolume(vid)
if !found {
return nil, fmt.Errorf("VolumeEcStatus: EC volume %d not found", vid)
}
shardInfos := make([]*volume_server_pb.EcShardInfo, len(ecv.Shards))
for i, s := range ecv.Shards {
shardInfos[i] = s.ToEcShardInfo()
}
var files, filesDeleted, totalSize uint64
err := ecv.WalkIndex(func(_ types.NeedleId, _ types.Offset, size types.Size) error {
// deleted files are counted when computing EC volume sizes. this aligns with VolumeStatus(),
// which reports the raw data backend file size, regardless of deleted files.
totalSize += uint64(size.Raw())
if size.IsDeleted() {
filesDeleted++
} else {
files++
}
return nil
})
if err != nil {
return nil, err
}
return &volume_server_pb.VolumeEcShardsInfoResponse{
EcShardInfos: ecShardInfos,
}, nil
res := &volume_server_pb.VolumeEcShardsInfoResponse{
EcShardInfos: shardInfos,
FileCount: files,
FileDeletedCount: filesDeleted,
VolumeSize: totalSize,
}
return res, nil
}

10
weed/storage/erasure_coding/ec_shard.go

@ -8,6 +8,7 @@ import (
"strconv"
"strings"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
@ -128,11 +129,18 @@ func (shard *EcVolumeShard) Destroy() {
}
func (shard *EcVolumeShard) ReadAt(buf []byte, offset int64) (int, error) {
n, err := shard.ecdFile.ReadAt(buf, offset)
if err == io.EOF && n == len(buf) {
err = nil
}
return n, err
}
func (shard *EcVolumeShard) ToEcShardInfo() *volume_server_pb.EcShardInfo {
return &volume_server_pb.EcShardInfo{
ShardId: uint32(shard.ShardId),
Size: int64(shard.Size()),
Collection: shard.Collection,
VolumeId: uint32(shard.VolumeId),
}
}

7
weed/storage/erasure_coding/ec_volume.go

@ -333,6 +333,13 @@ func (ev *EcVolume) IsTimeToDestroy() bool {
return ev.ExpireAtSec > 0 && time.Now().Unix() > (int64(ev.ExpireAtSec)+destroyDelaySeconds)
}
func (ev *EcVolume) WalkIndex(processNeedleFn func(key types.NeedleId, offset types.Offset, size types.Size) error) error {
if ev.ecxFile == nil {
return fmt.Errorf("no ECX file associated with EC volume %v", ev.VolumeId)
}
return idx.WalkIndexFile(ev.ecxFile, 0, processNeedleFn)
}
func (ev *EcVolume) CheckIndex() (int64, []error) {
if ev.ecxFile == nil {
return 0, []error{fmt.Errorf("no ECX file associated with EC volume %v", ev.VolumeId)}

11
weed/storage/types/needle_types.go

@ -30,6 +30,17 @@ func (s Size) IsValid() bool {
return s > 0 && s != TombstoneFileSize
}
// Raw returns the raw storage size for a needle, regardless of its deleted status.
func (s Size) Raw() uint32 {
if s == TombstoneFileSize {
return 0
}
if s < 0 {
return uint32((-1) * s)
}
return uint32(s)
}
type OffsetLower struct {
b3 byte
b2 byte

Loading…
Cancel
Save