diff --git a/weed/s3api/s3api_object_handlers_list.go b/weed/s3api/s3api_object_handlers_list.go index 22a671e67..5e060b008 100644 --- a/weed/s3api/s3api_object_handlers_list.go +++ b/weed/s3api/s3api_object_handlers_list.go @@ -120,7 +120,7 @@ func (s3a *S3ApiServer) ListObjectsV1Handler(w http.ResponseWriter, r *http.Requ bucket, _ := s3_constants.GetBucketAndObject(r) originalPrefix, marker, delimiter, encodingTypeUrl, maxKeys, allowUnordered, errCode := getListObjectsV1Args(r.URL.Query()) - glog.V(2).Infof("ListObjectsV1Handler bucket=%s prefix=%s", bucket, originalPrefix) + glog.V(2).Infof("ListObjectsV1Handler bucket=%s prefix=%s delimiter=%s maxKeys=%d", bucket, originalPrefix, delimiter, maxKeys) if errCode != s3err.ErrNone { s3err.WriteErrorResponse(w, r, errCode) @@ -203,7 +203,7 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m for { empty := true - nextMarker, doErr = s3a.doListFilerEntries(client, reqDir, prefix, cursor, marker, delimiter, false, func(dir string, entry *filer_pb.Entry) { + nextMarker, doErr = s3a.doListFilerEntries(client, reqDir, prefix, cursor, marker, delimiter, false, bucket, func(dir string, entry *filer_pb.Entry) { empty = false dirName, entryName, _ := entryUrlEncode(dir, entry.Name, encodingTypeUrl) if entry.IsDirectory { @@ -307,6 +307,7 @@ func (s3a *S3ApiServer) listFilerEntries(bucket string, originalPrefix string, m } } if !delimiterFound { + glog.V(4).Infof("Adding file to contents: %s", entryName) contents = append(contents, newListEntry(entry, "", dirName, entryName, bucketPrefix, fetchOwner, false, false, s3a.iam)) cursor.maxKeys-- lastEntryWasCommonPrefix = false @@ -439,7 +440,7 @@ func toParentAndDescendants(dirAndName string) (dir, name string) { return } -func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, cursor *ListingCursor, marker, delimiter string, inclusiveStartFrom bool, eachEntryFn func(dir string, entry *filer_pb.Entry)) (nextMarker string, err error) { +func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, dir, prefix string, cursor *ListingCursor, marker, delimiter string, inclusiveStartFrom bool, bucket string, eachEntryFn func(dir string, entry *filer_pb.Entry)) (nextMarker string, err error) { // invariants // prefix and marker should be under dir, marker may contain "/" // maxKeys should be updated for each recursion @@ -453,7 +454,7 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d if strings.Contains(marker, "/") { subDir, subMarker := toParentAndDescendants(marker) // println("doListFilerEntries dir", dir+"/"+subDir, "subMarker", subMarker) - subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+subDir, "", cursor, subMarker, delimiter, false, eachEntryFn) + subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+subDir, "", cursor, subMarker, delimiter, false, bucket, eachEntryFn) if subErr != nil { err = subErr return @@ -486,10 +487,6 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d return } - // Track .versions directories found in this directory for later processing - // Store the full entry to avoid additional getEntry calls (N+1 query optimization) - var versionsDirs []*filer_pb.Entry - for { resp, recvErr := stream.Recv() if recvErr != nil { @@ -504,7 +501,7 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d if cursor.maxKeys <= 0 { cursor.isTruncated = true - continue + break } // Set nextMarker only when we have quota to process this entry @@ -524,24 +521,42 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d continue } - // Skip .versions directories in regular list operations but track them for logical object creation - // Store the full entry to avoid additional getEntry calls later + // Process .versions directories immediately to create logical versioned object entries + // These directories are never traversed (we continue here), so each is only encountered once if strings.HasSuffix(entry.Name, s3_constants.VersionsFolder) { - glog.V(4).Infof("Found .versions directory: %s", entry.Name) - versionsDirs = append(versionsDirs, entry) + // Extract object name from .versions directory name + baseObjectName := strings.TrimSuffix(entry.Name, s3_constants.VersionsFolder) + // Construct full object path relative to bucket + bucketFullPath := s3a.option.BucketsPath + "/" + bucket + bucketRelativePath := strings.TrimPrefix(dir, bucketFullPath) + bucketRelativePath = strings.TrimPrefix(bucketRelativePath, "/") + var fullObjectPath string + if bucketRelativePath == "" { + fullObjectPath = baseObjectName + } else { + fullObjectPath = bucketRelativePath + "/" + baseObjectName + } + // Use metadata from the already-fetched .versions directory entry + if latestVersionEntry, err := s3a.getLatestVersionEntryFromDirectoryEntry(bucket, fullObjectPath, entry); err == nil { + eachEntryFn(dir, latestVersionEntry) + } else if !errors.Is(err, ErrDeleteMarker) { + // Log unexpected errors (delete markers are expected) + glog.V(2).Infof("Skipping versioned object %s due to error: %v", fullObjectPath, err) + } continue } if delimiter != "/" || cursor.prefixEndsOnDelimiter { + // When delimiter is empty (recursive mode), recurse into directories but don't add them to results + // Only files and versioned objects should appear in results if cursor.prefixEndsOnDelimiter { cursor.prefixEndsOnDelimiter = false if entry.IsDirectoryKeyObject() { eachEntryFn(dir, entry) } - } else { - eachEntryFn(dir, entry) } - subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+entry.Name, "", cursor, "", delimiter, false, eachEntryFn) + // Recurse into subdirectory - don't add the directory itself to results + subNextMarker, subErr := s3a.doListFilerEntries(client, dir+"/"+entry.Name, "", cursor, "", delimiter, false, bucket, eachEntryFn) if subErr != nil { err = fmt.Errorf("doListFilerEntries2: %w", subErr) return @@ -564,57 +579,7 @@ func (s3a *S3ApiServer) doListFilerEntries(client filer_pb.SeaweedFilerClient, d } } - // After processing all regular entries, handle versioned objects - // Create logical entries for objects that have .versions directories - // OPTIMIZATION: Use the already-fetched .versions directory entry to avoid N+1 queries - for _, versionsDir := range versionsDirs { - if cursor.maxKeys <= 0 { - cursor.isTruncated = true - break - } - - // Update nextMarker to ensure pagination advances past this .versions directory - // This is critical to prevent infinite loops when results are truncated - nextMarker = versionsDir.Name - - // Extract object name from .versions directory name (remove .versions suffix) - baseObjectName := strings.TrimSuffix(versionsDir.Name, s3_constants.VersionsFolder) - - // Construct full object path relative to bucket - // dir is something like "/buckets/sea-test-1/Veeam/Backup/vbr/Config" - // we need to get the path relative to bucket: "Veeam/Backup/vbr/Config/Owner" - bucketPath := strings.TrimPrefix(dir, s3a.option.BucketsPath+"/") - bucketName := strings.Split(bucketPath, "/")[0] - - // Remove bucket name from path to get directory within bucket - bucketRelativePath := strings.Join(strings.Split(bucketPath, "/")[1:], "/") - - var fullObjectPath string - if bucketRelativePath == "" { - // Object is at bucket root - fullObjectPath = baseObjectName - } else { - // Object is in subdirectory - fullObjectPath = bucketRelativePath + "/" + baseObjectName - } - - glog.V(4).Infof("Processing versioned object: baseObjectName=%s, bucketRelativePath=%s, fullObjectPath=%s", - baseObjectName, bucketRelativePath, fullObjectPath) - - // OPTIMIZATION: Use metadata from the already-fetched .versions directory entry - // This avoids additional getEntry calls which cause high "find" usage - if latestVersionEntry, err := s3a.getLatestVersionEntryFromDirectoryEntry(bucketName, fullObjectPath, versionsDir); err == nil { - glog.V(4).Infof("Creating logical entry for versioned object: %s", fullObjectPath) - eachEntryFn(dir, latestVersionEntry) - } else if errors.Is(err, ErrDeleteMarker) { - // Expected: latest version is a delete marker, object should not appear in list - glog.V(4).Infof("Skipping versioned object %s: delete marker", fullObjectPath) - } else { - // Unexpected failure: missing metadata, fetch error, etc. - glog.V(3).Infof("Skipping versioned object %s due to error: %v", fullObjectPath, err) - } - } - + // Versioned directories are processed above (lines 524-546) return } diff --git a/weed/s3api/s3api_object_handlers_list_test.go b/weed/s3api/s3api_object_handlers_list_test.go index b24771e8a..37bafbbac 100644 --- a/weed/s3api/s3api_object_handlers_list_test.go +++ b/weed/s3api/s3api_object_handlers_list_test.go @@ -218,7 +218,7 @@ func TestDoListFilerEntries_BucketRootPrefixSlashDelimiterSlash_ListsDirectories cursor := &ListingCursor{maxKeys: 1000} seen := make([]string, 0) - _, err := s3a.doListFilerEntries(client, "/buckets/test-bucket", "/", cursor, "", "/", false, func(dir string, entry *filer_pb.Entry) { + _, err := s3a.doListFilerEntries(client, "/buckets/test-bucket", "/", cursor, "", "/", false, "test-bucket", func(dir string, entry *filer_pb.Entry) { if entry.IsDirectory { seen = append(seen, entry.Name) } diff --git a/weed/s3api/s3api_object_handlers_list_versioned_test.go b/weed/s3api/s3api_object_handlers_list_versioned_test.go new file mode 100644 index 000000000..8252dc4a9 --- /dev/null +++ b/weed/s3api/s3api_object_handlers_list_versioned_test.go @@ -0,0 +1,433 @@ +package s3api + +import ( + "context" + "encoding/hex" + "fmt" + "strconv" + "strings" + "testing" + "time" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/stretchr/testify/assert" + grpc "google.golang.org/grpc" +) + +// TestListObjectsWithVersionedObjects tests that versioned objects are properly listed +// This validates the fix for duplicate versioned objects issue +func TestListObjectsWithVersionedObjects(t *testing.T) { + now := time.Now().Unix() + + // Create test filer client with versioned objects + filerClient := &testFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{ + "/buckets/test-bucket": { + // Regular directory + { + Name: "folder1", + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: now, + }, + }, + // .versions directory with metadata for versioned object + { + Name: "file1.txt" + s3_constants.VersionsFolder, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: now, + }, + Extended: map[string][]byte{ + s3_constants.ExtLatestVersionIdKey: []byte("v1-abc123"), + s3_constants.ExtLatestVersionSizeKey: []byte("1234"), + s3_constants.ExtLatestVersionMtimeKey: []byte(strconv.FormatInt(now, 10)), + s3_constants.ExtLatestVersionETagKey: []byte(fmt.Sprintf("\"%s\"", hex.EncodeToString([]byte("test-etag-1")))), + }, + }, + // Another .versions directory + { + Name: "file2.txt" + s3_constants.VersionsFolder, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: now, + }, + Extended: map[string][]byte{ + s3_constants.ExtLatestVersionIdKey: []byte("v2-def456"), + s3_constants.ExtLatestVersionSizeKey: []byte("5678"), + s3_constants.ExtLatestVersionMtimeKey: []byte(strconv.FormatInt(now, 10)), + s3_constants.ExtLatestVersionETagKey: []byte(fmt.Sprintf("\"%s\"", hex.EncodeToString([]byte("test-etag-2")))), + }, + }, + }, + "/buckets/test-bucket/folder1": { + // Versioned object in subdirectory + { + Name: "nested.txt" + s3_constants.VersionsFolder, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: now, + }, + Extended: map[string][]byte{ + s3_constants.ExtLatestVersionIdKey: []byte("v3-ghi789"), + s3_constants.ExtLatestVersionSizeKey: []byte("9012"), + s3_constants.ExtLatestVersionMtimeKey: []byte(strconv.FormatInt(now, 10)), + s3_constants.ExtLatestVersionETagKey: []byte(fmt.Sprintf("\"%s\"", hex.EncodeToString([]byte("test-etag-3")))), + }, + }, + }, + }, + } + + s3a := &S3ApiServer{ + option: &S3ApiServerOption{ + BucketsPath: "/buckets", + }, + } + + tests := []struct { + name string + bucket string + prefix string + delimiter string + expectedCount int + expectedKeys []string + expectedPrefixes []string + }{ + { + name: "List all objects including versioned (no delimiter)", + bucket: "test-bucket", + prefix: "", + delimiter: "", + expectedCount: 3, // file1.txt, file2.txt, folder1/nested.txt + expectedKeys: []string{ + "file1.txt", + "file2.txt", + "folder1/nested.txt", + }, + expectedPrefixes: []string{}, + }, + { + name: "List bucket root with delimiter", + bucket: "test-bucket", + prefix: "", + delimiter: "/", + expectedCount: 2, // file1.txt, file2.txt (folder1/ becomes common prefix) + expectedKeys: []string{ + "file1.txt", + "file2.txt", + }, + expectedPrefixes: []string{ + "folder1/", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // Directly call doListFilerEntries with a callback to exercise the versioned objects + // listing logic. The callback mirrors the production listFilerEntries behavior for + // path extraction and accumulation so that this test validates the internal listing + // implementation in isolation from the HTTP layer. + cursor := &ListingCursor{maxKeys: uint16(tt.expectedCount + 10)} + contents := []ListEntry{} + commonPrefixes := []PrefixEntry{} + bucketPrefix := fmt.Sprintf("%s/%s/", s3a.option.BucketsPath, tt.bucket) + + _, err := s3a.doListFilerEntries(filerClient, bucketPrefix[:len(bucketPrefix)-1], tt.prefix, cursor, "", tt.delimiter, false, tt.bucket, func(dir string, entry *filer_pb.Entry) { + if cursor.maxKeys <= 0 { + return + } + + if entry.IsDirectory { + if tt.delimiter == "/" { + // Extract relative path from bucket prefix + relDir := strings.TrimPrefix(dir, bucketPrefix[:len(bucketPrefix)-1]) + if relDir != "" && relDir[0] == '/' { + relDir = relDir[1:] + } + prefix := relDir + if prefix != "" { + prefix += "/" + } + prefix += entry.Name + "/" + + commonPrefixes = append(commonPrefixes, PrefixEntry{ + Prefix: prefix, + }) + cursor.maxKeys-- + } + } else { + // Extract key from dir and entry name + relDir := strings.TrimPrefix(dir, bucketPrefix[:len(bucketPrefix)-1]) + if relDir != "" && relDir[0] == '/' { + relDir = relDir[1:] + } + key := entry.Name + if relDir != "" { + key = relDir + "/" + entry.Name + } + + contents = append(contents, ListEntry{ + Key: key, + }) + cursor.maxKeys-- + } + }) + + assert.NoError(t, err, "doListFilerEntries should not return error") + assert.Equal(t, tt.expectedCount, len(contents), "Should return correct number of objects") + assert.Equal(t, len(tt.expectedPrefixes), len(commonPrefixes), "Should return correct number of common prefixes") + + // Verify keys + actualKeys := make([]string, len(contents)) + for i, entry := range contents { + actualKeys[i] = entry.Key + } + assert.ElementsMatch(t, tt.expectedKeys, actualKeys, "Should return expected keys") + + // Verify common prefixes + actualPrefixes := make([]string, len(commonPrefixes)) + for i, prefix := range commonPrefixes { + actualPrefixes[i] = prefix.Prefix + } + assert.ElementsMatch(t, tt.expectedPrefixes, actualPrefixes, "Should return expected prefixes") + + // Verify each versioned object has correct version metadata + for _, entry := range contents { + assert.NotEmpty(t, entry.Key, "Versioned object should have key") + } + }) + } +} + +// TestVersionedObjectsNoDuplication ensures that .versions directories are only processed once +// This is the core test for the bug fix - previously versioned objects were duplicated 250x +func TestVersionedObjectsNoDuplication(t *testing.T) { + now := time.Now().Unix() + + // Create a single .versions directory + filerClient := &testFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{ + "/buckets/test-bucket": { + { + Name: "test.txt" + s3_constants.VersionsFolder, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: now, + }, + Extended: map[string][]byte{ + s3_constants.ExtLatestVersionIdKey: []byte("v1-test"), + s3_constants.ExtLatestVersionSizeKey: []byte("100"), + s3_constants.ExtLatestVersionMtimeKey: []byte(strconv.FormatInt(now, 10)), + s3_constants.ExtLatestVersionETagKey: []byte("\"test-etag\""), + }, + }, + }, + }, + } + + s3a := &S3ApiServer{ + option: &S3ApiServerOption{ + BucketsPath: "/buckets", + }, + } + + cursor := &ListingCursor{maxKeys: uint16(1000)} + contents := []ListEntry{} + _, err := s3a.doListFilerEntries(filerClient, "/buckets/test-bucket", "", cursor, "", "", false, "test-bucket", func(dir string, entry *filer_pb.Entry) { + if cursor.maxKeys <= 0 { + return + } + contents = append(contents, ListEntry{Key: entry.Name}) + cursor.maxKeys-- + }) + + assert.NoError(t, err) + assert.Equal(t, 1, len(contents), "Should return exactly 1 object (no duplicates)") + assert.Equal(t, "test.txt", contents[0].Key, "Should return correct key") +} + +// TestVersionedObjectsWithDeleteMarker tests that objects with delete markers are not listed +func TestVersionedObjectsWithDeleteMarker(t *testing.T) { + now := time.Now().Unix() + + filerClient := &testFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{ + "/buckets/test-bucket": { + // Active versioned object + { + Name: "active.txt" + s3_constants.VersionsFolder, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: now, + }, + Extended: map[string][]byte{ + s3_constants.ExtLatestVersionIdKey: []byte("v1-active"), + s3_constants.ExtLatestVersionSizeKey: []byte("100"), + s3_constants.ExtLatestVersionMtimeKey: []byte(strconv.FormatInt(now, 10)), + s3_constants.ExtLatestVersionETagKey: []byte("\"etag-active\""), + }, + }, + // Deleted object (has delete marker) + { + Name: "deleted.txt" + s3_constants.VersionsFolder, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: now, + }, + Extended: map[string][]byte{ + s3_constants.ExtLatestVersionIdKey: []byte("v1-deleted"), + s3_constants.ExtLatestVersionIsDeleteMarker: []byte("true"), + }, + }, + }, + }, + } + + s3a := &S3ApiServer{ + option: &S3ApiServerOption{ + BucketsPath: "/buckets", + }, + } + + cursor := &ListingCursor{maxKeys: uint16(1000)} + contents := []ListEntry{} + _, err := s3a.doListFilerEntries(filerClient, "/buckets/test-bucket", "", cursor, "", "", false, "test-bucket", func(dir string, entry *filer_pb.Entry) { + if cursor.maxKeys <= 0 { + return + } + contents = append(contents, ListEntry{Key: entry.Name}) + cursor.maxKeys-- + }) + + assert.NoError(t, err) + assert.Equal(t, 1, len(contents), "Should only return active object, not deleted") + assert.Equal(t, "active.txt", contents[0].Key, "Should return the active object") +} + +// TestVersionedObjectsMaxKeys tests pagination with versioned objects +func TestVersionedObjectsMaxKeys(t *testing.T) { + now := time.Now().Unix() + + // Create 5 versioned objects + entries := make([]*filer_pb.Entry, 5) + for i := 0; i < 5; i++ { + entries[i] = &filer_pb.Entry{ + Name: fmt.Sprintf("file%d.txt"+s3_constants.VersionsFolder, i), + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: now, + }, + Extended: map[string][]byte{ + s3_constants.ExtLatestVersionIdKey: []byte(fmt.Sprintf("v%d", i)), + s3_constants.ExtLatestVersionSizeKey: []byte("100"), + s3_constants.ExtLatestVersionMtimeKey: []byte(strconv.FormatInt(now, 10)), + s3_constants.ExtLatestVersionETagKey: []byte(fmt.Sprintf("\"etag-%d\"", i)), + }, + } + } + + filerClient := &testFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{ + "/buckets/test-bucket": entries, + }, + } + + s3a := &S3ApiServer{ + option: &S3ApiServerOption{ + BucketsPath: "/buckets", + }, + } + + cursor := &ListingCursor{maxKeys: uint16(3)} + contents := []ListEntry{} + _, err := s3a.doListFilerEntries(filerClient, "/buckets/test-bucket", "", cursor, "", "", false, "test-bucket", func(dir string, entry *filer_pb.Entry) { + if cursor.maxKeys <= 0 { + return + } + contents = append(contents, ListEntry{Key: entry.Name}) + cursor.maxKeys-- + }) + + assert.NoError(t, err) + assert.Equal(t, 3, len(contents), "Should respect maxKeys limit") + assert.True(t, cursor.isTruncated, "Should set IsTruncated when there are more results") + + // Verify truncation is properly set when maxKeys is exceeded + // (The test mock doesn't implement marker-based pagination, but we can verify + // that the cursor state is correct for actual pagination to work) + assert.True(t, cursor.isTruncated, "IsTruncated should be true when maxKeys is exhausted with more entries available") +} + +// TestVersionsDirectoryNotTraversed ensures .versions directories are never traversed +func TestVersionsDirectoryNotTraversed(t *testing.T) { + now := time.Now().Unix() + traversedDirs := make(map[string]bool) + + // Custom filer client that tracks which directories are accessed + customClient := &customTestFilerClient{ + testFilerClient: testFilerClient{ + entriesByDir: map[string][]*filer_pb.Entry{ + "/buckets/test-bucket": { + { + Name: "object.txt" + s3_constants.VersionsFolder, + IsDirectory: true, + Attributes: &filer_pb.FuseAttributes{ + Mtime: now, + }, + Extended: map[string][]byte{ + s3_constants.ExtLatestVersionIdKey: []byte("v1"), + s3_constants.ExtLatestVersionSizeKey: []byte("100"), + s3_constants.ExtLatestVersionMtimeKey: []byte(strconv.FormatInt(now, 10)), + s3_constants.ExtLatestVersionETagKey: []byte("\"etag\""), + }, + }, + }, + // This directory should NEVER be accessed + "/buckets/test-bucket/object.txt.versions": { + { + Name: "should-not-see-this", + IsDirectory: false, + }, + }, + }, + }, + traversedDirs: &traversedDirs, + } + + s3a := &S3ApiServer{ + option: &S3ApiServerOption{ + BucketsPath: "/buckets", + }, + } + + cursor := &ListingCursor{maxKeys: uint16(1000)} + contents := []ListEntry{} + _, err := s3a.doListFilerEntries(customClient, "/buckets/test-bucket", "", cursor, "", "", false, "test-bucket", func(dir string, entry *filer_pb.Entry) { + if cursor.maxKeys <= 0 { + return + } + contents = append(contents, ListEntry{Key: entry.Name}) + cursor.maxKeys-- + }) + + assert.NoError(t, err) + assert.Equal(t, 1, len(contents)) + + // Verify .versions directory was NEVER traversed + _, wasTraversed := traversedDirs["/buckets/test-bucket/object.txt.versions"] + assert.False(t, wasTraversed, ".versions directory should never be traversed") +} + +// customTestFilerClient tracks which directories are accessed +type customTestFilerClient struct { + testFilerClient + traversedDirs *map[string]bool +} + +func (c *customTestFilerClient) ListEntries(ctx context.Context, in *filer_pb.ListEntriesRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[filer_pb.ListEntriesResponse], error) { + (*c.traversedDirs)[in.Directory] = true + return c.testFilerClient.ListEntries(ctx, in, opts...) +}