|
|
@ -4,7 +4,6 @@ import ( |
|
|
"context" |
|
|
"context" |
|
|
"errors" |
|
|
"errors" |
|
|
"fmt" |
|
|
"fmt" |
|
|
"math" |
|
|
|
|
|
"strings" |
|
|
"strings" |
|
|
|
|
|
|
|
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
|
"github.com/seaweedfs/seaweedfs/weed/glog" |
|
|
@ -115,49 +114,17 @@ func (s3a *S3ApiServer) updateEntriesTTL(parentDirectoryPath string, ttlSec int3 |
|
|
// Use iterative approach with a queue to avoid recursive WithFilerClient calls
|
|
|
// Use iterative approach with a queue to avoid recursive WithFilerClient calls
|
|
|
// which would create a new connection for each subdirectory
|
|
|
// which would create a new connection for each subdirectory
|
|
|
return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { |
|
|
return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { |
|
|
|
|
|
ctx := context.Background() |
|
|
var updateErrors []error |
|
|
var updateErrors []error |
|
|
dirsToProcess := []string{parentDirectoryPath} |
|
|
dirsToProcess := []string{parentDirectoryPath} |
|
|
ctx := context.Background() |
|
|
|
|
|
|
|
|
|
|
|
for len(dirsToProcess) > 0 { |
|
|
for len(dirsToProcess) > 0 { |
|
|
dir := dirsToProcess[0] |
|
|
dir := dirsToProcess[0] |
|
|
dirsToProcess = dirsToProcess[1:] |
|
|
dirsToProcess = dirsToProcess[1:] |
|
|
|
|
|
|
|
|
if listErr := filer_pb.SeaweedList(ctx, client, dir, "", func(entry *filer_pb.Entry, isLast bool) error { |
|
|
|
|
|
if entry.IsDirectory { |
|
|
|
|
|
// Add subdirectory to queue for processing
|
|
|
|
|
|
dirsToProcess = append(dirsToProcess, string(util.NewFullPath(dir, entry.Name))) |
|
|
|
|
|
return nil |
|
|
|
|
|
} |
|
|
|
|
|
if entry.Attributes == nil { |
|
|
|
|
|
entry.Attributes = &filer_pb.FuseAttributes{} |
|
|
|
|
|
} |
|
|
|
|
|
if entry.Extended == nil { |
|
|
|
|
|
entry.Extended = make(map[string][]byte) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Check if both TTL and S3 expiry flag are already set correctly
|
|
|
|
|
|
flagAlreadySet := string(entry.Extended[s3_constants.SeaweedFSExpiresS3]) == "true" |
|
|
|
|
|
if entry.Attributes.TtlSec == ttlSec && flagAlreadySet { |
|
|
|
|
|
return nil |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Set the S3 expiry flag
|
|
|
|
|
|
entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true") |
|
|
|
|
|
// Update TTL if needed
|
|
|
|
|
|
if entry.Attributes.TtlSec != ttlSec { |
|
|
|
|
|
entry.Attributes.TtlSec = ttlSec |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if err := filer_pb.UpdateEntry(ctx, client, &filer_pb.UpdateEntryRequest{ |
|
|
|
|
|
Directory: dir, |
|
|
|
|
|
Entry: entry, |
|
|
|
|
|
}); err != nil { |
|
|
|
|
|
updateErrors = append(updateErrors, fmt.Errorf("file %s/%s: %w", dir, entry.Name, err)) |
|
|
|
|
|
} |
|
|
|
|
|
return nil |
|
|
|
|
|
}, "", false, math.MaxInt32); listErr != nil { |
|
|
|
|
|
updateErrors = append(updateErrors, fmt.Errorf("list entries in %s: %w", dir, listErr)) |
|
|
|
|
|
|
|
|
// Process directory in paginated batches
|
|
|
|
|
|
if err := s3a.processDirectoryTTL(ctx, client, dir, ttlSec, &dirsToProcess, &updateErrors); err != nil { |
|
|
|
|
|
updateErrors = append(updateErrors, err) |
|
|
} |
|
|
} |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
@ -168,6 +135,85 @@ func (s3a *S3ApiServer) updateEntriesTTL(parentDirectoryPath string, ttlSec int3 |
|
|
}) |
|
|
}) |
|
|
} |
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// processDirectoryTTL processes a single directory in paginated batches
|
|
|
|
|
|
func (s3a *S3ApiServer) processDirectoryTTL(ctx context.Context, client filer_pb.SeaweedFilerClient, |
|
|
|
|
|
dir string, ttlSec int32, dirsToProcess *[]string, updateErrors *[]error) error { |
|
|
|
|
|
|
|
|
|
|
|
const batchSize = 1024 // Same as filer.PaginationSize
|
|
|
|
|
|
startFrom := "" |
|
|
|
|
|
|
|
|
|
|
|
for { |
|
|
|
|
|
lastEntryName, entryCount, err := s3a.processTTLBatch(ctx, client, dir, ttlSec, startFrom, batchSize, dirsToProcess, updateErrors) |
|
|
|
|
|
if err != nil { |
|
|
|
|
|
return fmt.Errorf("list entries in %s: %w", dir, err) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// If we got fewer entries than batch size, we've reached the end
|
|
|
|
|
|
if entryCount < batchSize { |
|
|
|
|
|
break |
|
|
|
|
|
} |
|
|
|
|
|
startFrom = lastEntryName |
|
|
|
|
|
} |
|
|
|
|
|
return nil |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// processTTLBatch processes a single batch of entries
|
|
|
|
|
|
func (s3a *S3ApiServer) processTTLBatch(ctx context.Context, client filer_pb.SeaweedFilerClient, |
|
|
|
|
|
dir string, ttlSec int32, startFrom string, batchSize uint32, |
|
|
|
|
|
dirsToProcess *[]string, updateErrors *[]error) (lastEntry string, count int, err error) { |
|
|
|
|
|
|
|
|
|
|
|
err = filer_pb.SeaweedList(ctx, client, dir, "", func(entry *filer_pb.Entry, isLast bool) error { |
|
|
|
|
|
lastEntry = entry.Name |
|
|
|
|
|
count++ |
|
|
|
|
|
|
|
|
|
|
|
if entry.IsDirectory { |
|
|
|
|
|
*dirsToProcess = append(*dirsToProcess, string(util.NewFullPath(dir, entry.Name))) |
|
|
|
|
|
return nil |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Update entry TTL and S3 expiry flag
|
|
|
|
|
|
if updateErr := s3a.updateEntryTTL(ctx, client, dir, entry, ttlSec); updateErr != nil { |
|
|
|
|
|
*updateErrors = append(*updateErrors, updateErr) |
|
|
|
|
|
} |
|
|
|
|
|
return nil |
|
|
|
|
|
}, startFrom, false, batchSize) |
|
|
|
|
|
|
|
|
|
|
|
return lastEntry, count, err |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// updateEntryTTL updates a single entry's TTL and S3 expiry flag
|
|
|
|
|
|
func (s3a *S3ApiServer) updateEntryTTL(ctx context.Context, client filer_pb.SeaweedFilerClient, |
|
|
|
|
|
dir string, entry *filer_pb.Entry, ttlSec int32) error { |
|
|
|
|
|
|
|
|
|
|
|
if entry.Attributes == nil { |
|
|
|
|
|
entry.Attributes = &filer_pb.FuseAttributes{} |
|
|
|
|
|
} |
|
|
|
|
|
if entry.Extended == nil { |
|
|
|
|
|
entry.Extended = make(map[string][]byte) |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Check if both TTL and S3 expiry flag are already set correctly
|
|
|
|
|
|
flagAlreadySet := string(entry.Extended[s3_constants.SeaweedFSExpiresS3]) == "true" |
|
|
|
|
|
if entry.Attributes.TtlSec == ttlSec && flagAlreadySet { |
|
|
|
|
|
return nil // Already up to date
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
// Set the S3 expiry flag
|
|
|
|
|
|
entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true") |
|
|
|
|
|
// Update TTL if needed
|
|
|
|
|
|
if entry.Attributes.TtlSec != ttlSec { |
|
|
|
|
|
entry.Attributes.TtlSec = ttlSec |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
if err := filer_pb.UpdateEntry(ctx, client, &filer_pb.UpdateEntryRequest{ |
|
|
|
|
|
Directory: dir, |
|
|
|
|
|
Entry: entry, |
|
|
|
|
|
}); err != nil { |
|
|
|
|
|
return fmt.Errorf("file %s/%s: %w", dir, entry.Name, err) |
|
|
|
|
|
} |
|
|
|
|
|
return nil |
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
func (s3a *S3ApiServer) getCollectionName(bucket string) string { |
|
|
func (s3a *S3ApiServer) getCollectionName(bucket string) string { |
|
|
if s3a.option.FilerGroup != "" { |
|
|
if s3a.option.FilerGroup != "" { |
|
|
return fmt.Sprintf("%s_%s", s3a.option.FilerGroup, bucket) |
|
|
return fmt.Sprintf("%s_%s", s3a.option.FilerGroup, bucket) |
|
|
|