diff --git a/weed/shell/commands.go b/weed/shell/commands.go index 62dcfd7f8..55a09e392 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -116,7 +116,7 @@ func (ce *CommandEnv) AdjustedUrl(location *filer_pb.Location) string { } func (ce *CommandEnv) GetDataCenter() string { - return ce.MasterClient.DataCenter + return ce.MasterClient.GetDataCenter() } func parseFilerUrl(entryPath string) (filerServer string, filerPort int64, path string, err error) { diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index f3950bc37..320156294 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -35,10 +35,10 @@ type MasterClient struct { masters pb.ServerDiscovery grpcDialOption grpc.DialOption - // TODO: CRITICAL - Data race: resetVidMap() writes to vidMap while other methods read concurrently - // This embedded *vidMap should be changed to a private field protected by sync.RWMutex - // See: https://github.com/seaweedfs/seaweedfs/issues/[ISSUE_NUMBER] - *vidMap + // vidMap stores volume location mappings + // Protected by vidMapLock to prevent race conditions during pointer swaps in resetVidMap + vidMap *vidMap + vidMapLock sync.RWMutex vidMapCacheSize int OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time) OnPeerUpdateLock sync.RWMutex @@ -71,8 +71,13 @@ func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType { } func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId string) (fullUrls []string, err error) { - // Try cache first using the fast path - fullUrls, err = mc.vidMap.LookupFileId(ctx, fileId) + // Try cache first using the fast path - grab both vidMap and dataCenter in one lock + mc.vidMapLock.RLock() + vm := mc.vidMap + dataCenter := vm.DataCenter + mc.vidMapLock.RUnlock() + + fullUrls, err = vm.LookupFileId(ctx, fileId) if err == nil && len(fullUrls) > 0 { return } @@ -99,7 +104,7 @@ func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId str var sameDcUrls, otherDcUrls []string for _, loc := range locations { httpUrl := "http://" + loc.Url + "/" + fileId - if mc.DataCenter != "" && mc.DataCenter == loc.DataCenter { + if dataCenter != "" && dataCenter == loc.DataCenter { sameDcUrls = append(sameDcUrls, httpUrl) } else { otherDcUrls = append(otherDcUrls, httpUrl) @@ -120,6 +125,10 @@ func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeI // Check cache first and parse volume IDs once vidStringToUint := make(map[string]uint32, len(volumeIds)) + + // Get stable pointer to vidMap with minimal lock hold time + vm := mc.getStableVidMap() + for _, vidString := range volumeIds { vid, err := strconv.ParseUint(vidString, 10, 32) if err != nil { @@ -127,7 +136,7 @@ func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeI } vidStringToUint[vidString] = uint32(vid) - locations, found := mc.GetLocations(uint32(vid)) + locations, found := vm.GetLocations(uint32(vid)) if found && len(locations) > 0 { result[vidString] = locations } else { @@ -149,9 +158,12 @@ func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeI stillNeedLookup := make([]string, 0, len(needsLookup)) batchResult := make(map[string][]Location) + // Get stable pointer with minimal lock hold time + vm := mc.getStableVidMap() + for _, vidString := range needsLookup { vid := vidStringToUint[vidString] // Use pre-parsed value - if locations, found := mc.GetLocations(vid); found && len(locations) > 0 { + if locations, found := vm.GetLocations(vid); found && len(locations) > 0 { batchResult[vidString] = locations } else { stillNeedLookup = append(stillNeedLookup, vidString) @@ -196,7 +208,7 @@ func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeI GrpcPort: int(masterLoc.GrpcPort), DataCenter: masterLoc.DataCenter, } - mc.vidMap.addLocation(uint32(vid), loc) + mc.addLocation(uint32(vid), loc) locations = append(locations, loc) } @@ -351,7 +363,7 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server if err = stream.Send(&master_pb.KeepConnectedRequest{ FilerGroup: mc.FilerGroup, - DataCenter: mc.DataCenter, + DataCenter: mc.GetDataCenter(), Rack: mc.rack, ClientType: mc.clientType, ClientAddress: string(mc.clientHost), @@ -482,24 +494,110 @@ func (mc *MasterClient) WithClientCustomGetMaster(getMasterF func() pb.ServerAdd }) } +// getStableVidMap gets a stable pointer to the vidMap, releasing the lock immediately. +// This is safe for read operations as the returned pointer is a stable snapshot, +// and the underlying vidMap methods have their own internal locking. +func (mc *MasterClient) getStableVidMap() *vidMap { + mc.vidMapLock.RLock() + vm := mc.vidMap + mc.vidMapLock.RUnlock() + return vm +} + +// withCurrentVidMap executes a function with the current vidMap under a read lock. +// This is for methods that modify vidMap's internal state, ensuring the pointer +// is not swapped by resetVidMap during the operation. The actual map mutations +// are protected by vidMap's internal mutex. +func (mc *MasterClient) withCurrentVidMap(f func(vm *vidMap)) { + mc.vidMapLock.RLock() + defer mc.vidMapLock.RUnlock() + f(mc.vidMap) +} + +// Public methods for external packages to access vidMap safely + +// GetLocations safely retrieves volume locations +func (mc *MasterClient) GetLocations(vid uint32) (locations []Location, found bool) { + return mc.getStableVidMap().GetLocations(vid) +} + +// GetLocationsClone safely retrieves a clone of volume locations +func (mc *MasterClient) GetLocationsClone(vid uint32) (locations []Location, found bool) { + return mc.getStableVidMap().GetLocationsClone(vid) +} + +// GetVidLocations safely retrieves volume locations by string ID +func (mc *MasterClient) GetVidLocations(vid string) (locations []Location, err error) { + return mc.getStableVidMap().GetVidLocations(vid) +} + +// LookupFileId safely looks up URLs for a file ID +func (mc *MasterClient) LookupFileId(ctx context.Context, fileId string) (fullUrls []string, err error) { + return mc.getStableVidMap().LookupFileId(ctx, fileId) +} + +// LookupVolumeServerUrl safely looks up volume server URLs +func (mc *MasterClient) LookupVolumeServerUrl(vid string) (serverUrls []string, err error) { + return mc.getStableVidMap().LookupVolumeServerUrl(vid) +} + +// GetDataCenter safely retrieves the data center +func (mc *MasterClient) GetDataCenter() string { + return mc.getStableVidMap().DataCenter +} + +// Thread-safe helpers for vidMap operations + +// addLocation adds a volume location +func (mc *MasterClient) addLocation(vid uint32, location Location) { + mc.withCurrentVidMap(func(vm *vidMap) { + vm.addLocation(vid, location) + }) +} + +// deleteLocation removes a volume location +func (mc *MasterClient) deleteLocation(vid uint32, location Location) { + mc.withCurrentVidMap(func(vm *vidMap) { + vm.deleteLocation(vid, location) + }) +} + +// addEcLocation adds an EC volume location +func (mc *MasterClient) addEcLocation(vid uint32, location Location) { + mc.withCurrentVidMap(func(vm *vidMap) { + vm.addEcLocation(vid, location) + }) +} + +// deleteEcLocation removes an EC volume location +func (mc *MasterClient) deleteEcLocation(vid uint32, location Location) { + mc.withCurrentVidMap(func(vm *vidMap) { + vm.deleteEcLocation(vid, location) + }) +} + func (mc *MasterClient) resetVidMap() { - tail := &vidMap{ - vid2Locations: mc.vid2Locations, - ecVid2Locations: mc.ecVid2Locations, - DataCenter: mc.DataCenter, - cache: mc.cache, - } + mc.vidMapLock.Lock() + defer mc.vidMapLock.Unlock() - nvm := newVidMap(mc.DataCenter) - nvm.cache = tail + // Preserve the existing vidMap in the cache chain + // No need to clone - the existing vidMap has its own mutex for thread safety + tail := mc.vidMap + + nvm := newVidMap(tail.DataCenter) + nvm.cache.Store(tail) mc.vidMap = nvm - //trim - for i := 0; i < mc.vidMapCacheSize && tail.cache != nil; i++ { - if i == mc.vidMapCacheSize-1 { - tail.cache = nil - } else { - tail = tail.cache + // Trim cache chain to vidMapCacheSize by traversing to the last node + // that should remain and cutting the chain after it + node := tail + for i := 0; i < mc.vidMapCacheSize-1; i++ { + if node.cache.Load() == nil { + return } + node = node.cache.Load() + } + if node != nil { + node.cache.Store(nil) } } diff --git a/weed/wdclient/vid_map.go b/weed/wdclient/vid_map.go index 9d5e5d378..179381b0c 100644 --- a/weed/wdclient/vid_map.go +++ b/weed/wdclient/vid_map.go @@ -4,13 +4,14 @@ import ( "context" "errors" "fmt" - "github.com/seaweedfs/seaweedfs/weed/pb" "math/rand" "strconv" "strings" "sync" "sync/atomic" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/glog" ) @@ -41,7 +42,7 @@ type vidMap struct { ecVid2Locations map[uint32][]Location DataCenter string cursor int32 - cache *vidMap + cache atomic.Pointer[vidMap] } func newVidMap(dataCenter string) *vidMap { @@ -135,8 +136,8 @@ func (vc *vidMap) GetLocations(vid uint32) (locations []Location, found bool) { return locations, found } - if vc.cache != nil { - return vc.cache.GetLocations(vid) + if cachedMap := vc.cache.Load(); cachedMap != nil { + return cachedMap.GetLocations(vid) } return nil, false @@ -212,8 +213,8 @@ func (vc *vidMap) addEcLocation(vid uint32, location Location) { } func (vc *vidMap) deleteLocation(vid uint32, location Location) { - if vc.cache != nil { - vc.cache.deleteLocation(vid, location) + if cachedMap := vc.cache.Load(); cachedMap != nil { + cachedMap.deleteLocation(vid, location) } vc.Lock() @@ -235,8 +236,8 @@ func (vc *vidMap) deleteLocation(vid uint32, location Location) { } func (vc *vidMap) deleteEcLocation(vid uint32, location Location) { - if vc.cache != nil { - vc.cache.deleteLocation(vid, location) + if cachedMap := vc.cache.Load(); cachedMap != nil { + cachedMap.deleteEcLocation(vid, location) } vc.Lock()