diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index b04244669..d4c423c9e 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -110,7 +110,7 @@ func fetchWholeChunk(ctx context.Context, bytesBuffer *bytes.Buffer, lookupFileI return err } jwt := JwtForVolumeServer(fileId) - err = retriedStreamFetchChunkData(ctx, bytesBuffer, urlStrings, jwt, cipherKey, isGzipped, true, 0, 0) + _, err = retriedStreamFetchChunkData(ctx, bytesBuffer, urlStrings, jwt, cipherKey, isGzipped, true, 0, 0) if err != nil { return err } @@ -126,7 +126,7 @@ func fetchChunkRange(ctx context.Context, buffer []byte, lookupFileIdFn wdclient return util_http.RetriedFetchChunkData(ctx, buffer, urlStrings, cipherKey, isGzipped, false, offset, fileId) } -func retriedStreamFetchChunkData(ctx context.Context, writer io.Writer, urlStrings []string, jwt string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (err error) { +func retriedStreamFetchChunkData(ctx context.Context, writer io.Writer, urlStrings []string, jwt string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) (written int64, err error) { var shouldRetry bool var totalWritten int @@ -135,7 +135,7 @@ func retriedStreamFetchChunkData(ctx context.Context, writer io.Writer, urlStrin // Check for context cancellation before starting retry loop select { case <-ctx.Done(): - return ctx.Err() + return int64(totalWritten), ctx.Err() default: } @@ -144,7 +144,7 @@ func retriedStreamFetchChunkData(ctx context.Context, writer io.Writer, urlStrin // Check for context cancellation before each volume server request select { case <-ctx.Done(): - return ctx.Err() + return int64(totalWritten), ctx.Err() default: } @@ -198,7 +198,7 @@ func retriedStreamFetchChunkData(ctx context.Context, writer io.Writer, urlStrin select { case <-ctx.Done(): timer.Stop() - return ctx.Err() + return int64(totalWritten), ctx.Err() case <-timer.C: // Continue with retry } @@ -207,7 +207,7 @@ func retriedStreamFetchChunkData(ctx context.Context, writer io.Writer, urlStrin } } - return err + return int64(totalWritten), err } diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 00539ca20..6cde0d776 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -106,6 +106,30 @@ func noJwtFunc(string) string { return "" } +type CacheInvalidator interface { + InvalidateCache(fileId string) +} + +// urlSlicesEqual checks if two URL slices contain the same URLs (order-independent) +func urlSlicesEqual(a, b []string) bool { + if len(a) != len(b) { + return false + } + // Create a map to count occurrences in first slice + counts := make(map[string]int) + for _, url := range a { + counts[url]++ + } + // Verify all URLs in second slice match + for _, url := range b { + if counts[url] == 0 { + return false + } + counts[url]-- + } + return true +} + func PrepareStreamContentWithThrottler(ctx context.Context, masterClient wdclient.HasLookupFileIdFunction, jwtFunc VolumeServerJwtFunction, chunks []*filer_pb.FileChunk, offset int64, size int64, downloadMaxBytesPs int64) (DoStreamContent, error) { glog.V(4).InfofCtx(ctx, "prepare to stream content for chunks: %d", len(chunks)) chunkViews := ViewFromChunks(ctx, masterClient.GetLookupFileIdFunction(), chunks, offset, size) @@ -153,7 +177,38 @@ func PrepareStreamContentWithThrottler(ctx context.Context, masterClient wdclien urlStrings := fileId2Url[chunkView.FileId] start := time.Now() jwt := jwtFunc(chunkView.FileId) - err := retriedStreamFetchChunkData(ctx, writer, urlStrings, jwt, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize)) + written, err := retriedStreamFetchChunkData(ctx, writer, urlStrings, jwt, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize)) + + // If read failed, try to invalidate cache and re-lookup + if err != nil && written == 0 { + if invalidator, ok := masterClient.(CacheInvalidator); ok { + glog.V(0).InfofCtx(ctx, "read chunk %s failed, invalidating cache and retrying", chunkView.FileId) + invalidator.InvalidateCache(chunkView.FileId) + + // Re-lookup + newUrlStrings, lookupErr := masterClient.GetLookupFileIdFunction()(ctx, chunkView.FileId) + if lookupErr == nil && len(newUrlStrings) > 0 { + // Check if new URLs are different from old ones to avoid infinite retry + if !urlSlicesEqual(urlStrings, newUrlStrings) { + glog.V(0).InfofCtx(ctx, "retrying read chunk %s with new locations: %v", chunkView.FileId, newUrlStrings) + _, err = retriedStreamFetchChunkData(ctx, writer, newUrlStrings, jwt, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.OffsetInChunk, int(chunkView.ViewSize)) + // Update the map so subsequent references use fresh URLs + if err == nil { + fileId2Url[chunkView.FileId] = newUrlStrings + } + } else { + glog.V(0).InfofCtx(ctx, "re-lookup returned same locations for chunk %s, skipping retry", chunkView.FileId) + } + } else { + if lookupErr != nil { + glog.WarningfCtx(ctx, "failed to re-lookup chunk %s after cache invalidation: %v", chunkView.FileId, lookupErr) + } else { + glog.WarningfCtx(ctx, "re-lookup for chunk %s returned no locations, skipping retry", chunkView.FileId) + } + } + } + } + offset += int64(chunkView.ViewSize) remaining -= int64(chunkView.ViewSize) stats.FilerRequestHistogram.WithLabelValues("chunkDownload").Observe(time.Since(start).Seconds()) diff --git a/weed/filer/stream_failover_test.go b/weed/filer/stream_failover_test.go new file mode 100644 index 000000000..fcb4917d8 --- /dev/null +++ b/weed/filer/stream_failover_test.go @@ -0,0 +1,175 @@ +package filer + +import ( + "context" + "testing" + + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/wdclient" +) + +// mockMasterClient implements HasLookupFileIdFunction and CacheInvalidator +type mockMasterClient struct { + lookupFunc func(ctx context.Context, fileId string) ([]string, error) + invalidatedFileIds []string +} + +func (m *mockMasterClient) GetLookupFileIdFunction() wdclient.LookupFileIdFunctionType { + return m.lookupFunc +} + +func (m *mockMasterClient) InvalidateCache(fileId string) { + m.invalidatedFileIds = append(m.invalidatedFileIds, fileId) +} + +// Test urlSlicesEqual helper function +func TestUrlSlicesEqual(t *testing.T) { + tests := []struct { + name string + a []string + b []string + expected bool + }{ + { + name: "identical slices", + a: []string{"http://server1", "http://server2"}, + b: []string{"http://server1", "http://server2"}, + expected: true, + }, + { + name: "same URLs different order", + a: []string{"http://server1", "http://server2"}, + b: []string{"http://server2", "http://server1"}, + expected: true, + }, + { + name: "different URLs", + a: []string{"http://server1", "http://server2"}, + b: []string{"http://server1", "http://server3"}, + expected: false, + }, + { + name: "different lengths", + a: []string{"http://server1"}, + b: []string{"http://server1", "http://server2"}, + expected: false, + }, + { + name: "empty slices", + a: []string{}, + b: []string{}, + expected: true, + }, + { + name: "duplicates in both", + a: []string{"http://server1", "http://server1"}, + b: []string{"http://server1", "http://server1"}, + expected: true, + }, + { + name: "different duplicate counts", + a: []string{"http://server1", "http://server1"}, + b: []string{"http://server1", "http://server2"}, + expected: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := urlSlicesEqual(tt.a, tt.b) + if result != tt.expected { + t.Errorf("urlSlicesEqual(%v, %v) = %v; want %v", tt.a, tt.b, result, tt.expected) + } + }) + } +} + +// Test cache invalidation when read fails +func TestStreamContentWithCacheInvalidation(t *testing.T) { + ctx := context.Background() + fileId := "3,01234567890" + + callCount := 0 + oldUrls := []string{"http://failed-server:8080"} + newUrls := []string{"http://working-server:8080"} + + mock := &mockMasterClient{ + lookupFunc: func(ctx context.Context, fid string) ([]string, error) { + callCount++ + if callCount == 1 { + // First call returns failing server + return oldUrls, nil + } + // After invalidation, return working server + return newUrls, nil + }, + } + + // Create a simple chunk + chunks := []*filer_pb.FileChunk{ + { + FileId: fileId, + Offset: 0, + Size: 10, + }, + } + + streamFn, err := PrepareStreamContentWithThrottler(ctx, mock, noJwtFunc, chunks, 0, 10, 0) + if err != nil { + t.Fatalf("PrepareStreamContentWithThrottler failed: %v", err) + } + + // Note: This test can't fully execute streamFn because it would require actual HTTP servers + // However, we can verify the setup was created correctly + if streamFn == nil { + t.Fatal("Expected non-nil stream function") + } + + // Verify the lookup was called + if callCount != 1 { + t.Errorf("Expected 1 lookup call, got %d", callCount) + } +} + +// Test that InvalidateCache is called on read failure +func TestCacheInvalidationInterface(t *testing.T) { + mock := &mockMasterClient{ + lookupFunc: func(ctx context.Context, fileId string) ([]string, error) { + return []string{"http://server:8080"}, nil + }, + } + + fileId := "3,test123" + + // Simulate invalidation + if invalidator, ok := interface{}(mock).(CacheInvalidator); ok { + invalidator.InvalidateCache(fileId) + } else { + t.Fatal("mockMasterClient should implement CacheInvalidator") + } + + // Check that the file ID was recorded as invalidated + if len(mock.invalidatedFileIds) != 1 { + t.Fatalf("Expected 1 invalidated file ID, got %d", len(mock.invalidatedFileIds)) + } + if mock.invalidatedFileIds[0] != fileId { + t.Errorf("Expected invalidated file ID %s, got %s", fileId, mock.invalidatedFileIds[0]) + } +} + +// Test retry logic doesn't retry with same URLs +func TestRetryLogicSkipsSameUrls(t *testing.T) { + // This test verifies that the urlSlicesEqual check prevents infinite retries + sameUrls := []string{"http://server1:8080", "http://server2:8080"} + differentUrls := []string{"http://server3:8080", "http://server4:8080"} + + // Same URLs should return true (and thus skip retry) + if !urlSlicesEqual(sameUrls, sameUrls) { + t.Error("Expected same URLs to be equal") + } + + // Different URLs should return false (and thus allow retry) + if urlSlicesEqual(sameUrls, differentUrls) { + t.Error("Expected different URLs to not be equal") + } +} diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 64622023b..74951e841 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -20,7 +20,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" - "github.com/seaweedfs/seaweedfs/weed/wdclient" "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" @@ -1038,10 +1037,10 @@ func (s3a *S3ApiServer) streamFromVolumeServers(w http.ResponseWriter, r *http.R // Prepare streaming function with simple master client wrapper tStreamPrep := time.Now() - masterClient := &simpleMasterClient{lookupFn: lookupFileIdFn} + // Use filerClient directly (not wrapped) so it can support cache invalidation streamFn, err := filer.PrepareStreamContentWithThrottler( ctx, - masterClient, + s3a.filerClient, filer.JwtForVolumeServer, // Use filer's JWT function (loads config once, generates JWT locally) resolvedChunks, offset, @@ -1892,11 +1891,10 @@ func (s3a *S3ApiServer) getEncryptedStreamFromVolumes(ctx context.Context, entry return nil, err } - // Create streaming reader - masterClient := &simpleMasterClient{lookupFn: lookupFileIdFn} + // Create streaming reader - use filerClient directly for cache invalidation support streamFn, err := filer.PrepareStreamContentWithThrottler( ctx, - masterClient, + s3a.filerClient, filer.JwtForVolumeServer, // Use filer's JWT function (loads config once, generates JWT locally) resolvedChunks, 0, @@ -2047,15 +2045,6 @@ func (s3a *S3ApiServer) setResponseHeaders(w http.ResponseWriter, r *http.Reques } } -// simpleMasterClient implements the minimal interface for streaming -type simpleMasterClient struct { - lookupFn func(ctx context.Context, fileId string) ([]string, error) -} - -func (s *simpleMasterClient) GetLookupFileIdFunction() wdclient.LookupFileIdFunctionType { - return s.lookupFn -} - // HeadObjectHandler handles S3 HEAD object requests // // Special behavior for implicit directories: diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go index 179381b0c..25bac2cff 100644 --- a/weed/wdclient/vid_map.go +++ b/weed/wdclient/vid_map.go @@ -257,3 +257,15 @@ func (vc *vidMap) deleteEcLocation(vid uint32, location Location) { } } } + +func (vc *vidMap) deleteVid(vid uint32) { + if cachedMap := vc.cache.Load(); cachedMap != nil { + cachedMap.deleteVid(vid) + } + + vc.Lock() + defer vc.Unlock() + + delete(vc.vid2Locations, vid) + delete(vc.ecVid2Locations, vid) +} diff --git a/weed/wdclient/vidmap_client.go b/weed/wdclient/vidmap_client.go index 402eaf8c4..579291bfc 100644 --- a/weed/wdclient/vidmap_client.go +++ b/weed/wdclient/vidmap_client.go @@ -71,6 +71,9 @@ func (vc *vidMapClient) LookupFileIdWithFallback(ctx context.Context, fileId str } // Cache miss - extract volume ID from file ID (format: "volumeId,needle_id_cookie") + if fileId == "" { + return nil, fmt.Errorf("empty fileId") + } parts := strings.Split(fileId, ",") if len(parts) != 2 { return nil, fmt.Errorf("invalid fileId %s", fileId) @@ -345,3 +348,16 @@ func (vc *vidMapClient) resetVidMap() { // node is guaranteed to be non-nil after the loop node.cache.Store(nil) } + +// InvalidateCache removes all cached locations for a volume ID +func (vc *vidMapClient) InvalidateCache(fileId string) { + parts := strings.Split(fileId, ",") + vidString := parts[0] + vid, err := strconv.ParseUint(vidString, 10, 32) + if err != nil { + return + } + vc.withCurrentVidMap(func(vm *vidMap) { + vm.deleteVid(uint32(vid)) + }) +} diff --git a/weed/wdclient/vidmap_invalidation_test.go b/weed/wdclient/vidmap_invalidation_test.go new file mode 100644 index 000000000..e2a544d41 --- /dev/null +++ b/weed/wdclient/vidmap_invalidation_test.go @@ -0,0 +1,207 @@ +package wdclient + +import ( + "testing" +) + +// TestInvalidateCacheValidFileId tests cache invalidation with a valid file ID +func TestInvalidateCacheValidFileId(t *testing.T) { + // Create a simple vidMapClient (can use nil provider for this test) + vc := &vidMapClient{ + vidMap: newVidMap(""), + vidMapCacheSize: 5, + } + + // Add some locations to the cache + vid := uint32(456) + vc.vidMap.Lock() + vc.vidMap.vid2Locations[vid] = []Location{{Url: "http://server1:8080"}} + vc.vidMap.Unlock() + + // Verify location exists + vc.vidMap.RLock() + _, found := vc.vidMap.vid2Locations[vid] + vc.vidMap.RUnlock() + + if !found { + t.Fatal("Location should exist before invalidation") + } + + // Call InvalidateCache with a properly formatted file ID + fileId := "456,abcdef123456" + vc.InvalidateCache(fileId) + + // Verify the locations were removed + vc.vidMap.RLock() + _, foundAfter := vc.vidMap.vid2Locations[vid] + vc.vidMap.RUnlock() + + if foundAfter { + t.Errorf("Expected locations for vid %d to be removed after InvalidateCache", vid) + } +} + +// TestInvalidateCacheInvalidFileId tests cache invalidation with invalid file IDs +func TestInvalidateCacheInvalidFileId(t *testing.T) { + testCases := []struct { + name string + fileId string + }{ + {"empty file ID", ""}, + {"no comma separator", "12345"}, + {"non-numeric vid", "abc,defg"}, + {"negative vid", "-1,abcd"}, + {"oversized vid", "999999999999999999999,abcd"}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + vc := &vidMapClient{ + vidMap: newVidMap(""), + vidMapCacheSize: 5, + } + + // Add a location to ensure the cache isn't empty + vc.vidMap.Lock() + vc.vidMap.vid2Locations[1] = []Location{{Url: "http://server:8080"}} + vc.vidMap.Unlock() + + // This should not panic or cause errors + vc.InvalidateCache(tc.fileId) + + // Verify the existing location is still there (not affected) + vc.vidMap.RLock() + _, found := vc.vidMap.vid2Locations[1] + vc.vidMap.RUnlock() + + if !found { + t.Errorf("InvalidateCache with invalid fileId '%s' should not affect other entries", tc.fileId) + } + }) + } +} + +// TestInvalidateCacheWithHistory tests that invalidation propagates through cache history +func TestInvalidateCacheWithHistory(t *testing.T) { + vid := uint32(789) + + // Create first vidMap with the volume + vm1 := newVidMap("") + vm1.Lock() + vm1.vid2Locations[vid] = []Location{{Url: "http://server1:8080"}} + vm1.Unlock() + + // Create second vidMap with the cached first one + vm2 := newVidMap("") + vm2.cache.Store(vm1) // vm1 becomes the cache/history + vm2.Lock() + vm2.vid2Locations[vid] = []Location{{Url: "http://server2:8080"}} + vm2.Unlock() + + // Create vidMapClient with vm2 as current + vc := &vidMapClient{ + vidMap: vm2, + vidMapCacheSize: 5, + } + + // Verify both have the vid before invalidation + vm2.RLock() + _, foundInCurrent := vm2.vid2Locations[vid] + vm2.RUnlock() + + vm1.RLock() + _, foundInCache := vm1.vid2Locations[vid] + vm1.RUnlock() + + if !foundInCurrent || !foundInCache { + t.Fatal("Both maps should have the vid before invalidation") + } + + // Invalidate the cache + fileId := "789,xyz123" + vc.InvalidateCache(fileId) + + // Check that current map doesn't have the vid + vm2.RLock() + _, foundInCurrentAfter := vm2.vid2Locations[vid] + vm2.RUnlock() + + if foundInCurrentAfter { + t.Error("Expected vid to be removed from current vidMap after InvalidateCache") + } + + // Check that cache doesn't have the vid either (recursive deletion) + vm1.RLock() + _, foundInCacheAfter := vm1.vid2Locations[vid] + vm1.RUnlock() + + if foundInCacheAfter { + t.Error("Expected vid to be removed from cached vidMap as well (recursive deletion)") + } +} + +// TestDeleteVidRecursion tests the deleteVid method removes from history chain +func TestDeleteVidRecursion(t *testing.T) { + vid := uint32(999) + + // Create a chain: vm3 -> vm2 -> vm1 + vm1 := newVidMap("") + vm1.Lock() + vm1.vid2Locations[vid] = []Location{{Url: "http://server1:8080"}} + vm1.Unlock() + + vm2 := newVidMap("") + vm2.cache.Store(vm1) + vm2.Lock() + vm2.vid2Locations[vid] = []Location{{Url: "http://server2:8080"}} + vm2.Unlock() + + vm3 := newVidMap("") + vm3.cache.Store(vm2) + vm3.Lock() + vm3.vid2Locations[vid] = []Location{{Url: "http://server3:8080"}} + vm3.Unlock() + + // Verify all have the vid + vm3.RLock() + _, found3 := vm3.vid2Locations[vid] + vm3.RUnlock() + + vm2.RLock() + _, found2 := vm2.vid2Locations[vid] + vm2.RUnlock() + + vm1.RLock() + _, found1 := vm1.vid2Locations[vid] + vm1.RUnlock() + + if !found1 || !found2 || !found3 { + t.Fatal("All maps should have the vid before deletion") + } + + // Delete from vm3 (should cascade) + vm3.deleteVid(vid) + + // Verify it's gone from all + vm3.RLock() + _, found3After := vm3.vid2Locations[vid] + vm3.RUnlock() + + vm2.RLock() + _, found2After := vm2.vid2Locations[vid] + vm2.RUnlock() + + vm1.RLock() + _, found1After := vm1.vid2Locations[vid] + vm1.RUnlock() + + if found3After { + t.Error("Expected vid to be removed from vm3") + } + if found2After { + t.Error("Expected vid to be removed from vm2 (cascaded)") + } + if found1After { + t.Error("Expected vid to be removed from vm1 (cascaded)") + } +}