diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index a2606839b..ec868aa9c 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -369,6 +369,13 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea if !found { return fmt.Errorf("VolumeEcShardRead not found ec volume id %d", req.VolumeId) } + + // Validate generation matches the request + requestedGeneration := req.Generation + if ecVolume.Generation != requestedGeneration { + return fmt.Errorf("VolumeEcShardRead volume %d generation mismatch: requested %d, found %d", + req.VolumeId, requestedGeneration, ecVolume.Generation) + } ecShard, found := ecVolume.FindEcVolumeShard(erasure_coding.ShardId(req.ShardId)) if !found { return fmt.Errorf("not found ec shard %d.%d", req.VolumeId, req.ShardId) diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go index 493ad660d..ee88524d7 100644 --- a/weed/storage/store_ec.go +++ b/weed/storage/store_ec.go @@ -219,7 +219,7 @@ func (s *Store) readOneEcShardInterval(needleId types.NeedleId, ecVolume *erasur // try reading directly if hasShardIdLocation { - _, is_deleted, err = s.readRemoteEcShardInterval(sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset) + _, is_deleted, err = s.readRemoteEcShardInterval(sourceDataNodes, needleId, ecVolume.VolumeId, shardId, data, actualOffset, ecVolume.Generation) if err == nil { return } @@ -286,7 +286,7 @@ func (s *Store) cachedLookupEcShardLocations(ecVolume *erasure_coding.EcVolume) return } -func (s *Store) readRemoteEcShardInterval(sourceDataNodes []pb.ServerAddress, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { +func (s *Store) readRemoteEcShardInterval(sourceDataNodes []pb.ServerAddress, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64, generation uint32) (n int, is_deleted bool, err error) { if len(sourceDataNodes) == 0 { return 0, false, fmt.Errorf("failed to find ec shard %d.%d", vid, shardId) @@ -294,7 +294,7 @@ func (s *Store) readRemoteEcShardInterval(sourceDataNodes []pb.ServerAddress, ne for _, sourceDataNode := range sourceDataNodes { glog.V(3).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode) - n, is_deleted, err = s.doReadRemoteEcShardInterval(sourceDataNode, needleId, vid, shardId, buf, offset) + n, is_deleted, err = s.doReadRemoteEcShardInterval(sourceDataNode, needleId, vid, shardId, buf, offset, generation) if err == nil { return } @@ -304,17 +304,18 @@ func (s *Store) readRemoteEcShardInterval(sourceDataNodes []pb.ServerAddress, ne return } -func (s *Store) doReadRemoteEcShardInterval(sourceDataNode pb.ServerAddress, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) { +func (s *Store) doReadRemoteEcShardInterval(sourceDataNode pb.ServerAddress, needleId types.NeedleId, vid needle.VolumeId, shardId erasure_coding.ShardId, buf []byte, offset int64, generation uint32) (n int, is_deleted bool, err error) { err = operation.WithVolumeServerClient(false, sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { // copy data slice shardReadClient, err := client.VolumeEcShardRead(context.Background(), &volume_server_pb.VolumeEcShardReadRequest{ - VolumeId: uint32(vid), - ShardId: uint32(shardId), - Offset: offset, - Size: int64(len(buf)), - FileKey: uint64(needleId), + VolumeId: uint32(vid), + ShardId: uint32(shardId), + Offset: offset, + Size: int64(len(buf)), + FileKey: uint64(needleId), + Generation: generation, // pass generation to read from correct EC volume }) if err != nil { return fmt.Errorf("failed to start reading ec shard %d.%d from %s: %v", vid, shardId, sourceDataNode, err) @@ -372,7 +373,7 @@ func (s *Store) recoverOneRemoteEcShardInterval(needleId types.NeedleId, ecVolum go func(shardId erasure_coding.ShardId, locations []pb.ServerAddress) { defer wg.Done() data := make([]byte, len(buf)) - nRead, isDeleted, readErr := s.readRemoteEcShardInterval(locations, needleId, ecVolume.VolumeId, shardId, data, offset) + nRead, isDeleted, readErr := s.readRemoteEcShardInterval(locations, needleId, ecVolume.VolumeId, shardId, data, offset, ecVolume.Generation) if readErr != nil { glog.V(3).Infof("recover: readRemoteEcShardInterval %d.%d %d bytes from %+v: %v", ecVolume.VolumeId, shardId, nRead, locations, readErr) forgetShardId(ecVolume, shardId)