Browse Source

s3 object lookup optimization

* Only check versioning configuration if client requests
* Consolidate SSE Entry Lookups
pull/7463/head
chrislu 3 weeks ago
parent
commit
b6678990fc
  1. 126
      weed/s3api/s3api_object_handlers.go

126
weed/s3api/s3api_object_handlers.go

@ -263,16 +263,28 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
// Check for specific version ID in query parameters // Check for specific version ID in query parameters
versionId := r.URL.Query().Get("versionId") versionId := r.URL.Query().Get("versionId")
// Check if versioning is configured for the bucket (Enabled or Suspended)
versioningConfigured, err := s3a.isVersioningConfigured(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
// Only check versioning configuration if client requests it or if we might need it
// This avoids unnecessary bucket config lookups for common non-versioned read requests
var versioningConfigured bool
var err error
// Fast path: skip versioning check if no versionId parameter (most common case)
if versionId != "" {
versioningConfigured, err = s3a.isVersioningConfigured(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return
}
glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
if !versioningConfigured {
// Client requested a version but versioning not enabled - return NoSuchKey per AWS S3 behavior
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return return
} }
glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
} }
glog.V(1).Infof("GetObject: bucket %s, object %s, versioningConfigured=%v, versionId=%s", bucket, object, versioningConfigured, versionId) glog.V(1).Infof("GetObject: bucket %s, object %s, versioningConfigured=%v, versionId=%s", bucket, object, versioningConfigured, versionId)
@ -344,33 +356,40 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
destUrl = s3a.toFilerUrl(bucket, object) destUrl = s3a.toFilerUrl(bucket, object)
} }
// Check if this is a range request to an SSE object and modify the approach
// Fetch the correct entry for SSE processing (respects versionId)
// This consolidates entry lookups to avoid multiple filer calls
var objectEntryForSSE *filer_pb.Entry
originalRangeHeader := r.Header.Get("Range") originalRangeHeader := r.Header.Get("Range")
var sseObject = false var sseObject = false
// Pre-check if this object is SSE encrypted to avoid filer range conflicts
if originalRangeHeader != "" {
bucket, object := s3_constants.GetBucketAndObject(r)
objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
if objectEntry, err := s3a.getEntry("", objectPath); err == nil {
primarySSEType := s3a.detectPrimarySSEType(objectEntry)
if primarySSEType == s3_constants.SSETypeC || primarySSEType == s3_constants.SSETypeKMS {
sseObject = true
// Temporarily remove Range header to get full encrypted data from filer
r.Header.Del("Range")
if versioningConfigured {
// For versioned objects, reuse the already-fetched entry
objectEntryForSSE = entry
} else {
// For non-versioned objects, fetch entry once and use for both SSE and Range checks
if originalRangeHeader != "" || true { // Always fetch for SSE processing
bucket, object := s3_constants.GetBucketAndObject(r)
objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
fetchedEntry, fetchErr := s3a.getEntry("", objectPath)
if fetchErr == nil {
objectEntryForSSE = fetchedEntry
// Check if this is an SSE object for Range request handling
if originalRangeHeader != "" {
primarySSEType := s3a.detectPrimarySSEType(fetchedEntry)
if primarySSEType == s3_constants.SSETypeC || primarySSEType == s3_constants.SSETypeKMS {
sseObject = true
// Temporarily remove Range header to get full encrypted data from filer
r.Header.Del("Range")
}
}
} else if !errors.Is(fetchErr, filer_pb.ErrNotFound) {
glog.Errorf("GetObjectHandler: failed to get entry for SSE check: %v", fetchErr)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
} }
} }
} }
// Fetch the correct entry for SSE processing (respects versionId)
objectEntryForSSE, err := s3a.getObjectEntryForSSE(r, versioningConfigured, entry)
if err != nil {
glog.Errorf("GetObjectHandler: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) { s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
// Restore the original Range header for SSE processing // Restore the original Range header for SSE processing
if sseObject && originalRangeHeader != "" { if sseObject && originalRangeHeader != "" {
@ -414,16 +433,28 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
// Check for specific version ID in query parameters // Check for specific version ID in query parameters
versionId := r.URL.Query().Get("versionId") versionId := r.URL.Query().Get("versionId")
// Check if versioning is configured for the bucket (Enabled or Suspended)
versioningConfigured, err := s3a.isVersioningConfigured(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
// Only check versioning configuration if client requests it or if we might need it
// This avoids unnecessary bucket config lookups for common non-versioned read requests
var versioningConfigured bool
var err error
// Fast path: skip versioning check if no versionId parameter (most common case)
if versionId != "" {
versioningConfigured, err = s3a.isVersioningConfigured(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return
}
glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
if !versioningConfigured {
// Client requested a version but versioning not enabled - return NoSuchKey per AWS S3 behavior
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return return
} }
glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
} }
var destUrl string var destUrl string
@ -494,11 +525,22 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
} }
// Fetch the correct entry for SSE processing (respects versionId) // Fetch the correct entry for SSE processing (respects versionId)
objectEntryForSSE, err := s3a.getObjectEntryForSSE(r, versioningConfigured, entry)
if err != nil {
glog.Errorf("HeadObjectHandler: %v", err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
// For versioned objects, reuse already-fetched entry; for non-versioned, fetch once
var objectEntryForSSE *filer_pb.Entry
if versioningConfigured {
objectEntryForSSE = entry
} else {
// Fetch entry for non-versioned objects (needed for SSE metadata)
bucket, object := s3_constants.GetBucketAndObject(r)
objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object)
fetchedEntry, fetchErr := s3a.getEntry("", objectPath)
if fetchErr == nil {
objectEntryForSSE = fetchedEntry
} else if !errors.Is(fetchErr, filer_pb.ErrNotFound) {
glog.Errorf("HeadObjectHandler: failed to get entry for SSE check: %v", fetchErr)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
} }
s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) { s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {

Loading…
Cancel
Save