You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

108 lines
3.3 KiB

  1. package storage
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/pb"
  6. "github.com/seaweedfs/seaweedfs/weed/glog"
  7. "github.com/seaweedfs/seaweedfs/weed/operation"
  8. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  9. "github.com/seaweedfs/seaweedfs/weed/storage/erasure_coding"
  10. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  11. "github.com/seaweedfs/seaweedfs/weed/storage/types"
  12. )
  13. func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error) {
  14. count, err := s.ReadEcShardNeedle(ecVolume.VolumeId, n, nil)
  15. if err != nil {
  16. return 0, err
  17. }
  18. if cookie != n.Cookie {
  19. return 0, fmt.Errorf("unexpected cookie %x", cookie)
  20. }
  21. if err = s.doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume, n.Id); err != nil {
  22. return 0, err
  23. }
  24. return int64(count), nil
  25. }
  26. func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error {
  27. _, _, intervals, err := ecVolume.LocateEcShardNeedle(needleId, ecVolume.Version)
  28. if err != nil {
  29. return err
  30. }
  31. if len(intervals) == 0 {
  32. return erasure_coding.NotFoundError
  33. }
  34. shardId, _ := intervals[0].ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
  35. hasDeletionSuccess := false
  36. err = s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId)
  37. if err == nil {
  38. hasDeletionSuccess = true
  39. }
  40. for shardId = erasure_coding.DataShardsCount; shardId < erasure_coding.TotalShardsCount; shardId++ {
  41. if parityDeletionError := s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId); parityDeletionError == nil {
  42. hasDeletionSuccess = true
  43. }
  44. }
  45. if hasDeletionSuccess {
  46. return nil
  47. }
  48. return err
  49. }
  50. func (s *Store) doDeleteNeedleFromRemoteEcShardServers(shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error {
  51. ecVolume.ShardLocationsLock.RLock()
  52. sourceDataNodes, hasShardLocations := ecVolume.ShardLocations[shardId]
  53. ecVolume.ShardLocationsLock.RUnlock()
  54. if !hasShardLocations {
  55. return fmt.Errorf("ec shard %d.%d not located", ecVolume.VolumeId, shardId)
  56. }
  57. for _, sourceDataNode := range sourceDataNodes {
  58. glog.V(4).Infof("delete from remote ec shard %d.%d from %s", ecVolume.VolumeId, shardId, sourceDataNode)
  59. err := s.doDeleteNeedleFromRemoteEcShard(sourceDataNode, ecVolume.VolumeId, ecVolume.Collection, ecVolume.Version, needleId)
  60. if err != nil {
  61. return err
  62. }
  63. glog.V(1).Infof("delete from remote ec shard %d.%d from %s: %v", ecVolume.VolumeId, shardId, sourceDataNode, err)
  64. }
  65. return nil
  66. }
  67. func (s *Store) doDeleteNeedleFromRemoteEcShard(sourceDataNode pb.ServerAddress, vid needle.VolumeId, collection string, version needle.Version, needleId types.NeedleId) error {
  68. return operation.WithVolumeServerClient(false, sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  69. // copy data slice
  70. _, err := client.VolumeEcBlobDelete(context.Background(), &volume_server_pb.VolumeEcBlobDeleteRequest{
  71. VolumeId: uint32(vid),
  72. Collection: collection,
  73. FileKey: uint64(needleId),
  74. Version: uint32(version),
  75. })
  76. if err != nil {
  77. return fmt.Errorf("failed to delete from ec shard %d on %s: %v", vid, sourceDataNode, err)
  78. }
  79. return nil
  80. })
  81. }