Browse Source

fix: S3 remote storage cold-cache read fails with 'size reported but no content available' (#7817)

fix: S3 remote storage cold-cache read fails with 'size reported but no content available' (#7815)

When a remote-only entry's initial caching attempt times out or fails,
streamFromVolumeServers() now detects this case and retries caching
synchronously before streaming, similar to how the filer server handles
remote-only entries.

Changes:
- Modified streamFromVolumeServers() to check entry.IsInRemoteOnly()
  before treating missing chunks as a data integrity error
- Added doCacheRemoteObject() as the core caching function (calls filer gRPC)
- Added buildRemoteObjectPath() helper to reduce code duplication
- Refactored cacheRemoteObjectWithDedup() and cacheRemoteObjectForStreaming()
  to reuse the shared functions
- Added integration tests for remote storage scenarios

Fixes https://github.com/seaweedfs/seaweedfs/issues/7815
pull/7821/head
Chris Lu 2 days ago
committed by GitHub
parent
commit
134fd6a1ae
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 131
      weed/s3api/s3api_object_handlers.go
  2. 273
      weed/s3api/s3api_remote_storage_test.go

131
weed/s3api/s3api_object_handlers.go

@ -760,7 +760,7 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
// Stream directly from volume servers with SSE support
tStream := time.Now()
err = s3a.streamFromVolumeServersWithSSE(w, r, objectEntryForSSE, primarySSEType)
err = s3a.streamFromVolumeServersWithSSE(w, r, objectEntryForSSE, primarySSEType, bucket, object, versionId)
streamTime = time.Since(tStream)
if err != nil {
glog.Errorf("GetObjectHandler: failed to stream from volume servers: %v", err)
@ -784,7 +784,7 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
// streamFromVolumeServers streams object data directly from volume servers, bypassing filer proxy
// This eliminates the ~19ms filer proxy overhead by reading chunks directly
func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string) error {
func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string, bucket, object, versionId string) error {
// Profiling: Track overall and stage timings
t0 := time.Now()
var (
@ -937,17 +937,34 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R
len(chunks), totalSize, isRangeRequest, offset, size)
if len(chunks) == 0 {
// BUG FIX: If totalSize > 0 but no chunks and no content, this is a data integrity issue
if totalSize > 0 && len(entry.Content) == 0 {
// Check if this is a remote-only entry that needs caching
// This handles the case where initial caching attempt timed out or failed
if entry.IsInRemoteOnly() {
glog.V(1).Infof("streamFromVolumeServers: entry is remote-only, attempting to cache before streaming")
// Try to cache the remote object synchronously (like filer does)
cachedEntry := s3a.cacheRemoteObjectForStreaming(r, entry, bucket, object, versionId)
if cachedEntry != nil && len(cachedEntry.GetChunks()) > 0 {
chunks = cachedEntry.GetChunks()
entry = cachedEntry
glog.V(1).Infof("streamFromVolumeServers: successfully cached remote object, got %d chunks", len(chunks))
} else {
// Caching failed - return error to client
glog.Errorf("streamFromVolumeServers: failed to cache remote object for streaming")
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return newStreamErrorWithResponse(fmt.Errorf("failed to cache remote object for streaming"))
}
} else if totalSize > 0 && len(entry.Content) == 0 {
// Not a remote entry but has size without content - this is a data integrity issue
glog.Errorf("streamFromVolumeServers: Data integrity error - entry reports size %d but has no content or chunks", totalSize)
// Write S3-compliant XML error response
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return newStreamErrorWithResponse(fmt.Errorf("data integrity error: size %d reported but no content available", totalSize))
} else {
// Empty object - set headers and write status
s3a.setResponseHeaders(w, r, entry, totalSize)
w.WriteHeader(http.StatusOK)
return nil
}
// Empty object - set headers and write status
s3a.setResponseHeaders(w, r, entry, totalSize)
w.WriteHeader(http.StatusOK)
return nil
}
// Log chunk details (verbose only - high frequency)
@ -1048,10 +1065,10 @@ func (s3a *S3ApiServer) createLookupFileIdFunction() func(context.Context, strin
}
// streamFromVolumeServersWithSSE handles streaming with inline SSE decryption
func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string) error {
func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r *http.Request, entry *filer_pb.Entry, sseType string, bucket, object, versionId string) error {
// If not encrypted, use fast path without decryption
if sseType == "" || sseType == "None" {
return s3a.streamFromVolumeServers(w, r, entry, sseType)
return s3a.streamFromVolumeServers(w, r, entry, sseType, bucket, object, versionId)
}
// Profiling: Track SSE decryption stages
@ -3338,35 +3355,26 @@ func (s3a *S3ApiServer) getMultipartInfo(entry *filer_pb.Entry, partNumber int)
return partsCount, nil
}
// cacheRemoteObjectWithDedup caches a remote-only object to the local cluster.
// The filer server handles singleflight deduplication, so all clients (S3, HTTP, Hadoop) benefit.
// On cache error, returns the original entry (streaming from remote will still work).
// Uses a bounded timeout to avoid blocking requests indefinitely.
func (s3a *S3ApiServer) cacheRemoteObjectWithDedup(ctx context.Context, bucket, object string, entry *filer_pb.Entry) *filer_pb.Entry {
// Use a bounded timeout for caching to avoid blocking requests indefinitely
// 30 seconds should be enough for most objects; large objects may timeout but will still stream
const cacheTimeout = 30 * time.Second
cacheCtx, cancel := context.WithTimeout(ctx, cacheTimeout)
defer cancel()
// Build the full path for the object
// Normalize object path: remove duplicate slashes and leading slash to avoid double slashes in path
dir := s3a.option.BucketsPath + "/" + bucket
normalizedObject := strings.TrimPrefix(removeDuplicateSlashes(object), "/")
if idx := strings.LastIndex(normalizedObject, "/"); idx > 0 {
dir = dir + "/" + normalizedObject[:idx]
normalizedObject = normalizedObject[idx+1:]
// buildRemoteObjectPath builds the filer directory and object name from S3 bucket/object.
// This is shared by all remote object caching functions.
func (s3a *S3ApiServer) buildRemoteObjectPath(bucket, object string) (dir, name string) {
dir = s3a.option.BucketsPath + "/" + bucket
name = strings.TrimPrefix(removeDuplicateSlashes(object), "/")
if idx := strings.LastIndex(name, "/"); idx > 0 {
dir = dir + "/" + name[:idx]
name = name[idx+1:]
}
return dir, name
}
glog.V(2).Infof("cacheRemoteObjectWithDedup: caching %s/%s (remote size: %d)", bucket, object, entry.RemoteEntry.RemoteSize)
// Call the filer's CacheRemoteObjectToLocalCluster via gRPC
// The filer handles singleflight deduplication internally
// doCacheRemoteObject calls the filer's CacheRemoteObjectToLocalCluster gRPC endpoint.
// This is the core caching function used by both cacheRemoteObjectWithDedup and cacheRemoteObjectForStreaming.
func (s3a *S3ApiServer) doCacheRemoteObject(ctx context.Context, dir, name string) (*filer_pb.Entry, error) {
var cachedEntry *filer_pb.Entry
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
resp, cacheErr := client.CacheRemoteObjectToLocalCluster(cacheCtx, &filer_pb.CacheRemoteObjectToLocalClusterRequest{
resp, cacheErr := client.CacheRemoteObjectToLocalCluster(ctx, &filer_pb.CacheRemoteObjectToLocalClusterRequest{
Directory: dir,
Name: normalizedObject,
Name: name,
})
if cacheErr != nil {
return cacheErr
@ -3376,24 +3384,67 @@ func (s3a *S3ApiServer) cacheRemoteObjectWithDedup(ctx context.Context, bucket,
}
return nil
})
return cachedEntry, err
}
// cacheRemoteObjectWithDedup caches a remote-only object to the local cluster.
// The filer server handles singleflight deduplication, so all clients (S3, HTTP, Hadoop) benefit.
// On cache error, returns the original entry (will retry in streamFromVolumeServers).
// Uses a bounded timeout to avoid blocking requests indefinitely.
func (s3a *S3ApiServer) cacheRemoteObjectWithDedup(ctx context.Context, bucket, object string, entry *filer_pb.Entry) *filer_pb.Entry {
const cacheTimeout = 30 * time.Second
cacheCtx, cancel := context.WithTimeout(ctx, cacheTimeout)
defer cancel()
dir, name := s3a.buildRemoteObjectPath(bucket, object)
glog.V(2).Infof("cacheRemoteObjectWithDedup: caching %s/%s (remote size: %d)", bucket, object, entry.RemoteEntry.RemoteSize)
cachedEntry, err := s3a.doCacheRemoteObject(cacheCtx, dir, name)
if err != nil {
// Caching failed - log and return original entry
// Streaming from remote storage will still work via filer proxy
if errors.Is(err, context.DeadlineExceeded) {
glog.V(1).Infof("cacheRemoteObjectWithDedup: timeout caching %s/%s after %v (will stream from remote)", bucket, object, cacheTimeout)
glog.V(1).Infof("cacheRemoteObjectWithDedup: timeout caching %s/%s after %v (will retry in streaming)", bucket, object, cacheTimeout)
} else {
glog.Warningf("cacheRemoteObjectWithDedup: failed to cache %s/%s: %v (will stream from remote)", bucket, object, err)
glog.Warningf("cacheRemoteObjectWithDedup: failed to cache %s/%s: %v (will retry in streaming)", bucket, object, err)
}
return entry
}
// If caching succeeded and we got chunks, use the cached entry's chunks
if cachedEntry != nil && len(cachedEntry.GetChunks()) > 0 {
glog.V(1).Infof("cacheRemoteObjectWithDedup: successfully cached %s/%s (%d chunks)", bucket, object, len(cachedEntry.GetChunks()))
// Preserve original entry metadata but use new chunks
entry.Chunks = cachedEntry.Chunks
}
return entry
}
// cacheRemoteObjectForStreaming caches a remote-only object to the local cluster for streaming.
// This is called from streamFromVolumeServers when the initial caching attempt timed out or failed.
// Uses the request context (no artificial timeout) to allow the caching to complete.
// For versioned objects, versionId determines the correct path in .versions/ directory.
func (s3a *S3ApiServer) cacheRemoteObjectForStreaming(r *http.Request, entry *filer_pb.Entry, bucket, object, versionId string) *filer_pb.Entry {
var dir, name string
if versionId != "" && versionId != "null" {
// This is a specific version - entry is located at /buckets/<bucket>/<object>.versions/v_<versionId>
normalizedObject := strings.TrimPrefix(removeDuplicateSlashes(object), "/")
dir = s3a.option.BucketsPath + "/" + bucket + "/" + normalizedObject + s3_constants.VersionsFolder
name = s3a.getVersionFileName(versionId)
} else {
// Non-versioned object or "null" version - lives at the main path
dir, name = s3a.buildRemoteObjectPath(bucket, object)
}
glog.V(1).Infof("cacheRemoteObjectForStreaming: caching %s/%s (remote size: %d, versionId: %s)", dir, name, entry.RemoteEntry.RemoteSize, versionId)
cachedEntry, err := s3a.doCacheRemoteObject(r.Context(), dir, name)
if err != nil {
glog.Errorf("cacheRemoteObjectForStreaming: failed to cache %s/%s: %v", dir, name, err)
return nil
}
if cachedEntry != nil && len(cachedEntry.GetChunks()) > 0 {
glog.V(1).Infof("cacheRemoteObjectForStreaming: successfully cached %s/%s (%d chunks)", dir, name, len(cachedEntry.GetChunks()))
return cachedEntry
}
return nil
}

273
weed/s3api/s3api_remote_storage_test.go

@ -0,0 +1,273 @@
package s3api
import (
"strings"
"testing"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/stretchr/testify/assert"
)
// TestIsInRemoteOnly tests the IsInRemoteOnly method on filer_pb.Entry
func TestIsInRemoteOnly(t *testing.T) {
tests := []struct {
name string
entry *filer_pb.Entry
expected bool
}{
{
name: "remote-only entry with no chunks",
entry: &filer_pb.Entry{
Name: "remote-file.txt",
Chunks: nil,
RemoteEntry: &filer_pb.RemoteEntry{
RemoteSize: 1024,
},
},
expected: true,
},
{
name: "remote entry with chunks (cached)",
entry: &filer_pb.Entry{
Name: "cached-file.txt",
Chunks: []*filer_pb.FileChunk{
{FileId: "1,abc123", Size: 1024, Offset: 0},
},
RemoteEntry: &filer_pb.RemoteEntry{
RemoteSize: 1024,
},
},
expected: false,
},
{
name: "local file with chunks (not remote)",
entry: &filer_pb.Entry{
Name: "local-file.txt",
Chunks: []*filer_pb.FileChunk{
{FileId: "1,abc123", Size: 1024, Offset: 0},
},
RemoteEntry: nil,
},
expected: false,
},
{
name: "empty remote entry (size 0)",
entry: &filer_pb.Entry{
Name: "empty-remote.txt",
Chunks: nil,
RemoteEntry: &filer_pb.RemoteEntry{
RemoteSize: 0,
},
},
expected: false,
},
{
name: "no chunks but nil RemoteEntry",
entry: &filer_pb.Entry{
Name: "empty-local.txt",
Chunks: nil,
RemoteEntry: nil,
},
expected: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := tt.entry.IsInRemoteOnly()
assert.Equal(t, tt.expected, result,
"IsInRemoteOnly() for %s should return %v", tt.name, tt.expected)
})
}
}
// TestRemoteOnlyEntryDetection tests that the streamFromVolumeServers logic
// correctly distinguishes between remote-only entries and data integrity errors
func TestRemoteOnlyEntryDetection(t *testing.T) {
tests := []struct {
name string
entry *filer_pb.Entry
shouldBeRemote bool
shouldBeDataError bool
shouldBeEmpty bool
}{
{
name: "remote-only entry (no chunks, has remote entry)",
entry: &filer_pb.Entry{
Name: "remote-file.txt",
Chunks: nil,
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
RemoteEntry: &filer_pb.RemoteEntry{
RemoteSize: 1024,
},
},
shouldBeRemote: true,
shouldBeDataError: false,
shouldBeEmpty: false,
},
{
name: "data integrity error (no chunks, no remote, has size)",
entry: &filer_pb.Entry{
Name: "corrupt-file.txt",
Chunks: nil,
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
RemoteEntry: nil,
},
shouldBeRemote: false,
shouldBeDataError: true,
shouldBeEmpty: false,
},
{
name: "empty local file (no chunks, no remote, size 0)",
entry: &filer_pb.Entry{
Name: "empty-file.txt",
Chunks: nil,
Attributes: &filer_pb.FuseAttributes{
FileSize: 0,
},
RemoteEntry: nil,
},
shouldBeRemote: false,
shouldBeDataError: false,
shouldBeEmpty: true,
},
{
name: "normal file with chunks",
entry: &filer_pb.Entry{
Name: "normal-file.txt",
Chunks: []*filer_pb.FileChunk{
{FileId: "1,abc123", Size: 1024, Offset: 0},
},
Attributes: &filer_pb.FuseAttributes{
FileSize: 1024,
},
RemoteEntry: nil,
},
shouldBeRemote: false,
shouldBeDataError: false,
shouldBeEmpty: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
chunks := tt.entry.GetChunks()
totalSize := int64(filer.FileSize(tt.entry))
if len(chunks) == 0 {
// This mirrors the logic in streamFromVolumeServers
if tt.entry.IsInRemoteOnly() {
assert.True(t, tt.shouldBeRemote,
"Entry should be detected as remote-only")
} else if totalSize > 0 && len(tt.entry.Content) == 0 {
assert.True(t, tt.shouldBeDataError,
"Entry should be detected as data integrity error")
} else {
assert.True(t, tt.shouldBeEmpty,
"Entry should be detected as empty")
}
} else {
assert.False(t, tt.shouldBeRemote, "Entry with chunks should not be remote-only")
assert.False(t, tt.shouldBeDataError, "Entry with chunks should not be data error")
assert.False(t, tt.shouldBeEmpty, "Entry with chunks should not be empty")
}
})
}
}
// TestVersionedRemoteObjectPathBuilding tests that the path building logic
// correctly handles versioned objects stored in .versions/ directory
func TestVersionedRemoteObjectPathBuilding(t *testing.T) {
bucketsPath := "/buckets"
tests := []struct {
name string
bucket string
object string
versionId string
expectedDir string
expectedName string
}{
{
name: "non-versioned object (empty versionId)",
bucket: "mybucket",
object: "myobject.txt",
versionId: "",
expectedDir: "/buckets/mybucket",
expectedName: "myobject.txt",
},
{
name: "null version",
bucket: "mybucket",
object: "myobject.txt",
versionId: "null",
expectedDir: "/buckets/mybucket",
expectedName: "myobject.txt",
},
{
name: "specific version",
bucket: "mybucket",
object: "myobject.txt",
versionId: "abc123",
expectedDir: "/buckets/mybucket/myobject.txt" + s3_constants.VersionsFolder,
expectedName: "v_abc123",
},
{
name: "nested object with version",
bucket: "mybucket",
object: "folder/subfolder/file.txt",
versionId: "xyz789",
expectedDir: "/buckets/mybucket/folder/subfolder/file.txt" + s3_constants.VersionsFolder,
expectedName: "v_xyz789",
},
{
name: "object with leading slash and version",
bucket: "mybucket",
object: "/path/to/file.txt",
versionId: "ver456",
expectedDir: "/buckets/mybucket/path/to/file.txt" + s3_constants.VersionsFolder,
expectedName: "v_ver456",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var dir, name string
// This mirrors the logic in cacheRemoteObjectForStreaming
if tt.versionId != "" && tt.versionId != "null" {
// Versioned object path
normalizedObject := strings.TrimPrefix(removeDuplicateSlashesTest(tt.object), "/")
dir = bucketsPath + "/" + tt.bucket + "/" + normalizedObject + s3_constants.VersionsFolder
name = "v_" + tt.versionId
} else {
// Non-versioned path (simplified - just for testing)
dir = bucketsPath + "/" + tt.bucket
normalizedObject := strings.TrimPrefix(removeDuplicateSlashesTest(tt.object), "/")
if idx := strings.LastIndex(normalizedObject, "/"); idx > 0 {
dir = dir + "/" + normalizedObject[:idx]
name = normalizedObject[idx+1:]
} else {
name = normalizedObject
}
}
assert.Equal(t, tt.expectedDir, dir, "Directory path should match")
assert.Equal(t, tt.expectedName, name, "Name should match")
})
}
}
// removeDuplicateSlashesTest is a test helper that mirrors production code
func removeDuplicateSlashesTest(s string) string {
for strings.Contains(s, "//") {
s = strings.ReplaceAll(s, "//", "/")
}
return s
}
Loading…
Cancel
Save