From bf8e4f40e60e74ce03c2f497c6245e5d1460f1d3 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 10 Nov 2025 20:30:21 -0800 Subject: [PATCH] S3: Perf related (#7463) * reduce checks * s3 object lookup optimization * Only check versioning configuration if client requests * Consolidate SSE Entry Lookups * optimize * revert optimization for versioned objects * Removed: getObjectEntryForSSE() function * refactor * Refactoring: Added fetchObjectEntryRequired * avoid refetching * return early if not found * reuse objects from conditional check * clear cache when creating bucket --- weed/s3api/custom_types.go | 8 +- weed/s3api/s3api_bucket_handlers.go | 17 ++- weed/s3api/s3api_object_handlers.go | 144 ++++++++++++++---------- weed/s3api/s3api_object_handlers_acl.go | 6 +- weed/s3api/s3api_object_handlers_put.go | 20 ++-- weed/s3api/s3api_object_retention.go | 9 +- weed/storage/volume_read.go | 12 +- 7 files changed, 123 insertions(+), 93 deletions(-) diff --git a/weed/s3api/custom_types.go b/weed/s3api/custom_types.go index cc170d0ad..ea769ac4f 100644 --- a/weed/s3api/custom_types.go +++ b/weed/s3api/custom_types.go @@ -1,11 +1,15 @@ package s3api -import "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" +import ( + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" +) const s3TimeFormat = "2006-01-02T15:04:05.999Z07:00" // ConditionalHeaderResult holds the result of conditional header checking type ConditionalHeaderResult struct { ErrorCode s3err.ErrorCode - ETag string // ETag of the object (for 304 responses) + ETag string // ETag of the object (for 304 responses) + Entry *filer_pb.Entry // Entry fetched during conditional check (nil if not fetched or object doesn't exist) } diff --git a/weed/s3api/s3api_bucket_handlers.go b/weed/s3api/s3api_bucket_handlers.go index 9509219d9..80d29547b 100644 --- a/weed/s3api/s3api_bucket_handlers.go +++ b/weed/s3api/s3api_bucket_handlers.go @@ -7,7 +7,6 @@ import ( "encoding/xml" "errors" "fmt" - "github.com/seaweedfs/seaweedfs/weed/util" "math" "net/http" "path" @@ -16,6 +15,8 @@ import ( "strings" "time" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/aws/aws-sdk-go/private/protocol/xml/xmlutil" "github.com/seaweedfs/seaweedfs/weed/s3api/s3bucket" @@ -210,6 +211,11 @@ func (s3a *S3ApiServer) PutBucketHandler(w http.ResponseWriter, r *http.Request) return } + // Remove bucket from negative cache after successful creation + if s3a.bucketConfigCache != nil { + s3a.bucketConfigCache.RemoveNegativeCache(bucket) + } + // Check for x-amz-bucket-object-lock-enabled header (S3 standard compliance) if objectLockHeaderValue := r.Header.Get(s3_constants.AmzBucketObjectLockEnabled); strings.EqualFold(objectLockHeaderValue, "true") { glog.V(3).Infof("PutBucketHandler: enabling Object Lock and Versioning for bucket %s due to x-amz-bucket-object-lock-enabled header", bucket) @@ -493,16 +499,17 @@ func (s3a *S3ApiServer) HeadBucketHandler(w http.ResponseWriter, r *http.Request } func (s3a *S3ApiServer) checkBucket(r *http.Request, bucket string) s3err.ErrorCode { - entry, err := s3a.getEntry(s3a.option.BucketsPath, bucket) - if entry == nil || errors.Is(err, filer_pb.ErrNotFound) { - return s3err.ErrNoSuchBucket + // Use cached bucket config instead of direct getEntry call (optimization) + config, errCode := s3a.getBucketConfig(bucket) + if errCode != s3err.ErrNone { + return errCode } //if iam is enabled, the access was already checked before if s3a.iam.isEnabled() { return s3err.ErrNone } - if !s3a.hasAccess(r, entry) { + if !s3a.hasAccess(r, config.Entry) { return s3err.ErrAccessDenied } return s3err.ErrNone diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 8917393be..9d3b3dfc5 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -264,6 +264,8 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) versionId := r.URL.Query().Get("versionId") // Check if versioning is configured for the bucket (Enabled or Suspended) + // Note: We need to check this even if versionId is empty, because versioned buckets + // handle even "get latest version" requests differently (through .versions directory) versioningConfigured, err := s3a.isVersioningConfigured(bucket) if err != nil { if err == filer_pb.ErrNotFound { @@ -344,31 +346,47 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) destUrl = s3a.toFilerUrl(bucket, object) } - // Check if this is a range request to an SSE object and modify the approach + // Fetch the correct entry for SSE processing (respects versionId) + // This consolidates entry lookups to avoid multiple filer calls + var objectEntryForSSE *filer_pb.Entry originalRangeHeader := r.Header.Get("Range") var sseObject = false - // Pre-check if this object is SSE encrypted to avoid filer range conflicts - if originalRangeHeader != "" { - bucket, object := s3_constants.GetBucketAndObject(r) - objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) - if objectEntry, err := s3a.getEntry("", objectPath); err == nil { - primarySSEType := s3a.detectPrimarySSEType(objectEntry) - if primarySSEType == s3_constants.SSETypeC || primarySSEType == s3_constants.SSETypeKMS { - sseObject = true - // Temporarily remove Range header to get full encrypted data from filer - r.Header.Del("Range") - + if versioningConfigured { + // For versioned objects, reuse the already-fetched entry + objectEntryForSSE = entry + } else { + // For non-versioned objects, try to reuse entry from conditional header check + if result.Entry != nil { + // Reuse entry fetched during conditional header check (optimization) + objectEntryForSSE = result.Entry + glog.V(3).Infof("GetObjectHandler: Reusing entry from conditional header check for %s/%s", bucket, object) + } else { + // No conditional headers were checked, fetch entry for SSE processing + var fetchErr error + objectEntryForSSE, fetchErr = s3a.fetchObjectEntry(bucket, object) + if fetchErr != nil { + glog.Errorf("GetObjectHandler: failed to get entry for SSE check: %v", fetchErr) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + if objectEntryForSSE == nil { + // Not found, return error early to avoid another lookup in proxyToFiler + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return } } } - // Fetch the correct entry for SSE processing (respects versionId) - objectEntryForSSE, err := s3a.getObjectEntryForSSE(r, versioningConfigured, entry) - if err != nil { - glog.Errorf("GetObjectHandler: %v", err) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return + // Check if this is an SSE object for Range request handling + // This applies to both versioned and non-versioned objects + if originalRangeHeader != "" && objectEntryForSSE != nil { + primarySSEType := s3a.detectPrimarySSEType(objectEntryForSSE) + if primarySSEType == s3_constants.SSETypeC || primarySSEType == s3_constants.SSETypeKMS { + sseObject = true + // Temporarily remove Range header to get full encrypted data from filer + r.Header.Del("Range") + } } s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) { @@ -415,6 +433,8 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request versionId := r.URL.Query().Get("versionId") // Check if versioning is configured for the bucket (Enabled or Suspended) + // Note: We need to check this even if versionId is empty, because versioned buckets + // handle even "get latest version" requests differently (through .versions directory) versioningConfigured, err := s3a.isVersioningConfigured(bucket) if err != nil { if err == filer_pb.ErrNotFound { @@ -494,11 +514,31 @@ func (s3a *S3ApiServer) HeadObjectHandler(w http.ResponseWriter, r *http.Request } // Fetch the correct entry for SSE processing (respects versionId) - objectEntryForSSE, err := s3a.getObjectEntryForSSE(r, versioningConfigured, entry) - if err != nil { - glog.Errorf("HeadObjectHandler: %v", err) - s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) - return + // For versioned objects, reuse already-fetched entry; for non-versioned, try to reuse from conditional check + var objectEntryForSSE *filer_pb.Entry + if versioningConfigured { + objectEntryForSSE = entry + } else { + // For non-versioned objects, try to reuse entry from conditional header check + if result.Entry != nil { + // Reuse entry fetched during conditional header check (optimization) + objectEntryForSSE = result.Entry + glog.V(3).Infof("HeadObjectHandler: Reusing entry from conditional header check for %s/%s", bucket, object) + } else { + // No conditional headers were checked, fetch entry for SSE processing + var fetchErr error + objectEntryForSSE, fetchErr = s3a.fetchObjectEntry(bucket, object) + if fetchErr != nil { + glog.Errorf("HeadObjectHandler: failed to get entry for SSE check: %v", fetchErr) + s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) + return + } + if objectEntryForSSE == nil { + // Not found, return error early to avoid another lookup in proxyToFiler + s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) + return + } + } } s3a.proxyToFiler(w, r, destUrl, false, func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int, bytesTransferred int64) { @@ -658,21 +698,27 @@ func writeFinalResponse(w http.ResponseWriter, proxyResponse *http.Response, bod return statusCode, bytesTransferred } -// getObjectEntryForSSE fetches the correct filer entry for SSE processing -// For versioned objects, it reuses the already-fetched entry -// For non-versioned objects, it fetches the entry from the filer -func (s3a *S3ApiServer) getObjectEntryForSSE(r *http.Request, versioningConfigured bool, versionedEntry *filer_pb.Entry) (*filer_pb.Entry, error) { - if versioningConfigured { - // For versioned objects, we already have the correct entry - return versionedEntry, nil +// fetchObjectEntry fetches the filer entry for an object +// Returns nil if not found (not an error), or propagates other errors +func (s3a *S3ApiServer) fetchObjectEntry(bucket, object string) (*filer_pb.Entry, error) { + objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) + fetchedEntry, fetchErr := s3a.getEntry("", objectPath) + if fetchErr != nil { + if errors.Is(fetchErr, filer_pb.ErrNotFound) { + return nil, nil // Not found is not an error for SSE check + } + return nil, fetchErr // Propagate other errors } + return fetchedEntry, nil +} - // For non-versioned objects, fetch the entry - bucket, object := s3_constants.GetBucketAndObject(r) +// fetchObjectEntryRequired fetches the filer entry for an object +// Returns an error if the object is not found or any other error occurs +func (s3a *S3ApiServer) fetchObjectEntryRequired(bucket, object string) (*filer_pb.Entry, error) { objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) - fetchedEntry, err := s3a.getEntry("", objectPath) - if err != nil && !errors.Is(err, filer_pb.ErrNotFound) { - return nil, fmt.Errorf("failed to get entry for SSE check %s: %w", objectPath, err) + fetchedEntry, fetchErr := s3a.getEntry("", objectPath) + if fetchErr != nil { + return nil, fetchErr // Return error for both not-found and other errors } return fetchedEntry, nil } @@ -750,7 +796,7 @@ func (s3a *S3ApiServer) handleSSECResponse(r *http.Request, proxyResponse *http. if sseCChunks >= 1 { // Handle chunked SSE-C objects - each chunk needs independent decryption - multipartReader, decErr := s3a.createMultipartSSECDecryptedReader(r, proxyResponse) + multipartReader, decErr := s3a.createMultipartSSECDecryptedReader(r, proxyResponse, entry) if decErr != nil { glog.Errorf("Failed to create multipart SSE-C decrypted reader: %v", decErr) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) @@ -966,7 +1012,7 @@ func (s3a *S3ApiServer) handleSSEKMSResponse(r *http.Request, proxyResponse *htt var decryptedReader io.Reader if isMultipartSSEKMS { // Handle multipart SSE-KMS objects - each chunk needs independent decryption - multipartReader, decErr := s3a.createMultipartSSEKMSDecryptedReader(r, proxyResponse) + multipartReader, decErr := s3a.createMultipartSSEKMSDecryptedReader(r, proxyResponse, entry) if decErr != nil { glog.Errorf("Failed to create multipart SSE-KMS decrypted reader: %v", decErr) s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) @@ -1271,16 +1317,8 @@ func (s3a *S3ApiServer) detectPrimarySSEType(entry *filer_pb.Entry) string { } // createMultipartSSEKMSDecryptedReader creates a reader that decrypts each chunk independently for multipart SSE-KMS objects -func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, proxyResponse *http.Response) (io.Reader, error) { - // Get the object path from the request - bucket, object := s3_constants.GetBucketAndObject(r) - objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) - - // Get the object entry from filer to access chunk information - entry, err := s3a.getEntry("", objectPath) - if err != nil { - return nil, fmt.Errorf("failed to get object entry for multipart SSE-KMS decryption: %v", err) - } +func (s3a *S3ApiServer) createMultipartSSEKMSDecryptedReader(r *http.Request, proxyResponse *http.Response, entry *filer_pb.Entry) (io.Reader, error) { + // Entry is passed from caller to avoid redundant filer lookup // Sort chunks by offset to ensure correct order chunks := entry.GetChunks() @@ -1531,22 +1569,14 @@ func (r *SSERangeReader) Read(p []byte) (n int, err error) { // createMultipartSSECDecryptedReader creates a decrypted reader for multipart SSE-C objects // Each chunk has its own IV and encryption key from the original multipart parts -func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, proxyResponse *http.Response) (io.Reader, error) { +func (s3a *S3ApiServer) createMultipartSSECDecryptedReader(r *http.Request, proxyResponse *http.Response, entry *filer_pb.Entry) (io.Reader, error) { // Parse SSE-C headers from the request for decryption key customerKey, err := ParseSSECHeaders(r) if err != nil { return nil, fmt.Errorf("invalid SSE-C headers for multipart decryption: %v", err) } - // Get the object path from the request - bucket, object := s3_constants.GetBucketAndObject(r) - objectPath := fmt.Sprintf("%s/%s%s", s3a.option.BucketsPath, bucket, object) - - // Get the object entry from filer to access chunk information - entry, err := s3a.getEntry("", objectPath) - if err != nil { - return nil, fmt.Errorf("failed to get object entry for multipart SSE-C decryption: %v", err) - } + // Entry is passed from caller to avoid redundant filer lookup // Sort chunks by offset to ensure correct order chunks := entry.GetChunks() diff --git a/weed/s3api/s3api_object_handlers_acl.go b/weed/s3api/s3api_object_handlers_acl.go index 1b6f28916..e90d84603 100644 --- a/weed/s3api/s3api_object_handlers_acl.go +++ b/weed/s3api/s3api_object_handlers_acl.go @@ -68,8 +68,7 @@ func (s3a *S3ApiServer) GetObjectAclHandler(w http.ResponseWriter, r *http.Reque } } else { // Handle regular (non-versioned) object ACL retrieval - bucketDir := s3a.option.BucketsPath + "/" + bucket - entry, err = s3a.getEntry(bucketDir, object) + entry, err = s3a.fetchObjectEntryRequired(bucket, object) if err != nil { if errors.Is(err, filer_pb.ErrNotFound) { s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) @@ -212,8 +211,7 @@ func (s3a *S3ApiServer) PutObjectAclHandler(w http.ResponseWriter, r *http.Reque } } else { // Handle regular (non-versioned) object ACL modification - bucketDir := s3a.option.BucketsPath + "/" + bucket - entry, err = s3a.getEntry(bucketDir, object) + entry, err = s3a.fetchObjectEntryRequired(bucket, object) if err != nil { if errors.Is(err, filer_pb.ErrNotFound) { s3err.WriteErrorResponse(w, r, s3err.ErrNoSuchKey) diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index 0d07c548e..0f6d88f42 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -1396,14 +1396,15 @@ func (s3a *S3ApiServer) checkConditionalHeadersForReadsWithGetter(getter EntryGe if !objectExists { if headers.ifMatch != "" { glog.V(3).Infof("checkConditionalHeadersForReads: If-Match failed - object %s/%s does not exist", bucket, object) - return ConditionalHeaderResult{ErrorCode: s3err.ErrPreconditionFailed} + return ConditionalHeaderResult{ErrorCode: s3err.ErrPreconditionFailed, Entry: nil} } if !headers.ifUnmodifiedSince.IsZero() { glog.V(3).Infof("checkConditionalHeadersForReads: If-Unmodified-Since failed - object %s/%s does not exist", bucket, object) - return ConditionalHeaderResult{ErrorCode: s3err.ErrPreconditionFailed} + return ConditionalHeaderResult{ErrorCode: s3err.ErrPreconditionFailed, Entry: nil} } // If-None-Match and If-Modified-Since succeed when object doesn't exist - return ConditionalHeaderResult{ErrorCode: s3err.ErrNone} + // No entry to return since object doesn't exist + return ConditionalHeaderResult{ErrorCode: s3err.ErrNone, Entry: nil} } // Object exists - check all conditions @@ -1419,7 +1420,7 @@ func (s3a *S3ApiServer) checkConditionalHeadersForReadsWithGetter(getter EntryGe // Use production etagMatches method if !s3a.etagMatches(headers.ifMatch, objectETag) { glog.V(3).Infof("checkConditionalHeadersForReads: If-Match failed for object %s/%s - expected ETag %s, got %s", bucket, object, headers.ifMatch, objectETag) - return ConditionalHeaderResult{ErrorCode: s3err.ErrPreconditionFailed} + return ConditionalHeaderResult{ErrorCode: s3err.ErrPreconditionFailed, Entry: entry} } } glog.V(3).Infof("checkConditionalHeadersForReads: If-Match passed for object %s/%s", bucket, object) @@ -1430,7 +1431,7 @@ func (s3a *S3ApiServer) checkConditionalHeadersForReadsWithGetter(getter EntryGe objectModTime := time.Unix(entry.Attributes.Mtime, 0) if objectModTime.After(headers.ifUnmodifiedSince) { glog.V(3).Infof("checkConditionalHeadersForReads: If-Unmodified-Since failed - object modified after %s", r.Header.Get(s3_constants.IfUnmodifiedSince)) - return ConditionalHeaderResult{ErrorCode: s3err.ErrPreconditionFailed} + return ConditionalHeaderResult{ErrorCode: s3err.ErrPreconditionFailed, Entry: entry} } glog.V(3).Infof("checkConditionalHeadersForReads: If-Unmodified-Since passed - object not modified since %s", r.Header.Get(s3_constants.IfUnmodifiedSince)) } @@ -1442,12 +1443,12 @@ func (s3a *S3ApiServer) checkConditionalHeadersForReadsWithGetter(getter EntryGe if headers.ifNoneMatch == "*" { glog.V(3).Infof("checkConditionalHeadersForReads: If-None-Match=* failed - object %s/%s exists", bucket, object) - return ConditionalHeaderResult{ErrorCode: s3err.ErrNotModified, ETag: objectETag} + return ConditionalHeaderResult{ErrorCode: s3err.ErrNotModified, ETag: objectETag, Entry: entry} } // Use production etagMatches method if s3a.etagMatches(headers.ifNoneMatch, objectETag) { glog.V(3).Infof("checkConditionalHeadersForReads: If-None-Match failed - ETag matches %s", objectETag) - return ConditionalHeaderResult{ErrorCode: s3err.ErrNotModified, ETag: objectETag} + return ConditionalHeaderResult{ErrorCode: s3err.ErrNotModified, ETag: objectETag, Entry: entry} } glog.V(3).Infof("checkConditionalHeadersForReads: If-None-Match passed - ETag %s doesn't match %s", objectETag, headers.ifNoneMatch) } @@ -1459,12 +1460,13 @@ func (s3a *S3ApiServer) checkConditionalHeadersForReadsWithGetter(getter EntryGe // Use production getObjectETag method objectETag := s3a.getObjectETag(entry) glog.V(3).Infof("checkConditionalHeadersForReads: If-Modified-Since failed - object not modified since %s", r.Header.Get(s3_constants.IfModifiedSince)) - return ConditionalHeaderResult{ErrorCode: s3err.ErrNotModified, ETag: objectETag} + return ConditionalHeaderResult{ErrorCode: s3err.ErrNotModified, ETag: objectETag, Entry: entry} } glog.V(3).Infof("checkConditionalHeadersForReads: If-Modified-Since passed - object modified after %s", r.Header.Get(s3_constants.IfModifiedSince)) } - return ConditionalHeaderResult{ErrorCode: s3err.ErrNone} + // Return success with the fetched entry for reuse + return ConditionalHeaderResult{ErrorCode: s3err.ErrNone, Entry: entry} } // checkConditionalHeadersForReads is the production method that uses the S3ApiServer as EntryGetter diff --git a/weed/s3api/s3api_object_retention.go b/weed/s3api/s3api_object_retention.go index 93e04e7da..5bb2faf54 100644 --- a/weed/s3api/s3api_object_retention.go +++ b/weed/s3api/s3api_object_retention.go @@ -200,8 +200,7 @@ func (s3a *S3ApiServer) getObjectEntry(bucket, object, versionId string) (*filer if versioningEnabled { entry, err = s3a.getLatestObjectVersion(bucket, object) } else { - bucketDir := s3a.option.BucketsPath + "/" + bucket - entry, err = s3a.getEntry(bucketDir, object) + entry, err = s3a.fetchObjectEntryRequired(bucket, object) } } @@ -284,8 +283,7 @@ func (s3a *S3ApiServer) setObjectRetention(bucket, object, versionId string, ret } } } else { - bucketDir := s3a.option.BucketsPath + "/" + bucket - entry, err = s3a.getEntry(bucketDir, object) + entry, err = s3a.fetchObjectEntryRequired(bucket, object) if err != nil { return fmt.Errorf("failed to get object %s/%s: %w", bucket, object, ErrObjectNotFound) } @@ -426,8 +424,7 @@ func (s3a *S3ApiServer) setObjectLegalHold(bucket, object, versionId string, leg } } } else { - bucketDir := s3a.option.BucketsPath + "/" + bucket - entry, err = s3a.getEntry(bucketDir, object) + entry, err = s3a.fetchObjectEntryRequired(bucket, object) if err != nil { return fmt.Errorf("failed to get object %s/%s: %w", bucket, object, ErrObjectNotFound) } diff --git a/weed/storage/volume_read.go b/weed/storage/volume_read.go index 626704fe1..9a209ced5 100644 --- a/weed/storage/volume_read.go +++ b/weed/storage/volume_read.go @@ -165,16 +165,8 @@ func (v *Volume) readNeedleDataInto(n *needle.Needle, readOption *ReadOption, wr toWrite := min(count, int(offset+size-x)) if toWrite > 0 { crc = crc.Update(buf[0:toWrite]) - // the crc.Value() function is to be deprecated. this double checking is for backward compatibility - // with seaweed version using crc.Value() instead of uint32(crc), which appears in commit 056c480eb - // and switch appeared in version 3.09. - if offset == 0 && size == int64(n.DataSize) && int64(count) == size && (n.Checksum != crc && uint32(n.Checksum) != crc.Value()) { - // This check works only if the buffer is big enough to hold the whole needle data - // and we ask for all needle data. - // Otherwise we cannot check the validity of partially aquired data. - stats.VolumeServerHandlerCounter.WithLabelValues(stats.ErrorCRC).Inc() - return fmt.Errorf("ReadNeedleData checksum %v expected %v for Needle: %v,%v", crc, n.Checksum, v.Id, n) - } + // Note: CRC validation happens after the loop completes (see below) + // to avoid performance overhead in the hot read path if _, err = writer.Write(buf[0:toWrite]); err != nil { return fmt.Errorf("ReadNeedleData write: %w", err) }