From ba07b3e4c68f34219f3ca10f8f3ddcc059ffb972 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 30 Oct 2025 16:43:29 -0700 Subject: [PATCH 1/2] network: Adaptive timeout (#7410) * server can start when no network for local dev * fixed superfluous response.WriteHeader call" warning * adaptive based on last write time * more doc * refactoring --- weed/util/net_timeout.go | 71 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 67 insertions(+), 4 deletions(-) diff --git a/weed/util/net_timeout.go b/weed/util/net_timeout.go index f235a77b3..313d7f849 100644 --- a/weed/util/net_timeout.go +++ b/weed/util/net_timeout.go @@ -1,13 +1,24 @@ package util import ( - "github.com/seaweedfs/seaweedfs/weed/glog" "net" "time" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/stats" ) +const ( + // minThroughputBytesPerSecond defines the minimum expected throughput (4KB/s) + // Used to calculate timeout scaling based on data transferred + minThroughputBytesPerSecond = 4000 + + // graceTimeCapMultiplier caps the grace period for slow clients at 3x base timeout + // This prevents indefinite connections while allowing time for server-side chunk fetches + graceTimeCapMultiplier = 3 +) + // Listener wraps a net.Listener, and gives a place to store the timeout // parameters. On Accept, it will wrap the net.Conn with our own Conn for us. type Listener struct { @@ -39,11 +50,28 @@ type Conn struct { isClosed bool bytesRead int64 bytesWritten int64 + lastWrite time.Time +} + +// calculateBytesPerTimeout calculates the expected number of bytes that should +// be transferred during one timeout period, based on the minimum throughput. +// Returns at least 1 to prevent division by zero. +func calculateBytesPerTimeout(timeout time.Duration) int64 { + bytesPerTimeout := int64(float64(minThroughputBytesPerSecond) * timeout.Seconds()) + if bytesPerTimeout <= 0 { + return 1 // Prevent division by zero + } + return bytesPerTimeout } func (c *Conn) Read(b []byte) (count int, e error) { if c.ReadTimeout != 0 { - err := c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout * time.Duration(c.bytesRead/40000+1))) + // Calculate expected bytes per timeout period based on minimum throughput (4KB/s) + // Example: with ReadTimeout=30s, bytesPerTimeout = 4000 * 30 = 120KB + // After reading 1MB: multiplier = 1,000,000/120,000 + 1 ≈ 9, deadline = 30s * 9 = 270s + bytesPerTimeout := calculateBytesPerTimeout(c.ReadTimeout) + timeoutMultiplier := time.Duration(c.bytesRead/bytesPerTimeout + 1) + err := c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout * timeoutMultiplier)) if err != nil { return 0, err } @@ -58,8 +86,42 @@ func (c *Conn) Read(b []byte) (count int, e error) { func (c *Conn) Write(b []byte) (count int, e error) { if c.WriteTimeout != 0 { - // minimum 4KB/s - err := c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout * time.Duration(c.bytesWritten/40000+1))) + now := time.Now() + // Calculate timeout with two components: + // 1. Base timeout scaled by cumulative data (minimum throughput of 4KB/s) + // 2. Additional grace period if there was a gap since last write (for chunk fetch delays) + + // Calculate expected bytes per timeout period based on minimum throughput (4KB/s) + // Example: with WriteTimeout=30s, bytesPerTimeout = 4000 * 30 = 120KB + // After writing 1MB: multiplier = 1,000,000/120,000 + 1 ≈ 9, baseTimeout = 30s * 9 = 270s + bytesPerTimeout := calculateBytesPerTimeout(c.WriteTimeout) + timeoutMultiplier := time.Duration(c.bytesWritten/bytesPerTimeout + 1) + baseTimeout := c.WriteTimeout * timeoutMultiplier + + // If it's been a while since last write, add grace time for server-side chunk fetches + // But cap it to avoid keeping slow clients connected indefinitely + // + // The comparison uses unscaled WriteTimeout intentionally: triggers grace when idle time + // exceeds base timeout, independent of throughput scaling. + if !c.lastWrite.IsZero() { + timeSinceLastWrite := now.Sub(c.lastWrite) + if timeSinceLastWrite > c.WriteTimeout { + // Add grace time capped at graceTimeCapMultiplier * scaled timeout. + // This allows total deadline up to 4x scaled timeout for server-side delays. + // + // Example: WriteTimeout=30s, 1MB written (multiplier≈9), baseTimeout=270s + // If 400s gap occurs fetching chunks: graceTime capped at 270s*3=810s + // Final deadline: 270s + 810s = 1080s (~18min) to accommodate slow storage + // But if only 50s gap: graceTime = 50s, final deadline = 270s + 50s = 320s + graceTime := timeSinceLastWrite + if graceTime > baseTimeout*graceTimeCapMultiplier { + graceTime = baseTimeout * graceTimeCapMultiplier + } + baseTimeout += graceTime + } + } + + err := c.Conn.SetWriteDeadline(now.Add(baseTimeout)) if err != nil { return 0, err } @@ -68,6 +130,7 @@ func (c *Conn) Write(b []byte) (count int, e error) { if e == nil { stats.BytesOut(int64(count)) c.bytesWritten += int64(count) + c.lastWrite = time.Now() } return } From 5810aba763cc269440483d12dcc6e026ec010fe6 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Thu, 30 Oct 2025 20:18:21 -0700 Subject: [PATCH 2/2] Filer: fallback to check master (#7411) * fallback to check master * clean up * parsing * refactor * handle parse error * return error * avoid dup lookup * use batch key * dedup lookup logic * address comments * errors.Join(lookupErrors...) * add a comment --- weed/server/filer_grpc_server.go | 44 ++++---- weed/wdclient/masterclient.go | 183 +++++++++++++++++++++++++++---- 2 files changed, 184 insertions(+), 43 deletions(-) diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index a18c55bb1..02eceebde 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -5,7 +5,6 @@ import ( "fmt" "os" "path/filepath" - "strconv" "time" "github.com/seaweedfs/seaweedfs/weed/cluster" @@ -17,6 +16,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "github.com/seaweedfs/seaweedfs/weed/storage/needle" "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/wdclient" ) func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) { @@ -94,31 +94,31 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol LocationsMap: make(map[string]*filer_pb.Locations), } - for _, vidString := range req.VolumeIds { - vid, err := strconv.Atoi(vidString) - if err != nil { - glog.V(1).InfofCtx(ctx, "Unknown volume id %d", vid) - return nil, err - } - var locs []*filer_pb.Location - locations, found := fs.filer.MasterClient.GetLocations(uint32(vid)) - if !found { - continue - } - for _, loc := range locations { - locs = append(locs, &filer_pb.Location{ - Url: loc.Url, - PublicUrl: loc.PublicUrl, - GrpcPort: uint32(loc.GrpcPort), - DataCenter: loc.DataCenter, - }) - } + // Use master client's lookup with fallback - it handles cache and master query + vidLocations, err := fs.filer.MasterClient.LookupVolumeIdsWithFallback(ctx, req.VolumeIds) + + // Convert wdclient.Location to filer_pb.Location + // Return partial results even if there was an error + for vidString, locations := range vidLocations { resp.LocationsMap[vidString] = &filer_pb.Locations{ - Locations: locs, + Locations: wdclientLocationsToPb(locations), } } - return resp, nil + return resp, err +} + +func wdclientLocationsToPb(locations []wdclient.Location) []*filer_pb.Location { + locs := make([]*filer_pb.Location, 0, len(locations)) + for _, loc := range locations { + locs = append(locs, &filer_pb.Location{ + Url: loc.Url, + PublicUrl: loc.PublicUrl, + GrpcPort: uint32(loc.GrpcPort), + DataCenter: loc.DataCenter, + }) + } + return locs } func (fs *FilerServer) lookupFileId(ctx context.Context, fileId string) (targetUrls []string, err error) { diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 11b58d861..f3950bc37 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -2,11 +2,17 @@ package wdclient import ( "context" + "errors" "fmt" "math/rand" + "sort" + "strconv" + "strings" "sync" "time" + "golang.org/x/sync/singleflight" + "github.com/seaweedfs/seaweedfs/weed/util/version" "github.com/seaweedfs/seaweedfs/weed/stats" @@ -29,10 +35,16 @@ type MasterClient struct { masters pb.ServerDiscovery grpcDialOption grpc.DialOption + // TODO: CRITICAL - Data race: resetVidMap() writes to vidMap while other methods read concurrently + // This embedded *vidMap should be changed to a private field protected by sync.RWMutex + // See: https://github.com/seaweedfs/seaweedfs/issues/[ISSUE_NUMBER] *vidMap vidMapCacheSize int OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time) OnPeerUpdateLock sync.RWMutex + + // Per-batch in-flight tracking to prevent duplicate lookups for the same set of volumes + vidLookupGroup singleflight.Group } func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters pb.ServerDiscovery) *MasterClient { @@ -59,39 +71,168 @@ func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType { } func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId string) (fullUrls []string, err error) { + // Try cache first using the fast path fullUrls, err = mc.vidMap.LookupFileId(ctx, fileId) if err == nil && len(fullUrls) > 0 { return } - err = pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { - resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{ - VolumeOrFileIds: []string{fileId}, - }) + // Extract volume ID from file ID (format: "volumeId,needle_id_cookie") + parts := strings.Split(fileId, ",") + if len(parts) != 2 { + return nil, fmt.Errorf("invalid fileId %s", fileId) + } + volumeId := parts[0] + + // Use shared lookup logic with batching and singleflight + vidLocations, err := mc.LookupVolumeIdsWithFallback(ctx, []string{volumeId}) + if err != nil { + return nil, fmt.Errorf("LookupVolume %s failed: %v", fileId, err) + } + + locations, found := vidLocations[volumeId] + if !found || len(locations) == 0 { + return nil, fmt.Errorf("volume %s not found for fileId %s", volumeId, fileId) + } + + // Build HTTP URLs from locations, preferring same data center + var sameDcUrls, otherDcUrls []string + for _, loc := range locations { + httpUrl := "http://" + loc.Url + "/" + fileId + if mc.DataCenter != "" && mc.DataCenter == loc.DataCenter { + sameDcUrls = append(sameDcUrls, httpUrl) + } else { + otherDcUrls = append(otherDcUrls, httpUrl) + } + } + + // Prefer same data center + fullUrls = append(sameDcUrls, otherDcUrls...) + return fullUrls, nil +} + +// LookupVolumeIdsWithFallback looks up volume locations, querying master if not in cache +// Uses singleflight to coalesce concurrent requests for the same batch of volumes +func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeIds []string) (map[string][]Location, error) { + result := make(map[string][]Location) + var needsLookup []string + var lookupErrors []error + + // Check cache first and parse volume IDs once + vidStringToUint := make(map[string]uint32, len(volumeIds)) + for _, vidString := range volumeIds { + vid, err := strconv.ParseUint(vidString, 10, 32) if err != nil { - return fmt.Errorf("LookupVolume %s failed: %v", fileId, err) + return nil, fmt.Errorf("invalid volume id %s: %v", vidString, err) + } + vidStringToUint[vidString] = uint32(vid) + + locations, found := mc.GetLocations(uint32(vid)) + if found && len(locations) > 0 { + result[vidString] = locations + } else { + needsLookup = append(needsLookup, vidString) + } + } + + if len(needsLookup) == 0 { + return result, nil + } + + // Batch query all missing volumes using singleflight on the batch key + // Sort for stable key to coalesce identical batches + sort.Strings(needsLookup) + batchKey := strings.Join(needsLookup, ",") + + sfResult, err, _ := mc.vidLookupGroup.Do(batchKey, func() (interface{}, error) { + // Double-check cache for volumes that might have been populated while waiting + stillNeedLookup := make([]string, 0, len(needsLookup)) + batchResult := make(map[string][]Location) + + for _, vidString := range needsLookup { + vid := vidStringToUint[vidString] // Use pre-parsed value + if locations, found := mc.GetLocations(vid); found && len(locations) > 0 { + batchResult[vidString] = locations + } else { + stillNeedLookup = append(stillNeedLookup, vidString) + } + } + + if len(stillNeedLookup) == 0 { + return batchResult, nil } - for vid, vidLocation := range resp.VolumeIdLocations { - for _, vidLoc := range vidLocation.Locations { - loc := Location{ - Url: vidLoc.Url, - PublicUrl: vidLoc.PublicUrl, - GrpcPort: int(vidLoc.GrpcPort), - DataCenter: vidLoc.DataCenter, + + // Query master with batched volume IDs + glog.V(2).Infof("Looking up %d volumes from master: %v", len(stillNeedLookup), stillNeedLookup) + + err := pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { + resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{ + VolumeOrFileIds: stillNeedLookup, + }) + if err != nil { + return fmt.Errorf("master lookup failed: %v", err) + } + + for _, vidLoc := range resp.VolumeIdLocations { + if vidLoc.Error != "" { + glog.V(0).Infof("volume %s lookup error: %s", vidLoc.VolumeOrFileId, vidLoc.Error) + continue } - mc.vidMap.addLocation(uint32(vid), loc) - httpUrl := "http://" + loc.Url + "/" + fileId - // Prefer same data center - if mc.DataCenter != "" && mc.DataCenter == loc.DataCenter { - fullUrls = append([]string{httpUrl}, fullUrls...) - } else { - fullUrls = append(fullUrls, httpUrl) + + // Parse volume ID from response + parts := strings.Split(vidLoc.VolumeOrFileId, ",") + vidOnly := parts[0] + vid, err := strconv.ParseUint(vidOnly, 10, 32) + if err != nil { + glog.Warningf("Failed to parse volume id '%s' from master response '%s': %v", vidOnly, vidLoc.VolumeOrFileId, err) + continue + } + + var locations []Location + for _, masterLoc := range vidLoc.Locations { + loc := Location{ + Url: masterLoc.Url, + PublicUrl: masterLoc.PublicUrl, + GrpcPort: int(masterLoc.GrpcPort), + DataCenter: masterLoc.DataCenter, + } + mc.vidMap.addLocation(uint32(vid), loc) + locations = append(locations, loc) + } + + if len(locations) > 0 { + batchResult[vidOnly] = locations } } + return nil + }) + + if err != nil { + return batchResult, err } - return nil + return batchResult, nil }) - return + + if err != nil { + lookupErrors = append(lookupErrors, err) + } + + // Merge singleflight batch results + if batchLocations, ok := sfResult.(map[string][]Location); ok { + for vid, locs := range batchLocations { + result[vid] = locs + } + } + + // Check for volumes that still weren't found + for _, vidString := range needsLookup { + if _, found := result[vidString]; !found { + lookupErrors = append(lookupErrors, fmt.Errorf("volume %s not found", vidString)) + } + } + + // Return aggregated errors using errors.Join to preserve error types + return result, errors.Join(lookupErrors...) } func (mc *MasterClient) getCurrentMaster() pb.ServerAddress {