121 lines
3.2 KiB

6 years ago
  1. package weed_server
  2. import (
  3. "context"
  4. "net/http"
  5. "time"
  6. "github.com/seaweedfs/seaweedfs/weed/operation"
  7. "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
  8. "github.com/seaweedfs/seaweedfs/weed/storage/needle"
  9. )
  10. func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.BatchDeleteRequest) (*volume_server_pb.BatchDeleteResponse, error) {
  11. resp := &volume_server_pb.BatchDeleteResponse{}
  12. now := uint64(time.Now().Unix())
  13. for _, fid := range req.FileIds {
  14. vid, id_cookie, err := operation.ParseFileId(fid)
  15. if err != nil {
  16. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  17. FileId: fid,
  18. Status: http.StatusBadRequest,
  19. Error: err.Error()})
  20. continue
  21. }
  22. n := new(needle.Needle)
  23. volumeId, _ := needle.NewVolumeId(vid)
  24. ecVolume, isEcVolume := vs.store.FindEcVolume(volumeId)
  25. if req.SkipCookieCheck {
  26. n.Id, _, err = needle.ParseNeedleIdCookie(id_cookie)
  27. if err != nil {
  28. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  29. FileId: fid,
  30. Status: http.StatusBadRequest,
  31. Error: err.Error()})
  32. continue
  33. }
  34. } else {
  35. n.ParsePath(id_cookie)
  36. cookie := n.Cookie
  37. if !isEcVolume {
  38. if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil); err != nil {
  39. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  40. FileId: fid,
  41. Status: http.StatusNotFound,
  42. Error: err.Error(),
  43. })
  44. continue
  45. }
  46. } else {
  47. if _, err := vs.store.ReadEcShardNeedle(volumeId, n, nil); err != nil {
  48. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  49. FileId: fid,
  50. Status: http.StatusNotFound,
  51. Error: err.Error(),
  52. })
  53. continue
  54. }
  55. }
  56. if n.Cookie != cookie {
  57. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  58. FileId: fid,
  59. Status: http.StatusBadRequest,
  60. Error: "File Random Cookie does not match.",
  61. })
  62. break
  63. }
  64. }
  65. if n.IsChunkedManifest() {
  66. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  67. FileId: fid,
  68. Status: http.StatusNotAcceptable,
  69. Error: "ChunkManifest: not allowed in batch delete mode.",
  70. })
  71. continue
  72. }
  73. n.LastModified = now
  74. if !isEcVolume {
  75. if size, err := vs.store.DeleteVolumeNeedle(volumeId, n); err != nil {
  76. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  77. FileId: fid,
  78. Status: http.StatusInternalServerError,
  79. Error: err.Error()},
  80. )
  81. } else if size == 0 {
  82. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  83. FileId: fid,
  84. Status: http.StatusNotModified},
  85. )
  86. } else {
  87. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  88. FileId: fid,
  89. Status: http.StatusAccepted,
  90. Size: uint32(size)},
  91. )
  92. }
  93. } else {
  94. if size, err := vs.store.DeleteEcShardNeedle(ecVolume, n, n.Cookie); err != nil {
  95. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  96. FileId: fid,
  97. Status: http.StatusInternalServerError,
  98. Error: err.Error()},
  99. )
  100. } else {
  101. resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
  102. FileId: fid,
  103. Status: http.StatusAccepted,
  104. Size: uint32(size)},
  105. )
  106. }
  107. }
  108. }
  109. return resp, nil
  110. }