Browse Source

VolumeEcShardRead to read from correct (vid, generation) EcVolume

add-ec-vacuum
chrislu 5 months ago
parent
commit
f00dc46607
  1. 7
      weed/server/volume_grpc_erasure_coding.go
  2. 21
      weed/storage/store_ec.go

7
weed/server/volume_grpc_erasure_coding.go

@ -369,6 +369,13 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea
if !found {
return fmt.Errorf("VolumeEcShardRead not found ec volume id %d", req.VolumeId)
}
// Validate generation matches the request
requestedGeneration := req.Generation
if ecVolume.Generation != requestedGeneration {
return fmt.Errorf("VolumeEcShardRead volume %d generation mismatch: requested %d, found %d",
req.VolumeId, requestedGeneration, ecVolume.Generation)
}
ecShard, found := ecVolume.FindEcVolumeShard(erasure_coding.ShardId(req.ShardId))
if !found {
return fmt.Errorf("not found ec shard %d.%d", req.VolumeId, req.ShardId)

21
weed/storage/store_ec.go

@ -219,7 +219,7 @@ func (s *Store) readOneEcShardInterval(needleId types.NeedleId, ecVolume *erasur
// try reading directly
if hasShardIdLocation {
_, is_deleted, err = s.readRemoteEcShardInterval(sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset)
_, is_deleted, err = s.readRemoteEcShardInterval(sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset, ecVolume.Generation)
if err == nil {
return
}
@ -286,7 +286,7 @@ func (s *Store) cachedLookupEcShardLocations(ecVolume *erasure_coding.EcVolume)
return
}
func (s *Store) readRemoteEcShardInterval(sourceDataNodes []pb.ServerAddress, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
func (s *Store) readRemoteEcShardInterval(sourceDataNodes []pb.ServerAddress, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64, generation uint32) (n int, is_deleted bool, err error) {
if len(sourceDataNodes) == 0 {
return 0, false, fmt.Errorf("failed to find ec shard %d.%d", vid, shardId)
@ -294,7 +294,7 @@ func (s *Store) readRemoteEcShardInterval(sourceDataNodes []pb.ServerAddress, ne
for _, sourceDataNode := range sourceDataNodes {
glog.V(3).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode)
n, is_deleted, err = s.doReadRemoteEcShardInterval(sourceDataNode, needleId, vid, shardId, buf, offset)
n, is_deleted, err = s.doReadRemoteEcShardInterval(sourceDataNode, needleId, vid, shardId, buf, offset, generation)
if err == nil {
return
}
@ -304,17 +304,18 @@ func (s *Store) readRemoteEcShardInterval(sourceDataNodes []pb.ServerAddress, ne
return
}
func (s *Store) doReadRemoteEcShardInterval(sourceDataNode pb.ServerAddress, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
func (s *Store) doReadRemoteEcShardInterval(sourceDataNode pb.ServerAddress, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64, generation uint32) (n int, is_deleted bool, err error) {
err = operation.WithVolumeServerClient(false, sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
// copy data slice
shardReadClient, err := client.VolumeEcShardRead(context.Background(), &volume_server_pb.VolumeEcShardReadRequest{
VolumeId: uint32(vid),
ShardId: uint32(shardId),
Offset: offset,
Size: int64(len(buf)),
FileKey: uint64(needleId),
VolumeId: uint32(vid),
ShardId: uint32(shardId),
Offset: offset,
Size: int64(len(buf)),
FileKey: uint64(needleId),
Generation: generation, // pass generation to read from correct EC volume
})
if err != nil {
return fmt.Errorf("failed to start reading ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err)
@ -372,7 +373,7 @@ func (s *Store) recoverOneRemoteEcShardInterval(needleId types.NeedleId, ecVolum
go func(shardId erasure_coding.ShardId, locations []pb.ServerAddress) {
defer wg.Done()
data := make([]byte, len(buf))
nRead, isDeleted, readErr := s.readRemoteEcShardInterval(locations, needleId, ecVolume.VolumeId, shardId, data, offset)
nRead, isDeleted, readErr := s.readRemoteEcShardInterval(locations, needleId, ecVolume.VolumeId, shardId, data, offset, ecVolume.Generation)
if readErr != nil {
glog.V(3).Infof("recover: readRemoteEcShardInterval %d.%d %d bytes from %+v: %v", ecVolume.VolumeId, shardId, nRead, locations, readErr)
forgetShardId(ecVolume, shardId)

Loading…
Cancel
Save