Browse Source

revert

Reverted the conditional versioning check to always check versioning status
Reverted the conditional SSE entry fetch to always fetch entry metadata
Reverted the conditional versioning check to always check versioning status
Reverted the conditional SSE entry fetch to always fetch entry metadata
pull/7480/head
chrislu 1 month ago
parent
commit
ec2986b9ee
  1. 119
      weed/s3api/s3api_object_handlers.go

119
weed/s3api/s3api_object_handlers.go

@ -268,23 +268,20 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
var versioningConfigured bool var versioningConfigured bool
var err error var err error
// Optimization: Only check versioning config if we need to handle versioning logic
// Most buckets don't have versioning, so we can skip this check and handle as non-versioned
// We'll detect if it's actually versioned when we try to access the object
needVersioningCheck := versionId != ""
if needVersioningCheck {
versioningConfigured, err = s3a.isVersioningConfigured(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return
}
glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
// Check if versioning is configured for the bucket (Enabled or Suspended)
// Note: We need to check this even if versionId is empty, because versioned buckets
// handle even "get latest version" requests differently (through .versions directory)
versioningConfigured, err = s3a.isVersioningConfigured(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return return
} }
glog.V(1).Infof("GetObject: bucket %s, object %s, versioningConfigured=%v, versionId=%s", bucket, object, versioningConfigured, versionId)
glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
} }
glog.V(1).Infof("GetObject: bucket %s, object %s, versioningConfigured=%v, versionId=%s", bucket, object, versioningConfigured, versionId)
if versioningConfigured { if versioningConfigured {
// Handle versioned GET - all versions are stored in .versions directory // Handle versioned GET - all versions are stored in .versions directory
@ -356,29 +353,30 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
originalRangeHeader := r.Header.Get("Range") originalRangeHeader := r.Header.Get("Range")
var sseObject = false var sseObject = false
// Optimization: Only fetch entry metadata if we need it for SSE or have it from conditional checks
// Most requests don't use SSE, so we can skip this metadata fetch entirely
// Optimization: Reuse already-fetched entry to avoid redundant metadata fetches
if versioningConfigured { if versioningConfigured {
// For versioned objects, reuse the already-fetched entry // For versioned objects, reuse the already-fetched entry
objectEntryForSSE = entry objectEntryForSSE = entry
} else if result.Entry != nil {
// Reuse entry fetched during conditional header check (optimization)
objectEntryForSSE = result.Entry
glog.V(3).Infof("GetObjectHandler: Reusing entry from conditional header check for %s/%s", bucket, object)
} else if originalRangeHeader != "" {
// Only fetch entry if we have a Range request (to check for SSE)
// Non-range requests can skip this metadata fetch entirely for better performance
var fetchErr error
objectEntryForSSE, fetchErr = s3a.fetchObjectEntry(bucket, object)
if fetchErr != nil {
glog.Errorf("GetObjectHandler: failed to get entry for SSE check: %v", fetchErr)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
if objectEntryForSSE == nil {
// Not found, return error early to avoid another lookup in proxyToFiler
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
} else {
// For non-versioned objects, try to reuse entry from conditional header check
if result.Entry != nil {
// Reuse entry fetched during conditional header check (optimization)
objectEntryForSSE = result.Entry
glog.V(3).Infof("GetObjectHandler: Reusing entry from conditional header check for %s/%s", bucket, object)
} else {
// No conditional headers were checked, fetch entry for SSE processing
var fetchErr error
objectEntryForSSE, fetchErr = s3a.fetchObjectEntry(bucket, object)
if fetchErr != nil {
glog.Errorf("GetObjectHandler: failed to get entry for SSE check: %v", fetchErr)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
if objectEntryForSSE == nil {
// Not found, return error early to avoid another lookup in proxyToFiler
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
} }
} }
@ -441,20 +439,18 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
var versioningConfigured bool var versioningConfigured bool
var err error var err error
// Optimization: Only check versioning config if we need to handle versioning logic
// Most buckets don't have versioning, so we can skip this check and handle as non-versioned
needVersioningCheck := versionId != ""
if needVersioningCheck {
versioningConfigured, err = s3a.isVersioningConfigured(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return
}
glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
// Check if versioning is configured for the bucket (Enabled or Suspended)
// Note: We need to check this even if versionId is empty, because versioned buckets
// handle even "get latest version" requests differently (through .versions directory)
versioningConfigured, err = s3a.isVersioningConfigured(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return return
} }
glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
} }
if versioningConfigured { if versioningConfigured {
@ -521,18 +517,33 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
destUrl = s3a.toFilerUrl(bucket, object) destUrl = s3a.toFilerUrl(bucket, object)
} }
// Optimization: Only fetch entry metadata if we need it for SSE or have it from conditional checks
// For HEAD requests, we only need entry metadata if we're dealing with SSE objects
// Fetch the correct entry for SSE processing (respects versionId)
// For versioned objects, reuse already-fetched entry; for non-versioned, try to reuse from conditional check
var objectEntryForSSE *filer_pb.Entry var objectEntryForSSE *filer_pb.Entry
if versioningConfigured { if versioningConfigured {
objectEntryForSSE = entry objectEntryForSSE = entry
} else if result.Entry != nil {
// Reuse entry fetched during conditional header check (optimization)
objectEntryForSSE = result.Entry
glog.V(3).Infof("HeadObjectHandler: Reusing entry from conditional header check for %s/%s", bucket, object)
} else {
// For non-versioned objects, try to reuse entry from conditional header check
if result.Entry != nil {
// Reuse entry fetched during conditional header check (optimization)
objectEntryForSSE = result.Entry
glog.V(3).Infof("HeadObjectHandler: Reusing entry from conditional header check for %s/%s", bucket, object)
} else {
// No conditional headers were checked, fetch entry for SSE processing
var fetchErr error
objectEntryForSSE, fetchErr = s3a.fetchObjectEntry(bucket, object)
if fetchErr != nil {
glog.Errorf("HeadObjectHandler: failed to get entry for SSE check: %v", fetchErr)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
if objectEntryForSSE == nil {
// Not found, return error early to avoid another lookup in proxyToFiler
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
}
} }
// Note: For HEAD requests without versioning or conditional headers, we can skip the entry fetch entirely
// The proxyToFiler will handle the request directly, which is more efficient
s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) { s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
// Handle SSE validation (both SSE-C and SSE-KMS) for HEAD requests // Handle SSE validation (both SSE-C and SSE-KMS) for HEAD requests

Loading…
Cancel
Save