Browse Source

handle retry if not found in .versions folder and should read the normal object

pull/7481/head
chrislu 3 weeks ago
parent
commit
8edb1e9641
  1. 178
      weed/s3api/s3api_object_handlers.go

178
weed/s3api/s3api_object_handlers.go

@ -327,14 +327,14 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
glog.V(1).Infof("GetObject: bucket %s, object %s, versioningConfigured=%v, versionId=%s", bucket, object, versioningConfigured, versionId)
glog.V(0).Infof("GetObject: bucket %s, object %s, versioningConfigured=%v, versionId=%s", bucket, object, versioningConfigured, versionId)
if versioningConfigured {
// Handle versioned GET - all versions are stored in .versions directory
// Handle versioned GET - check if specific version requested
var targetVersionId string
if versionId != "" {
// Request for specific version
// Request for specific version - must look in .versions directory
glog.V(2).Infof("GetObject: requesting specific version %s for %s%s", versionId, bucket, object)
entry, err = s3a.getSpecificObjectVersion(bucket, object, versionId)
if err != nil {
@ -350,28 +350,72 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
}
targetVersionId = versionId
} else {
// Request for latest version
glog.V(1).Infof("GetObject: requesting latest version for %s%s", bucket, object)
entry, err = s3a.getLatestObjectVersion(bucket, object)
if err != nil {
glog.Errorf("GetObject: Failed to get latest version for %s%s: %v", bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
// Request for latest version - OPTIMIZATION for suspended versioning:
// For suspended versioning, new objects are stored at regular path with version ID "null".
// Check regular path FIRST to avoid 12-second retry delay on .versions directory.
glog.V(0).Infof("GetObject: requesting latest version for %s%s, checking regular path first", bucket, object)
bucketDir := s3a.option.BucketsPath + "/" + bucket
normalizedObject := removeDuplicateSlashes(object)
regularEntry, regularErr := s3a.getEntry(bucketDir, normalizedObject)
if regularErr == nil && regularEntry != nil {
// Found object at regular path - check if it's a null version or pre-versioning object
hasNullVersion := false
if regularEntry.Extended != nil {
if versionIdBytes, exists := regularEntry.Extended[s3_constants.ExtVersionIdKey]; exists {
versionIdStr := string(versionIdBytes)
if versionIdStr == "null" {
hasNullVersion = true
targetVersionId = "null"
}
}
}
if hasNullVersion || regularEntry.Extended == nil || regularEntry.Extended[s3_constants.ExtVersionIdKey] == nil {
// This is either a null version (suspended) or pre-versioning object
// Use it directly instead of checking .versions
glog.V(0).Infof("GetObject: found null/pre-versioning object at regular path for %s%s", bucket, object)
entry = regularEntry
if targetVersionId == "" {
targetVersionId = "null"
}
} else {
// Has a real version ID, must be in .versions - fall through to getLatestObjectVersion
glog.V(0).Infof("GetObject: regular path object has version ID, checking .versions for %s%s", bucket, object)
entry, err = s3a.getLatestObjectVersion(bucket, object)
if err != nil {
glog.Errorf("GetObject: Failed to get latest version for %s%s: %v", bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
}
} else {
// No object at regular path, check .versions directory
glog.V(0).Infof("GetObject: no object at regular path, checking .versions for %s%s", bucket, object)
entry, err = s3a.getLatestObjectVersion(bucket, object)
if err != nil {
glog.Errorf("GetObject: Failed to get latest version for %s%s: %v", bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
}
// Safety check: entry must be valid after successful retrieval
if entry == nil {
glog.Errorf("GetObject: getLatestObjectVersion returned nil entry without error for %s%s", bucket, object)
glog.Errorf("GetObject: entry is nil after versioned lookup for %s%s", bucket, object)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
if entry.Extended != nil {
if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
targetVersionId = string(versionIdBytes)
}
}
// If no version ID found in entry, this is a pre-versioning object
// Extract version ID if not already set
if targetVersionId == "" {
targetVersionId = "null"
if entry.Extended != nil {
if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
targetVersionId = string(versionIdBytes)
}
}
// If no version ID found in entry, this is a pre-versioning object
if targetVersionId == "" {
targetVersionId = "null"
}
}
}
@ -631,6 +675,7 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R
// For small files stored inline in entry.Content
if len(entry.Content) > 0 && totalSize == int64(len(entry.Content)) {
glog.V(0).Infof("streamFromVolumeServers: streaming %d bytes from entry.Content", len(entry.Content))
if isRangeRequest {
_, err := w.Write(entry.Content[offset : offset+size])
return err
@ -641,12 +686,19 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R
// Get chunks
chunks := entry.GetChunks()
glog.V(0).Infof("streamFromVolumeServers: entry has %d chunks, totalSize=%d, len(entry.Content)=%d", len(chunks), totalSize, len(entry.Content))
if len(chunks) == 0 {
// BUG FIX: If totalSize > 0 but no chunks and no content, this is a data integrity issue
if totalSize > 0 && len(entry.Content) == 0 {
glog.Errorf("streamFromVolumeServers: Data integrity error - entry reports size %d but has no content or chunks", totalSize)
return fmt.Errorf("data integrity error: size %d reported but no content available", totalSize)
}
// Empty object - need to set headers before writing status
if !isRangeRequest {
// Headers were already set by setResponseHeaders above
w.WriteHeader(http.StatusOK)
}
glog.V(0).Infof("streamFromVolumeServers: empty object (totalSize=%d), returning", totalSize)
return nil
}
@ -812,13 +864,13 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
switch sseType {
case s3_constants.SSETypeC:
customerKey := decryptionKey.(*SSECustomerKey)
fmt.Printf("[GET DEBUG] SSE-C decryption - KeyMD5=%s, entry has %d chunks\n", customerKey.KeyMD5, len(entry.GetChunks()))
for i, chunk := range entry.GetChunks() {
fmt.Printf("[GET DEBUG] Chunk[%d]: offset=%d, size=%d, sseType=%v, hasMetadata=%v\n",
fmt.Printf("[GET DEBUG] Chunk[%d]: offset=%d, size=%d, sseType=%v, hasMetadata=%v\n",
i, chunk.Offset, chunk.Size, chunk.SseType, len(chunk.SseMetadata) > 0)
}
// Check if this is a multipart object (multiple chunks with SSE-C metadata)
isMultipartSSEC := false
ssecChunks := 0
@ -829,7 +881,7 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
}
isMultipartSSEC = ssecChunks > 1
fmt.Printf("[GET DEBUG] isMultipartSSEC=%v, ssecChunks=%d\n", isMultipartSSEC, ssecChunks)
if isMultipartSSEC {
// Handle multipart SSE-C objects - each chunk needs independent decryption with its own IV
decryptedReader, err = s3a.createMultipartSSECDecryptedReaderDirect(encryptedReader, customerKey, entry)
@ -845,7 +897,7 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
}
case s3_constants.SSETypeKMS:
sseKMSKey := decryptionKey.(*SSEKMSKey)
// Check if this is a multipart object (multiple chunks with SSE-KMS metadata)
isMultipartSSEKMS := false
ssekmsChunks := 0
@ -856,7 +908,7 @@ func (s3a *S3ApiServer) streamFromVolumeServersWithSSE(w http.ResponseWriter, r
}
isMultipartSSEKMS = ssekmsChunks > 1
fmt.Printf("[GET DEBUG] SSE-KMS: isMultipart=%v, chunks=%d\n", isMultipartSSEKMS, ssekmsChunks)
if isMultipartSSEKMS {
// Handle multipart SSE-KMS objects - each chunk needs independent decryption
decryptedReader, err = s3a.createMultipartSSEKMSDecryptedReaderDirect(encryptedReader, entry)
@ -1226,28 +1278,72 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
}
targetVersionId = versionId
} else {
// Request for latest version
glog.V(2).Infof("HeadObject: requesting latest version for %s%s", bucket, object)
entry, err = s3a.getLatestObjectVersion(bucket, object)
if err != nil {
glog.Errorf("Failed to get latest version: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
// Request for latest version - OPTIMIZATION for suspended versioning:
// For suspended versioning, new objects are stored at regular path with version ID "null".
// Check regular path FIRST to avoid 12-second retry delay on .versions directory.
glog.V(0).Infof("HeadObject: requesting latest version for %s%s, checking regular path first", bucket, object)
bucketDir := s3a.option.BucketsPath + "/" + bucket
normalizedObject := removeDuplicateSlashes(object)
regularEntry, regularErr := s3a.getEntry(bucketDir, normalizedObject)
if regularErr == nil && regularEntry != nil {
// Found object at regular path - check if it's a null version or pre-versioning object
hasNullVersion := false
if regularEntry.Extended != nil {
if versionIdBytes, exists := regularEntry.Extended[s3_constants.ExtVersionIdKey]; exists {
versionIdStr := string(versionIdBytes)
if versionIdStr == "null" {
hasNullVersion = true
targetVersionId = "null"
}
}
}
if hasNullVersion || regularEntry.Extended == nil || regularEntry.Extended[s3_constants.ExtVersionIdKey] == nil {
// This is either a null version (suspended) or pre-versioning object
// Use it directly instead of checking .versions
glog.V(0).Infof("HeadObject: found null/pre-versioning object at regular path for %s%s", bucket, object)
entry = regularEntry
if targetVersionId == "" {
targetVersionId = "null"
}
} else {
// Has a real version ID, must be in .versions - fall through to getLatestObjectVersion
glog.V(0).Infof("HeadObject: regular path object has version ID, checking .versions for %s%s", bucket, object)
entry, err = s3a.getLatestObjectVersion(bucket, object)
if err != nil {
glog.Errorf("HeadObject: Failed to get latest version for %s%s: %v", bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
}
} else {
// No object at regular path, check .versions directory
glog.V(0).Infof("HeadObject: no object at regular path, checking .versions for %s%s", bucket, object)
entry, err = s3a.getLatestObjectVersion(bucket, object)
if err != nil {
glog.Errorf("HeadObject: Failed to get latest version for %s%s: %v", bucket, object, err)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
}
// Safety check: entry must be valid after successful retrieval
if entry == nil {
glog.Errorf("HeadObject: getLatestObjectVersion returned nil entry without error for %s%s", bucket, object)
glog.Errorf("HeadObject: entry is nil after versioned lookup for %s%s", bucket, object)
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
if entry.Extended != nil {
if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
targetVersionId = string(versionIdBytes)
}
}
// If no version ID found in entry, this is a pre-versioning object
// Extract version ID if not already set
if targetVersionId == "" {
targetVersionId = "null"
if entry.Extended != nil {
if versionIdBytes, exists := entry.Extended[s3_constants.ExtVersionIdKey]; exists {
targetVersionId = string(versionIdBytes)
}
}
// If no version ID found in entry, this is a pre-versioning object
if targetVersionId == "" {
targetVersionId = "null"
}
}
}
@ -2219,7 +2315,7 @@ func (s3a *S3ApiServer) createMultipartSSECDecryptedReaderDirect(encryptedStream
return nil, fmt.Errorf("failed to decode IV for SSE-C chunk %s: %v", chunk.GetFileIdString(), err)
}
fmt.Printf("[GET DEBUG] Decrypting chunk %s with IV=%x, PartOffset=%d\n",
fmt.Printf("[GET DEBUG] Decrypting chunk %s with IV=%x, PartOffset=%d\n",
chunk.GetFileIdString(), chunkIV[:8], ssecMetadata.PartOffset)
// Note: For multipart SSE-C, each part was encrypted with offset=0
@ -2287,7 +2383,7 @@ func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReaderDirect(encryptedStre
return nil, fmt.Errorf("failed to deserialize SSE-KMS metadata for chunk %s: %v", chunk.GetFileIdString(), err)
}
fmt.Printf("[GET DEBUG] Decrypting SSE-KMS chunk %s with KeyID=%s\n",
fmt.Printf("[GET DEBUG] Decrypting SSE-KMS chunk %s with KeyID=%s\n",
chunk.GetFileIdString(), kmsKey.KeyID)
// Create decrypted reader for this chunk

Loading…
Cancel
Save