Browse Source

clean up

pull/7481/head
chrislu 2 weeks ago
parent
commit
d9cbc07b2a
  1. 145
      weed/s3api/s3api_object_handlers.go
  2. 59
      weed/s3api/s3api_object_handlers_put.go
  3. 72
      weed/s3api/s3api_object_handlers_test.go

145
weed/s3api/s3api_object_handlers.go

@ -25,8 +25,8 @@ import (
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http" util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/glog"
) )
@ -46,6 +46,81 @@ var corsHeaders = []string{
// Package-level to avoid per-call allocations in writeZeroBytes // Package-level to avoid per-call allocations in writeZeroBytes
var zeroBuf = make([]byte, 32*1024) var zeroBuf = make([]byte, 32*1024)
// adjustRangeForPart adjusts a client's Range header to absolute offsets within a part.
// Parameters:
// - partStartOffset: the absolute start offset of the part in the object
// - partEndOffset: the absolute end offset of the part in the object
// - clientRangeHeader: the Range header value from the client (e.g., "bytes=0-99")
//
// Returns:
// - adjustedStart: the adjusted absolute start offset
// - adjustedEnd: the adjusted absolute end offset
// - error: nil on success, error if the range is invalid
func adjustRangeForPart(partStartOffset, partEndOffset int64, clientRangeHeader string) (adjustedStart, adjustedEnd int64, err error) {
// If no range header, return the full part
if clientRangeHeader == "" || !strings.HasPrefix(clientRangeHeader, "bytes=") {
return partStartOffset, partEndOffset, nil
}
// Parse client's range request (relative to the part)
rangeSpec := clientRangeHeader[6:] // Remove "bytes=" prefix
parts := strings.Split(rangeSpec, "-")
if len(parts) != 2 {
return 0, 0, fmt.Errorf("invalid range format")
}
partSize := partEndOffset - partStartOffset + 1
var clientStart, clientEnd int64
// Parse start offset
if parts[0] != "" {
clientStart, err = strconv.ParseInt(parts[0], 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("invalid range start: %w", err)
}
}
// Parse end offset
if parts[1] != "" {
clientEnd, err = strconv.ParseInt(parts[1], 10, 64)
if err != nil {
return 0, 0, fmt.Errorf("invalid range end: %w", err)
}
} else {
// No end specified, read to end of part
clientEnd = partSize - 1
}
// Handle suffix-range (e.g., "bytes=-100" means last 100 bytes)
if parts[0] == "" {
// suffix-range: clientEnd is actually the suffix length
suffixLength := clientEnd
if suffixLength > partSize {
suffixLength = partSize
}
clientStart = partSize - suffixLength
clientEnd = partSize - 1
}
// Validate range is within part boundaries
if clientStart < 0 || clientStart >= partSize {
return 0, 0, fmt.Errorf("range start %d out of bounds for part size %d", clientStart, partSize)
}
if clientEnd >= partSize {
clientEnd = partSize - 1
}
if clientStart > clientEnd {
return 0, 0, fmt.Errorf("range start %d > end %d", clientStart, clientEnd)
}
// Adjust to absolute offsets in the object
adjustedStart = partStartOffset + clientStart
adjustedEnd = partStartOffset + clientEnd
return adjustedStart, adjustedEnd, nil
}
// StreamError is returned when streaming functions encounter errors. // StreamError is returned when streaming functions encounter errors.
// It tracks whether an HTTP response has already been written to prevent // It tracks whether an HTTP response has already been written to prevent
// double WriteHeader calls that would create malformed S3 error responses. // double WriteHeader calls that would create malformed S3 error responses.
@ -620,73 +695,17 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
// Check if client supplied a Range header - if so, apply it within the part's boundaries // Check if client supplied a Range header - if so, apply it within the part's boundaries
// S3 allows both partNumber and Range together, where Range applies within the selected part // S3 allows both partNumber and Range together, where Range applies within the selected part
clientRangeHeader := r.Header.Get("Range") clientRangeHeader := r.Header.Get("Range")
if clientRangeHeader != "" && strings.HasPrefix(clientRangeHeader, "bytes=") {
// Parse client's range request (relative to the part)
rangeSpec := clientRangeHeader[6:] // Remove "bytes=" prefix
parts := strings.Split(rangeSpec, "-")
if len(parts) == 2 {
partSize := endOffset - startOffset + 1
var clientStart, clientEnd int64
var parseErr error
// Parse start offset
if parts[0] != "" {
clientStart, parseErr = strconv.ParseInt(parts[0], 10, 64)
if parseErr != nil {
glog.Warningf("GetObject: Invalid Range start for part %d: %s", partNumber, parts[0])
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange)
return
}
}
// Parse end offset
if parts[1] != "" {
clientEnd, parseErr = strconv.ParseInt(parts[1], 10, 64)
if parseErr != nil {
glog.Warningf("GetObject: Invalid Range end for part %d: %s", partNumber, parts[1])
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange)
return
}
} else {
// No end specified, read to end of part
clientEnd = partSize - 1
}
// Handle suffix-range (e.g., "bytes=-100" means last 100 bytes)
if parts[0] == "" {
// suffix-range: clientEnd is actually the suffix length
suffixLength := clientEnd
if suffixLength > partSize {
suffixLength = partSize
}
clientStart = partSize - suffixLength
clientEnd = partSize - 1
}
// Validate range is within part boundaries
if clientStart < 0 || clientStart >= partSize {
glog.Warningf("GetObject: Range start %d out of bounds for part %d (size: %d)", clientStart, partNumber, partSize)
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange)
return
}
if clientEnd >= partSize {
clientEnd = partSize - 1
}
if clientStart > clientEnd {
glog.Warningf("GetObject: Invalid Range: start %d > end %d for part %d", clientStart, clientEnd, partNumber)
if clientRangeHeader != "" {
adjustedStart, adjustedEnd, rangeErr := adjustRangeForPart(startOffset, endOffset, clientRangeHeader)
if rangeErr != nil {
glog.Warningf("GetObject: Invalid Range for part %d: %v", partNumber, rangeErr)
s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange) s3err.WriteErrorResponse(w, r, s3err.ErrInvalidRange)
return return
} }
// Adjust to absolute offsets in the object
partStartOffset := startOffset
startOffset = partStartOffset + clientStart
endOffset = partStartOffset + clientEnd
startOffset = adjustedStart
endOffset = adjustedEnd
glog.V(3).Infof("GetObject: Client Range %s applied to part %d, adjusted to bytes=%d-%d", clientRangeHeader, partNumber, startOffset, endOffset) glog.V(3).Infof("GetObject: Client Range %s applied to part %d, adjusted to bytes=%d-%d", clientRangeHeader, partNumber, startOffset, endOffset)
} }
}
// Set Range header to read the requested bytes (full part or client-specified range within part) // Set Range header to read the requested bytes (full part or client-specified range within part)
rangeHeader := fmt.Sprintf("bytes=%d-%d", startOffset, endOffset) rangeHeader := fmt.Sprintf("bytes=%d-%d", startOffset, endOffset)

