Browse Source
Fix S3 Gateway Read Failover #8076 (#8087)
Fix S3 Gateway Read Failover #8076 (#8087)
* fix s3 read failover #8076 - Implement cache invalidation in vidMapClient - Add retry logic in shared PrepareStreamContentWithThrottler - Update S3 Gateway to use FilerClient directly for invalidation support - Remove obsolete simpleMasterClient struct * improve observability for chunk re-lookup failures Added a warning log when volume location re-lookup fails after cache invalidation in PrepareStreamContentWithThrottler. * address code review feedback - Prevent infinite retry loops by comparing old/new URLs before retry - Update fileId2Url map after successful re-lookup for subsequent references - Add comprehensive test coverage for failover logic - Add tests for InvalidateCache method * Fix: prevent data duplication in stream retry and improve VidMap robustness * Cleanup: remove redundant check in InvalidateCachepull/8016/merge
committed by
GitHub
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 476 additions and 22 deletions
-
12weed/filer/filechunk_manifest.go
-
57weed/filer/stream.go
-
175weed/filer/stream_failover_test.go
-
19weed/s3api/s3api_object_handlers.go
-
12weed/wdclient/vid_map.go
-
16weed/wdclient/vidmap_client.go
-
207weed/wdclient/vidmap_invalidation_test.go
@ -0,0 +1,175 @@ |
|||
package filer |
|||
|
|||
import ( |
|||
"context" |
|||
"testing" |
|||
|
|||
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" |
|||
"github.com/seaweedfs/seaweedfs/weed/wdclient" |
|||
) |
|||
|
|||
// mockMasterClient implements HasLookupFileIdFunction and CacheInvalidator
|
|||
type mockMasterClient struct { |
|||
lookupFunc func(ctx context.Context, fileId string) ([]string, error) |
|||
invalidatedFileIds []string |
|||
} |
|||
|
|||
func (m *mockMasterClient) GetLookupFileIdFunction() wdclient.LookupFileIdFunctionType { |
|||
return m.lookupFunc |
|||
} |
|||
|
|||
func (m *mockMasterClient) InvalidateCache(fileId string) { |
|||
m.invalidatedFileIds = append(m.invalidatedFileIds, fileId) |
|||
} |
|||
|
|||
// Test urlSlicesEqual helper function
|
|||
func TestUrlSlicesEqual(t *testing.T) { |
|||
tests := []struct { |
|||
name string |
|||
a []string |
|||
b []string |
|||
expected bool |
|||
}{ |
|||
{ |
|||
name: "identical slices", |
|||
a: []string{"http://server1", "http://server2"}, |
|||
b: []string{"http://server1", "http://server2"}, |
|||
expected: true, |
|||
}, |
|||
{ |
|||
name: "same URLs different order", |
|||
a: []string{"http://server1", "http://server2"}, |
|||
b: []string{"http://server2", "http://server1"}, |
|||
expected: true, |
|||
}, |
|||
{ |
|||
name: "different URLs", |
|||
a: []string{"http://server1", "http://server2"}, |
|||
b: []string{"http://server1", "http://server3"}, |
|||
expected: false, |
|||
}, |
|||
{ |
|||
name: "different lengths", |
|||
a: []string{"http://server1"}, |
|||
b: []string{"http://server1", "http://server2"}, |
|||
expected: false, |
|||
}, |
|||
{ |
|||
name: "empty slices", |
|||
a: []string{}, |
|||
b: []string{}, |
|||
expected: true, |
|||
}, |
|||
{ |
|||
name: "duplicates in both", |
|||
a: []string{"http://server1", "http://server1"}, |
|||
b: []string{"http://server1", "http://server1"}, |
|||
expected: true, |
|||
}, |
|||
{ |
|||
name: "different duplicate counts", |
|||
a: []string{"http://server1", "http://server1"}, |
|||
b: []string{"http://server1", "http://server2"}, |
|||
expected: false, |
|||
}, |
|||
} |
|||
|
|||
for _, tt := range tests { |
|||
t.Run(tt.name, func(t *testing.T) { |
|||
result := urlSlicesEqual(tt.a, tt.b) |
|||
if result != tt.expected { |
|||
t.Errorf("urlSlicesEqual(%v, %v) = %v; want %v", tt.a, tt.b, result, tt.expected) |
|||
} |
|||
}) |
|||
} |
|||
} |
|||
|
|||
// Test cache invalidation when read fails
|
|||
func TestStreamContentWithCacheInvalidation(t *testing.T) { |
|||
ctx := context.Background() |
|||
fileId := "3,01234567890" |
|||
|
|||
callCount := 0 |
|||
oldUrls := []string{"http://failed-server:8080"} |
|||
newUrls := []string{"http://working-server:8080"} |
|||
|
|||
mock := &mockMasterClient{ |
|||
lookupFunc: func(ctx context.Context, fid string) ([]string, error) { |
|||
callCount++ |
|||
if callCount == 1 { |
|||
// First call returns failing server
|
|||
return oldUrls, nil |
|||
} |
|||
// After invalidation, return working server
|
|||
return newUrls, nil |
|||
}, |
|||
} |
|||
|
|||
// Create a simple chunk
|
|||
chunks := []*filer_pb.FileChunk{ |
|||
{ |
|||
FileId: fileId, |
|||
Offset: 0, |
|||
Size: 10, |
|||
}, |
|||
} |
|||
|
|||
streamFn, err := PrepareStreamContentWithThrottler(ctx, mock, noJwtFunc, chunks, 0, 10, 0) |
|||
if err != nil { |
|||
t.Fatalf("PrepareStreamContentWithThrottler failed: %v", err) |
|||
} |
|||
|
|||
// Note: This test can't fully execute streamFn because it would require actual HTTP servers
|
|||
// However, we can verify the setup was created correctly
|
|||
if streamFn == nil { |
|||
t.Fatal("Expected non-nil stream function") |
|||
} |
|||
|
|||
// Verify the lookup was called
|
|||
if callCount != 1 { |
|||
t.Errorf("Expected 1 lookup call, got %d", callCount) |
|||
} |
|||
} |
|||
|
|||
// Test that InvalidateCache is called on read failure
|
|||
func TestCacheInvalidationInterface(t *testing.T) { |
|||
mock := &mockMasterClient{ |
|||
lookupFunc: func(ctx context.Context, fileId string) ([]string, error) { |
|||
return []string{"http://server:8080"}, nil |
|||
}, |
|||
} |
|||
|
|||
fileId := "3,test123" |
|||
|
|||
// Simulate invalidation
|
|||
if invalidator, ok := interface{}(mock).(CacheInvalidator); ok { |
|||
invalidator.InvalidateCache(fileId) |
|||
} else { |
|||
t.Fatal("mockMasterClient should implement CacheInvalidator") |
|||
} |
|||
|
|||
// Check that the file ID was recorded as invalidated
|
|||
if len(mock.invalidatedFileIds) != 1 { |
|||
t.Fatalf("Expected 1 invalidated file ID, got %d", len(mock.invalidatedFileIds)) |
|||
} |
|||
if mock.invalidatedFileIds[0] != fileId { |
|||
t.Errorf("Expected invalidated file ID %s, got %s", fileId, mock.invalidatedFileIds[0]) |
|||
} |
|||
} |
|||
|
|||
// Test retry logic doesn't retry with same URLs
|
|||
func TestRetryLogicSkipsSameUrls(t *testing.T) { |
|||
// This test verifies that the urlSlicesEqual check prevents infinite retries
|
|||
sameUrls := []string{"http://server1:8080", "http://server2:8080"} |
|||
differentUrls := []string{"http://server3:8080", "http://server4:8080"} |
|||
|
|||
// Same URLs should return true (and thus skip retry)
|
|||
if !urlSlicesEqual(sameUrls, sameUrls) { |
|||
t.Error("Expected same URLs to be equal") |
|||
} |
|||
|
|||
// Different URLs should return false (and thus allow retry)
|
|||
if urlSlicesEqual(sameUrls, differentUrls) { |
|||
t.Error("Expected different URLs to not be equal") |
|||
} |
|||
} |
|||
@ -0,0 +1,207 @@ |
|||
package wdclient |
|||
|
|||
import ( |
|||
"testing" |
|||
) |
|||
|
|||
// TestInvalidateCacheValidFileId tests cache invalidation with a valid file ID
|
|||
func TestInvalidateCacheValidFileId(t *testing.T) { |
|||
// Create a simple vidMapClient (can use nil provider for this test)
|
|||
vc := &vidMapClient{ |
|||
vidMap: newVidMap(""), |
|||
vidMapCacheSize: 5, |
|||
} |
|||
|
|||
// Add some locations to the cache
|
|||
vid := uint32(456) |
|||
vc.vidMap.Lock() |
|||
vc.vidMap.vid2Locations[vid] = []Location{{Url: "http://server1:8080"}} |
|||
vc.vidMap.Unlock() |
|||
|
|||
// Verify location exists
|
|||
vc.vidMap.RLock() |
|||
_, found := vc.vidMap.vid2Locations[vid] |
|||
vc.vidMap.RUnlock() |
|||
|
|||
if !found { |
|||
t.Fatal("Location should exist before invalidation") |
|||
} |
|||
|
|||
// Call InvalidateCache with a properly formatted file ID
|
|||
fileId := "456,abcdef123456" |
|||
vc.InvalidateCache(fileId) |
|||
|
|||
// Verify the locations were removed
|
|||
vc.vidMap.RLock() |
|||
_, foundAfter := vc.vidMap.vid2Locations[vid] |
|||
vc.vidMap.RUnlock() |
|||
|
|||
if foundAfter { |
|||
t.Errorf("Expected locations for vid %d to be removed after InvalidateCache", vid) |
|||
} |
|||
} |
|||
|
|||
// TestInvalidateCacheInvalidFileId tests cache invalidation with invalid file IDs
|
|||
func TestInvalidateCacheInvalidFileId(t *testing.T) { |
|||
testCases := []struct { |
|||
name string |
|||
fileId string |
|||
}{ |
|||
{"empty file ID", ""}, |
|||
{"no comma separator", "12345"}, |
|||
{"non-numeric vid", "abc,defg"}, |
|||
{"negative vid", "-1,abcd"}, |
|||
{"oversized vid", "999999999999999999999,abcd"}, |
|||
} |
|||
|
|||
for _, tc := range testCases { |
|||
t.Run(tc.name, func(t *testing.T) { |
|||
vc := &vidMapClient{ |
|||
vidMap: newVidMap(""), |
|||
vidMapCacheSize: 5, |
|||
} |
|||
|
|||
// Add a location to ensure the cache isn't empty
|
|||
vc.vidMap.Lock() |
|||
vc.vidMap.vid2Locations[1] = []Location{{Url: "http://server:8080"}} |
|||
vc.vidMap.Unlock() |
|||
|
|||
// This should not panic or cause errors
|
|||
vc.InvalidateCache(tc.fileId) |
|||
|
|||
// Verify the existing location is still there (not affected)
|
|||
vc.vidMap.RLock() |
|||
_, found := vc.vidMap.vid2Locations[1] |
|||
vc.vidMap.RUnlock() |
|||
|
|||
if !found { |
|||
t.Errorf("InvalidateCache with invalid fileId '%s' should not affect other entries", tc.fileId) |
|||
} |
|||
}) |
|||
} |
|||
} |
|||
|
|||
// TestInvalidateCacheWithHistory tests that invalidation propagates through cache history
|
|||
func TestInvalidateCacheWithHistory(t *testing.T) { |
|||
vid := uint32(789) |
|||
|
|||
// Create first vidMap with the volume
|
|||
vm1 := newVidMap("") |
|||
vm1.Lock() |
|||
vm1.vid2Locations[vid] = []Location{{Url: "http://server1:8080"}} |
|||
vm1.Unlock() |
|||
|
|||
// Create second vidMap with the cached first one
|
|||
vm2 := newVidMap("") |
|||
vm2.cache.Store(vm1) // vm1 becomes the cache/history
|
|||
vm2.Lock() |
|||
vm2.vid2Locations[vid] = []Location{{Url: "http://server2:8080"}} |
|||
vm2.Unlock() |
|||
|
|||
// Create vidMapClient with vm2 as current
|
|||
vc := &vidMapClient{ |
|||
vidMap: vm2, |
|||
vidMapCacheSize: 5, |
|||
} |
|||
|
|||
// Verify both have the vid before invalidation
|
|||
vm2.RLock() |
|||
_, foundInCurrent := vm2.vid2Locations[vid] |
|||
vm2.RUnlock() |
|||
|
|||
vm1.RLock() |
|||
_, foundInCache := vm1.vid2Locations[vid] |
|||
vm1.RUnlock() |
|||
|
|||
if !foundInCurrent || !foundInCache { |
|||
t.Fatal("Both maps should have the vid before invalidation") |
|||
} |
|||
|
|||
// Invalidate the cache
|
|||
fileId := "789,xyz123" |
|||
vc.InvalidateCache(fileId) |
|||
|
|||
// Check that current map doesn't have the vid
|
|||
vm2.RLock() |
|||
_, foundInCurrentAfter := vm2.vid2Locations[vid] |
|||
vm2.RUnlock() |
|||
|
|||
if foundInCurrentAfter { |
|||
t.Error("Expected vid to be removed from current vidMap after InvalidateCache") |
|||
} |
|||
|
|||
// Check that cache doesn't have the vid either (recursive deletion)
|
|||
vm1.RLock() |
|||
_, foundInCacheAfter := vm1.vid2Locations[vid] |
|||
vm1.RUnlock() |
|||
|
|||
if foundInCacheAfter { |
|||
t.Error("Expected vid to be removed from cached vidMap as well (recursive deletion)") |
|||
} |
|||
} |
|||
|
|||
// TestDeleteVidRecursion tests the deleteVid method removes from history chain
|
|||
func TestDeleteVidRecursion(t *testing.T) { |
|||
vid := uint32(999) |
|||
|
|||
// Create a chain: vm3 -> vm2 -> vm1
|
|||
vm1 := newVidMap("") |
|||
vm1.Lock() |
|||
vm1.vid2Locations[vid] = []Location{{Url: "http://server1:8080"}} |
|||
vm1.Unlock() |
|||
|
|||
vm2 := newVidMap("") |
|||
vm2.cache.Store(vm1) |
|||
vm2.Lock() |
|||
vm2.vid2Locations[vid] = []Location{{Url: "http://server2:8080"}} |
|||
vm2.Unlock() |
|||
|
|||
vm3 := newVidMap("") |
|||
vm3.cache.Store(vm2) |
|||
vm3.Lock() |
|||
vm3.vid2Locations[vid] = []Location{{Url: "http://server3:8080"}} |
|||
vm3.Unlock() |
|||
|
|||
// Verify all have the vid
|
|||
vm3.RLock() |
|||
_, found3 := vm3.vid2Locations[vid] |
|||
vm3.RUnlock() |
|||
|
|||
vm2.RLock() |
|||
_, found2 := vm2.vid2Locations[vid] |
|||
vm2.RUnlock() |
|||
|
|||
vm1.RLock() |
|||
_, found1 := vm1.vid2Locations[vid] |
|||
vm1.RUnlock() |
|||
|
|||
if !found1 || !found2 || !found3 { |
|||
t.Fatal("All maps should have the vid before deletion") |
|||
} |
|||
|
|||
// Delete from vm3 (should cascade)
|
|||
vm3.deleteVid(vid) |
|||
|
|||
// Verify it's gone from all
|
|||
vm3.RLock() |
|||
_, found3After := vm3.vid2Locations[vid] |
|||
vm3.RUnlock() |
|||
|
|||
vm2.RLock() |
|||
_, found2After := vm2.vid2Locations[vid] |
|||
vm2.RUnlock() |
|||
|
|||
vm1.RLock() |
|||
_, found1After := vm1.vid2Locations[vid] |
|||
vm1.RUnlock() |
|||
|
|||
if found3After { |
|||
t.Error("Expected vid to be removed from vm3") |
|||
} |
|||
if found2After { |
|||
t.Error("Expected vid to be removed from vm2 (cascaded)") |
|||
} |
|||
if found1After { |
|||
t.Error("Expected vid to be removed from vm1 (cascaded)") |
|||
} |
|||
} |
|||
Write
Preview
Loading…
Cancel
Save
Reference in new issue