Browse Source

Client disconnects create context cancelled errors, 500x errors and Filer lookup failures (#8845)

* Update stream.go

Client disconnects create context cancelled errors and Filer lookup failures

* s3api: handle canceled stream requests cleanly

* s3api: address canceled streaming review feedback

---------

Co-authored-by: Chris Lu <chris.lu@gmail.com>
pull/5637/merge
msementsov 2 days ago
committed by GitHub
parent
commit
4c13a9ce65
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 20
      weed/filer/stream.go
  2. 106
      weed/filer/stream_failover_test.go
  3. 44
      weed/s3api/s3api_object_handlers.go
  4. 61
      weed/s3api/s3api_stream_error_test.go

20
weed/filer/stream.go

@ -141,12 +141,26 @@ func PrepareStreamContentWithThrottler(ctx context.Context, masterClient wdclien
var urlStrings []string
var err error
for _, backoff := range getLookupFileIdBackoffSchedule {
if err := ctx.Err(); err != nil {
return nil, err
}
urlStrings, err = masterClient.GetLookupFileIdFunction()(ctx, chunkView.FileId)
if err == nil && len(urlStrings) > 0 {
break
}
if err := ctx.Err(); err != nil {
return nil, err
}
glog.V(4).InfofCtx(ctx, "waiting for chunk: %s", chunkView.FileId)
time.Sleep(backoff)
timer := time.NewTimer(backoff)
select {
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
return nil, ctx.Err()
case <-timer.C:
}
}
if err != nil {
glog.V(1).InfofCtx(ctx, "operation LookupFileId %s failed, err: %v", chunkView.FileId, err)
@ -179,6 +193,10 @@ func PrepareStreamContentWithThrottler(ctx context.Context, masterClient wdclien
jwt := jwtFunc(chunkView.FileId)
written, err := retriedStreamFetchChunkData(ctx, writer, urlStrings, jwt, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize))
if err != nil && ctx.Err() != nil {
return ctx.Err()
}
// If read failed, try to invalidate cache and re-lookup
if err != nil && written == 0 {
if invalidator, ok := masterClient.(CacheInvalidator); ok {

106
weed/filer/stream_failover_test.go

@ -1,8 +1,11 @@
package filer
import (
"bytes"
"context"
"errors"
"testing"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
@ -173,3 +176,106 @@ func TestRetryLogicSkipsSameUrls(t *testing.T) {
t.Error("Expected different URLs to not be equal")
}
}
func TestCanceledStreamSkipsCacheInvalidation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
fileId := "3,canceled"
mock := &mockMasterClient{
lookupFunc: func(ctx context.Context, fid string) ([]string, error) {
return []string{"http://server:8080"}, nil
},
}
chunks := []*filer_pb.FileChunk{
{
FileId: fileId,
Offset: 0,
Size: 10,
},
}
streamFn, err := PrepareStreamContentWithThrottler(ctx, mock, noJwtFunc, chunks, 0, 10, 0)
if err != nil {
t.Fatalf("PrepareStreamContentWithThrottler failed: %v", err)
}
cancel()
err = streamFn(&bytes.Buffer{})
if err != context.Canceled {
t.Fatalf("expected context.Canceled, got %v", err)
}
if len(mock.invalidatedFileIds) != 0 {
t.Fatalf("expected no cache invalidation on cancellation, got %v", mock.invalidatedFileIds)
}
}
func TestPrepareStreamContentSkipsLookupWhenContextAlreadyCanceled(t *testing.T) {
oldSchedule := getLookupFileIdBackoffSchedule
getLookupFileIdBackoffSchedule = []time.Duration{time.Millisecond}
t.Cleanup(func() {
getLookupFileIdBackoffSchedule = oldSchedule
})
ctx, cancel := context.WithCancel(context.Background())
cancel()
lookupCalls := 0
mock := &mockMasterClient{
lookupFunc: func(ctx context.Context, fileId string) ([]string, error) {
lookupCalls++
return nil, errors.New("lookup should not run")
},
}
chunks := []*filer_pb.FileChunk{
{
FileId: "3,precanceled",
Offset: 0,
Size: 10,
},
}
_, err := PrepareStreamContentWithThrottler(ctx, mock, noJwtFunc, chunks, 0, 10, 0)
if !errors.Is(err, context.Canceled) {
t.Fatalf("expected context.Canceled, got %v", err)
}
if lookupCalls != 0 {
t.Fatalf("expected no lookup calls after cancellation, got %d", lookupCalls)
}
}
func TestPrepareStreamContentStopsLookupRetriesAfterContextCancellation(t *testing.T) {
oldSchedule := getLookupFileIdBackoffSchedule
getLookupFileIdBackoffSchedule = []time.Duration{time.Millisecond, time.Millisecond, time.Millisecond}
t.Cleanup(func() {
getLookupFileIdBackoffSchedule = oldSchedule
})
ctx, cancel := context.WithCancel(context.Background())
lookupCalls := 0
mock := &mockMasterClient{
lookupFunc: func(ctx context.Context, fileId string) ([]string, error) {
lookupCalls++
cancel()
return nil, context.Canceled
},
}
chunks := []*filer_pb.FileChunk{
{
FileId: "3,cancel-during-lookup",
Offset: 0,
Size: 10,
},
}
_, err := PrepareStreamContentWithThrottler(ctx, mock, noJwtFunc, chunks, 0, 10, 0)
if !errors.Is(err, context.Canceled) {
t.Fatalf("expected context.Canceled, got %v", err)
}
if lookupCalls != 1 {
t.Fatalf("expected lookup retries to stop after cancellation, got %d calls", lookupCalls)
}
}

44
weed/s3api/s3api_object_handlers.go

@ -26,6 +26,8 @@ import (
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"github.com/seaweedfs/seaweedfs/weed/glog"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// corsHeaders defines the CORS headers that need to be preserved
@ -248,6 +250,14 @@ func newStreamErrorWithResponse(err error) *StreamError {
return &StreamError{Err: err, ResponseWritten: true}
}
func isCanceledStreamingError(err error) bool {
return errors.Is(err, context.Canceled) || status.Code(err) == codes.Canceled
}
func shouldWriteStreamingErrorResponse(err error) bool {
return err != nil && !isCanceledStreamingError(err)
}
func mimeDetect(r *http.Request, dataReader io.Reader) io.ReadCloser {
mimeBuffer := make([]byte, 512)
size, _ := dataReader.Read(mimeBuffer)
@ -879,7 +889,15 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
err = s3a.streamFromVolumeServersWithSSE(w, r, objectEntryForSSE, primarySSEType, bucket, object, versionId)
streamTime = time.Since(tStream)
if err != nil {
glog.Errorf("GetObjectHandler: failed to stream %s/%s from volume servers: %v", bucket, object, err)
switch {
case isCanceledStreamingError(err):
glog.V(3).Infof("GetObjectHandler: client disconnected while streaming %s/%s: %v", bucket, object, err)
return
case errors.Is(err, context.DeadlineExceeded):
glog.Warningf("GetObjectHandler: deadline exceeded while streaming %s/%s: %v", bucket, object, err)
default:
glog.Errorf("GetObjectHandler: failed to stream %s/%s from volume servers: %v", bucket, object, err)
}
// Check if the streaming function already wrote an HTTP response
var streamErr *StreamError
if errors.As(err, &streamErr) && streamErr.ResponseWritten {
@ -891,7 +909,7 @@ func (s3a *S3ApiServer) GetObjectHandler(w http.ResponseWriter, r *http.Request)
// Check if error is due to volume server rate limiting (HTTP 429)
if errors.Is(err, util_http.ErrTooManyRequests) {
s3err.WriteErrorResponse(w, r, s3err.ErrRequestBytesExceed)
} else {
} else if shouldWriteStreamingErrorResponse(err) {
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
}
return
@ -1027,7 +1045,15 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R
resolvedChunks, _, err := filer.ResolveChunkManifest(ctx, lookupFileIdFn, chunks, offset, offset+size)
chunkResolveTime = time.Since(tChunkResolve)
if err != nil {
glog.Errorf("streamFromVolumeServers: failed to resolve chunks: %v", err)
if isCanceledStreamingError(err) {
glog.V(3).Infof("streamFromVolumeServers: request canceled while resolving chunks: %v", err)
return err
}
if errors.Is(err, context.DeadlineExceeded) {
glog.Warningf("streamFromVolumeServers: request deadline exceeded while resolving chunks: %v", err)
} else {
glog.Errorf("streamFromVolumeServers: failed to resolve chunks: %v", err)
}
// Write S3-compliant XML error response
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return newStreamErrorWithResponse(fmt.Errorf("failed to resolve chunks: %v", err))
@ -1047,7 +1073,15 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R
)
streamPrepTime = time.Since(tStreamPrep)
if err != nil {
glog.Errorf("streamFromVolumeServers: failed to prepare stream: %v", err)
if isCanceledStreamingError(err) {
glog.V(3).Infof("streamFromVolumeServers: request canceled while preparing stream: %v", err)
return err
}
if errors.Is(err, context.DeadlineExceeded) {
glog.Warningf("streamFromVolumeServers: request deadline exceeded while preparing stream: %v", err)
} else {
glog.Errorf("streamFromVolumeServers: failed to prepare stream: %v", err)
}
// Write S3-compliant XML error response
s3err.WriteErrorResponse(w, r, s3err.ErrInternalError)
return newStreamErrorWithResponse(fmt.Errorf("failed to prepare stream: %v", err))
@ -1088,7 +1122,7 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R
}
if err != nil {
switch {
case errors.Is(err, context.Canceled):
case isCanceledStreamingError(err):
// Client disconnected mid-stream (e.g. Nginx upstream timeout, browser cancel) - expected
glog.V(3).Infof("streamFromVolumeServers: client disconnected after writing %d bytes: %v", cw.written, err)
case errors.Is(err, context.DeadlineExceeded):

61
weed/s3api/s3api_stream_error_test.go

@ -0,0 +1,61 @@
package s3api
import (
"context"
"testing"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
func TestShouldWriteStreamingErrorResponse(t *testing.T) {
tests := []struct {
name string
err error
expected bool
}{
{
name: "nil error",
err: nil,
expected: false,
},
{
name: "context canceled",
err: context.Canceled,
expected: false,
},
{
name: "wrapped context canceled",
err: &StreamError{Err: context.Canceled},
expected: false,
},
{
name: "grpc canceled",
err: status.Error(codes.Canceled, "client connection is closing"),
expected: false,
},
{
name: "wrapped grpc canceled",
err: &StreamError{Err: status.Error(codes.Canceled, "client connection is closing")},
expected: false,
},
{
name: "deadline exceeded",
err: context.DeadlineExceeded,
expected: true,
},
{
name: "wrapped deadline exceeded",
err: &StreamError{Err: context.DeadlineExceeded},
expected: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := shouldWriteStreamingErrorResponse(tt.err); got != tt.expected {
t.Fatalf("shouldWriteStreamingErrorResponse(%v) = %v, want %v", tt.err, got, tt.expected)
}
})
}
}
Loading…
Cancel
Save