From 8edb1e9641fa11be537dbbfeb9358338345948fa Mon Sep 17 00:00:00 2001 From: chrislu Date: Sun, 16 Nov 2025 13:11:52 -0800 Subject: [PATCH] handle retry if not found in .versions folder and should read the normal object --- weed/s3api/s3api_object_handlers.go | 178 +++++++++++++++++++++------- 1 file changed, 137 insertions(+), 41 deletions(-) diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 4a7f827c4..61b0a9fba 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -327,14 +327,14 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return } - glog.V(1).Infof("GetObject: bucket %s, object %s, versioningConfigured=%v, versionId=%s", bucket, object, versioningConfigured, versionId) + glog.V(0).Infof("GetObject: bucket %s, object %s, versioningConfigured=%v, versionId=%s", bucket, object, versioningConfigured, versionId) if versioningConfigured { - // Handle versioned GET - all versions are stored in .versions directory + // Handle versioned GET - check if specific version requested var targetVersionId string if versionId != "" { - // Request for specific version + // Request for specific version - must look in .versions directory glog.V(2).Infof("GetObject: requesting specific version %s for %s%s", versionId, bucket, object) entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId) if err != nil { @@ -350,28 +350,72 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) } targetVersionId = versionId } else { - // Request for latest version - glog.V(1).Infof("GetObject: requesting latest version for %s%s", bucket, object) - entry, err = s3a.getLatestObjectVersion(bucket, object) - if err != nil { - glog.Errorf("GetObject: Failed to get latest version for %s%s: %v", bucket, object, err) - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) - return + // Request for latest version - OPTIMIZATION for suspended versioning: + // For suspended versioning, new objects are stored at regular path with version ID "null". + // Check regular path FIRST to avoid 12-second retry delay on .versions directory. + glog.V(0).Infof("GetObject: requesting latest version for %s%s, checking regular path first", bucket, object) + bucketDir := s3a.option.BucketsPath + "/" + bucket + normalizedObject := removeDuplicateSlashes(object) + regularEntry, regularErr := s3a.getEntry(bucketDir, normalizedObject) + + if regularErr == nil && regularEntry != nil { + // Found object at regular path - check if it's a null version or pre-versioning object + hasNullVersion := false + if regularEntry.Extended != nil { + if versionIdBytes, exists := regularEntry.Extended[s3_constants.ExtVersionIdKey]; exists { + versionIdStr := string(versionIdBytes) + if versionIdStr == "null" { + hasNullVersion = true + targetVersionId = "null" + } + } + } + + if hasNullVersion || regularEntry.Extended == nil || regularEntry.Extended[s3_constants.ExtVersionIdKey] == nil { + // This is either a null version (suspended) or pre-versioning object + // Use it directly instead of checking .versions + glog.V(0).Infof("GetObject: found null/pre-versioning object at regular path for %s%s", bucket, object) + entry = regularEntry + if targetVersionId == "" { + targetVersionId = "null" + } + } else { + // Has a real version ID, must be in .versions - fall through to getLatestObjectVersion + glog.V(0).Infof("GetObject: regular path object has version ID, checking .versions for %s%s", bucket, object) + entry, err = s3a.getLatestObjectVersion(bucket, object) + if err != nil { + glog.Errorf("GetObject: Failed to get latest version for %s%s: %v", bucket, object, err) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + } + } else { + // No object at regular path, check .versions directory + glog.V(0).Infof("GetObject: no object at regular path, checking .versions for %s%s", bucket, object) + entry, err = s3a.getLatestObjectVersion(bucket, object) + if err != nil { + glog.Errorf("GetObject: Failed to get latest version for %s%s: %v", bucket, object, err) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } } // Safety check: entry must be valid after successful retrieval if entry == nil { - glog.Errorf("GetObject: getLatestObjectVersion returned nil entry without error for %s%s", bucket, object) + glog.Errorf("GetObject: entry is nil after versioned lookup for %s%s", bucket, object) s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) return } - if entry.Extended != nil { - if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists { - targetVersionId = string(versionIdBytes) - } - } - // If no version ID found in entry, this is a pre-versioning object + // Extract version ID if not already set if targetVersionId == "" { - targetVersionId = "null" + if entry.Extended != nil { + if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists { + targetVersionId = string(versionIdBytes) + } + } + // If no version ID found in entry, this is a pre-versioning object + if targetVersionId == "" { + targetVersionId = "null" + } } } @@ -631,6 +675,7 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R // For small files stored inline in entry.Content if len(entry.Content) > 0 && totalSize == int64(len(entry.Content)) { + glog.V(0).Infof("streamFromVolumeServers: streaming %d bytes from entry.Content", len(entry.Content)) if isRangeRequest { _, err := w.Write(entry.Content[offset : offset+size]) return err @@ -641,12 +686,19 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R // Get chunks chunks := entry.GetChunks() + glog.V(0).Infof("streamFromVolumeServers: entry has %d chunks, totalSize=%d, len(entry.Content)=%d", len(chunks), totalSize, len(entry.Content)) if len(chunks) == 0 { + // BUG FIX: If totalSize > 0 but no chunks and no content, this is a data integrity issue + if totalSize > 0 && len(entry.Content) == 0 { + glog.Errorf("streamFromVolumeServers: Data integrity error - entry reports size %d but has no content or chunks", totalSize) + return fmt.Errorf("data integrity error: size %d reported but no content available", totalSize) + } // Empty object - need to set headers before writing status if !isRangeRequest { // Headers were already set by setResponseHeaders above w.WriteHeader(http.StatusOK) } + glog.V(0).Infof("streamFromVolumeServers: empty object (totalSize=%d), returning", totalSize) return nil } @@ -812,13 +864,13 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r switch sseType { case s3_constants.SSETypeC: customerKey := decryptionKey.(*SSECustomerKey) - + fmt.Printf("[GET DEBUG] SSE-C decryption - KeyMD5=%s, entry has %d chunks\n", customerKey.KeyMD5, len(entry.GetChunks())) for i, chunk := range entry.GetChunks() { - fmt.Printf("[GET DEBUG] Chunk[%d]: offset=%d, size=%d, sseType=%v, hasMetadata=%v\n", + fmt.Printf("[GET DEBUG] Chunk[%d]: offset=%d, size=%d, sseType=%v, hasMetadata=%v\n", i, chunk.Offset, chunk.Size, chunk.SseType, len(chunk.SseMetadata) > 0) } - + // Check if this is a multipart object (multiple chunks with SSE-C metadata) isMultipartSSEC := false ssecChunks := 0 @@ -829,7 +881,7 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r } isMultipartSSEC = ssecChunks > 1 fmt.Printf("[GET DEBUG] isMultipartSSEC=%v, ssecChunks=%d\n", isMultipartSSEC, ssecChunks) - + if isMultipartSSEC { // Handle multipart SSE-C objects - each chunk needs independent decryption with its own IV decryptedReader, err = s3a.createMultipartSSECDecryptedReaderDirect(encryptedReader, customerKey, entry) @@ -845,7 +897,7 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r } case s3_constants.SSETypeKMS: sseKMSKey := decryptionKey.(*SSEKMSKey) - + // Check if this is a multipart object (multiple chunks with SSE-KMS metadata) isMultipartSSEKMS := false ssekmsChunks := 0 @@ -856,7 +908,7 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r } isMultipartSSEKMS = ssekmsChunks > 1 fmt.Printf("[GET DEBUG] SSE-KMS: isMultipart=%v, chunks=%d\n", isMultipartSSEKMS, ssekmsChunks) - + if isMultipartSSEKMS { // Handle multipart SSE-KMS objects - each chunk needs independent decryption decryptedReader, err = s3a.createMultipartSSEKMSDecryptedReaderDirect(encryptedReader, entry) @@ -1226,28 +1278,72 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request } targetVersionId = versionId } else { - // Request for latest version - glog.V(2).Infof("HeadObject: requesting latest version for %s%s", bucket, object) - entry, err = s3a.getLatestObjectVersion(bucket, object) - if err != nil { - glog.Errorf("Failed to get latest version: %v", err) - s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) - return + // Request for latest version - OPTIMIZATION for suspended versioning: + // For suspended versioning, new objects are stored at regular path with version ID "null". + // Check regular path FIRST to avoid 12-second retry delay on .versions directory. + glog.V(0).Infof("HeadObject: requesting latest version for %s%s, checking regular path first", bucket, object) + bucketDir := s3a.option.BucketsPath + "/" + bucket + normalizedObject := removeDuplicateSlashes(object) + regularEntry, regularErr := s3a.getEntry(bucketDir, normalizedObject) + + if regularErr == nil && regularEntry != nil { + // Found object at regular path - check if it's a null version or pre-versioning object + hasNullVersion := false + if regularEntry.Extended != nil { + if versionIdBytes, exists := regularEntry.Extended[s3_constants.ExtVersionIdKey]; exists { + versionIdStr := string(versionIdBytes) + if versionIdStr == "null" { + hasNullVersion = true + targetVersionId = "null" + } + } + } + + if hasNullVersion || regularEntry.Extended == nil || regularEntry.Extended[s3_constants.ExtVersionIdKey] == nil { + // This is either a null version (suspended) or pre-versioning object + // Use it directly instead of checking .versions + glog.V(0).Infof("HeadObject: found null/pre-versioning object at regular path for %s%s", bucket, object) + entry = regularEntry + if targetVersionId == "" { + targetVersionId = "null" + } + } else { + // Has a real version ID, must be in .versions - fall through to getLatestObjectVersion + glog.V(0).Infof("HeadObject: regular path object has version ID, checking .versions for %s%s", bucket, object) + entry, err = s3a.getLatestObjectVersion(bucket, object) + if err != nil { + glog.Errorf("HeadObject: Failed to get latest version for %s%s: %v", bucket, object, err) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + } + } else { + // No object at regular path, check .versions directory + glog.V(0).Infof("HeadObject: no object at regular path, checking .versions for %s%s", bucket, object) + entry, err = s3a.getLatestObjectVersion(bucket, object) + if err != nil { + glog.Errorf("HeadObject: Failed to get latest version for %s%s: %v", bucket, object, err) + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } } // Safety check: entry must be valid after successful retrieval if entry == nil { - glog.Errorf("HeadObject: getLatestObjectVersion returned nil entry without error for %s%s", bucket, object) + glog.Errorf("HeadObject: entry is nil after versioned lookup for %s%s", bucket, object) s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) return } - if entry.Extended != nil { - if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists { - targetVersionId = string(versionIdBytes) - } - } - // If no version ID found in entry, this is a pre-versioning object + // Extract version ID if not already set if targetVersionId == "" { - targetVersionId = "null" + if entry.Extended != nil { + if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists { + targetVersionId = string(versionIdBytes) + } + } + // If no version ID found in entry, this is a pre-versioning object + if targetVersionId == "" { + targetVersionId = "null" + } } } @@ -2219,7 +2315,7 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReaderDirect(encryptedStream return nil, fmt.Errorf("failed to decode IV for SSE-C chunk %s: %v", chunk.GetFileIdString(), err) } - fmt.Printf("[GET DEBUG] Decrypting chunk %s with IV=%x, PartOffset=%d\n", + fmt.Printf("[GET DEBUG] Decrypting chunk %s with IV=%x, PartOffset=%d\n", chunk.GetFileIdString(), chunkIV[:8], ssecMetadata.PartOffset) // Note: For multipart SSE-C, each part was encrypted with offset=0 @@ -2287,7 +2383,7 @@ func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReaderDirect(encryptedStre return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata for chunk %s: %v", chunk.GetFileIdString(), err) } - fmt.Printf("[GET DEBUG] Decrypting SSE-KMS chunk %s with KeyID=%s\n", + fmt.Printf("[GET DEBUG] Decrypting SSE-KMS chunk %s with KeyID=%s\n", chunk.GetFileIdString(), kmsKey.KeyID) // Create decrypted reader for this chunk