Browse Source

mount: improve EnsureVisited performance with dedup, parallelism, and batching (#7697)

* mount: add singleflight to deduplicate concurrent EnsureVisited calls

When multiple goroutines access the same uncached directory simultaneously,
they would all make redundant network requests to the filer. This change
uses singleflight.Group to ensure only one goroutine fetches the directory
entries while others wait for the result.

This fixes a race condition where concurrent lookups or readdir operations
on the same uncached directory would:
1. Make duplicate network requests to the filer
2. Insert duplicate entries into LevelDB cache
3. Waste CPU and network bandwidth

* mount: fetch parent directories in parallel during EnsureVisited

Previously, when accessing a deep path like /a/b/c/d, the parent directories
were fetched serially from target to root. This change:

1. Collects all uncached directories from target to root first
2. Fetches them all in parallel using errgroup
3. Relies on singleflight (from previous commit) for deduplication

This reduces latency when accessing deep uncached paths, especially in
high-latency network environments where parallel requests can significantly
improve performance.

* mount: add batch inserts for LevelDB meta cache

When populating the meta cache from filer, entries were inserted one-by-one
into LevelDB. This change:

1. Adds BatchInsertEntries method to LevelDBStore that uses LevelDB's
   native batch write API
2. Updates MetaCache to keep a direct reference to the LevelDB store
   for batch operations
3. Modifies doEnsureVisited to collect entries and insert them in
   batches of 100 entries

Batch writes are more efficient because:
- Reduces number of individual write operations
- Reduces disk syncs
- Improves throughput for large directories

* mount: fix potential nil dereference in MarkChildrenCached

Add missing check for inode existence in inode2path map before accessing
the InodeEntry. This prevents a potential nil pointer dereference if the
inode exists in path2inode but not in inode2path (which could happen due
to race conditions or bugs).

This follows the same pattern used in IsChildrenCached which properly
checks for existence before accessing the entry.

* mount: fix batch flush when last entry is hidden

The previous batch insert implementation relied on the isLast flag to flush
remaining entries. However, if the last entry is a hidden system entry
(like 'topics' or 'etc' in root), the callback returns early and the
remaining entries in the batch are never flushed.

Fix by:
1. Only flush when batch reaches threshold inside the callback
2. Flush any remaining entries after ReadDirAllEntries completes
3. Use error wrapping instead of logging+returning to avoid duplicate logs
4. Create new slice after flush to allow GC of flushed entries
5. Add documentation for batchInsertSize constant

This ensures all entries are properly inserted regardless of whether
the last entry is hidden, and prevents memory retention issues.

* mount: add context support for cancellation in EnsureVisited

Thread context.Context through the batch insert call chain to enable
proper cancellation and timeout support:

1. Use errgroup.WithContext() so if one fetch fails, others are cancelled
2. Add context parameter to BatchInsertEntries for consistency with InsertEntry
3. Pass context to ReadDirAllEntries for cancellation during network calls
4. Check context cancellation before starting work in doEnsureVisited
5. Use %w for error wrapping to preserve error types for inspection

This prevents unnecessary work when one directory fetch fails and makes
the batch operations consistent with the existing context-aware APIs.
pull/7698/head
Chris Lu 4 weeks ago
committed by GitHub
parent
commit
0cd9f34177
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 33
      weed/filer/leveldb/leveldb_store.go
  2. 4
      weed/mount/inode_to_path.go
  3. 22
      weed/mount/meta_cache/meta_cache.go
  4. 103
      weed/mount/meta_cache/meta_cache_init.go

33
weed/filer/leveldb/leveldb_store.go

@ -107,6 +107,39 @@ func (store *LevelDBStore) UpdateEntry(ctx context.Context, entry *filer.Entry)
return store.InsertEntry(ctx, entry)
}
// BatchInsertEntries inserts multiple entries in a single LevelDB batch write.
// This is more efficient than inserting entries one by one as it reduces
// the number of write operations and syncs to disk.
func (store *LevelDBStore) BatchInsertEntries(ctx context.Context, entries []*filer.Entry) error {
if len(entries) == 0 {
return nil
}
batch := new(leveldb.Batch)
for _, entry := range entries {
key := genKey(entry.DirAndName())
value, err := entry.EncodeAttributesAndChunks()
if err != nil {
return fmt.Errorf("encoding %s %+v: %w", entry.FullPath, entry.Attr, err)
}
if len(entry.GetChunks()) > filer.CountEntryChunksForGzip {
value = weed_util.MaybeGzipData(value)
}
batch.Put(key, value)
}
err := store.db.Write(batch, nil)
if err != nil {
return fmt.Errorf("batch write: %w", err)
}
return nil
}
func (store *LevelDBStore) FindEntry(ctx context.Context, fullpath weed_util.FullPath) (entry *filer.Entry, err error) {
key := genKey(fullpath.DirAndName())

4
weed/mount/inode_to_path.go

@ -160,6 +160,10 @@ func (i *InodeToPath) MarkChildrenCached(fullpath util.FullPath) {
return
}
path, found := i.inode2path[inode]
if !found {
glog.Warningf("MarkChildrenCached inode %d not found in inode2path for %v", inode, fullpath)
return
}
path.isChildrenCached = true
if i.cacheMetaTtlSec > 0 {
path.cachedExpiresTime = time.Now().Add(i.cacheMetaTtlSec)

22
weed/mount/meta_cache/meta_cache.go

@ -6,6 +6,8 @@ import (
"sync"
"time"
"golang.org/x/sync/singleflight"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/filer/leveldb"
"github.com/seaweedfs/seaweedfs/weed/glog"
@ -17,20 +19,24 @@ import (
// e.g. fill fileId field for chunks
type MetaCache struct {
root util.FullPath
localStore filer.VirtualFilerStore
root util.FullPath
localStore filer.VirtualFilerStore
leveldbStore *leveldb.LevelDBStore // direct reference for batch operations
sync.RWMutex
uidGidMapper *UidGidMapper
markCachedFn func(fullpath util.FullPath)
isCachedFn func(fullpath util.FullPath) bool
invalidateFunc func(fullpath util.FullPath, entry *filer_pb.Entry)
visitGroup singleflight.Group // deduplicates concurrent EnsureVisited calls for the same path
}
func NewMetaCache(dbFolder string, uidGidMapper *UidGidMapper, root util.FullPath,
markCachedFn func(path util.FullPath), isCachedFn func(path util.FullPath) bool, invalidateFunc func(util.FullPath, *filer_pb.Entry)) *MetaCache {
leveldbStore, virtualStore := openMetaStore(dbFolder)
return &MetaCache{
root: root,
localStore: openMetaStore(dbFolder),
localStore: virtualStore,
leveldbStore: leveldbStore,
markCachedFn: markCachedFn,
isCachedFn: isCachedFn,
uidGidMapper: uidGidMapper,
@ -40,7 +46,7 @@ func NewMetaCache(dbFolder string, uidGidMapper *UidGidMapper, root util.FullPat
}
}
func openMetaStore(dbFolder string) filer.VirtualFilerStore {
func openMetaStore(dbFolder string) (*leveldb.LevelDBStore, filer.VirtualFilerStore) {
os.RemoveAll(dbFolder)
os.MkdirAll(dbFolder, 0755)
@ -54,7 +60,7 @@ func openMetaStore(dbFolder string) filer.VirtualFilerStore {
glog.Fatalf("Failed to initialize metadata cache store for %s: %+v", store.GetName(), err)
}
return filer.NewFilerStoreWrapper(store)
return store, filer.NewFilerStoreWrapper(store)
}
@ -68,6 +74,12 @@ func (mc *MetaCache) doInsertEntry(ctx context.Context, entry *filer.Entry) erro
return mc.localStore.InsertEntry(ctx, entry)
}
// doBatchInsertEntries inserts multiple entries using LevelDB's batch write.
// This is more efficient than inserting entries one by one.
func (mc *MetaCache) doBatchInsertEntries(ctx context.Context, entries []*filer.Entry) error {
return mc.leveldbStore.BatchInsertEntries(ctx, entries)
}
func (mc *MetaCache) AtomicUpdateEntryFromFiler(ctx context.Context, oldPath util.FullPath, newEntry *filer.Entry) error {
mc.Lock()
defer mc.Unlock()

103
weed/mount/meta_cache/meta_cache_init.go

@ -4,6 +4,8 @@ import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
@ -11,22 +13,18 @@ import (
)
func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.FullPath) error {
// Collect all uncached paths from target directory up to root
var uncachedPaths []util.FullPath
currentPath := dirPath
for {
// the directory children are already cached
// so no need for this and upper directories
// If this path is cached, all ancestors are also cached
if mc.isCachedFn(currentPath) {
return nil
}
if err := doEnsureVisited(mc, client, currentPath); err != nil {
return err
break
}
uncachedPaths = append(uncachedPaths, currentPath)
// continue to parent directory
// Continue to parent directory
if currentPath != mc.root {
parent, _ := currentPath.DirAndName()
currentPath = util.FullPath(parent)
@ -35,33 +33,82 @@ func EnsureVisited(mc *MetaCache, client filer_pb.FilerClient, dirPath util.Full
}
}
return nil
if len(uncachedPaths) == 0 {
return nil
}
// Fetch all uncached directories in parallel with context for cancellation
// If one fetch fails, cancel the others to avoid unnecessary work
g, ctx := errgroup.WithContext(context.Background())
for _, p := range uncachedPaths {
path := p // capture for closure
g.Go(func() error {
return doEnsureVisited(ctx, mc, client, path)
})
}
return g.Wait()
}
func doEnsureVisited(mc *MetaCache, client filer_pb.FilerClient, path util.FullPath) error {
// batchInsertSize is the number of entries to accumulate before flushing to LevelDB.
// 100 provides a balance between memory usage (~100 Entry pointers) and write efficiency
// (fewer disk syncs). Larger values reduce I/O overhead but increase memory and latency.
const batchInsertSize = 100
func doEnsureVisited(ctx context.Context, mc *MetaCache, client filer_pb.FilerClient, path util.FullPath) error {
// Use singleflight to deduplicate concurrent requests for the same path
_, err, _ := mc.visitGroup.Do(string(path), func() (interface{}, error) {
// Check for cancellation before starting
if ctx.Err() != nil {
return nil, ctx.Err()
}
// Double-check if already cached (another goroutine may have completed)
if mc.isCachedFn(path) {
return nil, nil
}
glog.V(4).Infof("ReadDirAllEntries %s ...", path)
glog.V(4).Infof("ReadDirAllEntries %s ...", path)
err := util.Retry("ReadDirAllEntries", func() error {
return filer_pb.ReadDirAllEntries(context.Background(), client, path, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
entry := filer.FromPbEntry(string(path), pbEntry)
if IsHiddenSystemEntry(string(path), entry.Name()) {
// Collect entries in batches for efficient LevelDB writes
var batch []*filer.Entry
fetchErr := util.Retry("ReadDirAllEntries", func() error {
batch = nil // Reset batch on retry, allow GC of previous entries
return filer_pb.ReadDirAllEntries(ctx, client, path, "", func(pbEntry *filer_pb.Entry, isLast bool) error {
entry := filer.FromPbEntry(string(path), pbEntry)
if IsHiddenSystemEntry(string(path), entry.Name()) {
return nil
}
batch = append(batch, entry)
// Flush batch when it reaches the threshold
// Don't rely on isLast here - hidden entries may cause early return
if len(batch) >= batchInsertSize {
// No lock needed - LevelDB Write() is thread-safe
if err := mc.doBatchInsertEntries(ctx, batch); err != nil {
return fmt.Errorf("batch insert for %s: %w", path, err)
}
// Create new slice to allow GC of flushed entries
batch = make([]*filer.Entry, 0, batchInsertSize)
}
return nil
}
if err := mc.doInsertEntry(context.Background(), entry); err != nil {
glog.V(0).Infof("read %s: %v", entry.FullPath, err)
return err
}
return nil
})
})
})
if err != nil {
err = fmt.Errorf("list %s: %v", path, err)
} else {
if fetchErr != nil {
return nil, fmt.Errorf("list %s: %w", path, fetchErr)
}
// Flush any remaining entries in the batch
if len(batch) > 0 {
if err := mc.doBatchInsertEntries(ctx, batch); err != nil {
return nil, fmt.Errorf("batch insert remaining for %s: %w", path, err)
}
}
mc.markCachedFn(path)
}
return nil, nil
})
return err
}

Loading…
Cancel
Save