You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

200 lines
5.9 KiB

10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
  1. package s3api
  2. import (
  3. "encoding/xml"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  6. "golang.org/x/exp/slices"
  7. "io"
  8. "net/http"
  9. "strings"
  10. "github.com/seaweedfs/seaweedfs/weed/filer"
  11. "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
  12. "github.com/seaweedfs/seaweedfs/weed/glog"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/util"
  15. )
  16. const (
  17. deleteMultipleObjectsLimit = 1000
  18. )
  19. func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
  20. bucket, object := s3_constants.GetBucketAndObject(r)
  21. glog.V(3).Infof("DeleteObjectHandler %s %s", bucket, object)
  22. target := util.FullPath(fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object))
  23. dir, name := target.DirAndName()
  24. s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  25. err := doDeleteEntry(client, dir, name, true, false)
  26. if err != nil {
  27. // skip deletion error, usually the file is not found
  28. return nil
  29. }
  30. if s3a.option.AllowEmptyFolder {
  31. return nil
  32. }
  33. directoriesWithDeletion := make(map[string]int)
  34. lastSeparator := strings.LastIndex(object, "/")
  35. if lastSeparator > 0 {
  36. parentDirectoryPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object[:lastSeparator])
  37. directoriesWithDeletion[parentDirectoryPath]++
  38. // purge empty folders, only checking folders with deletions
  39. for len(directoriesWithDeletion) > 0 {
  40. directoriesWithDeletion = s3a.doDeleteEmptyDirectories(client, directoriesWithDeletion)
  41. }
  42. }
  43. return nil
  44. })
  45. w.WriteHeader(http.StatusNoContent)
  46. }
  47. // / ObjectIdentifier carries key name for the object to delete.
  48. type ObjectIdentifier struct {
  49. ObjectName string `xml:"Key"`
  50. }
  51. // DeleteObjectsRequest - xml carrying the object key names which needs to be deleted.
  52. type DeleteObjectsRequest struct {
  53. // Element to enable quiet mode for the request
  54. Quiet bool
  55. // List of objects to be deleted
  56. Objects []ObjectIdentifier `xml:"Object"`
  57. }
  58. // DeleteError structure.
  59. type DeleteError struct {
  60. Code string
  61. Message string
  62. Key string
  63. }
  64. // DeleteObjectsResponse container for multiple object deletes.
  65. type DeleteObjectsResponse struct {
  66. XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ DeleteResult" json:"-"`
  67. // Collection of all deleted objects
  68. DeletedObjects []ObjectIdentifier `xml:"Deleted,omitempty"`
  69. // Collection of errors deleting certain objects.
  70. Errors []DeleteError `xml:"Error,omitempty"`
  71. }
  72. // DeleteMultipleObjectsHandler - Delete multiple objects
  73. func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Request) {
  74. bucket, _ := s3_constants.GetBucketAndObject(r)
  75. glog.V(3).Infof("DeleteMultipleObjectsHandler %s", bucket)
  76. deleteXMLBytes, err := io.ReadAll(r.Body)
  77. if err != nil {
  78. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  79. return
  80. }
  81. deleteObjects := &DeleteObjectsRequest{}
  82. if err := xml.Unmarshal(deleteXMLBytes, deleteObjects); err != nil {
  83. s3err.WriteErrorResponse(w, r, s3err.ErrMalformedXML)
  84. return
  85. }
  86. if len(deleteObjects.Objects) > deleteMultipleObjectsLimit {
  87. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxDeleteObjects)
  88. return
  89. }
  90. var deletedObjects []ObjectIdentifier
  91. var deleteErrors []DeleteError
  92. var auditLog *s3err.AccessLog
  93. directoriesWithDeletion := make(map[string]int)
  94. if s3err.Logger != nil {
  95. auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone)
  96. }
  97. s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  98. // delete file entries
  99. for _, object := range deleteObjects.Objects {
  100. if object.ObjectName == "" {
  101. continue
  102. }
  103. lastSeparator := strings.LastIndex(object.ObjectName, "/")
  104. parentDirectoryPath, entryName, isDeleteData, isRecursive := "", object.ObjectName, true, false
  105. if lastSeparator > 0 && lastSeparator+1 < len(object.ObjectName) {
  106. entryName = object.ObjectName[lastSeparator+1:]
  107. parentDirectoryPath = "/" + object.ObjectName[:lastSeparator]
  108. }
  109. parentDirectoryPath = fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, parentDirectoryPath)
  110. err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
  111. if err == nil {
  112. directoriesWithDeletion[parentDirectoryPath]++
  113. deletedObjects = append(deletedObjects, object)
  114. } else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) {
  115. deletedObjects = append(deletedObjects, object)
  116. } else {
  117. delete(directoriesWithDeletion, parentDirectoryPath)
  118. deleteErrors = append(deleteErrors, DeleteError{
  119. Code: "",
  120. Message: err.Error(),
  121. Key: object.ObjectName,
  122. })
  123. }
  124. if auditLog != nil {
  125. auditLog.Key = entryName
  126. s3err.PostAccessLog(*auditLog)
  127. }
  128. }
  129. // purge empty folders, only checking folders with deletions
  130. for len(directoriesWithDeletion) > 0 {
  131. directoriesWithDeletion = s3a.doDeleteEmptyDirectories(client, directoriesWithDeletion)
  132. }
  133. return nil
  134. })
  135. deleteResp := DeleteObjectsResponse{}
  136. if !deleteObjects.Quiet {
  137. deleteResp.DeletedObjects = deletedObjects
  138. }
  139. deleteResp.Errors = deleteErrors
  140. writeSuccessResponseXML(w, r, deleteResp)
  141. }
  142. func (s3a *S3ApiServer) doDeleteEmptyDirectories(client filer_pb.SeaweedFilerClient, directoriesWithDeletion map[string]int) (newDirectoriesWithDeletion map[string]int) {
  143. var allDirs []string
  144. for dir := range directoriesWithDeletion {
  145. allDirs = append(allDirs, dir)
  146. }
  147. slices.SortFunc(allDirs, func(a, b string) int {
  148. return len(b) - len(a)
  149. })
  150. newDirectoriesWithDeletion = make(map[string]int)
  151. for _, dir := range allDirs {
  152. parentDir, dirName := util.FullPath(dir).DirAndName()
  153. if parentDir == s3a.option.BucketsPath {
  154. continue
  155. }
  156. if err := doDeleteEntry(client, parentDir, dirName, false, false); err != nil {
  157. glog.V(4).Infof("directory %s has %d deletion but still not empty: %v", dir, directoriesWithDeletion[dir], err)
  158. } else {
  159. newDirectoriesWithDeletion[parentDir]++
  160. }
  161. }
  162. return
  163. }