You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

199 lines
5.8 KiB

10 months ago
10 months ago
10 months ago
10 months ago
10 months ago
  1. package s3api
  2. import (
  3. "encoding/xml"
  4. "fmt"
  5. "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
  6. "golang.org/x/exp/slices"
  7. "io"
  8. "net/http"
  9. "strings"
  10. "github.com/seaweedfs/seaweedfs/weed/filer"
  11. "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
  12. "github.com/seaweedfs/seaweedfs/weed/glog"
  13. "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
  14. "github.com/seaweedfs/seaweedfs/weed/util"
  15. )
  16. const (
  17. deleteMultipleObjectsLimit = 1000
  18. )
  19. func (s3a *S3ApiServer) DeleteObjectHandler(w http.ResponseWriter, r *http.Request) {
  20. bucket, object := s3_constants.GetBucketAndObject(r)
  21. glog.V(3).Infof("DeleteObjectHandler %s %s", bucket, object)
  22. object = urlPathEscape(removeDuplicateSlashes(object))
  23. s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  24. err := doDeleteEntry(client, s3a.option.BucketsPath+"/"+bucket, object, true, false)
  25. if err != nil {
  26. // skip deletion error, usually the file is not found
  27. return nil
  28. }
  29. if s3a.option.AllowEmptyFolder {
  30. return nil
  31. }
  32. directoriesWithDeletion := make(map[string]int)
  33. lastSeparator := strings.LastIndex(object, "/")
  34. if lastSeparator > 0 {
  35. parentDirectoryPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object[:lastSeparator])
  36. directoriesWithDeletion[parentDirectoryPath]++
  37. // purge empty folders, only checking folders with deletions
  38. for len(directoriesWithDeletion) > 0 {
  39. directoriesWithDeletion = s3a.doDeleteEmptyDirectories(client, directoriesWithDeletion)
  40. }
  41. }
  42. return nil
  43. })
  44. w.WriteHeader(http.StatusNoContent)
  45. }
  46. // / ObjectIdentifier carries key name for the object to delete.
  47. type ObjectIdentifier struct {
  48. ObjectName string `xml:"Key"`
  49. }
  50. // DeleteObjectsRequest - xml carrying the object key names which needs to be deleted.
  51. type DeleteObjectsRequest struct {
  52. // Element to enable quiet mode for the request
  53. Quiet bool
  54. // List of objects to be deleted
  55. Objects []ObjectIdentifier `xml:"Object"`
  56. }
  57. // DeleteError structure.
  58. type DeleteError struct {
  59. Code string
  60. Message string
  61. Key string
  62. }
  63. // DeleteObjectsResponse container for multiple object deletes.
  64. type DeleteObjectsResponse struct {
  65. XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ DeleteResult" json:"-"`
  66. // Collection of all deleted objects
  67. DeletedObjects []ObjectIdentifier `xml:"Deleted,omitempty"`
  68. // Collection of errors deleting certain objects.
  69. Errors []DeleteError `xml:"Error,omitempty"`
  70. }
  71. // DeleteMultipleObjectsHandler - Delete multiple objects
  72. func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *http.Request) {
  73. bucket, _ := s3_constants.GetBucketAndObject(r)
  74. glog.V(3).Infof("DeleteMultipleObjectsHandler %s", bucket)
  75. deleteXMLBytes, err := io.ReadAll(r.Body)
  76. if err != nil {
  77. s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
  78. return
  79. }
  80. deleteObjects := &DeleteObjectsRequest{}
  81. if err := xml.Unmarshal(deleteXMLBytes, deleteObjects); err != nil {
  82. s3err.WriteErrorResponse(w, r, s3err.ErrMalformedXML)
  83. return
  84. }
  85. if len(deleteObjects.Objects) > deleteMultipleObjectsLimit {
  86. s3err.WriteErrorResponse(w, r, s3err.ErrInvalidMaxDeleteObjects)
  87. return
  88. }
  89. var deletedObjects []ObjectIdentifier
  90. var deleteErrors []DeleteError
  91. var auditLog *s3err.AccessLog
  92. directoriesWithDeletion := make(map[string]int)
  93. if s3err.Logger != nil {
  94. auditLog = s3err.GetAccessLog(r, http.StatusNoContent, s3err.ErrNone)
  95. }
  96. s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
  97. // delete file entries
  98. for _, object := range deleteObjects.Objects {
  99. if object.ObjectName == "" {
  100. continue
  101. }
  102. lastSeparator := strings.LastIndex(object.ObjectName, "/")
  103. parentDirectoryPath, entryName, isDeleteData, isRecursive := "", object.ObjectName, true, false
  104. if lastSeparator > 0 && lastSeparator+1 < len(object.ObjectName) {
  105. entryName = object.ObjectName[lastSeparator+1:]
  106. parentDirectoryPath = "/" + object.ObjectName[:lastSeparator]
  107. }
  108. parentDirectoryPath = fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, parentDirectoryPath)
  109. err := doDeleteEntry(client, parentDirectoryPath, entryName, isDeleteData, isRecursive)
  110. if err == nil {
  111. directoriesWithDeletion[parentDirectoryPath]++
  112. deletedObjects = append(deletedObjects, object)
  113. } else if strings.Contains(err.Error(), filer.MsgFailDelNonEmptyFolder) {
  114. deletedObjects = append(deletedObjects, object)
  115. } else {
  116. delete(directoriesWithDeletion, parentDirectoryPath)
  117. deleteErrors = append(deleteErrors, DeleteError{
  118. Code: "",
  119. Message: err.Error(),
  120. Key: object.ObjectName,
  121. })
  122. }
  123. if auditLog != nil {
  124. auditLog.Key = entryName
  125. s3err.PostAccessLog(*auditLog)
  126. }
  127. }
  128. // purge empty folders, only checking folders with deletions
  129. for len(directoriesWithDeletion) > 0 {
  130. directoriesWithDeletion = s3a.doDeleteEmptyDirectories(client, directoriesWithDeletion)
  131. }
  132. return nil
  133. })
  134. deleteResp := DeleteObjectsResponse{}
  135. if !deleteObjects.Quiet {
  136. deleteResp.DeletedObjects = deletedObjects
  137. }
  138. deleteResp.Errors = deleteErrors
  139. writeSuccessResponseXML(w, r, deleteResp)
  140. }
  141. func (s3a *S3ApiServer) doDeleteEmptyDirectories(client filer_pb.SeaweedFilerClient, directoriesWithDeletion map[string]int) (newDirectoriesWithDeletion map[string]int) {
  142. var allDirs []string
  143. for dir := range directoriesWithDeletion {
  144. allDirs = append(allDirs, dir)
  145. }
  146. slices.SortFunc(allDirs, func(a, b string) int {
  147. return len(b) - len(a)
  148. })
  149. newDirectoriesWithDeletion = make(map[string]int)
  150. for _, dir := range allDirs {
  151. parentDir, dirName := util.FullPath(dir).DirAndName()
  152. if parentDir == s3a.option.BucketsPath {
  153. continue
  154. }
  155. if err := doDeleteEntry(client, parentDir, dirName, false, false); err != nil {
  156. glog.V(4).Infof("directory %s has %d deletion but still not empty: %v", dir, directoriesWithDeletion[dir], err)
  157. } else {
  158. newDirectoriesWithDeletion[parentDir]++
  159. }
  160. }
  161. return
  162. }