diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 6d88e56e9..b4888f448 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -27,7 +27,7 @@ type MasterClient struct { masters map[string]pb.ServerAddress grpcDialOption grpc.DialOption - vidMap + *vidMap vidMapCacheSize int OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time) OnPeerUpdateLock sync.RWMutex @@ -303,9 +303,12 @@ func (mc *MasterClient) resetVidMap() { DataCenter: mc.DataCenter, cache: mc.cache, } - mc.vidMap = newVidMap(mc.DataCenter) - mc.vidMap.cache = tail + nvm := newVidMap(mc.DataCenter) + nvm.cache = tail + mc.vidMap = nvm + + //trim for i := 0; i < mc.vidMapCacheSize && tail.cache != nil; i++ { if i == mc.vidMapCacheSize-1 { tail.cache = nil diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go index 89542b25b..5c3d167db 100644 --- a/weed/wdclient/vid_map.go +++ b/weed/wdclient/vid_map.go @@ -43,8 +43,8 @@ type vidMap struct { cache *vidMap } -func newVidMap(dataCenter string) vidMap { - return vidMap{ +func newVidMap(dataCenter string) *vidMap { + return &vidMap{ vid2Locations: make(map[uint32][]Location), ecVid2Locations: make(map[uint32][]Location), DataCenter: dataCenter, diff --git a/weed/wdclient/vid_map_test.go b/weed/wdclient/vid_map_test.go index eae456c9f..980e5bd8c 100644 --- a/weed/wdclient/vid_map_test.go +++ b/weed/wdclient/vid_map_test.go @@ -1,15 +1,17 @@ package wdclient import ( + "context" "fmt" "google.golang.org/grpc" "strconv" "sync" "testing" + "time" ) func TestLocationIndex(t *testing.T) { - vm := vidMap{} + vm := &vidMap{} // test must be failed mustFailed := func(length int) { _, err := vm.getLocationIndex(length) @@ -132,6 +134,43 @@ func TestLookupFileId(t *testing.T) { wg.Wait() } +func TestConcurrentGetLocations(t *testing.T) { + mc := NewMasterClient(grpc.EmptyDialOption{}, "", "", "", "", "", nil) + location := Location{Url: "TestDataRacing"} + mc.addLocation(1, location) + + ctx, cancel := context.WithCancel(context.Background()) + wg := sync.WaitGroup{} + for i := 0; i < 50; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + return + default: + _, found := mc.GetLocations(1) + if !found { + cancel() + t.Error("vid map invalid due to data racing. ") + return + } + } + } + }() + } + + //Simulate vidmap reset with cache when leader changes + for i := 0; i < 100; i++ { + mc.resetVidMap() + mc.addLocation(1, location) + time.Sleep(1 * time.Microsecond) + } + cancel() + wg.Wait() +} + func BenchmarkLocationIndex(b *testing.B) { b.SetParallelism(8) vm := vidMap{