59
weed/s3api/s3api_object_handlers_put.go

@ -2,7 +2,6 @@ package s3api
import ( import (
"context" "context"
"crypto/md5"
"encoding/base64" "encoding/base64"
"encoding/json" "encoding/json"
"errors" "errors"
@ -680,15 +679,8 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
// Normalize object path to ensure consistency with toFilerUrl behavior // Normalize object path to ensure consistency with toFilerUrl behavior
normalizedObject := removeDuplicateSlashes(object) normalizedObject := removeDuplicateSlashes(object)
// Enable detailed logging for testobjbar
isTestObj := (normalizedObject == "testobjbar")
glog.V(3).Infof("putSuspendedVersioningObject: START bucket=%s, object=%s, normalized=%s, isTestObj=%v",
bucket, object, normalizedObject, isTestObj)
if isTestObj {
glog.V(3).Infof("=== TESTOBJBAR: putSuspendedVersioningObject START ===")
}
glog.V(3).Infof("putSuspendedVersioningObject: START bucket=%s, object=%s, normalized=%s",
bucket, object, normalizedObject)
bucketDir := s3a.option.BucketsPath + "/" + bucket bucketDir := s3a.option.BucketsPath + "/" + bucket
@ -726,8 +718,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
uploadUrl := s3a.toFilerUrl(bucket, normalizedObject) uploadUrl := s3a.toFilerUrl(bucket, normalizedObject)
hash := md5.New()
var body = io.TeeReader(dataReader, hash)
body := dataReader
if objectContentType == "" { if objectContentType == "" {
body = mimeDetect(r, body) body = mimeDetect(r, body)
} }
@ -738,10 +729,6 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
// Set version ID to "null" for suspended versioning // Set version ID to "null" for suspended versioning
r.Header.Set(s3_constants.ExtVersionIdKey, "null") r.Header.Set(s3_constants.ExtVersionIdKey, "null")
if isTestObj {
glog.V(3).Infof("=== TESTOBJBAR: set version header before putToFiler, r.Header[%s]=%s ===",
s3_constants.ExtVersionIdKey, r.Header.Get(s3_constants.ExtVersionIdKey))
}
// Extract and set object lock metadata as headers // Extract and set object lock metadata as headers
// This handles retention mode, retention date, and legal hold // This handles retention mode, retention date, and legal hold
@ -792,44 +779,11 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
} }
// Upload the file using putToFiler - this will create the file with version metadata // Upload the file using putToFiler - this will create the file with version metadata
if isTestObj {
glog.V(3).Infof("=== TESTOBJBAR: calling putToFiler ===")
}
etag, errCode, sseMetadata = s3a.putToFiler(r, uploadUrl, body, bucket, 1) etag, errCode, sseMetadata = s3a.putToFiler(r, uploadUrl, body, bucket, 1)
if errCode != s3err.ErrNone { if errCode != s3err.ErrNone {
glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode) glog.Errorf("putSuspendedVersioningObject: failed to upload object: %v", errCode)
return "", errCode, SSEResponseMetadata{} return "", errCode, SSEResponseMetadata{}
} }
if isTestObj {
glog.V(3).Infof("=== TESTOBJBAR: putToFiler completed, etag=%s ===", etag)
}
// Verify the metadata was set correctly during file creation
if isTestObj {
// Read back the entry to verify
maxRetries := 3
for attempt := 1; attempt <= maxRetries; attempt++ {
verifyEntry, verifyErr := s3a.getEntry(bucketDir, normalizedObject)
if verifyErr == nil {
glog.V(3).Infof("=== TESTOBJBAR: verify attempt %d, entry.Extended=%v ===", attempt, verifyEntry.Extended)
if verifyEntry.Extended != nil {
if versionIdBytes, ok := verifyEntry.Extended[s3_constants.ExtVersionIdKey]; ok {
glog.V(3).Infof("=== TESTOBJBAR: verification SUCCESSFUL, version=%s ===", string(versionIdBytes))
} else {
glog.V(3).Infof("=== TESTOBJBAR: verification FAILED, ExtVersionIdKey not found ===")
}
} else {
glog.V(3).Infof("=== TESTOBJBAR: verification FAILED, Extended is nil ===")
}
break
} else {
glog.V(3).Infof("=== TESTOBJBAR: getEntry failed on attempt %d: %v ===", attempt, verifyErr)
}
if attempt < maxRetries {
time.Sleep(time.Millisecond * 10)
}
}
}
// Update all existing versions/delete markers to set IsLatest=false since "null" is now latest // Update all existing versions/delete markers to set IsLatest=false since "null" is now latest
err = s3a.updateIsLatestFlagsForSuspendedVersioning(bucket, normalizedObject) err = s3a.updateIsLatestFlagsForSuspendedVersioning(bucket, normalizedObject)
@ -839,9 +793,7 @@ func (s3a *S3ApiServer) putSuspendedVersioningObject(r *http.Request, bucket, ob
} }
glog.V(2).Infof("putSuspendedVersioningObject: successfully created null version for %s/%s", bucket, object) glog.V(2).Infof("putSuspendedVersioningObject: successfully created null version for %s/%s", bucket, object)
if isTestObj {
glog.V(3).Infof("=== TESTOBJBAR: putSuspendedVersioningObject COMPLETED ===")
}
return etag, s3err.ErrNone, sseMetadata return etag, s3err.ErrNone, sseMetadata
} }
@ -942,8 +894,7 @@ func (s3a *S3ApiServer) putVersionedObject(r *http.Request, bucket, object strin
return "", "", s3err.ErrInternalError, SSEResponseMetadata{} return "", "", s3err.ErrInternalError, SSEResponseMetadata{}
} }
hash := md5.New()
var body = io.TeeReader(dataReader, hash)
body := dataReader
if objectContentType == "" { if objectContentType == "" {
body = mimeDetect(r, body) body = mimeDetect(r, body)
} }

