Browse Source

Lazy Versioning Check, Conditional SSE Entry Fetch, HEAD Request Optimization

pull/7480/head
chrislu 3 weeks ago
parent
commit
e2d14befc0
  1. 135
      weed/s3api/s3api_object_handlers.go

135
weed/s3api/s3api_object_handlers.go

@ -263,25 +263,29 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
// Check for specific version ID in query parameters
versionId := r.URL.Query().Get("versionId")
// Check if versioning is configured for the bucket (Enabled or Suspended)
// Note: We need to check this even if versionId is empty, because versioned buckets
// handle even "get latest version" requests differently (through .versions directory)
versioningConfigured, err := s3a.isVersioningConfigured(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
var destUrl string
var entry *filer_pb.Entry // Declare entry at function scope for SSE processing
var versioningConfigured bool
var err error
// Optimization: Only check versioning config if we need to handle versioning logic
// Most buckets don't have versioning, so we can skip this check and handle as non-versioned
// We'll detect if it's actually versioned when we try to access the object
needVersioningCheck := versionId != ""
if needVersioningCheck {
versioningConfigured, err = s3a.isVersioningConfigured(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return
}
glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
glog.V(1).Infof("GetObject: bucket %s, object %s, versioningConfigured=%v, versionId=%s", bucket, object, versioningConfigured, versionId)
}
glog.V(1).Infof("GetObject: bucket %s, object %s, versioningConfigured=%v, versionId=%s", bucket, object, versioningConfigured, versionId)
var destUrl string
var entry *filer_pb.Entry // Declare entry at function scope for SSE processing
if versioningConfigured {
// Handle versioned GET - all versions are stored in .versions directory
var targetVersionId string
@ -352,29 +356,29 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
originalRangeHeader := r.Header.Get("Range")
var sseObject = false
// Optimization: Only fetch entry metadata if we need it for SSE or have it from conditional checks
// Most requests don't use SSE, so we can skip this metadata fetch entirely
if versioningConfigured {
// For versioned objects, reuse the already-fetched entry
objectEntryForSSE = entry
} else {
// For non-versioned objects, try to reuse entry from conditional header check
if result.Entry != nil {
// Reuse entry fetched during conditional header check (optimization)
objectEntryForSSE = result.Entry
glog.V(3).Infof("GetObjectHandler: Reusing entry from conditional header check for %s/%s", bucket, object)
} else {
// No conditional headers were checked, fetch entry for SSE processing
var fetchErr error
objectEntryForSSE, fetchErr = s3a.fetchObjectEntry(bucket, object)
if fetchErr != nil {
glog.Errorf("GetObjectHandler: failed to get entry for SSE check: %v", fetchErr)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
if objectEntryForSSE == nil {
// Not found, return error early to avoid another lookup in proxyToFiler
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
} else if result.Entry != nil {
// Reuse entry fetched during conditional header check (optimization)
objectEntryForSSE = result.Entry
glog.V(3).Infof("GetObjectHandler: Reusing entry from conditional header check for %s/%s", bucket, object)
} else if originalRangeHeader != "" {
// Only fetch entry if we have a Range request (to check for SSE)
// Non-range requests can skip this metadata fetch entirely for better performance
var fetchErr error
objectEntryForSSE, fetchErr = s3a.fetchObjectEntry(bucket, object)
if fetchErr != nil {
glog.Errorf("GetObjectHandler: failed to get entry for SSE check: %v", fetchErr)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
if objectEntryForSSE == nil {
// Not found, return error early to avoid another lookup in proxyToFiler
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
}
@ -432,23 +436,27 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
// Check for specific version ID in query parameters
versionId := r.URL.Query().Get("versionId")
// Check if versioning is configured for the bucket (Enabled or Suspended)
// Note: We need to check this even if versionId is empty, because versioned buckets
// handle even "get latest version" requests differently (through .versions directory)
versioningConfigured, err := s3a.isVersioningConfigured(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
var destUrl string
var entry *filer_pb.Entry // Declare entry at function scope for SSE processing
var versioningConfigured bool
var err error
// Optimization: Only check versioning config if we need to handle versioning logic
// Most buckets don't have versioning, so we can skip this check and handle as non-versioned
needVersioningCheck := versionId != ""
if needVersioningCheck {
versioningConfigured, err = s3a.isVersioningConfigured(bucket)
if err != nil {
if err == filer_pb.ErrNotFound {
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchBucket)
return
}
glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
glog.Errorf("Error checking versioning status for bucket %s: %v", bucket, err)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
var destUrl string
var entry *filer_pb.Entry // Declare entry at function scope for SSE processing
if versioningConfigured {
// Handle versioned HEAD - all versions are stored in .versions directory
var targetVersionId string
@ -513,33 +521,18 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request
destUrl = s3a.toFilerUrl(bucket, object)
}
// Fetch the correct entry for SSE processing (respects versionId)
// For versioned objects, reuse already-fetched entry; for non-versioned, try to reuse from conditional check
// Optimization: Only fetch entry metadata if we need it for SSE or have it from conditional checks
// For HEAD requests, we only need entry metadata if we're dealing with SSE objects
var objectEntryForSSE *filer_pb.Entry
if versioningConfigured {
objectEntryForSSE = entry
} else {
// For non-versioned objects, try to reuse entry from conditional header check
if result.Entry != nil {
// Reuse entry fetched during conditional header check (optimization)
objectEntryForSSE = result.Entry
glog.V(3).Infof("HeadObjectHandler: Reusing entry from conditional header check for %s/%s", bucket, object)
} else {
// No conditional headers were checked, fetch entry for SSE processing
var fetchErr error
objectEntryForSSE, fetchErr = s3a.fetchObjectEntry(bucket, object)
if fetchErr != nil {
glog.Errorf("HeadObjectHandler: failed to get entry for SSE check: %v", fetchErr)
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return
}
if objectEntryForSSE == nil {
// Not found, return error early to avoid another lookup in proxyToFiler
s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey)
return
}
}
} else if result.Entry != nil {
// Reuse entry fetched during conditional header check (optimization)
objectEntryForSSE = result.Entry
glog.V(3).Infof("HeadObjectHandler: Reusing entry from conditional header check for %s/%s", bucket, object)
}
// Note: For HEAD requests without versioning or conditional headers, we can skip the entry fetch entirely
// The proxyToFiler will handle the request directly, which is more efficient
s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) {
// Handle SSE validation (both SSE-C and SSE-KMS) for HEAD requests

Loading…
Cancel
Save