From 134fd6a1ae23d52ac91e641cf72ec393092cbcf8 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 18 Dec 2025 21:19:44 -0800 Subject: [PATCH] fix: S3 remote storage cold-cache read fails with 'size reported but no content available' (#7817) fix: S3 remote storage cold-cache read fails with 'size reported but no content available' (#7815) When a remote-only entry's initial caching attempt times out or fails, streamFromVolumeServers() now detects this case and retries caching synchronously before streaming, similar to how the filer server handles remote-only entries. Changes: - Modified streamFromVolumeServers() to check entry.IsInRemoteOnly() before treating missing chunks as a data integrity error - Added doCacheRemoteObject() as the core caching function (calls filer gRPC) - Added buildRemoteObjectPath() helper to reduce code duplication - Refactored cacheRemoteObjectWithDedup() and cacheRemoteObjectForStreaming() to reuse the shared functions - Added integration tests for remote storage scenarios Fixes https://github.com/seaweedfs/seaweedfs/issues/7815 --- weed/s3api/s3api_object_handlers.go | 131 ++++++++---- weed/s3api/s3api_remote_storage_test.go | 273 ++++++++++++++++++++++++ 2 files changed, 364 insertions(+), 40 deletions(-) create mode 100644 weed/s3api/s3api_remote_storage_test.go diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index e923a7237..9b1495128 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -760,7 +760,7 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) // Stream directly from volume servers with SSE support tStream := time.Now() - err = s3a.streamFromVolumeServersWithSSE(w, r, objectEntryForSSE, primarySSEType) + err = s3a.streamFromVolumeServersWithSSE(w, r, objectEntryForSSE, primarySSEType, bucket, object, versionId) streamTime = time.Since(tStream) if err != nil { glog.Errorf("GetObjectHandler: failed to stream from volume servers: %v", err) @@ -784,7 +784,7 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) // streamFromVolumeServers streams object data directly from volume servers, bypassing filer proxy // This eliminates the ~19ms filer proxy overhead by reading chunks directly -func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string) error { +func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string, bucket, object, versionId string) error { // Profiling: Track overall and stage timings t0 := time.Now() var ( @@ -937,17 +937,34 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R len(chunks), totalSize, isRangeRequest, offset, size) if len(chunks) == 0 { - // BUG FIX: If totalSize > 0 but no chunks and no content, this is a data integrity issue - if totalSize > 0 && len(entry.Content) == 0 { + // Check if this is a remote-only entry that needs caching + // This handles the case where initial caching attempt timed out or failed + if entry.IsInRemoteOnly() { + glog.V(1).Infof("streamFromVolumeServers: entry is remote-only, attempting to cache before streaming") + // Try to cache the remote object synchronously (like filer does) + cachedEntry := s3a.cacheRemoteObjectForStreaming(r, entry, bucket, object, versionId) + if cachedEntry != nil && len(cachedEntry.GetChunks()) > 0 { + chunks = cachedEntry.GetChunks() + entry = cachedEntry + glog.V(1).Infof("streamFromVolumeServers: successfully cached remote object, got %d chunks", len(chunks)) + } else { + // Caching failed - return error to client + glog.Errorf("streamFromVolumeServers: failed to cache remote object for streaming") + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return newStreamErrorWithResponse(fmt.Errorf("failed to cache remote object for streaming")) + } + } else if totalSize > 0 && len(entry.Content) == 0 { + // Not a remote entry but has size without content - this is a data integrity issue glog.Errorf("streamFromVolumeServers: Data integrity error - entry reports size %d but has no content or chunks", totalSize) // Write S3-compliant XML error response s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return newStreamErrorWithResponse(fmt.Errorf("data integrity error: size %d reported but no content available", totalSize)) + } else { + // Empty object - set headers and write status + s3a.setResponseHeaders(w, r, entry, totalSize) + w.WriteHeader(http.StatusOK) + return nil } - // Empty object - set headers and write status - s3a.setResponseHeaders(w, r, entry, totalSize) - w.WriteHeader(http.StatusOK) - return nil } // Log chunk details (verbose only - high frequency) @@ -1048,10 +1065,10 @@ func (s3a *S3ApiServer) createLookupFileIdFunction() func(context.Context, strin } // streamFromVolumeServersWithSSE handles streaming with inline SSE decryption -func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string) error { +func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string, bucket, object, versionId string) error { // If not encrypted, use fast path without decryption if sseType == "" || sseType == "None" { - return s3a.streamFromVolumeServers(w, r, entry, sseType) + return s3a.streamFromVolumeServers(w, r, entry, sseType, bucket, object, versionId) } // Profiling: Track SSE decryption stages @@ -3338,35 +3355,26 @@ func (s3a *S3ApiServer) getMultipartInfo(entry *filer_pb.Entry, partNumber int) return partsCount, nil } -// cacheRemoteObjectWithDedup caches a remote-only object to the local cluster. -// The filer server handles singleflight deduplication, so all clients (S3, HTTP, Hadoop) benefit. -// On cache error, returns the original entry (streaming from remote will still work). -// Uses a bounded timeout to avoid blocking requests indefinitely. -func (s3a *S3ApiServer) cacheRemoteObjectWithDedup(ctx context.Context, bucket, object string, entry *filer_pb.Entry) *filer_pb.Entry { - // Use a bounded timeout for caching to avoid blocking requests indefinitely - // 30 seconds should be enough for most objects; large objects may timeout but will still stream - const cacheTimeout = 30 * time.Second - cacheCtx, cancel := context.WithTimeout(ctx, cacheTimeout) - defer cancel() - - // Build the full path for the object - // Normalize object path: remove duplicate slashes and leading slash to avoid double slashes in path - dir := s3a.option.BucketsPath + "/" + bucket - normalizedObject := strings.TrimPrefix(removeDuplicateSlashes(object), "/") - if idx := strings.LastIndex(normalizedObject, "/"); idx > 0 { - dir = dir + "/" + normalizedObject[:idx] - normalizedObject = normalizedObject[idx+1:] +// buildRemoteObjectPath builds the filer directory and object name from S3 bucket/object. +// This is shared by all remote object caching functions. +func (s3a *S3ApiServer) buildRemoteObjectPath(bucket, object string) (dir, name string) { + dir = s3a.option.BucketsPath + "/" + bucket + name = strings.TrimPrefix(removeDuplicateSlashes(object), "/") + if idx := strings.LastIndex(name, "/"); idx > 0 { + dir = dir + "/" + name[:idx] + name = name[idx+1:] } + return dir, name +} - glog.V(2).Infof("cacheRemoteObjectWithDedup: caching %s/%s (remote size: %d)", bucket, object, entry.RemoteEntry.RemoteSize) - - // Call the filer's CacheRemoteObjectToLocalCluster via gRPC - // The filer handles singleflight deduplication internally +// doCacheRemoteObject calls the filer's CacheRemoteObjectToLocalCluster gRPC endpoint. +// This is the core caching function used by both cacheRemoteObjectWithDedup and cacheRemoteObjectForStreaming. +func (s3a *S3ApiServer) doCacheRemoteObject(ctx context.Context, dir, name string) (*filer_pb.Entry, error) { var cachedEntry *filer_pb.Entry err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - resp, cacheErr := client.CacheRemoteObjectToLocalCluster(cacheCtx, &filer_pb.CacheRemoteObjectToLocalClusterRequest{ + resp, cacheErr := client.CacheRemoteObjectToLocalCluster(ctx, &filer_pb.CacheRemoteObjectToLocalClusterRequest{ Directory: dir, - Name: normalizedObject, + Name: name, }) if cacheErr != nil { return cacheErr @@ -3376,24 +3384,67 @@ func (s3a *S3ApiServer) cacheRemoteObjectWithDedup(ctx context.Context, bucket, } return nil }) + return cachedEntry, err +} + +// cacheRemoteObjectWithDedup caches a remote-only object to the local cluster. +// The filer server handles singleflight deduplication, so all clients (S3, HTTP, Hadoop) benefit. +// On cache error, returns the original entry (will retry in streamFromVolumeServers). +// Uses a bounded timeout to avoid blocking requests indefinitely. +func (s3a *S3ApiServer) cacheRemoteObjectWithDedup(ctx context.Context, bucket, object string, entry *filer_pb.Entry) *filer_pb.Entry { + const cacheTimeout = 30 * time.Second + cacheCtx, cancel := context.WithTimeout(ctx, cacheTimeout) + defer cancel() + dir, name := s3a.buildRemoteObjectPath(bucket, object) + glog.V(2).Infof("cacheRemoteObjectWithDedup: caching %s/%s (remote size: %d)", bucket, object, entry.RemoteEntry.RemoteSize) + + cachedEntry, err := s3a.doCacheRemoteObject(cacheCtx, dir, name) if err != nil { - // Caching failed - log and return original entry - // Streaming from remote storage will still work via filer proxy if errors.Is(err, context.DeadlineExceeded) { - glog.V(1).Infof("cacheRemoteObjectWithDedup: timeout caching %s/%s after %v (will stream from remote)", bucket, object, cacheTimeout) + glog.V(1).Infof("cacheRemoteObjectWithDedup: timeout caching %s/%s after %v (will retry in streaming)", bucket, object, cacheTimeout) } else { - glog.Warningf("cacheRemoteObjectWithDedup: failed to cache %s/%s: %v (will stream from remote)", bucket, object, err) + glog.Warningf("cacheRemoteObjectWithDedup: failed to cache %s/%s: %v (will retry in streaming)", bucket, object, err) } return entry } - // If caching succeeded and we got chunks, use the cached entry's chunks if cachedEntry != nil && len(cachedEntry.GetChunks()) > 0 { glog.V(1).Infof("cacheRemoteObjectWithDedup: successfully cached %s/%s (%d chunks)", bucket, object, len(cachedEntry.GetChunks())) - // Preserve original entry metadata but use new chunks entry.Chunks = cachedEntry.Chunks } return entry } + +// cacheRemoteObjectForStreaming caches a remote-only object to the local cluster for streaming. +// This is called from streamFromVolumeServers when the initial caching attempt timed out or failed. +// Uses the request context (no artificial timeout) to allow the caching to complete. +// For versioned objects, versionId determines the correct path in .versions/ directory. +func (s3a *S3ApiServer) cacheRemoteObjectForStreaming(r *http.Request, entry *filer_pb.Entry, bucket, object, versionId string) *filer_pb.Entry { + var dir, name string + if versionId != "" && versionId != "null" { + // This is a specific version - entry is located at /buckets//.versions/v_ + normalizedObject := strings.TrimPrefix(removeDuplicateSlashes(object), "/") + dir = s3a.option.BucketsPath + "/" + bucket + "/" + normalizedObject + s3_constants.VersionsFolder + name = s3a.getVersionFileName(versionId) + } else { + // Non-versioned object or "null" version - lives at the main path + dir, name = s3a.buildRemoteObjectPath(bucket, object) + } + + glog.V(1).Infof("cacheRemoteObjectForStreaming: caching %s/%s (remote size: %d, versionId: %s)", dir, name, entry.RemoteEntry.RemoteSize, versionId) + + cachedEntry, err := s3a.doCacheRemoteObject(r.Context(), dir, name) + if err != nil { + glog.Errorf("cacheRemoteObjectForStreaming: failed to cache %s/%s: %v", dir, name, err) + return nil + } + + if cachedEntry != nil && len(cachedEntry.GetChunks()) > 0 { + glog.V(1).Infof("cacheRemoteObjectForStreaming: successfully cached %s/%s (%d chunks)", dir, name, len(cachedEntry.GetChunks())) + return cachedEntry + } + + return nil +} diff --git a/weed/s3api/s3api_remote_storage_test.go b/weed/s3api/s3api_remote_storage_test.go new file mode 100644 index 000000000..7d5963f5d --- /dev/null +++ b/weed/s3api/s3api_remote_storage_test.go @@ -0,0 +1,273 @@ +package s3api + +import ( + "strings" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" + "github.com/stretchr/testify/assert" +) + +// TestIsInRemoteOnly tests the IsInRemoteOnly method on filer_pb.Entry +func TestIsInRemoteOnly(t *testing.T) { + tests := []struct { + name string + entry *filer_pb.Entry + expected bool + }{ + { + name: "remote-only entry with no chunks", + entry: &filer_pb.Entry{ + Name: "remote-file.txt", + Chunks: nil, + RemoteEntry: &filer_pb.RemoteEntry{ + RemoteSize: 1024, + }, + }, + expected: true, + }, + { + name: "remote entry with chunks (cached)", + entry: &filer_pb.Entry{ + Name: "cached-file.txt", + Chunks: []*filer_pb.FileChunk{ + {FileId: "1,abc123", Size: 1024, Offset: 0}, + }, + RemoteEntry: &filer_pb.RemoteEntry{ + RemoteSize: 1024, + }, + }, + expected: false, + }, + { + name: "local file with chunks (not remote)", + entry: &filer_pb.Entry{ + Name: "local-file.txt", + Chunks: []*filer_pb.FileChunk{ + {FileId: "1,abc123", Size: 1024, Offset: 0}, + }, + RemoteEntry: nil, + }, + expected: false, + }, + { + name: "empty remote entry (size 0)", + entry: &filer_pb.Entry{ + Name: "empty-remote.txt", + Chunks: nil, + RemoteEntry: &filer_pb.RemoteEntry{ + RemoteSize: 0, + }, + }, + expected: false, + }, + { + name: "no chunks but nil RemoteEntry", + entry: &filer_pb.Entry{ + Name: "empty-local.txt", + Chunks: nil, + RemoteEntry: nil, + }, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := tt.entry.IsInRemoteOnly() + assert.Equal(t, tt.expected, result, + "IsInRemoteOnly() for %s should return %v", tt.name, tt.expected) + }) + } +} + +// TestRemoteOnlyEntryDetection tests that the streamFromVolumeServers logic +// correctly distinguishes between remote-only entries and data integrity errors +func TestRemoteOnlyEntryDetection(t *testing.T) { + tests := []struct { + name string + entry *filer_pb.Entry + shouldBeRemote bool + shouldBeDataError bool + shouldBeEmpty bool + }{ + { + name: "remote-only entry (no chunks, has remote entry)", + entry: &filer_pb.Entry{ + Name: "remote-file.txt", + Chunks: nil, + Attributes: &filer_pb.FuseAttributes{ + FileSize: 1024, + }, + RemoteEntry: &filer_pb.RemoteEntry{ + RemoteSize: 1024, + }, + }, + shouldBeRemote: true, + shouldBeDataError: false, + shouldBeEmpty: false, + }, + { + name: "data integrity error (no chunks, no remote, has size)", + entry: &filer_pb.Entry{ + Name: "corrupt-file.txt", + Chunks: nil, + Attributes: &filer_pb.FuseAttributes{ + FileSize: 1024, + }, + RemoteEntry: nil, + }, + shouldBeRemote: false, + shouldBeDataError: true, + shouldBeEmpty: false, + }, + { + name: "empty local file (no chunks, no remote, size 0)", + entry: &filer_pb.Entry{ + Name: "empty-file.txt", + Chunks: nil, + Attributes: &filer_pb.FuseAttributes{ + FileSize: 0, + }, + RemoteEntry: nil, + }, + shouldBeRemote: false, + shouldBeDataError: false, + shouldBeEmpty: true, + }, + { + name: "normal file with chunks", + entry: &filer_pb.Entry{ + Name: "normal-file.txt", + Chunks: []*filer_pb.FileChunk{ + {FileId: "1,abc123", Size: 1024, Offset: 0}, + }, + Attributes: &filer_pb.FuseAttributes{ + FileSize: 1024, + }, + RemoteEntry: nil, + }, + shouldBeRemote: false, + shouldBeDataError: false, + shouldBeEmpty: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + chunks := tt.entry.GetChunks() + totalSize := int64(filer.FileSize(tt.entry)) + + if len(chunks) == 0 { + // This mirrors the logic in streamFromVolumeServers + if tt.entry.IsInRemoteOnly() { + assert.True(t, tt.shouldBeRemote, + "Entry should be detected as remote-only") + } else if totalSize > 0 && len(tt.entry.Content) == 0 { + assert.True(t, tt.shouldBeDataError, + "Entry should be detected as data integrity error") + } else { + assert.True(t, tt.shouldBeEmpty, + "Entry should be detected as empty") + } + } else { + assert.False(t, tt.shouldBeRemote, "Entry with chunks should not be remote-only") + assert.False(t, tt.shouldBeDataError, "Entry with chunks should not be data error") + assert.False(t, tt.shouldBeEmpty, "Entry with chunks should not be empty") + } + }) + } +} + +// TestVersionedRemoteObjectPathBuilding tests that the path building logic +// correctly handles versioned objects stored in .versions/ directory +func TestVersionedRemoteObjectPathBuilding(t *testing.T) { + bucketsPath := "/buckets" + + tests := []struct { + name string + bucket string + object string + versionId string + expectedDir string + expectedName string + }{ + { + name: "non-versioned object (empty versionId)", + bucket: "mybucket", + object: "myobject.txt", + versionId: "", + expectedDir: "/buckets/mybucket", + expectedName: "myobject.txt", + }, + { + name: "null version", + bucket: "mybucket", + object: "myobject.txt", + versionId: "null", + expectedDir: "/buckets/mybucket", + expectedName: "myobject.txt", + }, + { + name: "specific version", + bucket: "mybucket", + object: "myobject.txt", + versionId: "abc123", + expectedDir: "/buckets/mybucket/myobject.txt" + s3_constants.VersionsFolder, + expectedName: "v_abc123", + }, + { + name: "nested object with version", + bucket: "mybucket", + object: "folder/subfolder/file.txt", + versionId: "xyz789", + expectedDir: "/buckets/mybucket/folder/subfolder/file.txt" + s3_constants.VersionsFolder, + expectedName: "v_xyz789", + }, + { + name: "object with leading slash and version", + bucket: "mybucket", + object: "/path/to/file.txt", + versionId: "ver456", + expectedDir: "/buckets/mybucket/path/to/file.txt" + s3_constants.VersionsFolder, + expectedName: "v_ver456", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var dir, name string + + // This mirrors the logic in cacheRemoteObjectForStreaming + if tt.versionId != "" && tt.versionId != "null" { + // Versioned object path + normalizedObject := strings.TrimPrefix(removeDuplicateSlashesTest(tt.object), "/") + dir = bucketsPath + "/" + tt.bucket + "/" + normalizedObject + s3_constants.VersionsFolder + name = "v_" + tt.versionId + } else { + // Non-versioned path (simplified - just for testing) + dir = bucketsPath + "/" + tt.bucket + normalizedObject := strings.TrimPrefix(removeDuplicateSlashesTest(tt.object), "/") + if idx := strings.LastIndex(normalizedObject, "/"); idx > 0 { + dir = dir + "/" + normalizedObject[:idx] + name = normalizedObject[idx+1:] + } else { + name = normalizedObject + } + } + + assert.Equal(t, tt.expectedDir, dir, "Directory path should match") + assert.Equal(t, tt.expectedName, name, "Name should match") + }) + } +} + +// removeDuplicateSlashesTest is a test helper that mirrors production code +func removeDuplicateSlashesTest(s string) string { + for strings.Contains(s, "//") { + s = strings.ReplaceAll(s, "//", "/") + } + return s +}