72
weed/s3api/s3api_object_handlers_test.go

@ -1,8 +1,6 @@
package s3api package s3api
import ( import (
"strconv"
"strings"
"testing" "testing"
"time" "time"
@ -245,75 +243,13 @@ func TestPartNumberWithRangeHeader(t *testing.T) {
for _, tt := range tests { for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) { t.Run(tt.name, func(t *testing.T) {
// Simulate the range adjustment logic from GetObjectHandler
startOffset := tt.partStartOffset
endOffset := tt.partEndOffset
hasError := false
if tt.clientRangeHeader != "" && strings.HasPrefix(tt.clientRangeHeader, "bytes=") {
rangeSpec := tt.clientRangeHeader[6:] // Remove "bytes=" prefix
parts := strings.Split(rangeSpec, "-")
if len(parts) == 2 {
partSize := endOffset - startOffset + 1
var clientStart, clientEnd int64
var parseErr error
// Parse start offset
if parts[0] != "" {
clientStart, parseErr = strconv.ParseInt(parts[0], 10, 64)
if parseErr != nil {
hasError = true
}
}
// Parse end offset
if parts[1] != "" {
clientEnd, parseErr = strconv.ParseInt(parts[1], 10, 64)
if parseErr != nil {
hasError = true
}
} else {
// No end specified, read to end of part
clientEnd = partSize - 1
}
// Handle suffix-range (e.g., "bytes=-100" means last 100 bytes)
if parts[0] == "" && !hasError {
// suffix-range: clientEnd is actually the suffix length
suffixLength := clientEnd
if suffixLength > partSize {
suffixLength = partSize
}
clientStart = partSize - suffixLength
clientEnd = partSize - 1
}
// Validate range is within part boundaries
if !hasError {
if clientStart < 0 || clientStart >= partSize {
hasError = true
} else if clientEnd >= partSize {
clientEnd = partSize - 1
}
if clientStart > clientEnd {
hasError = true
}
if !hasError {
// Adjust to absolute offsets in the object
partStartOffset := startOffset
startOffset = partStartOffset + clientStart
endOffset = partStartOffset + clientEnd
}
}
}
}
// Test the actual range adjustment logic from GetObjectHandler
startOffset, endOffset, err := adjustRangeForPart(tt.partStartOffset, tt.partEndOffset, tt.clientRangeHeader)
if tt.expectError { if tt.expectError {
assert.True(t, hasError, "Expected error for range %s", tt.clientRangeHeader)
assert.Error(t, err, "Expected error for range %s", tt.clientRangeHeader)
} else { } else {
assert.False(t, hasError, "Unexpected error for range %s", tt.clientRangeHeader)
assert.NoError(t, err, "Unexpected error for range %s: %v", tt.clientRangeHeader, err)
assert.Equal(t, tt.expectedStart, startOffset, "Start offset mismatch") assert.Equal(t, tt.expectedStart, startOffset, "Start offset mismatch")
assert.Equal(t, tt.expectedEnd, endOffset, "End offset mismatch") assert.Equal(t, tt.expectedEnd, endOffset, "End offset mismatch")
} }

Loading…
Cancel
Save