106 lines
3.2 KiB

  1. package storage
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/chrislusf/seaweedfs/weed/pb"
  6. "github.com/chrislusf/seaweedfs/weed/glog"
  7. "github.com/chrislusf/seaweedfs/weed/operation"
  8. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  9. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  10. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  11. "github.com/chrislusf/seaweedfs/weed/storage/types"
  12. )
  13. func (s *Store) DeleteEcShardNeedle(ecVolume *erasure_coding.EcVolume, n *needle.Needle, cookie types.Cookie) (int64, error) {
  14. count, err := s.ReadEcShardNeedle(ecVolume.VolumeId, n, nil)
  15. if err != nil {
  16. return 0, err
  17. }
  18. if cookie != n.Cookie {
  19. return 0, fmt.Errorf("unexpected cookie %x", cookie)
  20. }
  21. if err = s.doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume, n.Id); err != nil {
  22. return 0, err
  23. }
  24. return int64(count), nil
  25. }
  26. func (s *Store) doDeleteNeedleFromAtLeastOneRemoteEcShards(ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error {
  27. _, _, intervals, err := ecVolume.LocateEcShardNeedle(needleId, ecVolume.Version)
  28. if len(intervals) == 0 {
  29. return erasure_coding.NotFoundError
  30. }
  31. shardId, _ := intervals[0].ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
  32. hasDeletionSuccess := false
  33. err = s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId)
  34. if err == nil {
  35. hasDeletionSuccess = true
  36. }
  37. for shardId = erasure_coding.DataShardsCount; shardId < erasure_coding.TotalShardsCount; shardId++ {
  38. if parityDeletionError := s.doDeleteNeedleFromRemoteEcShardServers(shardId, ecVolume, needleId); parityDeletionError == nil {
  39. hasDeletionSuccess = true
  40. }
  41. }
  42. if hasDeletionSuccess {
  43. return nil
  44. }
  45. return err
  46. }
  47. func (s *Store) doDeleteNeedleFromRemoteEcShardServers(shardId erasure_coding.ShardId, ecVolume *erasure_coding.EcVolume, needleId types.NeedleId) error {
  48. ecVolume.ShardLocationsLock.RLock()
  49. sourceDataNodes, hasShardLocations := ecVolume.ShardLocations[shardId]
  50. ecVolume.ShardLocationsLock.RUnlock()
  51. if !hasShardLocations {
  52. return fmt.Errorf("ec shard %d.%d not located", ecVolume.VolumeId, shardId)
  53. }
  54. for _, sourceDataNode := range sourceDataNodes {
  55. glog.V(4).Infof("delete from remote ec shard %d.%d from %s", ecVolume.VolumeId, shardId, sourceDataNode)
  56. err := s.doDeleteNeedleFromRemoteEcShard(sourceDataNode, ecVolume.VolumeId, ecVolume.Collection, ecVolume.Version, needleId)
  57. if err != nil {
  58. return err
  59. }
  60. glog.V(1).Infof("delete from remote ec shard %d.%d from %s: %v", ecVolume.VolumeId, shardId, sourceDataNode, err)
  61. }
  62. return nil
  63. }
  64. func (s *Store) doDeleteNeedleFromRemoteEcShard(sourceDataNode pb.ServerAddress, vid needle.VolumeId, collection string, version needle.Version, needleId types.NeedleId) error {
  65. return operation.WithVolumeServerClient(sourceDataNode, s.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
  66. // copy data slice
  67. _, err := client.VolumeEcBlobDelete(context.Background(), &volume_server_pb.VolumeEcBlobDeleteRequest{
  68. VolumeId: uint32(vid),
  69. Collection: collection,
  70. FileKey: uint64(needleId),
  71. Version: uint32(version),
  72. })
  73. if err != nil {
  74. return fmt.Errorf("failed to delete from ec shard %d on %s: %v", vid, sourceDataNode, err)
  75. }
  76. return nil
  77. })
  78. }