105 lines
3.2 KiB

  1. package storage
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/operation"
  7. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  8. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  9. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  10. "github.com/chrislusf/seaweedfs/weed/storage/types"
  11. )
  12. func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error) {
  13. count, err := s.ReadEcShardNeedle(ecVolume.VolumeId, n)
  14. if err != nil {
  15. return 0, err
  16. }
  17. if cookie != n.Cookie {
  18. return 0, fmt.Errorf("unexpected cookie %x", cookie)
  19. }
  20. if err = s.doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume, n.Id); err != nil {
  21. return 0, err
  22. }
  23. return int64(count), nil
  24. }
  25. func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error {
  26. _, _, intervals, err := ecVolume.LocateEcShardNeedle(needleId, ecVolume.Version)
  27. if len(intervals) == 0 {
  28. return erasure_coding.NotFoundError
  29. }
  30. shardId, _ := intervals[0].ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
  31. hasDeletionSuccess := false
  32. err = s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId)
  33. if err == nil {
  34. hasDeletionSuccess = true
  35. }
  36. for shardId = erasure_coding.DataShardsCount; shardId < erasure_coding.TotalShardsCount; shardId++ {
  37. if parityDeletionError := s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId); parityDeletionError == nil {
  38. hasDeletionSuccess = true
  39. }
  40. }
  41. if hasDeletionSuccess {
  42. return nil
  43. }
  44. return err
  45. }
  46. func (s *Store) doDeleteNeedleFromRemoteEcShardServers(shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error {
  47. ecVolume.ShardLocationsLock.RLock()
  48. sourceDataNodes, hasShardLocations := ecVolume.ShardLocations[shardId]
  49. ecVolume.ShardLocationsLock.RUnlock()
  50. if !hasShardLocations {
  51. return fmt.Errorf("ec shard %d.%d not located", ecVolume.VolumeId, shardId)
  52. }
  53. for _, sourceDataNode := range sourceDataNodes {
  54. glog.V(4).Infof("delete from remote ec shard %d.%d from %s", ecVolume.VolumeId, shardId, sourceDataNode)
  55. err := s.doDeleteNeedleFromRemoteEcShard(sourceDataNode, ecVolume.VolumeId, ecVolume.Collection, ecVolume.Version, needleId)
  56. if err != nil {
  57. return err
  58. }
  59. glog.V(1).Infof("delete from remote ec shard %d.%d from %s: %v", ecVolume.VolumeId, shardId, sourceDataNode, err)
  60. }
  61. return nil
  62. }
  63. func (s *Store) doDeleteNeedleFromRemoteEcShard(sourceDataNode string, vid needle.VolumeId, collection string, version needle.Version, needleId types.NeedleId) error {
  64. return operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  65. // copy data slice
  66. _, err := client.VolumeEcBlobDelete(context.Background(), &volume_server_pb.VolumeEcBlobDeleteRequest{
  67. VolumeId: uint32(vid),
  68. Collection: collection,
  69. FileKey: uint64(needleId),
  70. Version: uint32(version),
  71. })
  72. if err != nil {
  73. return fmt.Errorf("failed to delete from ec shard %d on %s: %v", vid, sourceDataNode, err)
  74. }
  75. return nil
  76. })
  77. }