157 lines
4.0 KiB

6 years ago
6 years ago
6 years ago
  1. package operation
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "net/http"
  7. "strings"
  8. "sync"
  9. "google.golang.org/grpc"
  10. "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
  11. )
  12. type DeleteResult struct {
  13. Fid string `json:"fid"`
  14. Size int `json:"size"`
  15. Status int `json:"status"`
  16. Error string `json:"error,omitempty"`
  17. }
  18. func ParseFileId(fid string) (vid string, key_cookie string, err error) {
  19. commaIndex := strings.Index(fid, ",")
  20. if commaIndex <= 0 {
  21. return "", "", errors.New("Wrong fid format.")
  22. }
  23. return fid[:commaIndex], fid[commaIndex+1:], nil
  24. }
  25. // DeleteFiles batch deletes a list of fileIds
  26. func DeleteFiles(master string, usePublicUrl bool, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
  27. lookupFunc := func(vids []string) (results map[string]LookupResult, err error) {
  28. results, err = LookupVolumeIds(master, grpcDialOption, vids)
  29. if err == nil && usePublicUrl {
  30. for _, result := range results {
  31. for _, loc := range result.Locations {
  32. loc.Url = loc.PublicUrl
  33. }
  34. }
  35. }
  36. return
  37. }
  38. return DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc)
  39. }
  40. func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []string, lookupFunc func(vid []string) (map[string]LookupResult, error)) ([]*volume_server_pb.DeleteResult, error) {
  41. var ret []*volume_server_pb.DeleteResult
  42. vid_to_fileIds := make(map[string][]string)
  43. var vids []string
  44. for _, fileId := range fileIds {
  45. vid, _, err := ParseFileId(fileId)
  46. if err != nil {
  47. ret = append(ret, &volume_server_pb.DeleteResult{
  48. FileId: fileId,
  49. Status: http.StatusBadRequest,
  50. Error: err.Error()},
  51. )
  52. continue
  53. }
  54. if _, ok := vid_to_fileIds[vid]; !ok {
  55. vid_to_fileIds[vid] = make([]string, 0)
  56. vids = append(vids, vid)
  57. }
  58. vid_to_fileIds[vid] = append(vid_to_fileIds[vid], fileId)
  59. }
  60. lookupResults, err := lookupFunc(vids)
  61. if err != nil {
  62. return ret, err
  63. }
  64. server_to_fileIds := make(map[string][]string)
  65. for vid, result := range lookupResults {
  66. if result.Error != "" {
  67. ret = append(ret, &volume_server_pb.DeleteResult{
  68. FileId: vid,
  69. Status: http.StatusBadRequest,
  70. Error: err.Error()},
  71. )
  72. continue
  73. }
  74. for _, location := range result.Locations {
  75. if _, ok := server_to_fileIds[location.Url]; !ok {
  76. server_to_fileIds[location.Url] = make([]string, 0)
  77. }
  78. server_to_fileIds[location.Url] = append(
  79. server_to_fileIds[location.Url], vid_to_fileIds[vid]...)
  80. }
  81. }
  82. resultChan := make(chan []*volume_server_pb.DeleteResult, len(server_to_fileIds))
  83. var wg sync.WaitGroup
  84. for server, fidList := range server_to_fileIds {
  85. wg.Add(1)
  86. go func(server string, fidList []string) {
  87. defer wg.Done()
  88. if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, grpcDialOption, fidList, true); deleteErr != nil {
  89. err = deleteErr
  90. } else if deleteResults != nil {
  91. resultChan <- deleteResults
  92. }
  93. }(server, fidList)
  94. }
  95. wg.Wait()
  96. close(resultChan)
  97. for result := range resultChan {
  98. ret = append(ret, result...)
  99. }
  100. return ret, err
  101. }
  102. // DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc
  103. func DeleteFilesAtOneVolumeServer(volumeServer string, grpcDialOption grpc.DialOption, fileIds []string, includeCookie bool) (ret []*volume_server_pb.DeleteResult, err error) {
  104. err = WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
  105. req := &volume_server_pb.BatchDeleteRequest{
  106. FileIds: fileIds,
  107. SkipCookieCheck: !includeCookie,
  108. }
  109. resp, err := volumeServerClient.BatchDelete(context.Background(), req)
  110. // fmt.Printf("deleted %v %v: %v\n", fileIds, err, resp)
  111. if err != nil {
  112. return err
  113. }
  114. ret = append(ret, resp.Results...)
  115. return nil
  116. })
  117. if err != nil {
  118. return
  119. }
  120. for _, result := range ret {
  121. if result.Error != "" && result.Error != "not found" {
  122. return nil, fmt.Errorf("delete fileId %s: %v", result.FileId, result.Error)
  123. }
  124. }
  125. return
  126. }