Browse Source

Merge branch 'master' into fix-ipv6-brackets-default-port

pull/7414/head
Chris Lu 1 month ago
parent
commit
ba2d05da2c
  1. 14
      k8s/charts/seaweedfs/templates/filer/filer-statefulset.yaml
  2. 8
      k8s/charts/seaweedfs/templates/master/master-statefulset.yaml
  3. 5
      weed/server/common.go
  4. 44
      weed/server/filer_grpc_server.go
  5. 71
      weed/util/net_timeout.go
  6. 6
      weed/util/network.go
  7. 183
      weed/wdclient/masterclient.go

14
k8s/charts/seaweedfs/templates/filer/filer-statefulset.yaml

@ -392,10 +392,12 @@ spec:
nodeSelector:
{{ tpl .Values.filer.nodeSelector . | indent 8 | trim }}
{{- end }}
{{- if and (.Values.filer.enablePVC) (eq .Values.filer.data.type "persistentVolumeClaim") }}
{{- if and (.Values.filer.enablePVC) (not .Values.filer.data) }}
# DEPRECATION: Deprecate in favor of filer.data section below
volumeClaimTemplates:
- metadata:
- apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: data-filer
spec:
accessModes:
@ -411,7 +413,9 @@ spec:
{{- if $pvc_exists }}
volumeClaimTemplates:
{{- if eq .Values.filer.data.type "persistentVolumeClaim" }}
- metadata:
- apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: data-filer
{{- with .Values.filer.data.annotations }}
annotations:
@ -425,7 +429,9 @@ spec:
storage: {{ .Values.filer.data.size }}
{{- end }}
{{- if eq .Values.filer.logs.type "persistentVolumeClaim" }}
- metadata:
- apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: seaweedfs-filer-log-volume
{{- with .Values.filer.logs.annotations }}
annotations:

8
k8s/charts/seaweedfs/templates/master/master-statefulset.yaml

@ -327,7 +327,9 @@ spec:
{{- if $pvc_exists }}
volumeClaimTemplates:
{{- if eq .Values.master.data.type "persistentVolumeClaim"}}
- metadata:
- apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: data-{{ .Release.Namespace }}
{{- with .Values.master.data.annotations }}
annotations:
@ -341,7 +343,9 @@ spec:
storage: {{ .Values.master.data.size }}
{{- end }}
{{- if eq .Values.master.logs.type "persistentVolumeClaim"}}
- metadata:
- apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: seaweedfs-master-log-volume
{{- with .Values.master.logs.annotations }}
annotations:

5
weed/server/common.go

@ -369,8 +369,7 @@ func ProcessRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
err = writeFn(bufferedWriter)
if err != nil {
glog.Errorf("ProcessRangeRequest range[0]: %+v err: %v", w.Header(), err)
w.Header().Del("Content-Length")
http.Error(w, err.Error(), http.StatusInternalServerError)
// Cannot call http.Error() here because WriteHeader was already called
return fmt.Errorf("ProcessRangeRequest range[0]: %w", err)
}
return nil
@ -424,7 +423,7 @@ func ProcessRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
w.WriteHeader(http.StatusPartialContent)
if _, err := io.CopyN(bufferedWriter, sendContent, sendSize); err != nil {
glog.Errorf("ProcessRangeRequest err: %v", err)
http.Error(w, "Internal Error", http.StatusInternalServerError)
// Cannot call http.Error() here because WriteHeader was already called
return fmt.Errorf("ProcessRangeRequest err: %w", err)
}
return nil

44
weed/server/filer_grpc_server.go

