|
|
|
@ -2,6 +2,7 @@ package wdclient |
|
|
|
|
|
|
|
import ( |
|
|
|
"context" |
|
|
|
"errors" |
|
|
|
"fmt" |
|
|
|
"math/rand" |
|
|
|
"sort" |
|
|
|
@ -39,7 +40,7 @@ type MasterClient struct { |
|
|
|
OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time) |
|
|
|
OnPeerUpdateLock sync.RWMutex |
|
|
|
|
|
|
|
// Per-volume-ID in-flight tracking to prevent duplicate lookups
|
|
|
|
// Per-batch in-flight tracking to prevent duplicate lookups for the same set of volumes
|
|
|
|
vidLookupGroup singleflight.Group |
|
|
|
} |
|
|
|
|
|
|
|
@ -108,7 +109,7 @@ func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId str |
|
|
|
} |
|
|
|
|
|
|
|
// LookupVolumeIdsWithFallback looks up volume locations, querying master if not in cache
|
|
|
|
// Uses per-volume-ID singleflight to prevent duplicate lookups, with batched master queries
|
|
|
|
// Uses singleflight to coalesce concurrent requests for the same batch of volumes
|
|
|
|
func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeIds []string) (map[string][]Location, error) { |
|
|
|
result := make(map[string][]Location) |
|
|
|
var needsLookup []string |
|
|
|
@ -227,16 +228,8 @@ func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeI |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
// Return aggregated errors with clear formatting
|
|
|
|
if len(lookupErrors) > 0 { |
|
|
|
errorMessages := make([]string, 0, len(lookupErrors)) |
|
|
|
for _, err := range lookupErrors { |
|
|
|
errorMessages = append(errorMessages, err.Error()) |
|
|
|
} |
|
|
|
return result, fmt.Errorf("lookup errors: %s", strings.Join(errorMessages, "; ")) |
|
|
|
} |
|
|
|
|
|
|
|
return result, nil |
|
|
|
// Return aggregated errors using errors.Join to preserve error types
|
|
|
|
return result, errors.Join(lookupErrors...) |
|
|
|
} |
|
|
|
|
|
|
|
func (mc *MasterClient) getCurrentMaster() pb.ServerAddress { |
|
|
|
|