Browse Source

allow deleting only older empty dir without recursion (#4430)

pull/4432/head
Konstantin Lebedev 2 years ago
committed by GitHub
parent
commit
d75a7b7f62
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 11
      weed/pb/filer_pb/filer_pb_helper.go
  2. 15
      weed/s3api/s3api_objects_list_handlers.go

11
weed/pb/filer_pb/filer_pb_helper.go

@ -6,6 +6,7 @@ import (
"fmt" "fmt"
"os" "os"
"strings" "strings"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/storage/needle"
@ -13,6 +14,8 @@ import (
"google.golang.org/protobuf/proto" "google.golang.org/protobuf/proto"
) )
const cutoffTimeNewEmptyDir = 3
func (entry *Entry) IsInRemoteOnly() bool { func (entry *Entry) IsInRemoteOnly() bool {
return len(entry.GetChunks()) == 0 && entry.RemoteEntry != nil && entry.RemoteEntry.RemoteSize > 0 return len(entry.GetChunks()) == 0 && entry.RemoteEntry != nil && entry.RemoteEntry.RemoteSize > 0
} }
@ -28,6 +31,10 @@ func (entry *Entry) FileMode() (fileMode os.FileMode) {
return return
} }
func (entry *Entry) IsOlderDir() bool {
return entry.IsDirectory && entry.Attributes != nil && entry.Attributes.Mime == "" && entry.Attributes.GetCrtime() <= time.Now().Unix()-cutoffTimeNewEmptyDir
}
func ToFileIdObject(fileIdStr string) (*FileId, error) { func ToFileIdObject(fileIdStr string) (*FileId, error) {
t, err := needle.ParseFileIdFromString(fileIdStr) t, err := needle.ParseFileIdFromString(fileIdStr)
if err != nil { if err != nil {
@ -143,18 +150,22 @@ var ErrNotFound = errors.New("filer: no entry is found in filer store")
func IsEmpty(event *SubscribeMetadataResponse) bool { func IsEmpty(event *SubscribeMetadataResponse) bool {
return event.EventNotification.NewEntry == nil && event.EventNotification.OldEntry == nil return event.EventNotification.NewEntry == nil && event.EventNotification.OldEntry == nil
} }
func IsCreate(event *SubscribeMetadataResponse) bool { func IsCreate(event *SubscribeMetadataResponse) bool {
return event.EventNotification.NewEntry != nil && event.EventNotification.OldEntry == nil return event.EventNotification.NewEntry != nil && event.EventNotification.OldEntry == nil
} }
func IsUpdate(event *SubscribeMetadataResponse) bool { func IsUpdate(event *SubscribeMetadataResponse) bool {
return event.EventNotification.NewEntry != nil && return event.EventNotification.NewEntry != nil &&
event.EventNotification.OldEntry != nil && event.EventNotification.OldEntry != nil &&
event.Directory == event.EventNotification.NewParentPath && event.Directory == event.EventNotification.NewParentPath &&
event.EventNotification.NewEntry.Name == event.EventNotification.OldEntry.Name event.EventNotification.NewEntry.Name == event.EventNotification.OldEntry.Name
} }
func IsDelete(event *SubscribeMetadataResponse) bool { func IsDelete(event *SubscribeMetadataResponse) bool {
return event.EventNotification.NewEntry == nil && event.EventNotification.OldEntry != nil return event.EventNotification.NewEntry == nil && event.EventNotification.OldEntry != nil
} }
func IsRename(event *SubscribeMetadataResponse) bool { func IsRename(event *SubscribeMetadataResponse) bool {
return event.EventNotification.NewEntry != nil && return event.EventNotification.NewEntry != nil &&
event.EventNotification.OldEntry != nil && event.EventNotification.OldEntry != nil &&

15
weed/s3api/s3api_objects_list_handlers.go

@ -18,8 +18,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
) )
const cutoffTimeNewEmptyDir = 3
type ListBucketResultV2 struct { type ListBucketResultV2 struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListBucketResult"` XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ListBucketResult"`
Name string `xml:"Name"` Name string `xml:"Name"`
@ -391,7 +389,7 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d
// println("doListFilerEntries2 nextMarker", nextMarker) // println("doListFilerEntries2 nextMarker", nextMarker)
} else { } else {
var isEmpty bool var isEmpty bool
if !s3a.option.AllowEmptyFolder && !entry.IsDirectoryKeyObject() {
if !s3a.option.AllowEmptyFolder && entry.IsOlderDir() {
if isEmpty, err = s3a.ensureDirectoryAllEmpty(client, dir, entry.Name); err != nil { if isEmpty, err = s3a.ensureDirectoryAllEmpty(client, dir, entry.Name); err != nil {
glog.Errorf("check empty folder %s: %v", dir, err) glog.Errorf("check empty folder %s: %v", dir, err)
} }
@ -447,16 +445,11 @@ func (s3a *S3ApiServer) ensureDirectoryAllEmpty(filerClient filer_pb.SeaweedFile
var startFrom string var startFrom string
var isExhausted bool var isExhausted bool
var foundEntry bool var foundEntry bool
cutOffTimeAtSec := time.Now().Unix() + cutoffTimeNewEmptyDir
for fileCounter == 0 && !isExhausted && err == nil { for fileCounter == 0 && !isExhausted && err == nil {
err = filer_pb.SeaweedList(filerClient, currentDir, "", func(entry *filer_pb.Entry, isLast bool) error { err = filer_pb.SeaweedList(filerClient, currentDir, "", func(entry *filer_pb.Entry, isLast bool) error {
foundEntry = true foundEntry = true
if entry.IsDirectory {
if entry.Attributes != nil && cutOffTimeAtSec >= entry.Attributes.GetCrtime() {
fileCounter++
} else {
subDirs = append(subDirs, entry.Name)
}
if entry.IsOlderDir() {
subDirs = append(subDirs, entry.Name)
} else { } else {
fileCounter++ fileCounter++
} }
@ -489,7 +482,7 @@ func (s3a *S3ApiServer) ensureDirectoryAllEmpty(filerClient filer_pb.SeaweedFile
} }
glog.V(1).Infof("deleting empty folder %s", currentDir) glog.V(1).Infof("deleting empty folder %s", currentDir)
if err = doDeleteEntry(filerClient, parentDir, name, true, true); err != nil {
if err = doDeleteEntry(filerClient, parentDir, name, true, false); err != nil {
return return
} }

Loading…
Cancel
Save