diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go index bd36c69e4..3b20da9f4 100644 --- a/weed/s3api/filer_util.go +++ b/weed/s3api/filer_util.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "math" "strings" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -115,49 +114,17 @@ func (s3a *S3ApiServer) updateEntriesTTL(parentDirectoryPath string, ttlSec int3 // Use iterative approach with a queue to avoid recursive WithFilerClient calls // which would create a new connection for each subdirectory return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + ctx := context.Background() var updateErrors []error dirsToProcess := []string{parentDirectoryPath} - ctx := context.Background() for len(dirsToProcess) > 0 { dir := dirsToProcess[0] dirsToProcess = dirsToProcess[1:] - if listErr := filer_pb.SeaweedList(ctx, client, dir, "", func(entry *filer_pb.Entry, isLast bool) error { - if entry.IsDirectory { - // Add subdirectory to queue for processing - dirsToProcess = append(dirsToProcess, string(util.NewFullPath(dir, entry.Name))) - return nil - } - if entry.Attributes == nil { - entry.Attributes = &filer_pb.FuseAttributes{} - } - if entry.Extended == nil { - entry.Extended = make(map[string][]byte) - } - - // Check if both TTL and S3 expiry flag are already set correctly - flagAlreadySet := string(entry.Extended[s3_constants.SeaweedFSExpiresS3]) == "true" - if entry.Attributes.TtlSec == ttlSec && flagAlreadySet { - return nil - } - - // Set the S3 expiry flag - entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true") - // Update TTL if needed - if entry.Attributes.TtlSec != ttlSec { - entry.Attributes.TtlSec = ttlSec - } - - if err := filer_pb.UpdateEntry(ctx, client, &filer_pb.UpdateEntryRequest{ - Directory: dir, - Entry: entry, - }); err != nil { - updateErrors = append(updateErrors, fmt.Errorf("file %s/%s: %w", dir, entry.Name, err)) - } - return nil - }, "", false, math.MaxInt32); listErr != nil { - updateErrors = append(updateErrors, fmt.Errorf("list entries in %s: %w", dir, listErr)) + // Process directory in paginated batches + if err := s3a.processDirectoryTTL(ctx, client, dir, ttlSec, &dirsToProcess, &updateErrors); err != nil { + updateErrors = append(updateErrors, err) } } @@ -168,6 +135,85 @@ func (s3a *S3ApiServer) updateEntriesTTL(parentDirectoryPath string, ttlSec int3 }) } +// processDirectoryTTL processes a single directory in paginated batches +func (s3a *S3ApiServer) processDirectoryTTL(ctx context.Context, client filer_pb.SeaweedFilerClient, + dir string, ttlSec int32, dirsToProcess *[]string, updateErrors *[]error) error { + + const batchSize = 1024 // Same as filer.PaginationSize + startFrom := "" + + for { + lastEntryName, entryCount, err := s3a.processTTLBatch(ctx, client, dir, ttlSec, startFrom, batchSize, dirsToProcess, updateErrors) + if err != nil { + return fmt.Errorf("list entries in %s: %w", dir, err) + } + + // If we got fewer entries than batch size, we've reached the end + if entryCount < batchSize { + break + } + startFrom = lastEntryName + } + return nil +} + +// processTTLBatch processes a single batch of entries +func (s3a *S3ApiServer) processTTLBatch(ctx context.Context, client filer_pb.SeaweedFilerClient, + dir string, ttlSec int32, startFrom string, batchSize uint32, + dirsToProcess *[]string, updateErrors *[]error) (lastEntry string, count int, err error) { + + err = filer_pb.SeaweedList(ctx, client, dir, "", func(entry *filer_pb.Entry, isLast bool) error { + lastEntry = entry.Name + count++ + + if entry.IsDirectory { + *dirsToProcess = append(*dirsToProcess, string(util.NewFullPath(dir, entry.Name))) + return nil + } + + // Update entry TTL and S3 expiry flag + if updateErr := s3a.updateEntryTTL(ctx, client, dir, entry, ttlSec); updateErr != nil { + *updateErrors = append(*updateErrors, updateErr) + } + return nil + }, startFrom, false, batchSize) + + return lastEntry, count, err +} + +// updateEntryTTL updates a single entry's TTL and S3 expiry flag +func (s3a *S3ApiServer) updateEntryTTL(ctx context.Context, client filer_pb.SeaweedFilerClient, + dir string, entry *filer_pb.Entry, ttlSec int32) error { + + if entry.Attributes == nil { + entry.Attributes = &filer_pb.FuseAttributes{} + } + if entry.Extended == nil { + entry.Extended = make(map[string][]byte) + } + + // Check if both TTL and S3 expiry flag are already set correctly + flagAlreadySet := string(entry.Extended[s3_constants.SeaweedFSExpiresS3]) == "true" + if entry.Attributes.TtlSec == ttlSec && flagAlreadySet { + return nil // Already up to date + } + + // Set the S3 expiry flag + entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true") + // Update TTL if needed + if entry.Attributes.TtlSec != ttlSec { + entry.Attributes.TtlSec = ttlSec + } + + if err := filer_pb.UpdateEntry(ctx, client, &filer_pb.UpdateEntryRequest{ + Directory: dir, + Entry: entry, + }); err != nil { + return fmt.Errorf("file %s/%s: %w", dir, entry.Name, err) + } + return nil +} + func (s3a *S3ApiServer) getCollectionName(bucket string) string { if s3a.option.FilerGroup != "" { return fmt.Sprintf("%s_%s", s3a.option.FilerGroup, bucket)