From 4c13a9ce6582bfdc33f8fa95b8ca146ee21adbb2 Mon Sep 17 00:00:00 2001 From: msementsov <47177265+m-sementsov@users.noreply.github.com> Date: Mon, 30 Mar 2026 22:11:30 +0300 Subject: [PATCH] Client disconnects create context cancelled errors, 500x errors and Filer lookup failures (#8845) * Update stream.go Client disconnects create context cancelled errors and Filer lookup failures * s3api: handle canceled stream requests cleanly * s3api: address canceled streaming review feedback --------- Co-authored-by: Chris Lu --- weed/filer/stream.go | 20 ++++- weed/filer/stream_failover_test.go | 106 ++++++++++++++++++++++++++ weed/s3api/s3api_object_handlers.go | 44 +++++++++-- weed/s3api/s3api_stream_error_test.go | 61 +++++++++++++++ 4 files changed, 225 insertions(+), 6 deletions(-) create mode 100644 weed/s3api/s3api_stream_error_test.go diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 511deb696..c60d147e5 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -141,12 +141,26 @@ func PrepareStreamContentWithThrottler(ctx context.Context, masterClient wdclien var urlStrings []string var err error for _, backoff := range getLookupFileIdBackoffSchedule { + if err := ctx.Err(); err != nil { + return nil, err + } urlStrings, err = masterClient.GetLookupFileIdFunction()(ctx, chunkView.FileId) if err == nil && len(urlStrings) > 0 { break } + if err := ctx.Err(); err != nil { + return nil, err + } glog.V(4).InfofCtx(ctx, "waiting for chunk: %s", chunkView.FileId) - time.Sleep(backoff) + timer := time.NewTimer(backoff) + select { + case <-ctx.Done(): + if !timer.Stop() { + <-timer.C + } + return nil, ctx.Err() + case <-timer.C: + } } if err != nil { glog.V(1).InfofCtx(ctx, "operation LookupFileId %s failed, err: %v", chunkView.FileId, err) @@ -179,6 +193,10 @@ func PrepareStreamContentWithThrottler(ctx context.Context, masterClient wdclien jwt := jwtFunc(chunkView.FileId) written, err := retriedStreamFetchChunkData(ctx, writer, urlStrings, jwt, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize)) + if err != nil && ctx.Err() != nil { + return ctx.Err() + } + // If read failed, try to invalidate cache and re-lookup if err != nil && written == 0 { if invalidator, ok := masterClient.(CacheInvalidator); ok { diff --git a/weed/filer/stream_failover_test.go b/weed/filer/stream_failover_test.go index fcb4917d8..aaa59c523 100644 --- a/weed/filer/stream_failover_test.go +++ b/weed/filer/stream_failover_test.go @@ -1,8 +1,11 @@ package filer import ( + "bytes" "context" + "errors" "testing" + "time" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/wdclient" @@ -173,3 +176,106 @@ func TestRetryLogicSkipsSameUrls(t *testing.T) { t.Error("Expected different URLs to not be equal") } } + +func TestCanceledStreamSkipsCacheInvalidation(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + fileId := "3,canceled" + + mock := &mockMasterClient{ + lookupFunc: func(ctx context.Context, fid string) ([]string, error) { + return []string{"http://server:8080"}, nil + }, + } + + chunks := []*filer_pb.FileChunk{ + { + FileId: fileId, + Offset: 0, + Size: 10, + }, + } + + streamFn, err := PrepareStreamContentWithThrottler(ctx, mock, noJwtFunc, chunks, 0, 10, 0) + if err != nil { + t.Fatalf("PrepareStreamContentWithThrottler failed: %v", err) + } + + cancel() + + err = streamFn(&bytes.Buffer{}) + if err != context.Canceled { + t.Fatalf("expected context.Canceled, got %v", err) + } + if len(mock.invalidatedFileIds) != 0 { + t.Fatalf("expected no cache invalidation on cancellation, got %v", mock.invalidatedFileIds) + } +} + +func TestPrepareStreamContentSkipsLookupWhenContextAlreadyCanceled(t *testing.T) { + oldSchedule := getLookupFileIdBackoffSchedule + getLookupFileIdBackoffSchedule = []time.Duration{time.Millisecond} + t.Cleanup(func() { + getLookupFileIdBackoffSchedule = oldSchedule + }) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + lookupCalls := 0 + mock := &mockMasterClient{ + lookupFunc: func(ctx context.Context, fileId string) ([]string, error) { + lookupCalls++ + return nil, errors.New("lookup should not run") + }, + } + + chunks := []*filer_pb.FileChunk{ + { + FileId: "3,precanceled", + Offset: 0, + Size: 10, + }, + } + + _, err := PrepareStreamContentWithThrottler(ctx, mock, noJwtFunc, chunks, 0, 10, 0) + if !errors.Is(err, context.Canceled) { + t.Fatalf("expected context.Canceled, got %v", err) + } + if lookupCalls != 0 { + t.Fatalf("expected no lookup calls after cancellation, got %d", lookupCalls) + } +} + +func TestPrepareStreamContentStopsLookupRetriesAfterContextCancellation(t *testing.T) { + oldSchedule := getLookupFileIdBackoffSchedule + getLookupFileIdBackoffSchedule = []time.Duration{time.Millisecond, time.Millisecond, time.Millisecond} + t.Cleanup(func() { + getLookupFileIdBackoffSchedule = oldSchedule + }) + + ctx, cancel := context.WithCancel(context.Background()) + lookupCalls := 0 + mock := &mockMasterClient{ + lookupFunc: func(ctx context.Context, fileId string) ([]string, error) { + lookupCalls++ + cancel() + return nil, context.Canceled + }, + } + + chunks := []*filer_pb.FileChunk{ + { + FileId: "3,cancel-during-lookup", + Offset: 0, + Size: 10, + }, + } + + _, err := PrepareStreamContentWithThrottler(ctx, mock, noJwtFunc, chunks, 0, 10, 0) + if !errors.Is(err, context.Canceled) { + t.Fatalf("expected context.Canceled, got %v", err) + } + if lookupCalls != 1 { + t.Fatalf("expected lookup retries to stop after cancellation, got %d calls", lookupCalls) + } +} diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 3d08b39da..7a7538214 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -26,6 +26,8 @@ import ( util_http "github.com/seaweedfs/seaweedfs/weed/util/http" "github.com/seaweedfs/seaweedfs/weed/glog" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // corsHeaders defines the CORS headers that need to be preserved @@ -248,6 +250,14 @@ func newStreamErrorWithResponse(err error) *StreamError { return &StreamError{Err: err, ResponseWritten: true} } +func isCanceledStreamingError(err error) bool { + return errors.Is(err, context.Canceled) || status.Code(err) == codes.Canceled +} + +func shouldWriteStreamingErrorResponse(err error) bool { + return err != nil && !isCanceledStreamingError(err) +} + func mimeDetect(r *http.Request, dataReader io.Reader) io.ReadCloser { mimeBuffer := make([]byte, 512) size, _ := dataReader.Read(mimeBuffer) @@ -879,7 +889,15 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) err = s3a.streamFromVolumeServersWithSSE(w, r, objectEntryForSSE, primarySSEType, bucket, object, versionId) streamTime = time.Since(tStream) if err != nil { - glog.Errorf("GetObjectHandler: failed to stream %s/%s from volume servers: %v", bucket, object, err) + switch { + case isCanceledStreamingError(err): + glog.V(3).Infof("GetObjectHandler: client disconnected while streaming %s/%s: %v", bucket, object, err) + return + case errors.Is(err, context.DeadlineExceeded): + glog.Warningf("GetObjectHandler: deadline exceeded while streaming %s/%s: %v", bucket, object, err) + default: + glog.Errorf("GetObjectHandler: failed to stream %s/%s from volume servers: %v", bucket, object, err) + } // Check if the streaming function already wrote an HTTP response var streamErr *StreamError if errors.As(err, &streamErr) && streamErr.ResponseWritten { @@ -891,7 +909,7 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request) // Check if error is due to volume server rate limiting (HTTP 429) if errors.Is(err, util_http.ErrTooManyRequests) { s3err.WriteErrorResponse(w, r, s3err.ErrRequestBytesExceed) - } else { + } else if shouldWriteStreamingErrorResponse(err) { s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) } return @@ -1027,7 +1045,15 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R resolvedChunks, _, err := filer.ResolveChunkManifest(ctx, lookupFileIdFn, chunks, offset, offset+size) chunkResolveTime = time.Since(tChunkResolve) if err != nil { - glog.Errorf("streamFromVolumeServers: failed to resolve chunks: %v", err) + if isCanceledStreamingError(err) { + glog.V(3).Infof("streamFromVolumeServers: request canceled while resolving chunks: %v", err) + return err + } + if errors.Is(err, context.DeadlineExceeded) { + glog.Warningf("streamFromVolumeServers: request deadline exceeded while resolving chunks: %v", err) + } else { + glog.Errorf("streamFromVolumeServers: failed to resolve chunks: %v", err) + } // Write S3-compliant XML error response s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return newStreamErrorWithResponse(fmt.Errorf("failed to resolve chunks: %v", err)) @@ -1047,7 +1073,15 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R ) streamPrepTime = time.Since(tStreamPrep) if err != nil { - glog.Errorf("streamFromVolumeServers: failed to prepare stream: %v", err) + if isCanceledStreamingError(err) { + glog.V(3).Infof("streamFromVolumeServers: request canceled while preparing stream: %v", err) + return err + } + if errors.Is(err, context.DeadlineExceeded) { + glog.Warningf("streamFromVolumeServers: request deadline exceeded while preparing stream: %v", err) + } else { + glog.Errorf("streamFromVolumeServers: failed to prepare stream: %v", err) + } // Write S3-compliant XML error response s3err.WriteErrorResponse(w, r, s3err.ErrInternalError) return newStreamErrorWithResponse(fmt.Errorf("failed to prepare stream: %v", err)) @@ -1088,7 +1122,7 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R } if err != nil { switch { - case errors.Is(err, context.Canceled): + case isCanceledStreamingError(err): // Client disconnected mid-stream (e.g. Nginx upstream timeout, browser cancel) - expected glog.V(3).Infof("streamFromVolumeServers: client disconnected after writing %d bytes: %v", cw.written, err) case errors.Is(err, context.DeadlineExceeded): diff --git a/weed/s3api/s3api_stream_error_test.go b/weed/s3api/s3api_stream_error_test.go new file mode 100644 index 000000000..35bcbf685 --- /dev/null +++ b/weed/s3api/s3api_stream_error_test.go @@ -0,0 +1,61 @@ +package s3api + +import ( + "context" + "testing" + + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +func TestShouldWriteStreamingErrorResponse(t *testing.T) { + tests := []struct { + name string + err error + expected bool + }{ + { + name: "nil error", + err: nil, + expected: false, + }, + { + name: "context canceled", + err: context.Canceled, + expected: false, + }, + { + name: "wrapped context canceled", + err: &StreamError{Err: context.Canceled}, + expected: false, + }, + { + name: "grpc canceled", + err: status.Error(codes.Canceled, "client connection is closing"), + expected: false, + }, + { + name: "wrapped grpc canceled", + err: &StreamError{Err: status.Error(codes.Canceled, "client connection is closing")}, + expected: false, + }, + { + name: "deadline exceeded", + err: context.DeadlineExceeded, + expected: true, + }, + { + name: "wrapped deadline exceeded", + err: &StreamError{Err: context.DeadlineExceeded}, + expected: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := shouldWriteStreamingErrorResponse(tt.err); got != tt.expected { + t.Fatalf("shouldWriteStreamingErrorResponse(%v) = %v, want %v", tt.err, got, tt.expected) + } + }) + } +}