diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 042400c74..ecca13f7a 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -25,6 +25,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/util" "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" "github.com/seaweedfs/seaweedfs/weed/wdclient" + "golang.org/x/sync/singleflight" ) const ( @@ -54,6 +55,7 @@ type Filer struct { Signature int32 FilerConf *FilerConf RemoteStorage *FilerRemoteStorage + lazyFetchGroup singleflight.Group Dlm *lock_manager.DistributedLockManager MaxFilenameLength uint32 deletionQuit chan struct{} @@ -375,6 +377,14 @@ func (f *Filer) FindEntry(ctx context.Context, p util.FullPath) (entry *Entry, e } } + if entry == nil && (err == nil || errors.Is(err, filer_pb.ErrNotFound)) { + if lazy, lazyErr := f.maybeLazyFetchFromRemote(ctx, p); lazyErr != nil { + glog.V(1).InfofCtx(ctx, "FindEntry lazy fetch %s: %v", p, lazyErr) + } else if lazy != nil { + return lazy, nil + } + } + return entry, err } diff --git a/weed/filer/filer_lazy_remote.go b/weed/filer/filer_lazy_remote.go new file mode 100644 index 000000000..76f48a6fd --- /dev/null +++ b/weed/filer/filer_lazy_remote.go @@ -0,0 +1,116 @@ +package filer + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb" + "github.com/seaweedfs/seaweedfs/weed/remote_storage" + "github.com/seaweedfs/seaweedfs/weed/util" +) + +type lazyFetchContextKey struct{} + +// maybeLazyFetchFromRemote is called by FindEntry when the store returns no +// entry for p. If p is under a remote-storage mount, it stats the remote +// object, builds a filer Entry from the result, and persists it via +// CreateEntry with SkipCheckParentDirectory so phantom parent directories +// under the mount are not required. +// +// On a CreateEntry failure after a successful StatFile the in-memory entry is +// still returned (availability over consistency); the singleflight key is +// forgotten so the next lookup retries the filer write. +// +// Returns nil without error when: p is not under a remote mount; the remote +// reports the object does not exist; or any other remote error occurs. +func (f *Filer) maybeLazyFetchFromRemote(ctx context.Context, p util.FullPath) (*Entry, error) { + // Prevent recursive invocation: CreateEntry calls FindEntry, which would + // re-enter this function and deadlock on the singleflight key. + if ctx.Value(lazyFetchContextKey{}) != nil { + return nil, nil + } + + if f.RemoteStorage == nil { + return nil, nil + } + + mountDir, remoteLoc := f.RemoteStorage.FindMountDirectory(p) + if remoteLoc == nil { + return nil, nil + } + + client, _, found := f.RemoteStorage.FindRemoteStorageClient(p) + if !found { + return nil, nil + } + + relPath := strings.TrimPrefix(string(p), string(mountDir)) + if relPath != "" && !strings.HasPrefix(relPath, "/") { + relPath = "/" + relPath + } + base := strings.TrimSuffix(remoteLoc.Path, "/") + remotePath := "/" + strings.TrimLeft(base+relPath, "/") + + objectLoc := &remote_pb.RemoteStorageLocation{ + Name: remoteLoc.Name, + Bucket: remoteLoc.Bucket, + Path: remotePath, + } + + type lazyFetchResult struct { + entry *Entry + } + + key := string(p) + val, err, _ := f.lazyFetchGroup.Do(key, func() (interface{}, error) { + remoteEntry, statErr := client.StatFile(objectLoc) + if statErr != nil { + if errors.Is(statErr, remote_storage.ErrRemoteObjectNotFound) { + glog.V(3).InfofCtx(ctx, "maybeLazyFetchFromRemote: %s not found in remote", p) + } else { + glog.Warningf("maybeLazyFetchFromRemote: stat %s failed: %v", p, statErr) + } + return lazyFetchResult{nil}, nil + } + if remoteEntry == nil { + glog.V(3).InfofCtx(ctx, "maybeLazyFetchFromRemote: %s StatFile returned nil entry", p) + return lazyFetchResult{nil}, nil + } + + mtime := time.Unix(remoteEntry.RemoteMtime, 0) + entry := &Entry{ + FullPath: p, + Attr: Attr{ + Mtime: mtime, + Crtime: mtime, + Mode: 0644, + FileSize: uint64(remoteEntry.RemoteSize), + }, + Remote: remoteEntry, + } + + persistBaseCtx, cancelPersist := context.WithTimeout(context.Background(), 30*time.Second) + defer cancelPersist() + persistCtx := context.WithValue(persistBaseCtx, lazyFetchContextKey{}, true) + saveErr := f.CreateEntry(persistCtx, entry, false, false, nil, true, f.MaxFilenameLength) + if saveErr != nil { + glog.Warningf("maybeLazyFetchFromRemote: failed to persist filer entry for %s: %v", p, saveErr) + f.lazyFetchGroup.Forget(key) + } + + return lazyFetchResult{entry}, nil + }) + if err != nil { + return nil, err + } + + result, ok := val.(lazyFetchResult) + if !ok { + return nil, fmt.Errorf("maybeLazyFetchFromRemote: unexpected singleflight result type %T for %s", val, p) + } + return result.entry, nil +} diff --git a/weed/filer/filer_lazy_remote_test.go b/weed/filer/filer_lazy_remote_test.go new file mode 100644 index 000000000..2c5dcd60f --- /dev/null +++ b/weed/filer/filer_lazy_remote_test.go @@ -0,0 +1,370 @@ +package filer + +import ( + "context" + "errors" + "fmt" + "io" + "sync" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/cluster" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb" + "github.com/seaweedfs/seaweedfs/weed/remote_storage" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/log_buffer" + "github.com/seaweedfs/seaweedfs/weed/wdclient" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// --- minimal FilerStore stub --- + +type stubFilerStore struct { + mu sync.Mutex + entries map[string]*Entry + insertErr error +} + +func newStubFilerStore() *stubFilerStore { + return &stubFilerStore{entries: make(map[string]*Entry)} +} + +func (s *stubFilerStore) GetName() string { return "stub" } +func (s *stubFilerStore) Initialize(util.Configuration, string) error { return nil } +func (s *stubFilerStore) Shutdown() {} +func (s *stubFilerStore) BeginTransaction(ctx context.Context) (context.Context, error) { + return ctx, nil +} +func (s *stubFilerStore) CommitTransaction(context.Context) error { return nil } +func (s *stubFilerStore) RollbackTransaction(context.Context) error { return nil } +func (s *stubFilerStore) KvPut(context.Context, []byte, []byte) error { return nil } +func (s *stubFilerStore) KvGet(context.Context, []byte) ([]byte, error) { + return nil, ErrKvNotFound +} +func (s *stubFilerStore) KvDelete(context.Context, []byte) error { return nil } +func (s *stubFilerStore) DeleteFolderChildren(context.Context, util.FullPath) error { return nil } +func (s *stubFilerStore) ListDirectoryEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, eachEntryFunc ListEachEntryFunc) (string, error) { + return "", nil +} +func (s *stubFilerStore) ListDirectoryPrefixedEntries(ctx context.Context, dirPath util.FullPath, startFileName string, includeStartFile bool, limit int64, prefix string, eachEntryFunc ListEachEntryFunc) (string, error) { + return "", nil +} + +func (s *stubFilerStore) InsertEntry(_ context.Context, entry *Entry) error { + s.mu.Lock() + defer s.mu.Unlock() + if s.insertErr != nil { + return s.insertErr + } + s.entries[string(entry.FullPath)] = entry + return nil +} + +func (s *stubFilerStore) UpdateEntry(_ context.Context, entry *Entry) error { + s.mu.Lock() + defer s.mu.Unlock() + s.entries[string(entry.FullPath)] = entry + return nil +} + +func (s *stubFilerStore) FindEntry(_ context.Context, p util.FullPath) (*Entry, error) { + s.mu.Lock() + defer s.mu.Unlock() + if e, ok := s.entries[string(p)]; ok { + return e, nil + } + return nil, filer_pb.ErrNotFound +} + +func (s *stubFilerStore) DeleteEntry(_ context.Context, p util.FullPath) error { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.entries, string(p)) + return nil +} + +// --- minimal RemoteStorageClient stub --- + +type stubRemoteClient struct { + statResult *filer_pb.RemoteEntry + statErr error +} + +func (c *stubRemoteClient) StatFile(*remote_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) { + return c.statResult, c.statErr +} +func (c *stubRemoteClient) Traverse(*remote_pb.RemoteStorageLocation, remote_storage.VisitFunc) error { + return nil +} +func (c *stubRemoteClient) ReadFile(*remote_pb.RemoteStorageLocation, int64, int64) ([]byte, error) { + return nil, nil +} +func (c *stubRemoteClient) WriteDirectory(*remote_pb.RemoteStorageLocation, *filer_pb.Entry) error { + return nil +} +func (c *stubRemoteClient) RemoveDirectory(*remote_pb.RemoteStorageLocation) error { return nil } +func (c *stubRemoteClient) WriteFile(*remote_pb.RemoteStorageLocation, *filer_pb.Entry, io.Reader) (*filer_pb.RemoteEntry, error) { + return nil, nil +} +func (c *stubRemoteClient) UpdateFileMetadata(*remote_pb.RemoteStorageLocation, *filer_pb.Entry, *filer_pb.Entry) error { + return nil +} +func (c *stubRemoteClient) DeleteFile(*remote_pb.RemoteStorageLocation) error { return nil } +func (c *stubRemoteClient) ListBuckets() ([]*remote_storage.Bucket, error) { return nil, nil } +func (c *stubRemoteClient) CreateBucket(string) error { return nil } +func (c *stubRemoteClient) DeleteBucket(string) error { return nil } + +// --- stub RemoteStorageClientMaker --- + +type stubClientMaker struct { + client remote_storage.RemoteStorageClient +} + +func (m *stubClientMaker) Make(*remote_pb.RemoteConf) (remote_storage.RemoteStorageClient, error) { + return m.client, nil +} +func (m *stubClientMaker) HasBucket() bool { return true } + +// --- test filer factory --- + +func newTestFiler(t *testing.T, store *stubFilerStore, rs *FilerRemoteStorage) *Filer { + t.Helper() + dialOption := grpc.WithTransportCredentials(insecure.NewCredentials()) + mc := wdclient.NewMasterClient( + dialOption, "test", cluster.FilerType, + pb.ServerAddress("localhost:0"), "", "", + *pb.NewServiceDiscoveryFromMap(map[string]pb.ServerAddress{}), + ) + f := &Filer{ + RemoteStorage: rs, + Store: NewFilerStoreWrapper(store), + MaxFilenameLength: 255, + MasterClient: mc, + fileIdDeletionQueue: util.NewUnboundedQueue(), + LocalMetaLogBuffer: log_buffer.NewLogBuffer("test", time.Minute, + func(*log_buffer.LogBuffer, time.Time, time.Time, []byte, int64, int64) {}, nil, func() {}), + } + return f +} + +// registerStubMaker registers a stub RemoteStorageClientMaker for the given +// type string and returns a cleanup function that restores the previous maker. +func registerStubMaker(t *testing.T, storageType string, client remote_storage.RemoteStorageClient) func() { + t.Helper() + prev := remote_storage.RemoteStorageClientMakers[storageType] + remote_storage.RemoteStorageClientMakers[storageType] = &stubClientMaker{client: client} + return func() { + if prev != nil { + remote_storage.RemoteStorageClientMakers[storageType] = prev + } else { + delete(remote_storage.RemoteStorageClientMakers, storageType) + } + } +} + +// --- tests --- + +func TestMaybeLazyFetchFromRemote_HitsRemoteAndPersists(t *testing.T) { + const storageType = "stub_lazy_hit" + stub := &stubRemoteClient{ + statResult: &filer_pb.RemoteEntry{RemoteMtime: 1700000000, RemoteSize: 1234}, + } + defer registerStubMaker(t, storageType, stub)() + + conf := &remote_pb.RemoteConf{Name: "mystore", Type: storageType} + rs := NewFilerRemoteStorage() + rs.storageNameToConf[conf.Name] = conf + rs.mapDirectoryToRemoteStorage("/buckets/mybucket", &remote_pb.RemoteStorageLocation{ + Name: "mystore", + Bucket: "mybucket", + Path: "/", + }) + + store := newStubFilerStore() + f := newTestFiler(t, store, rs) + + entry, err := f.maybeLazyFetchFromRemote(context.Background(), "/buckets/mybucket/file.txt") + require.NoError(t, err) + require.NotNil(t, entry) + assert.Equal(t, util.FullPath("/buckets/mybucket/file.txt"), entry.FullPath) + assert.Equal(t, int64(1234), entry.Remote.RemoteSize) + assert.Equal(t, uint64(1234), entry.FileSize) + + // entry must have been persisted in the store + stored, sErr := store.FindEntry(context.Background(), "/buckets/mybucket/file.txt") + require.NoError(t, sErr) + assert.Equal(t, int64(1234), stored.Remote.RemoteSize) +} + +func TestMaybeLazyFetchFromRemote_NotUnderMount(t *testing.T) { + rs := NewFilerRemoteStorage() + store := newStubFilerStore() + f := newTestFiler(t, store, rs) + + entry, err := f.maybeLazyFetchFromRemote(context.Background(), "/not/a/mounted/path.txt") + require.NoError(t, err) + assert.Nil(t, entry) +} + +func TestMaybeLazyFetchFromRemote_RemoteObjectNotFound(t *testing.T) { + const storageType = "stub_lazy_notfound" + stub := &stubRemoteClient{statErr: remote_storage.ErrRemoteObjectNotFound} + defer registerStubMaker(t, storageType, stub)() + + conf := &remote_pb.RemoteConf{Name: "storenotfound", Type: storageType} + rs := NewFilerRemoteStorage() + rs.storageNameToConf[conf.Name] = conf + rs.mapDirectoryToRemoteStorage("/buckets/mybucket", &remote_pb.RemoteStorageLocation{ + Name: "storenotfound", + Bucket: "mybucket", + Path: "/", + }) + + store := newStubFilerStore() + f := newTestFiler(t, store, rs) + + entry, err := f.maybeLazyFetchFromRemote(context.Background(), "/buckets/mybucket/missing.txt") + require.NoError(t, err) + assert.Nil(t, entry) +} + +func TestMaybeLazyFetchFromRemote_CreateEntryFailureReturnsInMemoryEntry(t *testing.T) { + const storageType = "stub_lazy_saveerr" + stub := &stubRemoteClient{ + statResult: &filer_pb.RemoteEntry{RemoteMtime: 1700000000, RemoteSize: 42}, + } + defer registerStubMaker(t, storageType, stub)() + + conf := &remote_pb.RemoteConf{Name: "storesaveerr", Type: storageType} + rs := NewFilerRemoteStorage() + rs.storageNameToConf[conf.Name] = conf + rs.mapDirectoryToRemoteStorage("/buckets/mybucket", &remote_pb.RemoteStorageLocation{ + Name: "storesaveerr", + Bucket: "mybucket", + Path: "/", + }) + + store := newStubFilerStore() + store.insertErr = errors.New("simulated store failure") + f := newTestFiler(t, store, rs) + + // even with a store failure, the in-memory entry should be returned + entry, err := f.maybeLazyFetchFromRemote(context.Background(), "/buckets/mybucket/failfile.txt") + require.NoError(t, err) + require.NotNil(t, entry, "should return in-memory entry even when CreateEntry fails") + assert.Equal(t, int64(42), entry.Remote.RemoteSize) +} + +func TestMaybeLazyFetchFromRemote_LongestPrefixMount(t *testing.T) { + // Register maker for the root mount + const typeRoot = "stub_lp_root" + stubRoot := &stubRemoteClient{statResult: &filer_pb.RemoteEntry{RemoteMtime: 1, RemoteSize: 10}} + defer registerStubMaker(t, typeRoot, stubRoot)() + + // Register maker for the prefix mount + const typePrefix = "stub_lp_prefix" + stubPrefix := &stubRemoteClient{statResult: &filer_pb.RemoteEntry{RemoteMtime: 2, RemoteSize: 20}} + defer registerStubMaker(t, typePrefix, stubPrefix)() + + rs := NewFilerRemoteStorage() + rs.storageNameToConf["rootstore"] = &remote_pb.RemoteConf{Name: "rootstore", Type: typeRoot} + rs.storageNameToConf["prefixstore"] = &remote_pb.RemoteConf{Name: "prefixstore", Type: typePrefix} + + rs.mapDirectoryToRemoteStorage("/buckets/mybucket", &remote_pb.RemoteStorageLocation{ + Name: "rootstore", Bucket: "root-bucket", Path: "/", + }) + rs.mapDirectoryToRemoteStorage("/buckets/mybucket/prefix", &remote_pb.RemoteStorageLocation{ + Name: "prefixstore", Bucket: "prefix-bucket", Path: "/", + }) + + store := newStubFilerStore() + f := newTestFiler(t, store, rs) + + // path under root mount only + entryRoot, err := f.maybeLazyFetchFromRemote(context.Background(), "/buckets/mybucket/file.txt") + require.NoError(t, err) + require.NotNil(t, entryRoot) + assert.Equal(t, int64(10), entryRoot.Remote.RemoteSize, "root mount should be used") + + // path under nested (longer) mount — must prefer the longer prefix + entryPrefix, err := f.maybeLazyFetchFromRemote(context.Background(), "/buckets/mybucket/prefix/file.txt") + require.NoError(t, err) + require.NotNil(t, entryPrefix) + assert.Equal(t, int64(20), entryPrefix.Remote.RemoteSize, "nested mount should win (longest prefix)") +} + +type countingRemoteClient struct { + stubRemoteClient + statCalls int +} + +func (c *countingRemoteClient) StatFile(loc *remote_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) { + c.statCalls++ + return c.stubRemoteClient.StatFile(loc) +} + +func TestMaybeLazyFetchFromRemote_ContextGuardPreventsRecursion(t *testing.T) { + const storageType = "stub_lazy_guard" + countingStub := &countingRemoteClient{ + stubRemoteClient: stubRemoteClient{ + statResult: &filer_pb.RemoteEntry{RemoteMtime: 1, RemoteSize: 1}, + }, + } + defer registerStubMaker(t, storageType, countingStub)() + + conf := &remote_pb.RemoteConf{Name: "guardstore", Type: storageType} + rs := NewFilerRemoteStorage() + rs.storageNameToConf[conf.Name] = conf + rs.mapDirectoryToRemoteStorage("/buckets/mybucket", &remote_pb.RemoteStorageLocation{ + Name: "guardstore", + Bucket: "mybucket", + Path: "/", + }) + + store := newStubFilerStore() + f := newTestFiler(t, store, rs) + + guardCtx := context.WithValue(context.Background(), lazyFetchContextKey{}, true) + entry, err := f.maybeLazyFetchFromRemote(guardCtx, "/buckets/mybucket/file.txt") + require.NoError(t, err) + assert.Nil(t, entry) + assert.Equal(t, 0, countingStub.statCalls, "guard should prevent StatFile from being called") +} + +func TestFindEntry_LazyFetchOnMiss(t *testing.T) { + const storageType = "stub_lazy_findentry" + stub := &stubRemoteClient{ + statResult: &filer_pb.RemoteEntry{RemoteMtime: 1700000000, RemoteSize: 999}, + } + defer registerStubMaker(t, storageType, stub)() + + conf := &remote_pb.RemoteConf{Name: "findentrystore", Type: storageType} + rs := NewFilerRemoteStorage() + rs.storageNameToConf[conf.Name] = conf + rs.mapDirectoryToRemoteStorage("/buckets/mybucket", &remote_pb.RemoteStorageLocation{ + Name: "findentrystore", + Bucket: "mybucket", + Path: "/", + }) + + store := newStubFilerStore() + f := newTestFiler(t, store, rs) + + // First lookup: store miss → lazy fetch + entry, err := f.FindEntry(context.Background(), "/buckets/mybucket/obj.txt") + require.NoError(t, err, fmt.Sprintf("unexpected err: %v", err)) + require.NotNil(t, entry) + assert.Equal(t, uint64(999), entry.FileSize) + + // Second lookup: now in store, no remote call needed + entry2, err2 := f.FindEntry(context.Background(), "/buckets/mybucket/obj.txt") + require.NoError(t, err2) + require.NotNil(t, entry2) + assert.Equal(t, uint64(999), entry2.FileSize) +} diff --git a/weed/filer/remote_storage.go b/weed/filer/remote_storage.go index e52cc3ff3..73ef4c052 100644 --- a/weed/filer/remote_storage.go +++ b/weed/filer/remote_storage.go @@ -82,6 +82,9 @@ func (rs *FilerRemoteStorage) mapDirectoryToRemoteStorage(dir util.FullPath, loc rs.rules.Put([]byte(dir+"/"), loc) } +// FindMountDirectory returns the mount directory and location for p. When multiple +// mounts match (e.g. /buckets/b and /buckets/b/prefix), ptrie MatchPrefix visits +// shorter prefixes first, so the last match is the longest prefix. func (rs *FilerRemoteStorage) FindMountDirectory(p util.FullPath) (mountDir util.FullPath, remoteLocation *remote_pb.RemoteStorageLocation) { rs.rules.MatchPrefix([]byte(p), func(key []byte, value *remote_pb.RemoteStorageLocation) bool { mountDir = util.FullPath(string(key[:len(key)-1])) diff --git a/weed/filer/remote_storage_test.go b/weed/filer/remote_storage_test.go index 57c08634d..d71527e7e 100644 --- a/weed/filer/remote_storage_test.go +++ b/weed/filer/remote_storage_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb" + "github.com/seaweedfs/seaweedfs/weed/util" "github.com/stretchr/testify/assert" ) @@ -33,3 +34,37 @@ func TestFilerRemoteStorage_FindRemoteStorageClient(t *testing.T) { _, _, found4 := rs.FindRemoteStorageClient("/a/b/cc") assert.Equal(t, false, found4, "should not find storage client") } + +func TestFilerRemoteStorage_FindMountDirectory_LongestPrefixWins(t *testing.T) { + conf := &remote_pb.RemoteConf{Name: "store", Type: "s3"} + rs := NewFilerRemoteStorage() + rs.storageNameToConf[conf.Name] = conf + + rs.mapDirectoryToRemoteStorage("/buckets/mybucket", &remote_pb.RemoteStorageLocation{ + Name: "store", + Bucket: "bucket-root", + Path: "/", + }) + rs.mapDirectoryToRemoteStorage("/buckets/mybucket/prefix", &remote_pb.RemoteStorageLocation{ + Name: "store", + Bucket: "bucket-prefix", + Path: "/", + }) + + tests := []struct { + path string + wantMount string + wantBucket string + }{ + {"/buckets/mybucket/file.txt", "/buckets/mybucket", "bucket-root"}, + {"/buckets/mybucket/prefix/file.txt", "/buckets/mybucket/prefix", "bucket-prefix"}, + {"/buckets/mybucket/prefix/sub/file.txt", "/buckets/mybucket/prefix", "bucket-prefix"}, + } + for _, tt := range tests { + mountDir, loc := rs.FindMountDirectory(util.FullPath(tt.path)) + assert.Equal(t, util.FullPath(tt.wantMount), mountDir, "mount dir for %s", tt.path) + if assert.NotNil(t, loc, "location for %s", tt.path) { + assert.Equal(t, tt.wantBucket, loc.Bucket, "bucket for %s", tt.path) + } + } +}