diff --git a/weed/operation/upload_chunked.go b/weed/operation/upload_chunked.go index 927a30c97..f8aba9b3c 100644 --- a/weed/operation/upload_chunked.go +++ b/weed/operation/upload_chunked.go @@ -232,17 +232,23 @@ uploadLoop: // Wait for all uploads to complete wg.Wait() - // Check for errors - if uploadErr != nil { - glog.Errorf("chunked upload failed: %v", uploadErr) - return nil, uploadErr - } - - // Sort chunks by offset + // Sort chunks by offset (do this even if there's an error, for cleanup purposes) sort.Slice(fileChunks, func(i, j int) bool { return fileChunks[i].Offset < fileChunks[j].Offset }) + // Check for errors - return partial results for cleanup + if uploadErr != nil { + glog.Errorf("chunked upload failed: %v (returning %d partial chunks for cleanup)", uploadErr, len(fileChunks)) + // IMPORTANT: Return partial results even on error so caller can cleanup orphaned chunks + return &ChunkedUploadResult{ + FileChunks: fileChunks, + Md5Hash: md5Hash, + TotalSize: chunkOffset, + SmallContent: nil, + }, uploadErr + } + return &ChunkedUploadResult{ FileChunks: fileChunks, Md5Hash: md5Hash, diff --git a/weed/operation/upload_chunked_test.go b/weed/operation/upload_chunked_test.go new file mode 100644 index 000000000..6ada93715 --- /dev/null +++ b/weed/operation/upload_chunked_test.go @@ -0,0 +1,231 @@ +package operation + +import ( + "bytes" + "context" + "errors" + "io" + "testing" +) + +// TestUploadReaderInChunksReturnsPartialResultsOnError verifies that when +// UploadReaderInChunks fails mid-upload, it returns partial results containing +// the chunks that were successfully uploaded before the error occurred. +// This allows the caller to cleanup orphaned chunks and prevent resource leaks. +func TestUploadReaderInChunksReturnsPartialResultsOnError(t *testing.T) { + // Create test data larger than one chunk to force multiple chunk uploads + testData := bytes.Repeat([]byte("test data for chunk upload failure testing"), 1000) // ~40KB + reader := bytes.NewReader(testData) + + uploadAttempts := 0 + + // Create a mock assign function that succeeds for first chunk, then fails + assignFunc := func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) { + uploadAttempts++ + + if uploadAttempts == 1 { + // First chunk succeeds + return nil, &AssignResult{ + Fid: "test-fid-1,1234", + Url: "http://test-volume-1:8080", + PublicUrl: "http://test-volume-1:8080", + Count: 1, + }, nil + } + + // Second chunk fails (simulating volume server down or network error) + return nil, nil, errors.New("simulated volume assignment failure") + } + + // Attempt upload with small chunk size to trigger multiple uploads + result, err := UploadReaderInChunks(context.Background(), reader, &ChunkedUploadOption{ + ChunkSize: 8 * 1024, // 8KB chunks + SmallFileLimit: 256, + Collection: "test", + DataCenter: "", + SaveSmallInline: false, + AssignFunc: assignFunc, + }) + + // VERIFICATION 1: Error should be returned + if err == nil { + t.Fatal("Expected error from UploadReaderInChunks, got nil") + } + t.Logf("✓ Got expected error: %v", err) + + // VERIFICATION 2: Result should NOT be nil (this is the fix) + if result == nil { + t.Fatal("CRITICAL: UploadReaderInChunks returned nil result on error - caller cannot cleanup orphaned chunks!") + } + t.Log("✓ Result is not nil (partial results returned)") + + // VERIFICATION 3: Result should contain partial chunks from successful uploads + // Note: In reality, the first chunk upload would succeed before assignment fails for chunk 2 + // But in this test, assignment fails immediately for chunk 2, so we may have 0 chunks + // The important thing is that the result struct is returned, not that it has chunks + t.Logf("✓ Result contains %d chunks (may be 0 if all assignments failed)", len(result.FileChunks)) + + // VERIFICATION 4: MD5 hash should be available even on partial failure + if result.Md5Hash == nil { + t.Error("Expected Md5Hash to be non-nil") + } else { + t.Log("✓ Md5Hash is available for partial data") + } + + // VERIFICATION 5: TotalSize should reflect bytes read before failure + if result.TotalSize < 0 { + t.Errorf("Expected non-negative TotalSize, got %d", result.TotalSize) + } else { + t.Logf("✓ TotalSize = %d bytes read before failure", result.TotalSize) + } +} + +// TestUploadReaderInChunksSuccessPath verifies normal successful upload behavior +func TestUploadReaderInChunksSuccessPath(t *testing.T) { + testData := []byte("small test data") + reader := bytes.NewReader(testData) + + // Mock assign function that always succeeds + assignFunc := func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) { + return nil, &AssignResult{ + Fid: "test-fid,1234", + Url: "http://test-volume:8080", + PublicUrl: "http://test-volume:8080", + Count: 1, + }, nil + } + + result, err := UploadReaderInChunks(context.Background(), reader, &ChunkedUploadOption{ + ChunkSize: 8 * 1024, + SmallFileLimit: 256, + Collection: "test", + DataCenter: "", + SaveSmallInline: false, + AssignFunc: assignFunc, + }) + + // For this test, we expect success (though actual upload to volume server will fail + // since we're using a mock URL, but that's after assignment succeeds) + // In real scenario with working volume server, this would fully succeed + + if result == nil { + t.Fatal("Expected non-nil result") + } + + t.Logf("Upload completed with result: chunks=%d, totalSize=%d, error=%v", + len(result.FileChunks), result.TotalSize, err) +} + +// TestUploadReaderInChunksContextCancellation verifies behavior when context is cancelled +func TestUploadReaderInChunksContextCancellation(t *testing.T) { + testData := bytes.Repeat([]byte("test data"), 10000) // ~80KB + reader := bytes.NewReader(testData) + + // Create a context that we'll cancel + ctx, cancel := context.WithCancel(context.Background()) + + // Cancel immediately to trigger cancellation handling + cancel() + + assignFunc := func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) { + return nil, &AssignResult{ + Fid: "test-fid,1234", + Url: "http://test-volume:8080", + PublicUrl: "http://test-volume:8080", + Count: 1, + }, nil + } + + result, err := UploadReaderInChunks(ctx, reader, &ChunkedUploadOption{ + ChunkSize: 8 * 1024, + SmallFileLimit: 256, + Collection: "test", + DataCenter: "", + SaveSmallInline: false, + AssignFunc: assignFunc, + }) + + // Should get context cancelled error + if err == nil { + t.Error("Expected context cancellation error") + } + + // Should still get partial results for cleanup + if result == nil { + t.Error("Expected non-nil result even on context cancellation") + } else { + t.Logf("✓ Got partial result on cancellation: chunks=%d", len(result.FileChunks)) + } +} + +// mockFailingReader simulates a reader that fails after reading some data +type mockFailingReader struct { + data []byte + pos int + failAfter int +} + +func (m *mockFailingReader) Read(p []byte) (n int, err error) { + if m.pos >= m.failAfter { + return 0, errors.New("simulated read failure") + } + + remaining := m.failAfter - m.pos + toRead := len(p) + if toRead > remaining { + toRead = remaining + } + if toRead > len(m.data)-m.pos { + toRead = len(m.data) - m.pos + } + + if toRead == 0 { + return 0, io.EOF + } + + copy(p, m.data[m.pos:m.pos+toRead]) + m.pos += toRead + return toRead, nil +} + +// TestUploadReaderInChunksReaderFailure verifies behavior when reader fails mid-read +func TestUploadReaderInChunksReaderFailure(t *testing.T) { + testData := bytes.Repeat([]byte("test"), 5000) // 20KB + failingReader := &mockFailingReader{ + data: testData, + pos: 0, + failAfter: 10000, // Fail after 10KB + } + + assignFunc := func(ctx context.Context, count int) (*VolumeAssignRequest, *AssignResult, error) { + return nil, &AssignResult{ + Fid: "test-fid,1234", + Url: "http://test-volume:8080", + PublicUrl: "http://test-volume:8080", + Count: 1, + }, nil + } + + result, err := UploadReaderInChunks(context.Background(), failingReader, &ChunkedUploadOption{ + ChunkSize: 8 * 1024, // 8KB chunks + SmallFileLimit: 256, + Collection: "test", + DataCenter: "", + SaveSmallInline: false, + AssignFunc: assignFunc, + }) + + // Should get read error + if err == nil { + t.Error("Expected read failure error") + } + + // Should still get partial results + if result == nil { + t.Fatal("Expected non-nil result on read failure") + } + + t.Logf("✓ Got partial result on read failure: chunks=%d, totalSize=%d", + len(result.FileChunks), result.TotalSize) +} + diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go index d666df096..99394b4aa 100644 --- a/weed/s3api/s3api_object_handlers_put.go +++ b/weed/s3api/s3api_object_handlers_put.go @@ -372,6 +372,16 @@ func (s3a *S3ApiServer) putToFiler(r *http.Request, uploadUrl string, dataReader }) if err != nil { glog.Errorf("putToFiler: chunked upload failed: %v", err) + + // CRITICAL: Cleanup orphaned chunks before returning error + // UploadReaderInChunks now returns partial results even on error, + // allowing us to cleanup any chunks that were successfully uploaded + // before the failure occurred + if chunkResult != nil && len(chunkResult.FileChunks) > 0 { + glog.Warningf("putToFiler: Upload failed, attempting to cleanup %d orphaned chunks", len(chunkResult.FileChunks)) + s3a.deleteOrphanedChunks(chunkResult.FileChunks) + } + if strings.Contains(err.Error(), s3err.ErrMsgPayloadChecksumMismatch) { return "", s3err.ErrInvalidDigest, SSEResponseMetadata{} }