diff --git a/test/s3/iam/s3_iam_framework.go b/test/s3/iam/s3_iam_framework.go index 178ae0763..c155b7358 100644 --- a/test/s3/iam/s3_iam_framework.go +++ b/test/s3/iam/s3_iam_framework.go @@ -705,12 +705,22 @@ func (f *S3IAMTestFramework) CreateBucketWithCleanup(s3Client *s3.S3, bucketName f.t.Logf("Warning: Failed to delete existing bucket %s: %v", bucketName, deleteErr) } + // Add a small delay to allow deletion to propagate + time.Sleep(100 * time.Millisecond) + // Now create it fresh _, err = s3Client.CreateBucket(&s3.CreateBucketInput{ Bucket: aws.String(bucketName), }) if err != nil { - return fmt.Errorf("failed to recreate bucket after cleanup: %v", err) + // If it still says bucket exists after cleanup, it might be in an inconsistent state + // In this case, just use the existing bucket since we emptied it + if awsErr, ok := err.(awserr.Error); ok && (awsErr.Code() == "BucketAlreadyExists" || awsErr.Code() == "BucketAlreadyOwnedByYou") { + f.t.Logf("Bucket %s still exists after cleanup, reusing it", bucketName) + // Bucket exists and is empty, so we can proceed + } else { + return fmt.Errorf("failed to recreate bucket after cleanup: %v", err) + } } } else { return err diff --git a/weed/cluster/cluster.go b/weed/cluster/cluster.go index 52d32f697..638553b04 100644 --- a/weed/cluster/cluster.go +++ b/weed/cluster/cluster.go @@ -1,10 +1,11 @@ package cluster import ( - "github.com/seaweedfs/seaweedfs/weed/pb" - "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" "sync" "time" + + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" ) const ( diff --git a/weed/command/iam.go b/weed/command/iam.go index c484ed18d..8f4ac878d 100644 --- a/weed/command/iam.go +++ b/weed/command/iam.go @@ -76,7 +76,7 @@ func (iamopt *IamOptions) startIamServer() bool { masters := pb.ServerAddresses(*iamopt.masters).ToAddressMap() router := mux.NewRouter().SkipClean(true) - _, iamApiServer_err := iamapi.NewIamApiServer(router, &iamapi.IamServerOption{ + iamApiServer, iamApiServer_err := iamapi.NewIamApiServer(router, &iamapi.IamServerOption{ Masters: masters, Filer: filerAddress, Port: *iamopt.port, @@ -86,6 +86,9 @@ func (iamopt *IamOptions) startIamServer() bool { if iamApiServer_err != nil { glog.Fatalf("IAM API Server startup error: %v", iamApiServer_err) } + + // Ensure cleanup on shutdown + defer iamApiServer.Shutdown() listenAddress := fmt.Sprintf(":%d", *iamopt.port) iamApiListener, iamApiLocalListener, err := util.NewIpAndLocalListeners(*iamopt.ip, *iamopt.port, time.Duration(10)*time.Second) diff --git a/weed/filer/filer.go b/weed/filer/filer.go index d3d2de948..b68004a8b 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -175,10 +175,6 @@ func (fs *Filer) GetMaster(ctx context.Context) pb.ServerAddress { return fs.MasterClient.GetMaster(ctx) } -func (fs *Filer) KeepMasterClientConnected(ctx context.Context) { - fs.MasterClient.KeepConnectedToMaster(ctx) -} - func (f *Filer) BeginTransaction(ctx context.Context) (context.Context, error) { return f.Store.BeginTransaction(ctx) } diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 27d773f49..aeac9b34a 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -26,15 +26,42 @@ type ChunkReadAt struct { var _ = io.ReaderAt(&ChunkReadAt{}) var _ = io.Closer(&ChunkReadAt{}) +// LookupFn creates a basic volume location lookup function with simple caching. +// +// Deprecated: Use wdclient.FilerClient instead. This function has several limitations compared to wdclient.FilerClient: +// - Simple bounded cache (10k entries, no eviction policy or TTL for stale entries) +// - No singleflight deduplication (concurrent requests for same volume will duplicate work) +// - No cache history for volume moves (no fallback chain when volumes migrate) +// - No high availability (single filer address, no automatic failover) +// +// For NEW code, especially mount operations, use wdclient.FilerClient instead: +// filerClient := wdclient.NewFilerClient(filerAddresses, grpcDialOption, dataCenter, opts) +// lookupFn := filerClient.GetLookupFileIdFunction() +// +// This provides: +// - Bounded cache with configurable size +// - Singleflight deduplication of concurrent lookups +// - Cache history when volumes move +// - Battle-tested vidMap with cache chain +// +// This function is kept for backward compatibility with existing code paths +// (shell commands, streaming, etc.) but should be avoided in long-running processes +// or multi-tenant deployments where unbounded memory growth is a concern. +// +// Maximum recommended cache entries: ~10,000 volumes per process. +// Beyond this, consider migrating to wdclient.FilerClient. func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionType { vidCache := make(map[string]*filer_pb.Locations) - var vicCacheLock sync.RWMutex + var vidCacheLock sync.RWMutex + cacheSize := 0 + const maxCacheSize = 10000 // Simple bound to prevent unbounded growth + return func(ctx context.Context, fileId string) (targetUrls []string, err error) { vid := VolumeId(fileId) - vicCacheLock.RLock() + vidCacheLock.RLock() locations, found := vidCache[vid] - vicCacheLock.RUnlock() + vidCacheLock.RUnlock() if !found { util.Retry("lookup volume "+vid, func() error { @@ -51,9 +78,17 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp glog.V(0).InfofCtx(ctx, "failed to locate %s", fileId) return fmt.Errorf("failed to locate %s", fileId) } - vicCacheLock.Lock() - vidCache[vid] = locations - vicCacheLock.Unlock() + vidCacheLock.Lock() + // Simple size limit to prevent unbounded growth + // For proper cache management, use wdclient.FilerClient instead + if cacheSize < maxCacheSize { + vidCache[vid] = locations + cacheSize++ + } else if cacheSize == maxCacheSize { + glog.Warningf("filer.LookupFn cache reached limit of %d volumes, not caching new entries. Consider migrating to wdclient.FilerClient for bounded cache management.", maxCacheSize) + cacheSize++ // Only log once + } + vidCacheLock.Unlock() return nil }) diff --git a/weed/iamapi/iamapi_server.go b/weed/iamapi/iamapi_server.go index cf507ee82..361d9bec9 100644 --- a/weed/iamapi/iamapi_server.go +++ b/weed/iamapi/iamapi_server.go @@ -12,6 +12,7 @@ import ( "github.com/gorilla/mux" "github.com/seaweedfs/seaweedfs/weed/credential" "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/iam_pb" @@ -45,8 +46,11 @@ type IamServerOption struct { } type IamApiServer struct { - s3ApiConfig IamS3ApiConfig - iam *s3api.IdentityAccessManagement + s3ApiConfig IamS3ApiConfig + iam *s3api.IdentityAccessManagement + shutdownContext context.Context + shutdownCancel context.CancelFunc + masterClient *wdclient.MasterClient } var s3ApiConfigure IamS3ApiConfig @@ -56,9 +60,21 @@ func NewIamApiServer(router *mux.Router, option *IamServerOption) (iamApiServer } func NewIamApiServerWithStore(router *mux.Router, option *IamServerOption, explicitStore string) (iamApiServer *IamApiServer, err error) { + masterClient := wdclient.NewMasterClient(option.GrpcDialOption, "", "iam", "", "", "", *pb.NewServiceDiscoveryFromMap(option.Masters)) + + // Create a cancellable context for the master client connection + // This allows graceful shutdown via Shutdown() method + shutdownCtx, shutdownCancel := context.WithCancel(context.Background()) + + // Start KeepConnectedToMaster for volume location lookups + // IAM config files are typically small and inline, but if they ever have chunks, + // ReadEntry→StreamContent needs masterClient for volume lookups + glog.V(0).Infof("IAM API starting master client connection for volume location lookups") + go masterClient.KeepConnectedToMaster(shutdownCtx) + configure := &IamS3ApiConfigure{ option: option, - masterClient: wdclient.NewMasterClient(option.GrpcDialOption, "", "iam", "", "", "", *pb.NewServiceDiscoveryFromMap(option.Masters)), + masterClient: masterClient, } s3ApiConfigure = configure @@ -72,8 +88,11 @@ func NewIamApiServerWithStore(router *mux.Router, option *IamServerOption, expli configure.credentialManager = iam.GetCredentialManager() iamApiServer = &IamApiServer{ - s3ApiConfig: s3ApiConfigure, - iam: iam, + s3ApiConfig: s3ApiConfigure, + iam: iam, + shutdownContext: shutdownCtx, + shutdownCancel: shutdownCancel, + masterClient: masterClient, } iamApiServer.registerRouter(router) @@ -93,6 +112,20 @@ func (iama *IamApiServer) registerRouter(router *mux.Router) { apiRouter.NotFoundHandler = http.HandlerFunc(s3err.NotFoundHandler) } +// Shutdown gracefully stops the IAM API server and releases resources. +// It cancels the master client connection goroutine and closes gRPC connections. +// This method is safe to call multiple times. +// +// Note: This method is called via defer in weed/command/iam.go for best-effort cleanup. +// For proper graceful shutdown on SIGTERM/SIGINT, signal handling should be added to +// the command layer to call this method before process exit. +func (iama *IamApiServer) Shutdown() { + if iama.shutdownCancel != nil { + glog.V(0).Infof("IAM API server shutting down, stopping master client connection") + iama.shutdownCancel() + } +} + func (iama *IamS3ApiConfigure) GetS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) (err error) { return iama.GetS3ApiConfigurationFromCredentialManager(s3cfg) } diff --git a/weed/mount/weedfs.go b/weed/mount/weedfs.go index 95864ef00..21c54841a 100644 --- a/weed/mount/weedfs.go +++ b/weed/mount/weedfs.go @@ -97,9 +97,32 @@ type WFS struct { fhLockTable *util.LockTable[FileHandleId] rdmaClient *RDMAMountClient FilerConf *filer.FilerConf + filerClient *wdclient.FilerClient // Cached volume location client } func NewSeaweedFileSystem(option *Option) *WFS { + // Only create FilerClient for direct volume access modes + // When VolumeServerAccess == "filerProxy", all reads go through filer, so no volume lookup needed + var filerClient *wdclient.FilerClient + if option.VolumeServerAccess != "filerProxy" { + // Create FilerClient for efficient volume location caching + // Pass all filer addresses for high availability with automatic failover + // Configure URL preference based on VolumeServerAccess option + var opts *wdclient.FilerClientOption + if option.VolumeServerAccess == "publicUrl" { + opts = &wdclient.FilerClientOption{ + UrlPreference: wdclient.PreferPublicUrl, + } + } + + filerClient = wdclient.NewFilerClient( + option.FilerAddresses, // Pass all filer addresses for HA + option.GrpcDialOption, + option.DataCenter, + opts, + ) + } + wfs := &WFS{ RawFileSystem: fuse.NewDefaultRawFileSystem(), option: option, @@ -107,6 +130,7 @@ func NewSeaweedFileSystem(option *Option) *WFS { inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath), option.CacheMetaTTlSec), fhMap: NewFileHandleToInode(), dhMap: NewDirectoryHandleToInode(), + filerClient: filerClient, // nil for proxy mode, initialized for direct access fhLockTable: util.NewLockTable[FileHandleId](), } @@ -253,7 +277,8 @@ func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType { return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil } } - return filer.LookupFn(wfs) + // Use the cached FilerClient for efficient lookups with singleflight and cache history + return wfs.filerClient.GetLookupFileIdFunction() } func (wfs *WFS) getCurrentFiler() pb.ServerAddress { diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index dee5f60c8..cd0e82421 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -24,7 +24,6 @@ import ( "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants" "github.com/seaweedfs/seaweedfs/weed/s3api/s3err" - util_http "github.com/seaweedfs/seaweedfs/weed/util/http" "github.com/seaweedfs/seaweedfs/weed/util/mem" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -994,36 +993,10 @@ var volumeServerHTTPClient = &http.Client{ } // createLookupFileIdFunction creates a reusable lookup function for resolving volume URLs +// Uses FilerClient's vidMap cache to eliminate per-chunk gRPC overhead func (s3a *S3ApiServer) createLookupFileIdFunction() func(context.Context, string) ([]string, error) { - return func(ctx context.Context, fileId string) ([]string, error) { - var urls []string - err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - vid := filer.VolumeId(fileId) - resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{ - VolumeIds: []string{vid}, - }) - if err != nil { - return err - } - if locs, found := resp.LocationsMap[vid]; found { - for _, loc := range locs.Locations { - // Build complete URL with volume server address and fileId - // The fileId parameter contains the full "volumeId,fileKey" identifier (e.g., "3,01637037d6") - // This constructs URLs like: http://127.0.0.1:8080/3,01637037d6 (or https:// if configured) - // NormalizeUrl ensures the proper scheme (http:// or https://) is used based on configuration - normalizedUrl, err := util_http.NormalizeUrl(loc.Url) - if err != nil { - glog.Warningf("Failed to normalize URL for %s: %v", loc.Url, err) - continue - } - urls = append(urls, normalizedUrl+"/"+fileId) - } - } - return nil - }) - glog.V(3).Infof("createLookupFileIdFunction: fileId=%s, resolved urls=%v", fileId, urls) - return urls, err - } + // Return the FilerClient's lookup function which uses the battle-tested vidMap cache + return s3a.filerClient.GetLookupFileIdFunction() } // streamFromVolumeServersWithSSE handles streaming with inline SSE decryption diff --git a/weed/s3api/s3api_server.go b/weed/s3api/s3api_server.go index b9c4eb3fc..992027fda 100644 --- a/weed/s3api/s3api_server.go +++ b/weed/s3api/s3api_server.go @@ -19,6 +19,7 @@ import ( "github.com/seaweedfs/seaweedfs/weed/iam/sts" "github.com/seaweedfs/seaweedfs/weed/pb/s3_pb" "github.com/seaweedfs/seaweedfs/weed/util/grace" + "github.com/seaweedfs/seaweedfs/weed/wdclient" "github.com/gorilla/mux" "github.com/seaweedfs/seaweedfs/weed/pb" @@ -55,6 +56,7 @@ type S3ApiServer struct { cb *CircuitBreaker randomClientId int32 filerGuard *security.Guard + filerClient *wdclient.FilerClient client util_http_client.HTTPClientInterface bucketRegistry *BucketRegistry credentialManager *credential.CredentialManager @@ -91,11 +93,18 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl // Initialize bucket policy engine first policyEngine := NewBucketPolicyEngine() + // Initialize FilerClient for volume location caching + // Uses the battle-tested vidMap with filer-based lookups + // S3 API typically connects to a single filer, but wrap in slice for consistency + filerClient := wdclient.NewFilerClient([]pb.ServerAddress{option.Filer}, option.GrpcDialOption, option.DataCenter) + glog.V(0).Infof("S3 API initialized FilerClient for volume location caching") + s3ApiServer = &S3ApiServer{ option: option, iam: iam, randomClientId: util.RandomInt32(), filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec), + filerClient: filerClient, cb: NewCircuitBreaker(option), credentialManager: iam.credentialManager, bucketConfigCache: NewBucketConfigCache(60 * time.Minute), // Increased TTL since cache is now event-driven diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 79fb90742..3d08c0980 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -178,7 +178,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) fs.checkWithMaster() go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec) - go fs.filer.KeepMasterClientConnected(context.Background()) + go fs.filer.MasterClient.KeepConnectedToMaster(context.Background()) fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete") v.SetDefault("filer.options.buckets_folder", "/buckets") diff --git a/weed/wdclient/filer_client.go b/weed/wdclient/filer_client.go new file mode 100644 index 000000000..f0dd5f2e6 --- /dev/null +++ b/weed/wdclient/filer_client.go @@ -0,0 +1,404 @@ +package wdclient + +import ( + "context" + "fmt" + "math/rand" + "strings" + "sync/atomic" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" +) + +// UrlPreference controls which URL to use for volume access +type UrlPreference string + +const ( + PreferUrl UrlPreference = "url" // Use private URL (default) + PreferPublicUrl UrlPreference = "publicUrl" // Use public URL +) + +// filerHealth tracks the health status of a filer +type filerHealth struct { + failureCount int32 // atomic: consecutive failures + lastFailureTimeNs int64 // atomic: last failure time in Unix nanoseconds +} + +// FilerClient provides volume location services by querying a filer +// It uses the shared vidMap cache for efficient lookups +// Supports multiple filer addresses with automatic failover for high availability +// Tracks filer health to avoid repeatedly trying known-unhealthy filers +type FilerClient struct { + *vidMapClient + filerAddresses []pb.ServerAddress + filerIndex int32 // atomic: current filer index for round-robin + filerHealth []*filerHealth // health status per filer (same order as filerAddresses) + grpcDialOption grpc.DialOption + urlPreference UrlPreference + grpcTimeout time.Duration + cacheSize int // Number of historical vidMap snapshots to keep + clientId int32 // Unique client identifier for gRPC metadata + failureThreshold int32 // Circuit breaker: consecutive failures before circuit opens + resetTimeout time.Duration // Circuit breaker: time before re-checking unhealthy filer + maxRetries int // Retry: maximum retry attempts for transient failures + initialRetryWait time.Duration // Retry: initial wait time before first retry + retryBackoffFactor float64 // Retry: backoff multiplier for wait time +} + +// filerVolumeProvider implements VolumeLocationProvider by querying filer +// Supports multiple filer addresses with automatic failover +type filerVolumeProvider struct { + filerClient *FilerClient +} + +// FilerClientOption holds optional configuration for FilerClient +type FilerClientOption struct { + GrpcTimeout time.Duration + UrlPreference UrlPreference + CacheSize int // Number of historical vidMap snapshots (0 = use default) + FailureThreshold int32 // Circuit breaker: consecutive failures before skipping filer (0 = use default of 3) + ResetTimeout time.Duration // Circuit breaker: time before re-checking unhealthy filer (0 = use default of 30s) + MaxRetries int // Retry: maximum retry attempts for transient failures (0 = use default of 3) + InitialRetryWait time.Duration // Retry: initial wait time before first retry (0 = use default of 1s) + RetryBackoffFactor float64 // Retry: backoff multiplier for wait time (0 = use default of 1.5) +} + +// NewFilerClient creates a new client that queries filer(s) for volume locations +// Supports multiple filer addresses for high availability with automatic failover +// Uses sensible defaults: 5-second gRPC timeout, PreferUrl, DefaultVidMapCacheSize +func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialOption, dataCenter string, opts ...*FilerClientOption) *FilerClient { + if len(filerAddresses) == 0 { + glog.Fatal("NewFilerClient requires at least one filer address") + } + + // Apply defaults + grpcTimeout := 5 * time.Second + urlPref := PreferUrl + cacheSize := DefaultVidMapCacheSize + failureThreshold := int32(3) // Default: 3 consecutive failures before circuit opens + resetTimeout := 30 * time.Second // Default: 30 seconds before re-checking unhealthy filer + maxRetries := 3 // Default: 3 retry attempts for transient failures + initialRetryWait := time.Second // Default: 1 second initial retry wait + retryBackoffFactor := 1.5 // Default: 1.5x backoff multiplier + + // Override with provided options + if len(opts) > 0 && opts[0] != nil { + opt := opts[0] + if opt.GrpcTimeout > 0 { + grpcTimeout = opt.GrpcTimeout + } + if opt.UrlPreference != "" { + urlPref = opt.UrlPreference + } + if opt.CacheSize > 0 { + cacheSize = opt.CacheSize + } + if opt.FailureThreshold > 0 { + failureThreshold = opt.FailureThreshold + } + if opt.ResetTimeout > 0 { + resetTimeout = opt.ResetTimeout + } + if opt.MaxRetries > 0 { + maxRetries = opt.MaxRetries + } + if opt.InitialRetryWait > 0 { + initialRetryWait = opt.InitialRetryWait + } + if opt.RetryBackoffFactor > 0 { + retryBackoffFactor = opt.RetryBackoffFactor + } + } + + // Initialize health tracking for each filer + health := make([]*filerHealth, len(filerAddresses)) + for i := range health { + health[i] = &filerHealth{} + } + + fc := &FilerClient{ + filerAddresses: filerAddresses, + filerIndex: 0, + filerHealth: health, + grpcDialOption: grpcDialOption, + urlPreference: urlPref, + grpcTimeout: grpcTimeout, + cacheSize: cacheSize, + clientId: rand.Int31(), // Random client ID for gRPC metadata tracking + failureThreshold: failureThreshold, + resetTimeout: resetTimeout, + maxRetries: maxRetries, + initialRetryWait: initialRetryWait, + retryBackoffFactor: retryBackoffFactor, + } + + // Create provider that references this FilerClient for failover support + provider := &filerVolumeProvider{ + filerClient: fc, + } + + fc.vidMapClient = newVidMapClient(provider, dataCenter, cacheSize) + + return fc +} + +// GetLookupFileIdFunction returns a lookup function with URL preference handling +func (fc *FilerClient) GetLookupFileIdFunction() LookupFileIdFunctionType { + if fc.urlPreference == PreferUrl { + // Use the default implementation from vidMapClient + return fc.vidMapClient.GetLookupFileIdFunction() + } + + // Custom implementation that prefers PublicUrl + return func(ctx context.Context, fileId string) (fullUrls []string, err error) { + // Parse file ID to extract volume ID + parts := strings.Split(fileId, ",") + if len(parts) != 2 { + return nil, fmt.Errorf("invalid fileId format: %s", fileId) + } + volumeIdStr := parts[0] + + // First try the cache using LookupVolumeIdsWithFallback + vidLocations, err := fc.LookupVolumeIdsWithFallback(ctx, []string{volumeIdStr}) + + // Check for partial results first (important for multi-volume batched lookups) + locations, found := vidLocations[volumeIdStr] + if !found || len(locations) == 0 { + // Volume not found - return specific error with context from lookup if available + if err != nil { + return nil, fmt.Errorf("volume %s not found for fileId %s: %w", volumeIdStr, fileId, err) + } + return nil, fmt.Errorf("volume %s not found for fileId %s", volumeIdStr, fileId) + } + + // Volume found successfully - ignore any errors about other volumes + // (not relevant for single-volume lookup, but defensive for future batching) + + // Build URLs with publicUrl preference, and also prefer same DC + var sameDcUrls, otherDcUrls []string + dataCenter := fc.GetDataCenter() + for _, loc := range locations { + url := loc.PublicUrl + if url == "" { + url = loc.Url + } + httpUrl := "http://" + url + "/" + fileId + if dataCenter != "" && dataCenter == loc.DataCenter { + sameDcUrls = append(sameDcUrls, httpUrl) + } else { + otherDcUrls = append(otherDcUrls, httpUrl) + } + } + // Shuffle to distribute load across volume servers + rand.Shuffle(len(sameDcUrls), func(i, j int) { sameDcUrls[i], sameDcUrls[j] = sameDcUrls[j], sameDcUrls[i] }) + rand.Shuffle(len(otherDcUrls), func(i, j int) { otherDcUrls[i], otherDcUrls[j] = otherDcUrls[j], otherDcUrls[i] }) + // Prefer same data center + fullUrls = append(sameDcUrls, otherDcUrls...) + return fullUrls, nil + } +} + +// isRetryableGrpcError checks if a gRPC error is transient and should be retried +// +// Note on codes.Aborted: While Aborted can indicate application-level conflicts +// (e.g., transaction failures), in the context of volume location lookups (which +// are simple read-only operations with no transactions), Aborted is more likely +// to indicate transient server issues during restart/recovery. We include it here +// for volume lookups but log it for visibility in case misclassification occurs. +func isRetryableGrpcError(err error) bool { + if err == nil { + return false + } + + // Check gRPC status code + st, ok := status.FromError(err) + if ok { + switch st.Code() { + case codes.Unavailable: // Server unavailable (temporary) + return true + case codes.DeadlineExceeded: // Request timeout + return true + case codes.ResourceExhausted: // Rate limited or overloaded + return true + case codes.Aborted: + // Aborted during read-only volume lookups is likely transient + // (e.g., filer restarting), but log for visibility + glog.V(1).Infof("Treating Aborted as retryable for volume lookup: %v", err) + return true + } + } + + // Fallback to string matching for non-gRPC errors (e.g., network errors) + errStr := err.Error() + return strings.Contains(errStr, "transport") || + strings.Contains(errStr, "connection") || + strings.Contains(errStr, "timeout") || + strings.Contains(errStr, "unavailable") +} + +// shouldSkipUnhealthyFiler checks if we should skip a filer based on recent failures +// Circuit breaker pattern: skip filers with multiple recent consecutive failures +func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool { + health := fc.filerHealth[index] + failureCount := atomic.LoadInt32(&health.failureCount) + + // Check if failure count exceeds threshold + if failureCount < fc.failureThreshold { + return false + } + + // Re-check unhealthy filers after reset timeout + lastFailureNs := atomic.LoadInt64(&health.lastFailureTimeNs) + if lastFailureNs == 0 { + return false // Never failed, shouldn't skip + } + lastFailureTime := time.Unix(0, lastFailureNs) + if time.Since(lastFailureTime) > fc.resetTimeout { + return false // Time to re-check + } + + return true // Skip this unhealthy filer +} + +// recordFilerSuccess resets failure tracking for a successful filer +func (fc *FilerClient) recordFilerSuccess(index int32) { + health := fc.filerHealth[index] + atomic.StoreInt32(&health.failureCount, 0) +} + +// recordFilerFailure increments failure count for an unhealthy filer +func (fc *FilerClient) recordFilerFailure(index int32) { + health := fc.filerHealth[index] + atomic.AddInt32(&health.failureCount, 1) + atomic.StoreInt64(&health.lastFailureTimeNs, time.Now().UnixNano()) +} + +// LookupVolumeIds queries the filer for volume locations with automatic failover +// Tries all configured filer addresses until one succeeds (high availability) +// Retries transient gRPC errors (Unavailable, DeadlineExceeded, etc.) with exponential backoff +// Note: Unlike master's VolumeIdLocation, filer's Locations message doesn't currently have +// an Error field. This implementation handles the current structure while being prepared +// for future error reporting enhancements. +func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []string) (map[string][]Location, error) { + fc := p.filerClient + result := make(map[string][]Location) + + // Retry transient failures with configurable backoff + var lastErr error + waitTime := fc.initialRetryWait + maxRetries := fc.maxRetries + + for retry := 0; retry < maxRetries; retry++ { + // Try all filer addresses with round-robin starting from current index + // Skip known-unhealthy filers (circuit breaker pattern) + i := atomic.LoadInt32(&fc.filerIndex) + n := int32(len(fc.filerAddresses)) + + for x := int32(0); x < n; x++ { + // Circuit breaker: skip unhealthy filers + if fc.shouldSkipUnhealthyFiler(i) { + glog.V(2).Infof("FilerClient: skipping unhealthy filer %s (consecutive failures: %d)", + fc.filerAddresses[i], atomic.LoadInt32(&fc.filerHealth[i].failureCount)) + i++ + if i >= n { + i = 0 + } + continue + } + + filerAddress := fc.filerAddresses[i] + + // Use anonymous function to ensure defer cancel() is called per iteration, not accumulated + err := func() error { + // Create a fresh timeout context for each filer attempt + // This ensures each retry gets the full grpcTimeout, not a diminishing deadline + timeoutCtx, cancel := context.WithTimeout(ctx, fc.grpcTimeout) + defer cancel() // Always clean up context, even on panic or early return + + return pb.WithGrpcFilerClient(false, fc.clientId, filerAddress, fc.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + resp, err := client.LookupVolume(timeoutCtx, &filer_pb.LookupVolumeRequest{ + VolumeIds: volumeIds, + }) + if err != nil { + return fmt.Errorf("filer.LookupVolume failed: %w", err) + } + + // Process each volume in the response + for vid, locs := range resp.LocationsMap { + // Convert locations from protobuf to internal format + var locations []Location + for _, loc := range locs.Locations { + locations = append(locations, Location{ + Url: loc.Url, + PublicUrl: loc.PublicUrl, + DataCenter: loc.DataCenter, + GrpcPort: int(loc.GrpcPort), + }) + } + + // Only add to result if we have locations + // Empty locations with no gRPC error means "not found" (volume doesn't exist) + if len(locations) > 0 { + result[vid] = locations + glog.V(4).Infof("FilerClient: volume %s found with %d location(s)", vid, len(locations)) + } else { + glog.V(2).Infof("FilerClient: volume %s not found (no locations in response)", vid) + } + } + + // Check for volumes that weren't in the response at all + // This could indicate a problem with the filer + for _, vid := range volumeIds { + if _, found := resp.LocationsMap[vid]; !found { + glog.V(1).Infof("FilerClient: volume %s missing from filer response", vid) + } + } + + return nil + }) + }() + + if err != nil { + glog.V(1).Infof("FilerClient: filer %s lookup failed (attempt %d/%d, retry %d/%d): %v", filerAddress, x+1, n, retry+1, maxRetries, err) + fc.recordFilerFailure(i) + lastErr = err + i++ + if i >= n { + i = 0 + } + continue + } + + // Success - update the preferred filer index and reset health tracking + atomic.StoreInt32(&fc.filerIndex, i) + fc.recordFilerSuccess(i) + glog.V(3).Infof("FilerClient: looked up %d volumes on %s, found %d", len(volumeIds), filerAddress, len(result)) + return result, nil + } + + // All filers failed on this attempt + // Check if the error is retryable (transient gRPC error) + if !isRetryableGrpcError(lastErr) { + // Non-retryable error (e.g., NotFound, PermissionDenied) - fail immediately + return nil, fmt.Errorf("all %d filer(s) failed with non-retryable error: %w", n, lastErr) + } + + // Transient error - retry if we have attempts left + if retry < maxRetries-1 { + glog.V(1).Infof("FilerClient: all %d filer(s) failed with retryable error (attempt %d/%d), retrying in %v: %v", + n, retry+1, maxRetries, waitTime, lastErr) + time.Sleep(waitTime) + waitTime = time.Duration(float64(waitTime) * fc.retryBackoffFactor) + } + } + + // All retries exhausted + return nil, fmt.Errorf("all %d filer(s) failed after %d attempts, last error: %w", len(fc.filerAddresses), maxRetries, lastErr) +} diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index 320156294..89218a8c7 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -5,328 +5,143 @@ import ( "errors" "fmt" "math/rand" - "sort" "strconv" "strings" "sync" "time" - "golang.org/x/sync/singleflight" - - "github.com/seaweedfs/seaweedfs/weed/util/version" - - "github.com/seaweedfs/seaweedfs/weed/stats" - - "github.com/seaweedfs/seaweedfs/weed/util" "google.golang.org/grpc" "github.com/seaweedfs/seaweedfs/weed/glog" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" + "github.com/seaweedfs/seaweedfs/weed/stats" + "github.com/seaweedfs/seaweedfs/weed/util" + "github.com/seaweedfs/seaweedfs/weed/util/version" ) -type MasterClient struct { - FilerGroup string - clientType string - clientHost pb.ServerAddress - rack string - currentMaster pb.ServerAddress - currentMasterLock sync.RWMutex - masters pb.ServerDiscovery - grpcDialOption grpc.DialOption - - // vidMap stores volume location mappings - // Protected by vidMapLock to prevent race conditions during pointer swaps in resetVidMap - vidMap *vidMap - vidMapLock sync.RWMutex - vidMapCacheSize int - OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time) - OnPeerUpdateLock sync.RWMutex - - // Per-batch in-flight tracking to prevent duplicate lookups for the same set of volumes - vidLookupGroup singleflight.Group -} - -func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters pb.ServerDiscovery) *MasterClient { - return &MasterClient{ - FilerGroup: filerGroup, - clientType: clientType, - clientHost: clientHost, - rack: rack, - masters: masters, - grpcDialOption: grpcDialOption, - vidMap: newVidMap(clientDataCenter), - vidMapCacheSize: 5, - } -} - -func (mc *MasterClient) SetOnPeerUpdateFn(onPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)) { - mc.OnPeerUpdateLock.Lock() - mc.OnPeerUpdate = onPeerUpdate - mc.OnPeerUpdateLock.Unlock() -} - -func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType { - return mc.LookupFileIdWithFallback +// masterVolumeProvider implements VolumeLocationProvider by querying master +// This is rarely called since master pushes updates proactively via KeepConnected stream +type masterVolumeProvider struct { + masterClient *MasterClient } -func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId string) (fullUrls []string, err error) { - // Try cache first using the fast path - grab both vidMap and dataCenter in one lock - mc.vidMapLock.RLock() - vm := mc.vidMap - dataCenter := vm.DataCenter - mc.vidMapLock.RUnlock() - - fullUrls, err = vm.LookupFileId(ctx, fileId) - if err == nil && len(fullUrls) > 0 { - return - } - - // Extract volume ID from file ID (format: "volumeId,needle_id_cookie") - parts := strings.Split(fileId, ",") - if len(parts) != 2 { - return nil, fmt.Errorf("invalid fileId %s", fileId) - } - volumeId := parts[0] - - // Use shared lookup logic with batching and singleflight - vidLocations, err := mc.LookupVolumeIdsWithFallback(ctx, []string{volumeId}) - if err != nil { - return nil, fmt.Errorf("LookupVolume %s failed: %v", fileId, err) - } - - locations, found := vidLocations[volumeId] - if !found || len(locations) == 0 { - return nil, fmt.Errorf("volume %s not found for fileId %s", volumeId, fileId) - } - - // Build HTTP URLs from locations, preferring same data center - var sameDcUrls, otherDcUrls []string - for _, loc := range locations { - httpUrl := "http://" + loc.Url + "/" + fileId - if dataCenter != "" && dataCenter == loc.DataCenter { - sameDcUrls = append(sameDcUrls, httpUrl) - } else { - otherDcUrls = append(otherDcUrls, httpUrl) - } - } - - // Prefer same data center - fullUrls = append(sameDcUrls, otherDcUrls...) - return fullUrls, nil -} - -// LookupVolumeIdsWithFallback looks up volume locations, querying master if not in cache -// Uses singleflight to coalesce concurrent requests for the same batch of volumes -func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeIds []string) (map[string][]Location, error) { +// LookupVolumeIds queries the master for volume locations (fallback when cache misses) +// Returns partial results with aggregated errors for volumes that failed +func (p *masterVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []string) (map[string][]Location, error) { result := make(map[string][]Location) - var needsLookup []string var lookupErrors []error - // Check cache first and parse volume IDs once - vidStringToUint := make(map[string]uint32, len(volumeIds)) + glog.V(2).Infof("Looking up %d volumes from master: %v", len(volumeIds), volumeIds) - // Get stable pointer to vidMap with minimal lock hold time - vm := mc.getStableVidMap() + // Use a timeout for the master lookup to prevent indefinite blocking + timeoutCtx, cancel := context.WithTimeout(ctx, p.masterClient.grpcTimeout) + defer cancel() - for _, vidString := range volumeIds { - vid, err := strconv.ParseUint(vidString, 10, 32) + err := pb.WithMasterClient(false, p.masterClient.GetMaster(ctx), p.masterClient.grpcDialOption, false, func(client master_pb.SeaweedClient) error { + resp, err := client.LookupVolume(timeoutCtx, &master_pb.LookupVolumeRequest{ + VolumeOrFileIds: volumeIds, + }) if err != nil { - return nil, fmt.Errorf("invalid volume id %s: %v", vidString, err) - } - vidStringToUint[vidString] = uint32(vid) - - locations, found := vm.GetLocations(uint32(vid)) - if found && len(locations) > 0 { - result[vidString] = locations - } else { - needsLookup = append(needsLookup, vidString) + return fmt.Errorf("master lookup failed: %v", err) } - } - - if len(needsLookup) == 0 { - return result, nil - } - // Batch query all missing volumes using singleflight on the batch key - // Sort for stable key to coalesce identical batches - sort.Strings(needsLookup) - batchKey := strings.Join(needsLookup, ",") - - sfResult, err, _ := mc.vidLookupGroup.Do(batchKey, func() (interface{}, error) { - // Double-check cache for volumes that might have been populated while waiting - stillNeedLookup := make([]string, 0, len(needsLookup)) - batchResult := make(map[string][]Location) - - // Get stable pointer with minimal lock hold time - vm := mc.getStableVidMap() - - for _, vidString := range needsLookup { - vid := vidStringToUint[vidString] // Use pre-parsed value - if locations, found := vm.GetLocations(vid); found && len(locations) > 0 { - batchResult[vidString] = locations - } else { - stillNeedLookup = append(stillNeedLookup, vidString) + for _, vidLoc := range resp.VolumeIdLocations { + // Preserve per-volume errors from master response + // These could indicate misconfiguration, volume deletion, etc. + if vidLoc.Error != "" { + lookupErrors = append(lookupErrors, fmt.Errorf("volume %s: %s", vidLoc.VolumeOrFileId, vidLoc.Error)) + glog.V(1).Infof("volume %s lookup error from master: %s", vidLoc.VolumeOrFileId, vidLoc.Error) + continue } - } - - if len(stillNeedLookup) == 0 { - return batchResult, nil - } - - // Query master with batched volume IDs - glog.V(2).Infof("Looking up %d volumes from master: %v", len(stillNeedLookup), stillNeedLookup) - err := pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { - resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{ - VolumeOrFileIds: stillNeedLookup, - }) + // Parse volume ID from response + parts := strings.Split(vidLoc.VolumeOrFileId, ",") + vidOnly := parts[0] + vid, err := strconv.ParseUint(vidOnly, 10, 32) if err != nil { - return fmt.Errorf("master lookup failed: %v", err) + lookupErrors = append(lookupErrors, fmt.Errorf("volume %s: invalid volume ID format: %w", vidLoc.VolumeOrFileId, err)) + glog.Warningf("Failed to parse volume id '%s' from master response '%s': %v", vidOnly, vidLoc.VolumeOrFileId, err) + continue } - for _, vidLoc := range resp.VolumeIdLocations { - if vidLoc.Error != "" { - glog.V(0).Infof("volume %s lookup error: %s", vidLoc.VolumeOrFileId, vidLoc.Error) - continue - } - - // Parse volume ID from response - parts := strings.Split(vidLoc.VolumeOrFileId, ",") - vidOnly := parts[0] - vid, err := strconv.ParseUint(vidOnly, 10, 32) - if err != nil { - glog.Warningf("Failed to parse volume id '%s' from master response '%s': %v", vidOnly, vidLoc.VolumeOrFileId, err) - continue - } - - var locations []Location - for _, masterLoc := range vidLoc.Locations { - loc := Location{ - Url: masterLoc.Url, - PublicUrl: masterLoc.PublicUrl, - GrpcPort: int(masterLoc.GrpcPort), - DataCenter: masterLoc.DataCenter, - } - mc.addLocation(uint32(vid), loc) - locations = append(locations, loc) - } - - if len(locations) > 0 { - batchResult[vidOnly] = locations + var locations []Location + for _, masterLoc := range vidLoc.Locations { + loc := Location{ + Url: masterLoc.Url, + PublicUrl: masterLoc.PublicUrl, + GrpcPort: int(masterLoc.GrpcPort), + DataCenter: masterLoc.DataCenter, } + // Update cache with the location + p.masterClient.addLocation(uint32(vid), loc) + locations = append(locations, loc) } - return nil - }) - if err != nil { - return batchResult, err + if len(locations) > 0 { + result[vidOnly] = locations + } } - return batchResult, nil + return nil }) if err != nil { - lookupErrors = append(lookupErrors, err) + return nil, err } - // Merge singleflight batch results - if batchLocations, ok := sfResult.(map[string][]Location); ok { - for vid, locs := range batchLocations { - result[vid] = locs - } - } - - // Check for volumes that still weren't found - for _, vidString := range needsLookup { - if _, found := result[vidString]; !found { - lookupErrors = append(lookupErrors, fmt.Errorf("volume %s not found", vidString)) - } + // Return partial results with detailed errors + // Callers should check both result map and error + if len(lookupErrors) > 0 { + glog.V(2).Infof("MasterClient: looked up %d volumes, found %d, %d errors", len(volumeIds), len(result), len(lookupErrors)) + return result, fmt.Errorf("master volume lookup errors: %w", errors.Join(lookupErrors...)) } - // Return aggregated errors using errors.Join to preserve error types - return result, errors.Join(lookupErrors...) + glog.V(3).Infof("MasterClient: looked up %d volumes, found %d", len(volumeIds), len(result)) + return result, nil } -func (mc *MasterClient) getCurrentMaster() pb.ServerAddress { - mc.currentMasterLock.RLock() - defer mc.currentMasterLock.RUnlock() - return mc.currentMaster -} +// MasterClient connects to master servers and maintains volume location cache +// It receives real-time updates via KeepConnected streaming and uses vidMapClient for caching +type MasterClient struct { + *vidMapClient // Embedded cache with shared logic -func (mc *MasterClient) setCurrentMaster(master pb.ServerAddress) { - mc.currentMasterLock.Lock() - mc.currentMaster = master - mc.currentMasterLock.Unlock() + FilerGroup string + clientType string + clientHost pb.ServerAddress + rack string + currentMaster pb.ServerAddress + currentMasterLock sync.RWMutex + masters pb.ServerDiscovery + grpcDialOption grpc.DialOption + grpcTimeout time.Duration // Timeout for gRPC calls to master + OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time) + OnPeerUpdateLock sync.RWMutex } -func (mc *MasterClient) GetMaster(ctx context.Context) pb.ServerAddress { - mc.WaitUntilConnected(ctx) - return mc.getCurrentMaster() -} +func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters pb.ServerDiscovery) *MasterClient { + mc := &MasterClient{ + FilerGroup: filerGroup, + clientType: clientType, + clientHost: clientHost, + rack: rack, + masters: masters, + grpcDialOption: grpcDialOption, + grpcTimeout: 5 * time.Second, // Default: 5 seconds for gRPC calls to master + } -func (mc *MasterClient) GetMasters(ctx context.Context) []pb.ServerAddress { - mc.WaitUntilConnected(ctx) - return mc.masters.GetInstances() -} + // Create provider that references this MasterClient + provider := &masterVolumeProvider{masterClient: mc} -func (mc *MasterClient) WaitUntilConnected(ctx context.Context) { - attempts := 0 - for { - select { - case <-ctx.Done(): - return - default: - currentMaster := mc.getCurrentMaster() - if currentMaster != "" { - return - } - attempts++ - if attempts%100 == 0 { // Log every 100 attempts (roughly every 20 seconds) - glog.V(0).Infof("%s.%s WaitUntilConnected still waiting for master connection (attempt %d)...", mc.FilerGroup, mc.clientType, attempts) - } - time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond) - } - } -} + // Initialize embedded vidMapClient with the provider and default cache size + mc.vidMapClient = newVidMapClient(provider, clientDataCenter, DefaultVidMapCacheSize) -func (mc *MasterClient) KeepConnectedToMaster(ctx context.Context) { - glog.V(1).Infof("%s.%s masterClient bootstraps with masters %v", mc.FilerGroup, mc.clientType, mc.masters) - for { - select { - case <-ctx.Done(): - glog.V(0).Infof("Connection to masters stopped: %v", ctx.Err()) - return - default: - mc.tryAllMasters(ctx) - time.Sleep(time.Second) - } - } + return mc } -func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddress) (leader string) { - for _, master := range mc.masters.GetInstances() { - if master == myMasterAddress { - continue - } - if grpcErr := pb.WithMasterClient(false, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { - ctx, cancel := context.WithTimeout(context.Background(), 120*time.Millisecond) - defer cancel() - resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{}) - if err != nil { - return err - } - leader = resp.Leader - return nil - }); grpcErr != nil { - glog.V(0).Infof("connect to %s: %v", master, grpcErr) - } - if leader != "" { - glog.V(0).Infof("existing leader is %s", leader) - return - } - } - glog.V(0).Infof("No existing leader found!") - return +func (mc *MasterClient) SetOnPeerUpdateFn(onPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)) { + mc.OnPeerUpdateLock.Lock() + mc.OnPeerUpdate = onPeerUpdate + mc.OnPeerUpdateLock.Unlock() } func (mc *MasterClient) tryAllMasters(ctx context.Context) { @@ -393,6 +208,8 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server mc.resetVidMap() mc.updateVidMap(resp) } else { + // First message from master is not VolumeLocation (e.g., ClusterNodeUpdate) + // Still need to reset cache to ensure we don't use stale data from previous master mc.resetVidMap() } mc.setCurrentMaster(master) @@ -406,7 +223,8 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server } if resp.VolumeLocation != nil { - // maybe the leader is changed + // Check for leader change during the stream + // If master announces a new leader, reconnect to it if resp.VolumeLocation.Leader != "" && string(mc.GetMaster(ctx)) != resp.VolumeLocation.Leader { glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.GetMaster(ctx), resp.VolumeLocation.Leader) nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader) @@ -415,7 +233,6 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server } mc.updateVidMap(resp) } - if resp.ClusterNodeUpdate != nil { update := resp.ClusterNodeUpdate mc.OnPeerUpdateLock.RLock() @@ -442,7 +259,7 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server stats.MasterClientConnectCounter.WithLabelValues(stats.Failed).Inc() glog.V(1).Infof("%s.%s masterClient failed to connect with master %v: %v", mc.FilerGroup, mc.clientType, master, gprcErr) } - return + return nextHintedLeader } func (mc *MasterClient) updateVidMap(resp *master_pb.KeepConnectedResponse) { @@ -494,110 +311,103 @@ func (mc *MasterClient) WithClientCustomGetMaster(getMasterF func() pb.ServerAdd }) } -// getStableVidMap gets a stable pointer to the vidMap, releasing the lock immediately. -// This is safe for read operations as the returned pointer is a stable snapshot, -// and the underlying vidMap methods have their own internal locking. -func (mc *MasterClient) getStableVidMap() *vidMap { - mc.vidMapLock.RLock() - vm := mc.vidMap - mc.vidMapLock.RUnlock() - return vm -} - -// withCurrentVidMap executes a function with the current vidMap under a read lock. -// This is for methods that modify vidMap's internal state, ensuring the pointer -// is not swapped by resetVidMap during the operation. The actual map mutations -// are protected by vidMap's internal mutex. -func (mc *MasterClient) withCurrentVidMap(f func(vm *vidMap)) { - mc.vidMapLock.RLock() - defer mc.vidMapLock.RUnlock() - f(mc.vidMap) -} - -// Public methods for external packages to access vidMap safely - -// GetLocations safely retrieves volume locations -func (mc *MasterClient) GetLocations(vid uint32) (locations []Location, found bool) { - return mc.getStableVidMap().GetLocations(vid) -} - -// GetLocationsClone safely retrieves a clone of volume locations -func (mc *MasterClient) GetLocationsClone(vid uint32) (locations []Location, found bool) { - return mc.getStableVidMap().GetLocationsClone(vid) -} - -// GetVidLocations safely retrieves volume locations by string ID -func (mc *MasterClient) GetVidLocations(vid string) (locations []Location, err error) { - return mc.getStableVidMap().GetVidLocations(vid) -} - -// LookupFileId safely looks up URLs for a file ID -func (mc *MasterClient) LookupFileId(ctx context.Context, fileId string) (fullUrls []string, err error) { - return mc.getStableVidMap().LookupFileId(ctx, fileId) -} - -// LookupVolumeServerUrl safely looks up volume server URLs -func (mc *MasterClient) LookupVolumeServerUrl(vid string) (serverUrls []string, err error) { - return mc.getStableVidMap().LookupVolumeServerUrl(vid) +func (mc *MasterClient) getCurrentMaster() pb.ServerAddress { + mc.currentMasterLock.RLock() + defer mc.currentMasterLock.RUnlock() + return mc.currentMaster } -// GetDataCenter safely retrieves the data center -func (mc *MasterClient) GetDataCenter() string { - return mc.getStableVidMap().DataCenter +func (mc *MasterClient) setCurrentMaster(master pb.ServerAddress) { + mc.currentMasterLock.Lock() + mc.currentMaster = master + mc.currentMasterLock.Unlock() } -// Thread-safe helpers for vidMap operations - -// addLocation adds a volume location -func (mc *MasterClient) addLocation(vid uint32, location Location) { - mc.withCurrentVidMap(func(vm *vidMap) { - vm.addLocation(vid, location) - }) +// GetMaster returns the current master address, blocking until connected. +// +// IMPORTANT: This method blocks until KeepConnectedToMaster successfully establishes +// a connection to a master server. If KeepConnectedToMaster hasn't been started in a +// background goroutine, this will block indefinitely (or until ctx is canceled). +// +// Typical initialization pattern: +// +// mc := wdclient.NewMasterClient(...) +// go mc.KeepConnectedToMaster(ctx) // Start connection management +// // ... later ... +// master := mc.GetMaster(ctx) // Will block until connected +// +// If called before KeepConnectedToMaster establishes a connection, this may cause +// unexpected timeouts in LookupVolumeIds and other operations that depend on it. +func (mc *MasterClient) GetMaster(ctx context.Context) pb.ServerAddress { + mc.WaitUntilConnected(ctx) + return mc.getCurrentMaster() } -// deleteLocation removes a volume location -func (mc *MasterClient) deleteLocation(vid uint32, location Location) { - mc.withCurrentVidMap(func(vm *vidMap) { - vm.deleteLocation(vid, location) - }) +// GetMasters returns all configured master addresses, blocking until connected. +// See GetMaster() for important initialization contract details. +func (mc *MasterClient) GetMasters(ctx context.Context) []pb.ServerAddress { + mc.WaitUntilConnected(ctx) + return mc.masters.GetInstances() } -// addEcLocation adds an EC volume location -func (mc *MasterClient) addEcLocation(vid uint32, location Location) { - mc.withCurrentVidMap(func(vm *vidMap) { - vm.addEcLocation(vid, location) - }) +// WaitUntilConnected blocks until a master connection is established or ctx is canceled. +// This does NOT initiate connections - it only waits for KeepConnectedToMaster to succeed. +func (mc *MasterClient) WaitUntilConnected(ctx context.Context) { + attempts := 0 + for { + select { + case <-ctx.Done(): + return + default: + currentMaster := mc.getCurrentMaster() + if currentMaster != "" { + return + } + attempts++ + if attempts%100 == 0 { // Log every 100 attempts (roughly every 20 seconds) + glog.V(0).Infof("%s.%s WaitUntilConnected still waiting for master connection (attempt %d)...", mc.FilerGroup, mc.clientType, attempts) + } + time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond) + } + } } -// deleteEcLocation removes an EC volume location -func (mc *MasterClient) deleteEcLocation(vid uint32, location Location) { - mc.withCurrentVidMap(func(vm *vidMap) { - vm.deleteEcLocation(vid, location) - }) +func (mc *MasterClient) KeepConnectedToMaster(ctx context.Context) { + glog.V(1).Infof("%s.%s masterClient bootstraps with masters %v", mc.FilerGroup, mc.clientType, mc.masters) + for { + select { + case <-ctx.Done(): + glog.V(0).Infof("Connection to masters stopped: %v", ctx.Err()) + return + default: + mc.tryAllMasters(ctx) + time.Sleep(time.Second) + } + } } -func (mc *MasterClient) resetVidMap() { - mc.vidMapLock.Lock() - defer mc.vidMapLock.Unlock() - - // Preserve the existing vidMap in the cache chain - // No need to clone - the existing vidMap has its own mutex for thread safety - tail := mc.vidMap - - nvm := newVidMap(tail.DataCenter) - nvm.cache.Store(tail) - mc.vidMap = nvm - - // Trim cache chain to vidMapCacheSize by traversing to the last node - // that should remain and cutting the chain after it - node := tail - for i := 0; i < mc.vidMapCacheSize-1; i++ { - if node.cache.Load() == nil { +func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddress) (leader string) { + for _, master := range mc.masters.GetInstances() { + if master == myMasterAddress { + continue + } + if grpcErr := pb.WithMasterClient(false, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error { + ctx, cancel := context.WithTimeout(context.Background(), 120*time.Millisecond) + defer cancel() + resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{}) + if err != nil { + return err + } + leader = resp.Leader + return nil + }); grpcErr != nil { + glog.V(0).Infof("connect to %s: %v", master, grpcErr) + } + if leader != "" { + glog.V(0).Infof("existing leader is %s", leader) return } - node = node.cache.Load() - } - if node != nil { - node.cache.Store(nil) } + glog.V(0).Infof("No existing leader found!") + return } diff --git a/weed/wdclient/vidmap_client.go b/weed/wdclient/vidmap_client.go new file mode 100644 index 000000000..402eaf8c4 --- /dev/null +++ b/weed/wdclient/vidmap_client.go @@ -0,0 +1,347 @@ +package wdclient + +import ( + "context" + "errors" + "fmt" + "math/rand" + "sort" + "strconv" + "strings" + "sync" + + "golang.org/x/sync/singleflight" + + "github.com/seaweedfs/seaweedfs/weed/glog" +) + +// VolumeLocationProvider is the interface for looking up volume locations +// This allows different implementations (master subscription, filer queries, etc.) +type VolumeLocationProvider interface { + // LookupVolumeIds looks up volume locations for the given volume IDs + // Returns a map of volume ID to locations + LookupVolumeIds(ctx context.Context, volumeIds []string) (map[string][]Location, error) +} + +// vidMapClient provides volume location caching with pluggable lookup +// It wraps the battle-tested vidMap with customizable volume lookup strategies +type vidMapClient struct { + vidMap *vidMap + vidMapLock sync.RWMutex + vidMapCacheSize int + provider VolumeLocationProvider + vidLookupGroup singleflight.Group +} + +const ( + // DefaultVidMapCacheSize is the default number of historical vidMap snapshots to keep + // This provides cache history when volumes move between servers + DefaultVidMapCacheSize = 5 +) + +// newVidMapClient creates a new client with the given provider and data center +func newVidMapClient(provider VolumeLocationProvider, dataCenter string, cacheSize int) *vidMapClient { + if cacheSize <= 0 { + cacheSize = DefaultVidMapCacheSize + } + return &vidMapClient{ + vidMap: newVidMap(dataCenter), + vidMapCacheSize: cacheSize, + provider: provider, + } +} + +// GetLookupFileIdFunction returns a function that can be used to lookup file IDs +func (vc *vidMapClient) GetLookupFileIdFunction() LookupFileIdFunctionType { + return vc.LookupFileIdWithFallback +} + +// LookupFileIdWithFallback looks up a file ID, checking cache first, then using provider +func (vc *vidMapClient) LookupFileIdWithFallback(ctx context.Context, fileId string) (fullUrls []string, err error) { + // Try cache first - hold read lock during entire vidMap access to prevent swap during operation + vc.vidMapLock.RLock() + vm := vc.vidMap + dataCenter := vm.DataCenter + fullUrls, err = vm.LookupFileId(ctx, fileId) + vc.vidMapLock.RUnlock() + + // Cache hit - return immediately + if err == nil && len(fullUrls) > 0 { + return + } + + // Cache miss - extract volume ID from file ID (format: "volumeId,needle_id_cookie") + parts := strings.Split(fileId, ",") + if len(parts) != 2 { + return nil, fmt.Errorf("invalid fileId %s", fileId) + } + volumeId := parts[0] + + // Use shared lookup logic with batching and singleflight + vidLocations, err := vc.LookupVolumeIdsWithFallback(ctx, []string{volumeId}) + + // Check for partial results first (important for multi-volume batched lookups) + locations, found := vidLocations[volumeId] + if !found || len(locations) == 0 { + // Volume not found - return specific error with context from lookup if available + if err != nil { + return nil, fmt.Errorf("volume %s not found for fileId %s: %w", volumeId, fileId, err) + } + return nil, fmt.Errorf("volume %s not found for fileId %s", volumeId, fileId) + } + + // Volume found successfully - ignore any errors about other volumes + // (not relevant for single-volume lookup, but defensive for future batching) + + // Build HTTP URLs from locations, preferring same data center + var sameDcUrls, otherDcUrls []string + for _, loc := range locations { + httpUrl := "http://" + loc.Url + "/" + fileId + if dataCenter != "" && dataCenter == loc.DataCenter { + sameDcUrls = append(sameDcUrls, httpUrl) + } else { + otherDcUrls = append(otherDcUrls, httpUrl) + } + } + + // Shuffle to distribute load across volume servers + rand.Shuffle(len(sameDcUrls), func(i, j int) { sameDcUrls[i], sameDcUrls[j] = sameDcUrls[j], sameDcUrls[i] }) + rand.Shuffle(len(otherDcUrls), func(i, j int) { otherDcUrls[i], otherDcUrls[j] = otherDcUrls[j], otherDcUrls[i] }) + + // Prefer same data center + fullUrls = append(sameDcUrls, otherDcUrls...) + return fullUrls, nil +} + +// LookupVolumeIdsWithFallback looks up volume locations, querying provider if not in cache. +// Uses singleflight to coalesce concurrent requests for the same batch of volumes. +// +// IMPORTANT: This function may return PARTIAL results with a non-nil error. +// The result map contains successfully looked up volumes, while the error aggregates +// failures for volumes that couldn't be found or had lookup errors. +// +// Callers MUST check both the result map AND the error: +// - result != nil && err == nil: All volumes found successfully +// - result != nil && err != nil: Some volumes found, some failed (check both) +// - result == nil && err != nil: Complete failure (connection error, etc.) +// +// Example usage: +// +// locs, err := mc.LookupVolumeIdsWithFallback(ctx, []string{"1", "2", "999"}) +// if len(locs) > 0 { +// // Process successfully found volumes +// } +// if err != nil { +// // Log/handle failed volumes +// } +func (vc *vidMapClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeIds []string) (map[string][]Location, error) { + result := make(map[string][]Location) + var needsLookup []string + var lookupErrors []error + + // Check cache first and parse volume IDs once + vidStringToUint := make(map[string]uint32, len(volumeIds)) + + // Get stable pointer to vidMap with minimal lock hold time + vm := vc.getStableVidMap() + + for _, vidString := range volumeIds { + vid, err := strconv.ParseUint(vidString, 10, 32) + if err != nil { + return nil, fmt.Errorf("invalid volume id %s: %v", vidString, err) + } + vidStringToUint[vidString] = uint32(vid) + + locations, found := vm.GetLocations(uint32(vid)) + if found && len(locations) > 0 { + result[vidString] = locations + } else { + needsLookup = append(needsLookup, vidString) + } + } + + if len(needsLookup) == 0 { + return result, nil + } + + // Batch query all missing volumes using singleflight on the batch key + // Sort for stable key to coalesce identical batches + sort.Strings(needsLookup) + batchKey := strings.Join(needsLookup, ",") + + sfResult, err, _ := vc.vidLookupGroup.Do(batchKey, func() (interface{}, error) { + // Double-check cache for volumes that might have been populated while waiting + stillNeedLookup := make([]string, 0, len(needsLookup)) + batchResult := make(map[string][]Location) + + // Get stable pointer with minimal lock hold time + vm := vc.getStableVidMap() + + for _, vidString := range needsLookup { + vid := vidStringToUint[vidString] // Use pre-parsed value + if locations, found := vm.GetLocations(vid); found && len(locations) > 0 { + batchResult[vidString] = locations + } else { + stillNeedLookup = append(stillNeedLookup, vidString) + } + } + + if len(stillNeedLookup) == 0 { + return batchResult, nil + } + + // Query provider with batched volume IDs + glog.V(2).Infof("Looking up %d volumes from provider: %v", len(stillNeedLookup), stillNeedLookup) + + providerResults, err := vc.provider.LookupVolumeIds(ctx, stillNeedLookup) + if err != nil { + return batchResult, fmt.Errorf("provider lookup failed: %v", err) + } + + // Update cache with results + for vidString, locations := range providerResults { + vid, err := strconv.ParseUint(vidString, 10, 32) + if err != nil { + glog.Warningf("Failed to parse volume id '%s': %v", vidString, err) + continue + } + + for _, loc := range locations { + vc.addLocation(uint32(vid), loc) + } + + if len(locations) > 0 { + batchResult[vidString] = locations + } + } + + return batchResult, nil + }) + + if err != nil { + lookupErrors = append(lookupErrors, err) + } + + // Merge singleflight batch results + if batchLocations, ok := sfResult.(map[string][]Location); ok { + for vid, locs := range batchLocations { + result[vid] = locs + } + } + + // Check for volumes that still weren't found + for _, vidString := range needsLookup { + if _, found := result[vidString]; !found { + lookupErrors = append(lookupErrors, fmt.Errorf("volume %s not found", vidString)) + } + } + + // Return aggregated errors + return result, errors.Join(lookupErrors...) +} + +// getStableVidMap gets a stable pointer to the vidMap, releasing the lock immediately. +// WARNING: Use with caution. The returned vidMap pointer is stable (won't be garbage collected +// due to cache chain), but the vidMapClient.vidMap field may be swapped by resetVidMap(). +// For operations that must use the current vidMap atomically, use withCurrentVidMap() instead. +func (vc *vidMapClient) getStableVidMap() *vidMap { + vc.vidMapLock.RLock() + vm := vc.vidMap + vc.vidMapLock.RUnlock() + return vm +} + +// withCurrentVidMap executes a function with the current vidMap under a read lock. +// This guarantees the vidMap instance cannot be swapped during the function execution. +// Use this when you need atomic access to the current vidMap for multiple operations. +func (vc *vidMapClient) withCurrentVidMap(f func(vm *vidMap)) { + vc.vidMapLock.RLock() + defer vc.vidMapLock.RUnlock() + f(vc.vidMap) +} + +// Public methods for external access + +// GetLocations safely retrieves volume locations +func (vc *vidMapClient) GetLocations(vid uint32) (locations []Location, found bool) { + return vc.getStableVidMap().GetLocations(vid) +} + +// GetLocationsClone safely retrieves a clone of volume locations +func (vc *vidMapClient) GetLocationsClone(vid uint32) (locations []Location, found bool) { + return vc.getStableVidMap().GetLocationsClone(vid) +} + +// GetVidLocations safely retrieves volume locations by string ID +func (vc *vidMapClient) GetVidLocations(vid string) (locations []Location, err error) { + return vc.getStableVidMap().GetVidLocations(vid) +} + +// LookupFileId safely looks up URLs for a file ID +func (vc *vidMapClient) LookupFileId(ctx context.Context, fileId string) (fullUrls []string, err error) { + return vc.getStableVidMap().LookupFileId(ctx, fileId) +} + +// LookupVolumeServerUrl safely looks up volume server URLs +func (vc *vidMapClient) LookupVolumeServerUrl(vid string) (serverUrls []string, err error) { + return vc.getStableVidMap().LookupVolumeServerUrl(vid) +} + +// GetDataCenter safely retrieves the data center +func (vc *vidMapClient) GetDataCenter() string { + return vc.getStableVidMap().DataCenter +} + +// Thread-safe helpers for vidMap operations + +// addLocation adds a volume location +func (vc *vidMapClient) addLocation(vid uint32, location Location) { + vc.withCurrentVidMap(func(vm *vidMap) { + vm.addLocation(vid, location) + }) +} + +// deleteLocation removes a volume location +func (vc *vidMapClient) deleteLocation(vid uint32, location Location) { + vc.withCurrentVidMap(func(vm *vidMap) { + vm.deleteLocation(vid, location) + }) +} + +// addEcLocation adds an EC volume location +func (vc *vidMapClient) addEcLocation(vid uint32, location Location) { + vc.withCurrentVidMap(func(vm *vidMap) { + vm.addEcLocation(vid, location) + }) +} + +// deleteEcLocation removes an EC volume location +func (vc *vidMapClient) deleteEcLocation(vid uint32, location Location) { + vc.withCurrentVidMap(func(vm *vidMap) { + vm.deleteEcLocation(vid, location) + }) +} + +// resetVidMap resets the volume ID map +func (vc *vidMapClient) resetVidMap() { + vc.vidMapLock.Lock() + defer vc.vidMapLock.Unlock() + + // Preserve the existing vidMap in the cache chain + tail := vc.vidMap + + nvm := newVidMap(tail.DataCenter) + nvm.cache.Store(tail) + vc.vidMap = nvm + + // Trim cache chain to vidMapCacheSize + node := tail + for i := 0; i < vc.vidMapCacheSize-1; i++ { + if node.cache.Load() == nil { + return + } + node = node.cache.Load() + } + // node is guaranteed to be non-nil after the loop + node.cache.Store(nil) +}