diff --git a/weed/util/http/http_global_client_util.go b/weed/util/http/http_global_client_util.go index 3a969fdc8..c48388ac9 100644 --- a/weed/util/http/http_global_client_util.go +++ b/weed/util/http/http_global_client_util.go @@ -487,6 +487,12 @@ func RetriedFetchChunkData(ctx context.Context, buffer []byte, urlStrings []stri ) } + // For unencrypted, non-gzipped full chunks, use direct buffer read + // This avoids the 64KB intermediate buffer and callback overhead + if cipherKey == nil && !isGzipped && isFullChunk { + return retriedFetchChunkDataDirect(ctx, buffer, urlStrings, string(jwt)) + } + var shouldRetry bool for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 { @@ -551,3 +557,103 @@ func RetriedFetchChunkData(ctx context.Context, buffer []byte, urlStrings []stri return n, err } + +// retriedFetchChunkDataDirect reads chunk data directly into the buffer without +// intermediate buffering. This reduces memory copies and improves throughput +// for large chunk reads. +func retriedFetchChunkDataDirect(ctx context.Context, buffer []byte, urlStrings []string, jwt string) (n int, err error) { + var shouldRetry bool + + for waitTime := time.Second; waitTime < util.RetryWaitTime; waitTime += waitTime / 2 { + select { + case <-ctx.Done(): + return 0, ctx.Err() + default: + } + + for _, urlString := range urlStrings { + select { + case <-ctx.Done(): + return 0, ctx.Err() + default: + } + + if strings.Contains(urlString, "%") { + urlString = url.PathEscape(urlString) + } + + n, shouldRetry, err = readUrlDirectToBuffer(ctx, urlString+"?readDeleted=true", jwt, buffer) + if err == nil { + return n, nil + } + if !shouldRetry { + break + } + glog.V(0).InfofCtx(ctx, "read %s failed, err: %v", urlString, err) + } + + if err != nil && shouldRetry { + glog.V(0).InfofCtx(ctx, "retry reading in %v", waitTime) + timer := time.NewTimer(waitTime) + select { + case <-ctx.Done(): + timer.Stop() + return 0, ctx.Err() + case <-timer.C: + } + } else { + break + } + } + + return n, err +} + +// readUrlDirectToBuffer reads HTTP response directly into the provided buffer, +// avoiding intermediate buffer allocations and copies. +func readUrlDirectToBuffer(ctx context.Context, fileUrl, jwt string, buffer []byte) (n int, retryable bool, err error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, fileUrl, nil) + if err != nil { + return 0, false, err + } + maybeAddAuth(req, jwt) + + r, err := GetGlobalHttpClient().Do(req) + if err != nil { + return 0, true, err + } + defer CloseResponse(r) + + if r.StatusCode >= 400 { + if r.StatusCode == http.StatusNotFound { + return 0, true, fmt.Errorf("%s: %s: %w", fileUrl, r.Status, ErrNotFound) + } + if r.StatusCode == http.StatusTooManyRequests { + return 0, false, fmt.Errorf("%s: %s: %w", fileUrl, r.Status, ErrTooManyRequests) + } + retryable = r.StatusCode >= 499 + return 0, retryable, fmt.Errorf("%s: %s", fileUrl, r.Status) + } + + // Read directly into the buffer without intermediate copying + // This is significantly faster for large chunks (16MB+) + var totalRead int + for totalRead < len(buffer) { + select { + case <-ctx.Done(): + return totalRead, false, ctx.Err() + default: + } + + m, readErr := r.Body.Read(buffer[totalRead:]) + totalRead += m + if readErr != nil { + if readErr == io.EOF { + return totalRead, false, nil + } + return totalRead, true, readErr + } + } + + return totalRead, false, nil +}