You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

155 lines
4.9 KiB

package meta_cache
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) error {
// Collect all uncached paths from target directory up to root
var uncachedPaths []util.FullPath
currentPath := dirPath
for {
// If this path is cached, all ancestors are also cached
if mc.isCachedFn(currentPath) {
break
}
uncachedPaths = append(uncachedPaths, currentPath)
// Continue to parent directory
if currentPath != mc.root {
parent, _ := currentPath.DirAndName()
currentPath = util.FullPath(parent)
} else {
break
}
}
if len(uncachedPaths) == 0 {
return nil
}
// Fetch all uncached directories in parallel with context for cancellation
// If one fetch fails, cancel the others to avoid unnecessary work
g, ctx := errgroup.WithContext(context.Background())
for _, p := range uncachedPaths {
path := p // capture for closure
g.Go(func() error {
return doEnsureVisited(ctx, mc, client, path)
})
}
return g.Wait()
}
// batchInsertSize is the number of entries to accumulate before flushing to LevelDB.
// 100 provides a balance between memory usage (~100 Entry pointers) and write efficiency
// (fewer disk syncs). Larger values reduce I/O overhead but increase memory and latency.
const batchInsertSize = 100
func doEnsureVisited(ctx context.Context, mc *MetaCache, client filer_pb.FilerClient, path util.FullPath) error {
// Use singleflight to deduplicate concurrent requests for the same path
_, err, _ := mc.visitGroup.Do(string(path), func() (interface{}, error) {
// Check for cancellation before starting
if ctx.Err() != nil {
return nil, ctx.Err()
}
// Double-check if already cached (another goroutine may have completed)
if mc.isCachedFn(path) {
return nil, nil
}
glog.V(4).Infof("ReadDirAllEntries %s ...", path)
// Use context.Background() for build lifecycle calls so that
// errgroup cancellation of ctx doesn't cause enqueueAndWait to
// return early, which would trigger cleanupBuild while the
// operation is still queued.
if err := mc.BeginDirectoryBuild(context.Background(), path); err != nil {
return nil, fmt.Errorf("begin build %s: %w", path, err)
}
cleanupDone := false
cleanupBuild := func(reason string) {
if cleanupDone {
return
}
cleanupDone = true
if deleteErr := mc.DeleteFolderChildren(context.Background(), path); deleteErr != nil {
glog.V(2).Infof("clear %s build %s: %v", reason, path, deleteErr)
}
if abortErr := mc.AbortDirectoryBuild(context.Background(), path); abortErr != nil {
glog.V(2).Infof("abort %s build %s: %v", reason, path, abortErr)
}
}
defer func() {
if !cleanupDone && ctx.Err() != nil {
cleanupBuild("canceled")
}
}()
// Collect entries in batches for efficient LevelDB writes
var batch []*filer.Entry
var snapshotTsNs int64
fetchErr := util.Retry("ReadDirAllEntries", func() error {
batch = nil // Reset batch on retry, allow GC of previous entries
if err := mc.DeleteFolderChildren(ctx, path); err != nil {
return fmt.Errorf("clear existing entries for %s: %w", path, err)
}
var err error
snapshotTsNs, err = filer_pb.ReadDirAllEntriesWithSnapshot(ctx, client, path, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
entry := filer.FromPbEntry(string(path), pbEntry)
if IsHiddenSystemEntry(string(path), entry.Name()) {
return nil
}
batch = append(batch, entry)
// Flush batch when it reaches the threshold
// Don't rely on isLast here - hidden entries may cause early return
if len(batch) >= batchInsertSize {
// No lock needed - LevelDB Write() is thread-safe
if err := mc.doBatchInsertEntries(ctx, batch); err != nil {
return fmt.Errorf("batch insert for %s: %w", path, err)
}
// Create new slice to allow GC of flushed entries
batch = make([]*filer.Entry, 0, batchInsertSize)
}
return nil
})
return err
})
if fetchErr != nil {
cleanupBuild("failed")
return nil, fmt.Errorf("list %s: %w", path, fetchErr)
}
// Flush any remaining entries in the batch
if len(batch) > 0 {
if err := mc.doBatchInsertEntries(ctx, batch); err != nil {
cleanupBuild("incomplete")
return nil, fmt.Errorf("batch insert remaining for %s: %w", path, err)
}
}
if err := mc.CompleteDirectoryBuild(context.Background(), path, snapshotTsNs); err != nil {
cleanupBuild("unreplayed")
return nil, fmt.Errorf("complete build for %s: %w", path, err)
}
cleanupDone = true // Prevent deferred cleanup after successful publish
return nil, nil
})
return err
}
func IsHiddenSystemEntry(dir, name string) bool {
return dir == "/" && (name == "topics" || name == "etc")
}