Browse Source

use iterative approach with a queue to avoid recursive WithFilerClient calls

pull/7426/head
chrislu 2 months ago
parent
commit
629b520edf
  1. 62
      weed/s3api/filer_util.go

62
weed/s3api/filer_util.go

@ -112,43 +112,51 @@ func (s3a *S3ApiServer) updateEntry(parentDirectoryPath string, newEntry *filer_
} }
func (s3a *S3ApiServer) updateEntriesTTL(parentDirectoryPath string, ttlSec int32) error { func (s3a *S3ApiServer) updateEntriesTTL(parentDirectoryPath string, ttlSec int32) error {
var updateErrors []error
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
// Use iterative approach with a queue to avoid recursive WithFilerClient calls
// which would create a new connection for each subdirectory
return s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
var updateErrors []error
dirsToProcess := []string{parentDirectoryPath}
ctx := context.Background() ctx := context.Background()
if listErr := filer_pb.SeaweedList(ctx, client, parentDirectoryPath, "", func(entry *filer_pb.Entry, isLast bool) error {
if entry.IsDirectory {
if err := s3a.updateEntriesTTL(fmt.Sprintf("%s/%s", strings.TrimRight(parentDirectoryPath, "/"), entry.Name), ttlSec); err != nil {
updateErrors = append(updateErrors, fmt.Errorf("dir %s: %w", entry.Name, err))
for len(dirsToProcess) > 0 {
dir := dirsToProcess[0]
dirsToProcess = dirsToProcess[1:]
if listErr := filer_pb.SeaweedList(ctx, client, dir, "", func(entry *filer_pb.Entry, isLast bool) error {
if entry.IsDirectory {
// Add subdirectory to queue for processing
dirsToProcess = append(dirsToProcess, fmt.Sprintf("%s/%s", strings.TrimRight(dir, "/"), entry.Name))
return nil
}
if entry.Attributes == nil {
entry.Attributes = &filer_pb.FuseAttributes{}
}
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true")
if entry.Attributes.TtlSec == ttlSec {
return nil
}
entry.Attributes.TtlSec = ttlSec
if err := filer_pb.UpdateEntry(ctx, client, &filer_pb.UpdateEntryRequest{
Directory: dir,
Entry: entry,
}); err != nil {
updateErrors = append(updateErrors, fmt.Errorf("file %s/%s: %w", dir, entry.Name, err))
} }
return nil return nil
}, "", false, math.MaxInt32); listErr != nil {
updateErrors = append(updateErrors, fmt.Errorf("list entries in %s: %w", dir, listErr))
} }
if entry.Attributes == nil {
entry.Attributes = &filer_pb.FuseAttributes{}
}
if entry.Extended == nil {
entry.Extended = make(map[string][]byte)
}
entry.Extended[s3_constants.SeaweedFSExpiresS3] = []byte("true")
if entry.Attributes.TtlSec == ttlSec {
return nil
}
entry.Attributes.TtlSec = ttlSec
if err := filer_pb.UpdateEntry(ctx, client, &filer_pb.UpdateEntryRequest{
Directory: parentDirectoryPath,
Entry: entry,
}); err != nil {
updateErrors = append(updateErrors, fmt.Errorf("file %s: %w", entry.Name, err))
}
return nil
}, "", false, math.MaxInt32); listErr != nil {
return fmt.Errorf("list entries in %s: %w", parentDirectoryPath, listErr)
} }
if len(updateErrors) > 0 { if len(updateErrors) > 0 {
return fmt.Errorf("failed to update %d entries: %v", len(updateErrors), updateErrors[0]) return fmt.Errorf("failed to update %d entries: %v", len(updateErrors), updateErrors[0])
} }
return nil return nil
}) })
return err
} }
func (s3a *S3ApiServer) getCollectionName(bucket string) string { func (s3a *S3ApiServer) getCollectionName(bucket string) string {

Loading…
Cancel
Save