@ -5,7 +5,6 @@ import (
"fmt"
"os"
"path/filepath"
"strconv"
"time"
"github.com/seaweedfs/seaweedfs/weed/cluster"
@ -17,6 +16,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
)
func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) {
@ -94,31 +94,31 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
LocationsMap: make(map[string]*filer_pb.Locations),
}
for _, vidString := range req.VolumeIds {
vid, err := strconv.Atoi(vidString)
if err != nil {
glog.V(1).InfofCtx(ctx, "Unknown volume id %d", vid)
return nil, err
}
var locs []*filer_pb.Location
locations, found := fs.filer.MasterClient.GetLocations(uint32(vid))
if !found {
continue
}
for _, loc := range locations {
locs = append(locs, &filer_pb.Location{
Url: loc.Url,
PublicUrl: loc.PublicUrl,
GrpcPort: uint32(loc.GrpcPort),
DataCenter: loc.DataCenter,
})
}
// Use master client's lookup with fallback - it handles cache and master query
vidLocations, err := fs.filer.MasterClient.LookupVolumeIdsWithFallback(ctx, req.VolumeIds)
// Convert wdclient.Location to filer_pb.Location
// Return partial results even if there was an error
for vidString, locations := range vidLocations {
resp.LocationsMap[vidString] = &filer_pb.Locations{
Locations: locs,
Locations: wdclientLocationsToPb(locations),
}
}
return resp, nil
return resp, err
}
func wdclientLocationsToPb(locations []wdclient.Location) []*filer_pb.Location {
locs := make([]*filer_pb.Location, 0, len(locations))
for _, loc := range locations {
locs = append(locs, &filer_pb.Location{
Url: loc.Url,
PublicUrl: loc.PublicUrl,
GrpcPort: uint32(loc.GrpcPort),
DataCenter: loc.DataCenter,
})
}
return locs
}
func (fs *FilerServer) lookupFileId(ctx context.Context, fileId string) (targetUrls []string, err error) {

71
weed/util/net_timeout.go

@ -1,13 +1,24 @@
package util
import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"net"
"time"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/stats"
)
const (
// minThroughputBytesPerSecond defines the minimum expected throughput (4KB/s)
// Used to calculate timeout scaling based on data transferred
minThroughputBytesPerSecond = 4000
// graceTimeCapMultiplier caps the grace period for slow clients at 3x base timeout
// This prevents indefinite connections while allowing time for server-side chunk fetches
graceTimeCapMultiplier = 3
)
// Listener wraps a net.Listener, and gives a place to store the timeout
// parameters. On Accept, it will wrap the net.Conn with our own Conn for us.
type Listener struct {
@ -39,11 +50,28 @@ type Conn struct {
isClosed bool
bytesRead int64
bytesWritten int64
lastWrite time.Time
}
// calculateBytesPerTimeout calculates the expected number of bytes that should
// be transferred during one timeout period, based on the minimum throughput.
// Returns at least 1 to prevent division by zero.
func calculateBytesPerTimeout(timeout time.Duration) int64 {
bytesPerTimeout := int64(float64(minThroughputBytesPerSecond) * timeout.Seconds())
if bytesPerTimeout <= 0 {
return 1 // Prevent division by zero
}
return bytesPerTimeout
}
func (c *Conn) Read(b []byte) (count int, e error) {
if c.ReadTimeout != 0 {
err := c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout * time.Duration(c.bytesRead/40000+1)))
// Calculate expected bytes per timeout period based on minimum throughput (4KB/s)
// Example: with ReadTimeout=30s, bytesPerTimeout = 4000 * 30 = 120KB
// After reading 1MB: multiplier = 1,000,000/120,000 + 1 ≈ 9, deadline = 30s * 9 = 270s
bytesPerTimeout := calculateBytesPerTimeout(c.ReadTimeout)
timeoutMultiplier := time.Duration(c.bytesRead/bytesPerTimeout + 1)
err := c.Conn.SetReadDeadline(time.Now().Add(c.ReadTimeout * timeoutMultiplier))
if err != nil {
return 0, err
}
@ -58,8 +86,42 @@ func (c *Conn) Read(b []byte) (count int, e error) {
func (c *Conn) Write(b []byte) (count int, e error) {
if c.WriteTimeout != 0 {
// minimum 4KB/s
err := c.Conn.SetWriteDeadline(time.Now().Add(c.WriteTimeout * time.Duration(c.bytesWritten/40000+1)))
now := time.Now()
// Calculate timeout with two components:
// 1. Base timeout scaled by cumulative data (minimum throughput of 4KB/s)
// 2. Additional grace period if there was a gap since last write (for chunk fetch delays)
// Calculate expected bytes per timeout period based on minimum throughput (4KB/s)
// Example: with WriteTimeout=30s, bytesPerTimeout = 4000 * 30 = 120KB
// After writing 1MB: multiplier = 1,000,000/120,000 + 1 ≈ 9, baseTimeout = 30s * 9 = 270s
bytesPerTimeout := calculateBytesPerTimeout(c.WriteTimeout)
timeoutMultiplier := time.Duration(c.bytesWritten/bytesPerTimeout + 1)
baseTimeout := c.WriteTimeout * timeoutMultiplier
// If it's been a while since last write, add grace time for server-side chunk fetches
// But cap it to avoid keeping slow clients connected indefinitely
//
// The comparison uses unscaled WriteTimeout intentionally: triggers grace when idle time
// exceeds base timeout, independent of throughput scaling.
if !c.lastWrite.IsZero() {
timeSinceLastWrite := now.Sub(c.lastWrite)
if timeSinceLastWrite > c.WriteTimeout {
// Add grace time capped at graceTimeCapMultiplier * scaled timeout.
// This allows total deadline up to 4x scaled timeout for server-side delays.
//
// Example: WriteTimeout=30s, 1MB written (multiplier≈9), baseTimeout=270s
// If 400s gap occurs fetching chunks: graceTime capped at 270s*3=810s
// Final deadline: 270s + 810s = 1080s (~18min) to accommodate slow storage
// But if only 50s gap: graceTime = 50s, final deadline = 270s + 50s = 320s
graceTime := timeSinceLastWrite
if graceTime > baseTimeout*graceTimeCapMultiplier {
graceTime = baseTimeout * graceTimeCapMultiplier
}
baseTimeout += graceTime
}
}
err := c.Conn.SetWriteDeadline(now.Add(baseTimeout))
if err != nil {
return 0, err
}
@ -68,6 +130,7 @@ func (c *Conn) Write(b []byte) (count int, e error) {
if e == nil {
stats.BytesOut(int64(count))
c.bytesWritten += int64(count)
c.lastWrite = time.Now()
}
return
}

6
weed/util/network.go

@ -44,7 +44,11 @@ func selectIpV4(netInterfaces []net.Interface, isIpV4 bool) string {
}
} else {
if ipNet.IP.To16() != nil {
return ipNet.IP.String()
// Filter out link-local IPv6 addresses (fe80::/10)
// They require zone identifiers and are not suitable for server binding
if !ipNet.IP.IsLinkLocalUnicast() {
return ipNet.IP.String()
}
}
}
}

183
weed/wdclient/masterclient.go

@ -2,11 +2,17 @@ package wdclient
import (
"context"
"errors"
"fmt"
"math/rand"
"sort"
"strconv"
"strings"
"sync"
"time"
"golang.org/x/sync/singleflight"
"github.com/seaweedfs/seaweedfs/weed/util/version"
"github.com/seaweedfs/seaweedfs/weed/stats"
@ -29,10 +35,16 @@ type MasterClient struct {
masters pb.ServerDiscovery
grpcDialOption grpc.DialOption
// TODO: CRITICAL - Data race: resetVidMap() writes to vidMap while other methods read concurrently
// This embedded *vidMap should be changed to a private field protected by sync.RWMutex
// See: https://github.com/seaweedfs/seaweedfs/issues/[ISSUE_NUMBER]
*vidMap
vidMapCacheSize int
OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
OnPeerUpdateLock sync.RWMutex
// Per-batch in-flight tracking to prevent duplicate lookups for the same set of volumes
vidLookupGroup singleflight.Group
}
func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters pb.ServerDiscovery) *MasterClient {
@ -59,39 +71,168 @@ func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
}
func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId string) (fullUrls []string, err error) {
// Try cache first using the fast path
fullUrls, err = mc.vidMap.LookupFileId(ctx, fileId)
if err == nil && len(fullUrls) > 0 {
return
}
err = pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{
VolumeOrFileIds: []string{fileId},
})
// Extract volume ID from file ID (format: "volumeId,needle_id_cookie")
parts := strings.Split(fileId, ",")
if len(parts) != 2 {
return nil, fmt.Errorf("invalid fileId %s", fileId)
}
volumeId := parts[0]
// Use shared lookup logic with batching and singleflight
vidLocations, err := mc.LookupVolumeIdsWithFallback(ctx, []string{volumeId})
if err != nil {
return nil, fmt.Errorf("LookupVolume %s failed: %v", fileId, err)
}
locations, found := vidLocations[volumeId]
if !found || len(locations) == 0 {
return nil, fmt.Errorf("volume %s not found for fileId %s", volumeId, fileId)
}
// Build HTTP URLs from locations, preferring same data center
var sameDcUrls, otherDcUrls []string
for _, loc := range locations {
httpUrl := "http://" + loc.Url + "/" + fileId
if mc.DataCenter != "" && mc.DataCenter == loc.DataCenter {
sameDcUrls = append(sameDcUrls, httpUrl)
} else {
otherDcUrls = append(otherDcUrls, httpUrl)
}
}
// Prefer same data center
fullUrls = append(sameDcUrls, otherDcUrls...)
return fullUrls, nil
}
// LookupVolumeIdsWithFallback looks up volume locations, querying master if not in cache
// Uses singleflight to coalesce concurrent requests for the same batch of volumes
func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeIds []string) (map[string][]Location, error) {
result := make(map[string][]Location)
var needsLookup []string
var lookupErrors []error
// Check cache first and parse volume IDs once
vidStringToUint := make(map[string]uint32, len(volumeIds))
for _, vidString := range volumeIds {
vid, err := strconv.ParseUint(vidString, 10, 32)
if err != nil {
return fmt.Errorf("LookupVolume %s failed: %v", fileId, err)
return nil, fmt.Errorf("invalid volume id %s: %v", vidString, err)
}
vidStringToUint[vidString] = uint32(vid)
locations, found := mc.GetLocations(uint32(vid))
if found && len(locations) > 0 {
result[vidString] = locations
} else {
needsLookup = append(needsLookup, vidString)
}
}
if len(needsLookup) == 0 {
return result, nil
}
// Batch query all missing volumes using singleflight on the batch key
// Sort for stable key to coalesce identical batches
sort.Strings(needsLookup)
batchKey := strings.Join(needsLookup, ",")
sfResult, err, _ := mc.vidLookupGroup.Do(batchKey, func() (interface{}, error) {
// Double-check cache for volumes that might have been populated while waiting
stillNeedLookup := make([]string, 0, len(needsLookup))
batchResult := make(map[string][]Location)
for _, vidString := range needsLookup {
vid := vidStringToUint[vidString] // Use pre-parsed value
if locations, found := mc.GetLocations(vid); found && len(locations) > 0 {
batchResult[vidString] = locations
} else {
stillNeedLookup = append(stillNeedLookup, vidString)
}
}
if len(stillNeedLookup) == 0 {
return batchResult, nil
}
for vid, vidLocation := range resp.VolumeIdLocations {
for _, vidLoc := range vidLocation.Locations {
loc := Location{
Url: vidLoc.Url,
PublicUrl: vidLoc.PublicUrl,
GrpcPort: int(vidLoc.GrpcPort),
DataCenter: vidLoc.DataCenter,
// Query master with batched volume IDs
glog.V(2).Infof("Looking up %d volumes from master: %v", len(stillNeedLookup), stillNeedLookup)
err := pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{
VolumeOrFileIds: stillNeedLookup,
})
if err != nil {
return fmt.Errorf("master lookup failed: %v", err)
}
for _, vidLoc := range resp.VolumeIdLocations {
if vidLoc.Error != "" {
glog.V(0).Infof("volume %s lookup error: %s", vidLoc.VolumeOrFileId, vidLoc.Error)
continue
}
mc.vidMap.addLocation(uint32(vid), loc)
httpUrl := "http://" + loc.Url + "/" + fileId
// Prefer same data center
if mc.DataCenter != "" && mc.DataCenter == loc.DataCenter {
fullUrls = append([]string{httpUrl}, fullUrls...)
} else {
fullUrls = append(fullUrls, httpUrl)
// Parse volume ID from response
parts := strings.Split(vidLoc.VolumeOrFileId, ",")
vidOnly := parts[0]
vid, err := strconv.ParseUint(vidOnly, 10, 32)
if err != nil {
glog.Warningf("Failed to parse volume id '%s' from master response '%s': %v", vidOnly, vidLoc.VolumeOrFileId, err)
continue
}
var locations []Location
for _, masterLoc := range vidLoc.Locations {
loc := Location{
Url: masterLoc.Url,
PublicUrl: masterLoc.PublicUrl,
GrpcPort: int(masterLoc.GrpcPort),
DataCenter: masterLoc.DataCenter,
}
mc.vidMap.addLocation(uint32(vid), loc)
locations = append(locations, loc)
}
if len(locations) > 0 {
batchResult[vidOnly] = locations
}
}
return nil
})
if err != nil {
return batchResult, err
}
return nil
return batchResult, nil
})
return
if err != nil {
lookupErrors = append(lookupErrors, err)
}
// Merge singleflight batch results
if batchLocations, ok := sfResult.(map[string][]Location); ok {
for vid, locs := range batchLocations {
result[vid] = locs
}
}
// Check for volumes that still weren't found
for _, vidString := range needsLookup {
if _, found := result[vidString]; !found {
lookupErrors = append(lookupErrors, fmt.Errorf("volume %s not found", vidString))
}
}
// Return aggregated errors using errors.Join to preserve error types
return result, errors.Join(lookupErrors...)
}
func (mc *MasterClient) getCurrentMaster() pb.ServerAddress {

Loading…
Cancel
Save