Browse Source

avoid dup lookup

pull/7411/head
chrislu 2 months ago
parent
commit
0667c4964e
  1. 133
      weed/wdclient/masterclient.go

133
weed/wdclient/masterclient.go

@ -9,6 +9,8 @@ import (
"sync"
"time"
"golang.org/x/sync/singleflight"
"github.com/seaweedfs/seaweedfs/weed/util/version"
"github.com/seaweedfs/seaweedfs/weed/stats"
@ -35,18 +37,23 @@ type MasterClient struct {
vidMapCacheSize int
OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
OnPeerUpdateLock sync.RWMutex
// Per-volume-ID in-flight tracking to prevent duplicate lookups
vidLookupLock sync.Mutex
vidLookupInFlight map[string]*singleflight.Group // volumeId -> singleflight group
}
func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters pb.ServerDiscovery) *MasterClient {
return &MasterClient{
FilerGroup: filerGroup,
clientType: clientType,
clientHost: clientHost,
rack: rack,
masters: masters,
grpcDialOption: grpcDialOption,
vidMap: newVidMap(clientDataCenter),
vidMapCacheSize: 5,
FilerGroup: filerGroup,
clientType: clientType,
clientHost: clientHost,
rack: rack,
masters: masters,
grpcDialOption: grpcDialOption,
vidMap: newVidMap(clientDataCenter),
vidMapCacheSize: 5,
vidLookupInFlight: make(map[string]*singleflight.Group),
}
}
@ -97,11 +104,12 @@ func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId str
}
// LookupVolumeIdsWithFallback looks up volume locations, querying master if not in cache
// Uses per-volume-ID singleflight to prevent duplicate lookups of the same volume
func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeIds []string) (map[string][]Location, error) {
result := make(map[string][]Location)
var missingVids []string
var needsLookup []string
// Check cache first
// Check cache first and separate volumes that need lookup
for _, vidString := range volumeIds {
vid, err := strconv.ParseUint(vidString, 10, 32)
if err != nil {
@ -111,58 +119,91 @@ func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeI
if found && len(locations) > 0 {
result[vidString] = locations
} else {
missingVids = append(missingVids, vidString)
needsLookup = append(needsLookup, vidString)
}
}
// Query master for missing volumes
if len(missingVids) > 0 {
glog.V(2).Infof("Looking up %d volumes from master: %v", len(missingVids), missingVids)
err := pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{
VolumeOrFileIds: missingVids,
})
if err != nil {
return fmt.Errorf("master lookup failed: %v", err)
if len(needsLookup) == 0 {
return result, nil
}
// For each volume that needs lookup, use per-volume singleflight
// to prevent duplicate master queries for the same volume ID
for _, vidString := range needsLookup {
// Get or create singleflight group for this volume ID
mc.vidLookupLock.Lock()
group, exists := mc.vidLookupInFlight[vidString]
if !exists {
group = &singleflight.Group{}
mc.vidLookupInFlight[vidString] = group
}
mc.vidLookupLock.Unlock()
// Use singleflight to ensure only one lookup per volume ID
sfResult, err, _ := group.Do(vidString, func() (interface{}, error) {
// Double-check cache in case it was populated while we were waiting
vid, _ := strconv.ParseUint(vidString, 10, 32)
if locations, found := mc.GetLocations(uint32(vid)); found && len(locations) > 0 {
return locations, nil
}
for _, vidLoc := range resp.VolumeIdLocations {
if vidLoc.Error != "" {
glog.V(0).Infof("volume %s lookup error: %s", vidLoc.VolumeOrFileId, vidLoc.Error)
continue
}
// Query master for this volume
glog.V(2).Infof("Looking up volume %s from master", vidString)
var locations []Location
vidString := vidLoc.VolumeOrFileId
// Parse volume ID from response (could be "123" or "123,abc")
parts := strings.Split(vidString, ",")
vidOnly := parts[0]
vid, err := strconv.ParseUint(vidOnly, 10, 32)
err := pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{
VolumeOrFileIds: []string{vidString},
})
if err != nil {
glog.Warningf("Failed to parse volume id '%s' from master response '%s': %v", vidOnly, vidString, err)
continue
return fmt.Errorf("master lookup failed: %v", err)
}
var locations []Location
for _, masterLoc := range vidLoc.Locations {
loc := Location{
Url: masterLoc.Url,
PublicUrl: masterLoc.PublicUrl,
GrpcPort: int(masterLoc.GrpcPort),
DataCenter: masterLoc.DataCenter,
for _, vidLoc := range resp.VolumeIdLocations {
if vidLoc.Error != "" {
return fmt.Errorf("volume %s lookup error: %s", vidString, vidLoc.Error)
}
// Parse volume ID from response
parts := strings.Split(vidLoc.VolumeOrFileId, ",")
vidOnly := parts[0]
vid, err := strconv.ParseUint(vidOnly, 10, 32)
if err != nil {
return fmt.Errorf("failed to parse volume id '%s': %v", vidOnly, err)
}
mc.vidMap.addLocation(uint32(vid), loc)
locations = append(locations, loc)
}
if len(locations) > 0 {
result[vidOnly] = locations
for _, masterLoc := range vidLoc.Locations {
loc := Location{
Url: masterLoc.Url,
PublicUrl: masterLoc.PublicUrl,
GrpcPort: int(masterLoc.GrpcPort),
DataCenter: masterLoc.DataCenter,
}
mc.vidMap.addLocation(uint32(vid), loc)
locations = append(locations, loc)
}
}
return nil
})
if err != nil {
return nil, err
}
return nil
return locations, nil
})
// Clean up the singleflight group for this volume
mc.vidLookupLock.Lock()
delete(mc.vidLookupInFlight, vidString)
mc.vidLookupLock.Unlock()
if err != nil {
return result, err
glog.Warningf("Failed to lookup volume %s: %v", vidString, err)
continue // Continue with other volumes
}
if locations, ok := sfResult.([]Location); ok && len(locations) > 0 {
result[vidString] = locations
}
}

Loading…
Cancel
Save