Browse Source

Merge branch 'master' into add_error_list_each_entry_func

pull/7485/head
tam-i13 2 months ago
committed by GitHub
parent
commit
f6f88d0bb8
No known key found for this signature in database GPG Key ID: B5690EEEBB952194
  1. 12
      test/s3/iam/s3_iam_framework.go
  2. 5
      weed/cluster/cluster.go
  3. 5
      weed/command/iam.go
  4. 4
      weed/filer/filer.go
  5. 47
      weed/filer/reader_at.go
  6. 43
      weed/iamapi/iamapi_server.go
  7. 27
      weed/mount/weedfs.go
  8. 5
      weed/s3api/s3_constants/header.go
  9. 6
      weed/s3api/s3api_bucket_handlers_object_lock_config.go
  10. 33
      weed/s3api/s3api_object_handlers.go
  11. 39
      weed/s3api/s3api_object_retention.go
  12. 76
      weed/s3api/s3api_object_retention_test.go
  13. 9
      weed/s3api/s3api_server.go
  14. 2
      weed/server/filer_server.go
  15. 404
      weed/wdclient/filer_client.go
  16. 546
      weed/wdclient/masterclient.go
  17. 347
      weed/wdclient/vidmap_client.go

12
test/s3/iam/s3_iam_framework.go

@ -705,12 +705,22 @@ func (f *S3IAMTestFramework) CreateBucketWithCleanup(s3Client *s3.S3, bucketName
f.t.Logf("Warning: Failed to delete existing bucket %s: %v", bucketName, deleteErr)
}
// Add a small delay to allow deletion to propagate
time.Sleep(100 * time.Millisecond)
// Now create it fresh
_, err = s3Client.CreateBucket(&s3.CreateBucketInput{
Bucket: aws.String(bucketName),
})
if err != nil {
return fmt.Errorf("failed to recreate bucket after cleanup: %v", err)
// If it still says bucket exists after cleanup, it might be in an inconsistent state
// In this case, just use the existing bucket since we emptied it
if awsErr, ok := err.(awserr.Error); ok && (awsErr.Code() == "BucketAlreadyExists" || awsErr.Code() == "BucketAlreadyOwnedByYou") {
f.t.Logf("Bucket %s still exists after cleanup, reusing it", bucketName)
// Bucket exists and is empty, so we can proceed
} else {
return fmt.Errorf("failed to recreate bucket after cleanup: %v", err)
}
}
} else {
return err

5
weed/cluster/cluster.go

@ -1,10 +1,11 @@
package cluster
import (
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"sync"
"time"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
const (

5
weed/command/iam.go

@ -76,7 +76,7 @@ func (iamopt *IamOptions) startIamServer() bool {
masters := pb.ServerAddresses(*iamopt.masters).ToAddressMap()
router := mux.NewRouter().SkipClean(true)
_, iamApiServer_err := iamapi.NewIamApiServer(router, &iamapi.IamServerOption{
iamApiServer, iamApiServer_err := iamapi.NewIamApiServer(router, &iamapi.IamServerOption{
Masters: masters,
Filer: filerAddress,
Port: *iamopt.port,
@ -86,6 +86,9 @@ func (iamopt *IamOptions) startIamServer() bool {
if iamApiServer_err != nil {
glog.Fatalf("IAM API Server startup error: %v", iamApiServer_err)
}
// Ensure cleanup on shutdown
defer iamApiServer.Shutdown()
listenAddress := fmt.Sprintf(":%d", *iamopt.port)
iamApiListener, iamApiLocalListener, err := util.NewIpAndLocalListeners(*iamopt.ip, *iamopt.port, time.Duration(10)*time.Second)

4
weed/filer/filer.go

@ -175,10 +175,6 @@ func (fs *Filer) GetMaster(ctx context.Context) pb.ServerAddress {
return fs.MasterClient.GetMaster(ctx)
}
func (fs *Filer) KeepMasterClientConnected(ctx context.Context) {
fs.MasterClient.KeepConnectedToMaster(ctx)
}
func (f *Filer) BeginTransaction(ctx context.Context) (context.Context, error) {
return f.Store.BeginTransaction(ctx)
}

47
weed/filer/reader_at.go

@ -26,15 +26,42 @@ type ChunkReadAt struct {
var _ = io.ReaderAt(&ChunkReadAt{})
var _ = io.Closer(&ChunkReadAt{})
// LookupFn creates a basic volume location lookup function with simple caching.
//
// Deprecated: Use wdclient.FilerClient instead. This function has several limitations compared to wdclient.FilerClient:
// - Simple bounded cache (10k entries, no eviction policy or TTL for stale entries)
// - No singleflight deduplication (concurrent requests for same volume will duplicate work)
// - No cache history for volume moves (no fallback chain when volumes migrate)
// - No high availability (single filer address, no automatic failover)
//
// For NEW code, especially mount operations, use wdclient.FilerClient instead:
// filerClient := wdclient.NewFilerClient(filerAddresses, grpcDialOption, dataCenter, opts)
// lookupFn := filerClient.GetLookupFileIdFunction()
//
// This provides:
// - Bounded cache with configurable size
// - Singleflight deduplication of concurrent lookups
// - Cache history when volumes move
// - Battle-tested vidMap with cache chain
//
// This function is kept for backward compatibility with existing code paths
// (shell commands, streaming, etc.) but should be avoided in long-running processes
// or multi-tenant deployments where unbounded memory growth is a concern.
//
// Maximum recommended cache entries: ~10,000 volumes per process.
// Beyond this, consider migrating to wdclient.FilerClient.
func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionType {
vidCache := make(map[string]*filer_pb.Locations)
var vicCacheLock sync.RWMutex
var vidCacheLock sync.RWMutex
cacheSize := 0
const maxCacheSize = 10000 // Simple bound to prevent unbounded growth
return func(ctx context.Context, fileId string) (targetUrls []string, err error) {
vid := VolumeId(fileId)
vicCacheLock.RLock()
vidCacheLock.RLock()
locations, found := vidCache[vid]
vicCacheLock.RUnlock()
vidCacheLock.RUnlock()
if !found {
util.Retry("lookup volume "+vid, func() error {
@ -51,9 +78,17 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
glog.V(0).InfofCtx(ctx, "failed to locate %s", fileId)
return fmt.Errorf("failed to locate %s", fileId)
}
vicCacheLock.Lock()
vidCache[vid] = locations
vicCacheLock.Unlock()
vidCacheLock.Lock()
// Simple size limit to prevent unbounded growth
// For proper cache management, use wdclient.FilerClient instead
if cacheSize < maxCacheSize {
vidCache[vid] = locations
cacheSize++
} else if cacheSize == maxCacheSize {
glog.Warningf("filer.LookupFn cache reached limit of %d volumes, not caching new entries. Consider migrating to wdclient.FilerClient for bounded cache management.", maxCacheSize)
cacheSize++ // Only log once
}
vidCacheLock.Unlock()
return nil
})

43
weed/iamapi/iamapi_server.go

@ -12,6 +12,7 @@ import (
"github.com/gorilla/mux"
"github.com/seaweedfs/seaweedfs/weed/credential"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/iam_pb"
@ -45,8 +46,11 @@ type IamServerOption struct {
}
type IamApiServer struct {
s3ApiConfig IamS3ApiConfig
iam *s3api.IdentityAccessManagement
s3ApiConfig IamS3ApiConfig
iam *s3api.IdentityAccessManagement
shutdownContext context.Context
shutdownCancel context.CancelFunc
masterClient *wdclient.MasterClient
}
var s3ApiConfigure IamS3ApiConfig
@ -56,9 +60,21 @@ func NewIamApiServer(router *mux.Router, option *IamServerOption) (iamApiServer
}
func NewIamApiServerWithStore(router *mux.Router, option *IamServerOption, explicitStore string) (iamApiServer *IamApiServer, err error) {
masterClient := wdclient.NewMasterClient(option.GrpcDialOption, "", "iam", "", "", "", *pb.NewServiceDiscoveryFromMap(option.Masters))
// Create a cancellable context for the master client connection
// This allows graceful shutdown via Shutdown() method
shutdownCtx, shutdownCancel := context.WithCancel(context.Background())
// Start KeepConnectedToMaster for volume location lookups
// IAM config files are typically small and inline, but if they ever have chunks,
// ReadEntry→StreamContent needs masterClient for volume lookups
glog.V(0).Infof("IAM API starting master client connection for volume location lookups")
go masterClient.KeepConnectedToMaster(shutdownCtx)
configure := &IamS3ApiConfigure{
option: option,
masterClient: wdclient.NewMasterClient(option.GrpcDialOption, "", "iam", "", "", "", *pb.NewServiceDiscoveryFromMap(option.Masters)),
masterClient: masterClient,
}
s3ApiConfigure = configure
@ -72,8 +88,11 @@ func NewIamApiServerWithStore(router *mux.Router, option *IamServerOption, expli
configure.credentialManager = iam.GetCredentialManager()
iamApiServer = &IamApiServer{
s3ApiConfig: s3ApiConfigure,
iam: iam,
s3ApiConfig: s3ApiConfigure,
iam: iam,
shutdownContext: shutdownCtx,
shutdownCancel: shutdownCancel,
masterClient: masterClient,
}
iamApiServer.registerRouter(router)
@ -93,6 +112,20 @@ func (iama *IamApiServer) registerRouter(router *mux.Router) {
apiRouter.NotFoundHandler = http.HandlerFunc(s3err.NotFoundHandler)
}
// Shutdown gracefully stops the IAM API server and releases resources.
// It cancels the master client connection goroutine and closes gRPC connections.
// This method is safe to call multiple times.
//
// Note: This method is called via defer in weed/command/iam.go for best-effort cleanup.
// For proper graceful shutdown on SIGTERM/SIGINT, signal handling should be added to
// the command layer to call this method before process exit.
func (iama *IamApiServer) Shutdown() {
if iama.shutdownCancel != nil {
glog.V(0).Infof("IAM API server shutting down, stopping master client connection")
iama.shutdownCancel()
}
}
func (iama *IamS3ApiConfigure) GetS3ApiConfiguration(s3cfg *iam_pb.S3ApiConfiguration) (err error) {
return iama.GetS3ApiConfigurationFromCredentialManager(s3cfg)
}

27
weed/mount/weedfs.go

@ -97,9 +97,32 @@ type WFS struct {
fhLockTable *util.LockTable[FileHandleId]
rdmaClient *RDMAMountClient
FilerConf *filer.FilerConf
filerClient *wdclient.FilerClient // Cached volume location client
}
func NewSeaweedFileSystem(option *Option) *WFS {
// Only create FilerClient for direct volume access modes
// When VolumeServerAccess == "filerProxy", all reads go through filer, so no volume lookup needed
var filerClient *wdclient.FilerClient
if option.VolumeServerAccess != "filerProxy" {
// Create FilerClient for efficient volume location caching
// Pass all filer addresses for high availability with automatic failover
// Configure URL preference based on VolumeServerAccess option
var opts *wdclient.FilerClientOption
if option.VolumeServerAccess == "publicUrl" {
opts = &wdclient.FilerClientOption{
UrlPreference: wdclient.PreferPublicUrl,
}
}
filerClient = wdclient.NewFilerClient(
option.FilerAddresses, // Pass all filer addresses for HA
option.GrpcDialOption,
option.DataCenter,
opts,
)
}
wfs := &WFS{
RawFileSystem: fuse.NewDefaultRawFileSystem(),
option: option,
@ -107,6 +130,7 @@ func NewSeaweedFileSystem(option *Option) *WFS {
inodeToPath: NewInodeToPath(util.FullPath(option.FilerMountRootPath), option.CacheMetaTTlSec),
fhMap: NewFileHandleToInode(),
dhMap: NewDirectoryHandleToInode(),
filerClient: filerClient, // nil for proxy mode, initialized for direct access
fhLockTable: util.NewLockTable[FileHandleId](),
}
@ -253,7 +277,8 @@ func (wfs *WFS) LookupFn() wdclient.LookupFileIdFunctionType {
return []string{"http://" + wfs.getCurrentFiler().ToHttpAddress() + "/?proxyChunkId=" + fileId}, nil
}
}
return filer.LookupFn(wfs)
// Use the cached FilerClient for efficient lookups with singleflight and cache history
return wfs.filerClient.GetLookupFileIdFunction()
}
func (wfs *WFS) getCurrentFiler() pb.ServerAddress {

5
weed/s3api/s3_constants/header.go

@ -23,6 +23,11 @@ import (
"github.com/gorilla/mux"
)
// S3 XML namespace
const (
S3Namespace = "http://s3.amazonaws.com/doc/2006-03-01/"
)
// Standard S3 HTTP request constants
const (
// S3 storage class

6
weed/s3api/s3api_bucket_handlers_object_lock_config.go

@ -86,6 +86,9 @@ func (s3a *S3ApiServer) GetObjectLockConfigurationHandler(w http.ResponseWriter,
// Check if we have cached Object Lock configuration
if bucketConfig.ObjectLockConfig != nil {
// Set namespace for S3 compatibility
bucketConfig.ObjectLockConfig.XMLNS = s3_constants.S3Namespace
// Use cached configuration and marshal it to XML for response
marshaledXML, err := xml.Marshal(bucketConfig.ObjectLockConfig)
if err != nil {
@ -139,6 +142,9 @@ func (s3a *S3ApiServer) GetObjectLockConfigurationHandler(w http.ResponseWriter,
// not just ObjectLockConfig, before resetting the TTL
s3a.updateBucketConfigCacheFromEntry(freshEntry)
// Set namespace for S3 compatibility
objectLockConfig.XMLNS = s3_constants.S3Namespace
// Marshal and return the configuration
marshaledXML, err := xml.Marshal(objectLockConfig)
if err != nil {

33
weed/s3api/s3api_object_handlers.go

@ -24,7 +24,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"github.com/seaweedfs/seaweedfs/weed/util/mem"
"github.com/seaweedfs/seaweedfs/weed/glog"
@ -994,36 +993,10 @@ var volumeServerHTTPClient = &http.Client{
}
// createLookupFileIdFunction creates a reusable lookup function for resolving volume URLs
// Uses FilerClient's vidMap cache to eliminate per-chunk gRPC overhead
func (s3a *S3ApiServer) createLookupFileIdFunction() func(context.Context, string) ([]string, error) {
return func(ctx context.Context, fileId string) ([]string, error) {
var urls []string
err := s3a.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
vid := filer.VolumeId(fileId)
resp, err := client.LookupVolume(ctx, &filer_pb.LookupVolumeRequest{
VolumeIds: []string{vid},
})
if err != nil {
return err
}
if locs, found := resp.LocationsMap[vid]; found {
for _, loc := range locs.Locations {
// Build complete URL with volume server address and fileId
// The fileId parameter contains the full "volumeId,fileKey" identifier (e.g., "3,01637037d6")
// This constructs URLs like: http://127.0.0.1:8080/3,01637037d6 (or https:// if configured)
// NormalizeUrl ensures the proper scheme (http:// or https://) is used based on configuration
normalizedUrl, err := util_http.NormalizeUrl(loc.Url)
if err != nil {
glog.Warningf("Failed to normalize URL for %s: %v", loc.Url, err)
continue
}
urls = append(urls, normalizedUrl+"/"+fileId)
}
}
return nil
})
glog.V(3).Infof("createLookupFileIdFunction: fileId=%s, resolved urls=%v", fileId, urls)
return urls, err
}
// Return the FilerClient's lookup function which uses the battle-tested vidMap cache
return s3a.filerClient.GetLookupFileIdFunction()
}
// streamFromVolumeServersWithSSE handles streaming with inline SSE decryption

39
weed/s3api/s3api_object_retention.go

@ -57,37 +57,40 @@ const (
// ObjectRetention represents S3 Object Retention configuration
type ObjectRetention struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Retention"`
Mode string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Mode,omitempty"`
RetainUntilDate *time.Time `xml:"http://s3.amazonaws.com/doc/2006-03-01/ RetainUntilDate,omitempty"`
XMLNS string `xml:"xmlns,attr,omitempty"`
XMLName xml.Name `xml:"Retention"`
Mode string `xml:"Mode,omitempty"`
RetainUntilDate *time.Time `xml:"RetainUntilDate,omitempty"`
}
// ObjectLegalHold represents S3 Object Legal Hold configuration
type ObjectLegalHold struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ LegalHold"`
Status string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Status,omitempty"`
XMLNS string `xml:"xmlns,attr,omitempty"`
XMLName xml.Name `xml:"LegalHold"`
Status string `xml:"Status,omitempty"`
}
// ObjectLockConfiguration represents S3 Object Lock Configuration
type ObjectLockConfiguration struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ObjectLockConfiguration"`
ObjectLockEnabled string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ ObjectLockEnabled,omitempty"`
Rule *ObjectLockRule `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Rule,omitempty"`
XMLNS string `xml:"xmlns,attr,omitempty"`
XMLName xml.Name `xml:"ObjectLockConfiguration"`
ObjectLockEnabled string `xml:"ObjectLockEnabled,omitempty"`
Rule *ObjectLockRule `xml:"Rule,omitempty"`
}
// ObjectLockRule represents an Object Lock Rule
type ObjectLockRule struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Rule"`
DefaultRetention *DefaultRetention `xml:"http://s3.amazonaws.com/doc/2006-03-01/ DefaultRetention,omitempty"`
XMLName xml.Name `xml:"Rule"`
DefaultRetention *DefaultRetention `xml:"DefaultRetention,omitempty"`
}
// DefaultRetention represents default retention settings
// Implements custom XML unmarshal to track if Days/Years were present in XML
type DefaultRetention struct {
XMLName xml.Name `xml:"http://s3.amazonaws.com/doc/2006-03-01/ DefaultRetention"`
Mode string `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Mode,omitempty"`
Days int `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Days,omitempty"`
Years int `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Years,omitempty"`
XMLName xml.Name `xml:"DefaultRetention"`
Mode string `xml:"Mode,omitempty"`
Days int `xml:"Days,omitempty"`
Years int `xml:"Years,omitempty"`
DaysSet bool `xml:"-"`
YearsSet bool `xml:"-"`
}
@ -102,8 +105,8 @@ func (dr *DefaultRetention) UnmarshalXML(d *xml.Decoder, start xml.StartElement)
type Alias DefaultRetention
aux := &struct {
*Alias
Days *int `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Days,omitempty"`
Years *int `xml:"http://s3.amazonaws.com/doc/2006-03-01/ Years,omitempty"`
Days *int `xml:"Days,omitempty"`
Years *int `xml:"Years,omitempty"`
}{Alias: (*Alias)(dr)}
if err := d.DecodeElement(aux, &start); err != nil {
glog.V(2).Infof("DefaultRetention.UnmarshalXML: decode error: %v", err)
@ -245,6 +248,8 @@ func (s3a *S3ApiServer) getObjectRetention(bucket, object, versionId string) (*O
return nil, ErrNoRetentionConfiguration
}
// Set namespace for S3 compatibility
retention.XMLNS = s3_constants.S3Namespace
return retention, nil
}
@ -386,6 +391,8 @@ func (s3a *S3ApiServer) getObjectLegalHold(bucket, object, versionId string) (*O
return nil, ErrNoLegalHoldConfiguration
}
// Set namespace for S3 compatibility
legalHold.XMLNS = s3_constants.S3Namespace
return legalHold, nil
}

76
weed/s3api/s3api_object_retention_test.go

@ -201,6 +201,30 @@ func TestParseObjectRetention(t *testing.T) {
RetainUntilDate: timePtr(time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)),
},
},
{
name: "Valid retention XML without namespace (Veeam compatibility)",
xmlBody: `<Retention>
<Mode>GOVERNANCE</Mode>
<RetainUntilDate>2024-12-31T23:59:59Z</RetainUntilDate>
</Retention>`,
expectError: false,
expectedResult: &ObjectRetention{
Mode: "GOVERNANCE",
RetainUntilDate: timePtr(time.Date(2024, 12, 31, 23, 59, 59, 0, time.UTC)),
},
},
{
name: "Valid compliance retention XML without namespace (Veeam compatibility)",
xmlBody: `<Retention>
<Mode>COMPLIANCE</Mode>
<RetainUntilDate>2025-01-01T00:00:00Z</RetainUntilDate>
</Retention>`,
expectError: false,
expectedResult: &ObjectRetention{
Mode: "COMPLIANCE",
RetainUntilDate: timePtr(time.Date(2025, 1, 1, 0, 0, 0, 0, time.UTC)),
},
},
{
name: "Empty XML body",
xmlBody: "",
@ -311,6 +335,26 @@ func TestParseObjectLegalHold(t *testing.T) {
Status: "OFF",
},
},
{
name: "Valid legal hold ON without namespace",
xmlBody: `<LegalHold>
<Status>ON</Status>
</LegalHold>`,
expectError: false,
expectedResult: &ObjectLegalHold{
Status: "ON",
},
},
{
name: "Valid legal hold OFF without namespace",
xmlBody: `<LegalHold>
<Status>OFF</Status>
</LegalHold>`,
expectError: false,
expectedResult: &ObjectLegalHold{
Status: "OFF",
},
},
{
name: "Empty XML body",
xmlBody: "",
@ -405,6 +449,38 @@ func TestParseObjectLockConfiguration(t *testing.T) {
},
},
},
{
name: "Valid object lock configuration without namespace",
xmlBody: `<ObjectLockConfiguration>
<ObjectLockEnabled>Enabled</ObjectLockEnabled>
</ObjectLockConfiguration>`,
expectError: false,
expectedResult: &ObjectLockConfiguration{
ObjectLockEnabled: "Enabled",
},
},
{
name: "Valid object lock configuration with rule without namespace",
xmlBody: `<ObjectLockConfiguration>
<ObjectLockEnabled>Enabled</ObjectLockEnabled>
<Rule>
<DefaultRetention>
<Mode>GOVERNANCE</Mode>
<Days>30</Days>
</DefaultRetention>
</Rule>
</ObjectLockConfiguration>`,
expectError: false,
expectedResult: &ObjectLockConfiguration{
ObjectLockEnabled: "Enabled",
Rule: &ObjectLockRule{
DefaultRetention: &DefaultRetention{
Mode: "GOVERNANCE",
Days: 30,
},
},
},
},
{
name: "Empty XML body",
xmlBody: "",

9
weed/s3api/s3api_server.go

@ -19,6 +19,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/iam/sts"
"github.com/seaweedfs/seaweedfs/weed/pb/s3_pb"
"github.com/seaweedfs/seaweedfs/weed/util/grace"
"github.com/seaweedfs/seaweedfs/weed/wdclient"
"github.com/gorilla/mux"
"github.com/seaweedfs/seaweedfs/weed/pb"
@ -55,6 +56,7 @@ type S3ApiServer struct {
cb *CircuitBreaker
randomClientId int32
filerGuard *security.Guard
filerClient *wdclient.FilerClient
client util_http_client.HTTPClientInterface
bucketRegistry *BucketRegistry
credentialManager *credential.CredentialManager
@ -91,11 +93,18 @@ func NewS3ApiServerWithStore(router *mux.Router, option *S3ApiServerOption, expl
// Initialize bucket policy engine first
policyEngine := NewBucketPolicyEngine()
// Initialize FilerClient for volume location caching
// Uses the battle-tested vidMap with filer-based lookups
// S3 API typically connects to a single filer, but wrap in slice for consistency
filerClient := wdclient.NewFilerClient([]pb.ServerAddress{option.Filer}, option.GrpcDialOption, option.DataCenter)
glog.V(0).Infof("S3 API initialized FilerClient for volume location caching")
s3ApiServer = &S3ApiServer{
option: option,
iam: iam,
randomClientId: util.RandomInt32(),
filerGuard: security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec),
filerClient: filerClient,
cb: NewCircuitBreaker(option),
credentialManager: iam.credentialManager,
bucketConfigCache: NewBucketConfigCache(60 * time.Minute), // Increased TTL since cache is now event-driven

2
weed/server/filer_server.go

@ -178,7 +178,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
fs.checkWithMaster()
go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec)
go fs.filer.KeepMasterClientConnected(context.Background())
go fs.filer.MasterClient.KeepConnectedToMaster(context.Background())
fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete")
v.SetDefault("filer.options.buckets_folder", "/buckets")

404
weed/wdclient/filer_client.go

@ -0,0 +1,404 @@
package wdclient
import (
"context"
"fmt"
"math/rand"
"strings"
"sync/atomic"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
)
// UrlPreference controls which URL to use for volume access
type UrlPreference string
const (
PreferUrl UrlPreference = "url" // Use private URL (default)
PreferPublicUrl UrlPreference = "publicUrl" // Use public URL
)
// filerHealth tracks the health status of a filer
type filerHealth struct {
failureCount int32 // atomic: consecutive failures
lastFailureTimeNs int64 // atomic: last failure time in Unix nanoseconds
}
// FilerClient provides volume location services by querying a filer
// It uses the shared vidMap cache for efficient lookups
// Supports multiple filer addresses with automatic failover for high availability
// Tracks filer health to avoid repeatedly trying known-unhealthy filers
type FilerClient struct {
*vidMapClient
filerAddresses []pb.ServerAddress
filerIndex int32 // atomic: current filer index for round-robin
filerHealth []*filerHealth // health status per filer (same order as filerAddresses)
grpcDialOption grpc.DialOption
urlPreference UrlPreference
grpcTimeout time.Duration
cacheSize int // Number of historical vidMap snapshots to keep
clientId int32 // Unique client identifier for gRPC metadata
failureThreshold int32 // Circuit breaker: consecutive failures before circuit opens
resetTimeout time.Duration // Circuit breaker: time before re-checking unhealthy filer
maxRetries int // Retry: maximum retry attempts for transient failures
initialRetryWait time.Duration // Retry: initial wait time before first retry
retryBackoffFactor float64 // Retry: backoff multiplier for wait time
}
// filerVolumeProvider implements VolumeLocationProvider by querying filer
// Supports multiple filer addresses with automatic failover
type filerVolumeProvider struct {
filerClient *FilerClient
}
// FilerClientOption holds optional configuration for FilerClient
type FilerClientOption struct {
GrpcTimeout time.Duration
UrlPreference UrlPreference
CacheSize int // Number of historical vidMap snapshots (0 = use default)
FailureThreshold int32 // Circuit breaker: consecutive failures before skipping filer (0 = use default of 3)
ResetTimeout time.Duration // Circuit breaker: time before re-checking unhealthy filer (0 = use default of 30s)
MaxRetries int // Retry: maximum retry attempts for transient failures (0 = use default of 3)
InitialRetryWait time.Duration // Retry: initial wait time before first retry (0 = use default of 1s)
RetryBackoffFactor float64 // Retry: backoff multiplier for wait time (0 = use default of 1.5)
}
// NewFilerClient creates a new client that queries filer(s) for volume locations
// Supports multiple filer addresses for high availability with automatic failover
// Uses sensible defaults: 5-second gRPC timeout, PreferUrl, DefaultVidMapCacheSize
func NewFilerClient(filerAddresses []pb.ServerAddress, grpcDialOption grpc.DialOption, dataCenter string, opts ...*FilerClientOption) *FilerClient {
if len(filerAddresses) == 0 {
glog.Fatal("NewFilerClient requires at least one filer address")
}
// Apply defaults
grpcTimeout := 5 * time.Second
urlPref := PreferUrl
cacheSize := DefaultVidMapCacheSize
failureThreshold := int32(3) // Default: 3 consecutive failures before circuit opens
resetTimeout := 30 * time.Second // Default: 30 seconds before re-checking unhealthy filer
maxRetries := 3 // Default: 3 retry attempts for transient failures
initialRetryWait := time.Second // Default: 1 second initial retry wait
retryBackoffFactor := 1.5 // Default: 1.5x backoff multiplier
// Override with provided options
if len(opts) > 0 && opts[0] != nil {
opt := opts[0]
if opt.GrpcTimeout > 0 {
grpcTimeout = opt.GrpcTimeout
}
if opt.UrlPreference != "" {
urlPref = opt.UrlPreference
}
if opt.CacheSize > 0 {
cacheSize = opt.CacheSize
}
if opt.FailureThreshold > 0 {
failureThreshold = opt.FailureThreshold
}
if opt.ResetTimeout > 0 {
resetTimeout = opt.ResetTimeout
}
if opt.MaxRetries > 0 {
maxRetries = opt.MaxRetries
}
if opt.InitialRetryWait > 0 {
initialRetryWait = opt.InitialRetryWait
}
if opt.RetryBackoffFactor > 0 {
retryBackoffFactor = opt.RetryBackoffFactor
}
}
// Initialize health tracking for each filer
health := make([]*filerHealth, len(filerAddresses))
for i := range health {
health[i] = &filerHealth{}
}
fc := &FilerClient{
filerAddresses: filerAddresses,
filerIndex: 0,
filerHealth: health,
grpcDialOption: grpcDialOption,
urlPreference: urlPref,
grpcTimeout: grpcTimeout,
cacheSize: cacheSize,
clientId: rand.Int31(), // Random client ID for gRPC metadata tracking
failureThreshold: failureThreshold,
resetTimeout: resetTimeout,
maxRetries: maxRetries,
initialRetryWait: initialRetryWait,
retryBackoffFactor: retryBackoffFactor,
}
// Create provider that references this FilerClient for failover support
provider := &filerVolumeProvider{
filerClient: fc,
}
fc.vidMapClient = newVidMapClient(provider, dataCenter, cacheSize)
return fc
}
// GetLookupFileIdFunction returns a lookup function with URL preference handling
func (fc *FilerClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
if fc.urlPreference == PreferUrl {
// Use the default implementation from vidMapClient
return fc.vidMapClient.GetLookupFileIdFunction()
}
// Custom implementation that prefers PublicUrl
return func(ctx context.Context, fileId string) (fullUrls []string, err error) {
// Parse file ID to extract volume ID
parts := strings.Split(fileId, ",")
if len(parts) != 2 {
return nil, fmt.Errorf("invalid fileId format: %s", fileId)
}
volumeIdStr := parts[0]
// First try the cache using LookupVolumeIdsWithFallback
vidLocations, err := fc.LookupVolumeIdsWithFallback(ctx, []string{volumeIdStr})
// Check for partial results first (important for multi-volume batched lookups)
locations, found := vidLocations[volumeIdStr]
if !found || len(locations) == 0 {
// Volume not found - return specific error with context from lookup if available
if err != nil {
return nil, fmt.Errorf("volume %s not found for fileId %s: %w", volumeIdStr, fileId, err)
}
return nil, fmt.Errorf("volume %s not found for fileId %s", volumeIdStr, fileId)
}
// Volume found successfully - ignore any errors about other volumes
// (not relevant for single-volume lookup, but defensive for future batching)
// Build URLs with publicUrl preference, and also prefer same DC
var sameDcUrls, otherDcUrls []string
dataCenter := fc.GetDataCenter()
for _, loc := range locations {
url := loc.PublicUrl
if url == "" {
url = loc.Url
}
httpUrl := "http://" + url + "/" + fileId
if dataCenter != "" && dataCenter == loc.DataCenter {
sameDcUrls = append(sameDcUrls, httpUrl)
} else {
otherDcUrls = append(otherDcUrls, httpUrl)
}
}
// Shuffle to distribute load across volume servers
rand.Shuffle(len(sameDcUrls), func(i, j int) { sameDcUrls[i], sameDcUrls[j] = sameDcUrls[j], sameDcUrls[i] })
rand.Shuffle(len(otherDcUrls), func(i, j int) { otherDcUrls[i], otherDcUrls[j] = otherDcUrls[j], otherDcUrls[i] })
// Prefer same data center
fullUrls = append(sameDcUrls, otherDcUrls...)
return fullUrls, nil
}
}
// isRetryableGrpcError checks if a gRPC error is transient and should be retried
//
// Note on codes.Aborted: While Aborted can indicate application-level conflicts
// (e.g., transaction failures), in the context of volume location lookups (which
// are simple read-only operations with no transactions), Aborted is more likely
// to indicate transient server issues during restart/recovery. We include it here
// for volume lookups but log it for visibility in case misclassification occurs.
func isRetryableGrpcError(err error) bool {
if err == nil {
return false
}
// Check gRPC status code
st, ok := status.FromError(err)
if ok {
switch st.Code() {
case codes.Unavailable: // Server unavailable (temporary)
return true
case codes.DeadlineExceeded: // Request timeout
return true
case codes.ResourceExhausted: // Rate limited or overloaded
return true
case codes.Aborted:
// Aborted during read-only volume lookups is likely transient
// (e.g., filer restarting), but log for visibility
glog.V(1).Infof("Treating Aborted as retryable for volume lookup: %v", err)
return true
}
}
// Fallback to string matching for non-gRPC errors (e.g., network errors)
errStr := err.Error()
return strings.Contains(errStr, "transport") ||
strings.Contains(errStr, "connection") ||
strings.Contains(errStr, "timeout") ||
strings.Contains(errStr, "unavailable")
}
// shouldSkipUnhealthyFiler checks if we should skip a filer based on recent failures
// Circuit breaker pattern: skip filers with multiple recent consecutive failures
func (fc *FilerClient) shouldSkipUnhealthyFiler(index int32) bool {
health := fc.filerHealth[index]
failureCount := atomic.LoadInt32(&health.failureCount)
// Check if failure count exceeds threshold
if failureCount < fc.failureThreshold {
return false
}
// Re-check unhealthy filers after reset timeout
lastFailureNs := atomic.LoadInt64(&health.lastFailureTimeNs)
if lastFailureNs == 0 {
return false // Never failed, shouldn't skip
}
lastFailureTime := time.Unix(0, lastFailureNs)
if time.Since(lastFailureTime) > fc.resetTimeout {
return false // Time to re-check
}
return true // Skip this unhealthy filer
}
// recordFilerSuccess resets failure tracking for a successful filer
func (fc *FilerClient) recordFilerSuccess(index int32) {
health := fc.filerHealth[index]
atomic.StoreInt32(&health.failureCount, 0)
}
// recordFilerFailure increments failure count for an unhealthy filer
func (fc *FilerClient) recordFilerFailure(index int32) {
health := fc.filerHealth[index]
atomic.AddInt32(&health.failureCount, 1)
atomic.StoreInt64(&health.lastFailureTimeNs, time.Now().UnixNano())
}
// LookupVolumeIds queries the filer for volume locations with automatic failover
// Tries all configured filer addresses until one succeeds (high availability)
// Retries transient gRPC errors (Unavailable, DeadlineExceeded, etc.) with exponential backoff
// Note: Unlike master's VolumeIdLocation, filer's Locations message doesn't currently have
// an Error field. This implementation handles the current structure while being prepared
// for future error reporting enhancements.
func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []string) (map[string][]Location, error) {
fc := p.filerClient
result := make(map[string][]Location)
// Retry transient failures with configurable backoff
var lastErr error
waitTime := fc.initialRetryWait
maxRetries := fc.maxRetries
for retry := 0; retry < maxRetries; retry++ {
// Try all filer addresses with round-robin starting from current index
// Skip known-unhealthy filers (circuit breaker pattern)
i := atomic.LoadInt32(&fc.filerIndex)
n := int32(len(fc.filerAddresses))
for x := int32(0); x < n; x++ {
// Circuit breaker: skip unhealthy filers
if fc.shouldSkipUnhealthyFiler(i) {
glog.V(2).Infof("FilerClient: skipping unhealthy filer %s (consecutive failures: %d)",
fc.filerAddresses[i], atomic.LoadInt32(&fc.filerHealth[i].failureCount))
i++
if i >= n {
i = 0
}
continue
}
filerAddress := fc.filerAddresses[i]
// Use anonymous function to ensure defer cancel() is called per iteration, not accumulated
err := func() error {
// Create a fresh timeout context for each filer attempt
// This ensures each retry gets the full grpcTimeout, not a diminishing deadline
timeoutCtx, cancel := context.WithTimeout(ctx, fc.grpcTimeout)
defer cancel() // Always clean up context, even on panic or early return
return pb.WithGrpcFilerClient(false, fc.clientId, filerAddress, fc.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.LookupVolume(timeoutCtx, &filer_pb.LookupVolumeRequest{
VolumeIds: volumeIds,
})
if err != nil {
return fmt.Errorf("filer.LookupVolume failed: %w", err)
}
// Process each volume in the response
for vid, locs := range resp.LocationsMap {
// Convert locations from protobuf to internal format
var locations []Location
for _, loc := range locs.Locations {
locations = append(locations, Location{
Url: loc.Url,
PublicUrl: loc.PublicUrl,
DataCenter: loc.DataCenter,
GrpcPort: int(loc.GrpcPort),
})
}
// Only add to result if we have locations
// Empty locations with no gRPC error means "not found" (volume doesn't exist)
if len(locations) > 0 {
result[vid] = locations
glog.V(4).Infof("FilerClient: volume %s found with %d location(s)", vid, len(locations))
} else {
glog.V(2).Infof("FilerClient: volume %s not found (no locations in response)", vid)
}
}
// Check for volumes that weren't in the response at all
// This could indicate a problem with the filer
for _, vid := range volumeIds {
if _, found := resp.LocationsMap[vid]; !found {
glog.V(1).Infof("FilerClient: volume %s missing from filer response", vid)
}
}
return nil
})
}()
if err != nil {
glog.V(1).Infof("FilerClient: filer %s lookup failed (attempt %d/%d, retry %d/%d): %v", filerAddress, x+1, n, retry+1, maxRetries, err)
fc.recordFilerFailure(i)
lastErr = err
i++
if i >= n {
i = 0
}
continue
}
// Success - update the preferred filer index and reset health tracking
atomic.StoreInt32(&fc.filerIndex, i)
fc.recordFilerSuccess(i)
glog.V(3).Infof("FilerClient: looked up %d volumes on %s, found %d", len(volumeIds), filerAddress, len(result))
return result, nil
}
// All filers failed on this attempt
// Check if the error is retryable (transient gRPC error)
if !isRetryableGrpcError(lastErr) {
// Non-retryable error (e.g., NotFound, PermissionDenied) - fail immediately
return nil, fmt.Errorf("all %d filer(s) failed with non-retryable error: %w", n, lastErr)
}
// Transient error - retry if we have attempts left
if retry < maxRetries-1 {
glog.V(1).Infof("FilerClient: all %d filer(s) failed with retryable error (attempt %d/%d), retrying in %v: %v",
n, retry+1, maxRetries, waitTime, lastErr)
time.Sleep(waitTime)
waitTime = time.Duration(float64(waitTime) * fc.retryBackoffFactor)
}
}
// All retries exhausted
return nil, fmt.Errorf("all %d filer(s) failed after %d attempts, last error: %w", len(fc.filerAddresses), maxRetries, lastErr)
}

546
weed/wdclient/masterclient.go

@ -5,328 +5,143 @@ import (
"errors"
"fmt"
"math/rand"
"sort"
"strconv"
"strings"
"sync"
"time"
"golang.org/x/sync/singleflight"
"github.com/seaweedfs/seaweedfs/weed/util/version"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util"
"google.golang.org/grpc"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/version"
)
type MasterClient struct {
FilerGroup string
clientType string
clientHost pb.ServerAddress
rack string
currentMaster pb.ServerAddress
currentMasterLock sync.RWMutex
masters pb.ServerDiscovery
grpcDialOption grpc.DialOption
// vidMap stores volume location mappings
// Protected by vidMapLock to prevent race conditions during pointer swaps in resetVidMap
vidMap *vidMap
vidMapLock sync.RWMutex
vidMapCacheSize int
OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
OnPeerUpdateLock sync.RWMutex
// Per-batch in-flight tracking to prevent duplicate lookups for the same set of volumes
vidLookupGroup singleflight.Group
}
func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters pb.ServerDiscovery) *MasterClient {
return &MasterClient{
FilerGroup: filerGroup,
clientType: clientType,
clientHost: clientHost,
rack: rack,
masters: masters,
grpcDialOption: grpcDialOption,
vidMap: newVidMap(clientDataCenter),
vidMapCacheSize: 5,
}
}
func (mc *MasterClient) SetOnPeerUpdateFn(onPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)) {
mc.OnPeerUpdateLock.Lock()
mc.OnPeerUpdate = onPeerUpdate
mc.OnPeerUpdateLock.Unlock()
}
func (mc *MasterClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
return mc.LookupFileIdWithFallback
// masterVolumeProvider implements VolumeLocationProvider by querying master
// This is rarely called since master pushes updates proactively via KeepConnected stream
type masterVolumeProvider struct {
masterClient *MasterClient
}
func (mc *MasterClient) LookupFileIdWithFallback(ctx context.Context, fileId string) (fullUrls []string, err error) {
// Try cache first using the fast path - grab both vidMap and dataCenter in one lock
mc.vidMapLock.RLock()
vm := mc.vidMap
dataCenter := vm.DataCenter
mc.vidMapLock.RUnlock()
fullUrls, err = vm.LookupFileId(ctx, fileId)
if err == nil && len(fullUrls) > 0 {
return
}
// Extract volume ID from file ID (format: "volumeId,needle_id_cookie")
parts := strings.Split(fileId, ",")
if len(parts) != 2 {
return nil, fmt.Errorf("invalid fileId %s", fileId)
}
volumeId := parts[0]
// Use shared lookup logic with batching and singleflight
vidLocations, err := mc.LookupVolumeIdsWithFallback(ctx, []string{volumeId})
if err != nil {
return nil, fmt.Errorf("LookupVolume %s failed: %v", fileId, err)
}
locations, found := vidLocations[volumeId]
if !found || len(locations) == 0 {
return nil, fmt.Errorf("volume %s not found for fileId %s", volumeId, fileId)
}
// Build HTTP URLs from locations, preferring same data center
var sameDcUrls, otherDcUrls []string
for _, loc := range locations {
httpUrl := "http://" + loc.Url + "/" + fileId
if dataCenter != "" && dataCenter == loc.DataCenter {
sameDcUrls = append(sameDcUrls, httpUrl)
} else {
otherDcUrls = append(otherDcUrls, httpUrl)
}
}
// Prefer same data center
fullUrls = append(sameDcUrls, otherDcUrls...)
return fullUrls, nil
}
// LookupVolumeIdsWithFallback looks up volume locations, querying master if not in cache
// Uses singleflight to coalesce concurrent requests for the same batch of volumes
func (mc *MasterClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeIds []string) (map[string][]Location, error) {
// LookupVolumeIds queries the master for volume locations (fallback when cache misses)
// Returns partial results with aggregated errors for volumes that failed
func (p *masterVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []string) (map[string][]Location, error) {
result := make(map[string][]Location)
var needsLookup []string
var lookupErrors []error
// Check cache first and parse volume IDs once
vidStringToUint := make(map[string]uint32, len(volumeIds))
glog.V(2).Infof("Looking up %d volumes from master: %v", len(volumeIds), volumeIds)
// Get stable pointer to vidMap with minimal lock hold time
vm := mc.getStableVidMap()
// Use a timeout for the master lookup to prevent indefinite blocking
timeoutCtx, cancel := context.WithTimeout(ctx, p.masterClient.grpcTimeout)
defer cancel()
for _, vidString := range volumeIds {
vid, err := strconv.ParseUint(vidString, 10, 32)
err := pb.WithMasterClient(false, p.masterClient.GetMaster(ctx), p.masterClient.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
resp, err := client.LookupVolume(timeoutCtx, &master_pb.LookupVolumeRequest{
VolumeOrFileIds: volumeIds,
})
if err != nil {
return nil, fmt.Errorf("invalid volume id %s: %v", vidString, err)
}
vidStringToUint[vidString] = uint32(vid)
locations, found := vm.GetLocations(uint32(vid))
if found && len(locations) > 0 {
result[vidString] = locations
} else {
needsLookup = append(needsLookup, vidString)
return fmt.Errorf("master lookup failed: %v", err)
}
}
if len(needsLookup) == 0 {
return result, nil
}
// Batch query all missing volumes using singleflight on the batch key
// Sort for stable key to coalesce identical batches
sort.Strings(needsLookup)
batchKey := strings.Join(needsLookup, ",")
sfResult, err, _ := mc.vidLookupGroup.Do(batchKey, func() (interface{}, error) {
// Double-check cache for volumes that might have been populated while waiting
stillNeedLookup := make([]string, 0, len(needsLookup))
batchResult := make(map[string][]Location)
// Get stable pointer with minimal lock hold time
vm := mc.getStableVidMap()
for _, vidString := range needsLookup {
vid := vidStringToUint[vidString] // Use pre-parsed value
if locations, found := vm.GetLocations(vid); found && len(locations) > 0 {
batchResult[vidString] = locations
} else {
stillNeedLookup = append(stillNeedLookup, vidString)
for _, vidLoc := range resp.VolumeIdLocations {
// Preserve per-volume errors from master response
// These could indicate misconfiguration, volume deletion, etc.
if vidLoc.Error != "" {
lookupErrors = append(lookupErrors, fmt.Errorf("volume %s: %s", vidLoc.VolumeOrFileId, vidLoc.Error))
glog.V(1).Infof("volume %s lookup error from master: %s", vidLoc.VolumeOrFileId, vidLoc.Error)
continue
}
}
if len(stillNeedLookup) == 0 {
return batchResult, nil
}
// Query master with batched volume IDs
glog.V(2).Infof("Looking up %d volumes from master: %v", len(stillNeedLookup), stillNeedLookup)
err := pb.WithMasterClient(false, mc.GetMaster(ctx), mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
resp, err := client.LookupVolume(ctx, &master_pb.LookupVolumeRequest{
VolumeOrFileIds: stillNeedLookup,
})
// Parse volume ID from response
parts := strings.Split(vidLoc.VolumeOrFileId, ",")
vidOnly := parts[0]
vid, err := strconv.ParseUint(vidOnly, 10, 32)
if err != nil {
return fmt.Errorf("master lookup failed: %v", err)
lookupErrors = append(lookupErrors, fmt.Errorf("volume %s: invalid volume ID format: %w", vidLoc.VolumeOrFileId, err))
glog.Warningf("Failed to parse volume id '%s' from master response '%s': %v", vidOnly, vidLoc.VolumeOrFileId, err)
continue
}
for _, vidLoc := range resp.VolumeIdLocations {
if vidLoc.Error != "" {
glog.V(0).Infof("volume %s lookup error: %s", vidLoc.VolumeOrFileId, vidLoc.Error)
continue
}
// Parse volume ID from response
parts := strings.Split(vidLoc.VolumeOrFileId, ",")
vidOnly := parts[0]
vid, err := strconv.ParseUint(vidOnly, 10, 32)
if err != nil {
glog.Warningf("Failed to parse volume id '%s' from master response '%s': %v", vidOnly, vidLoc.VolumeOrFileId, err)
continue
}
var locations []Location
for _, masterLoc := range vidLoc.Locations {
loc := Location{
Url: masterLoc.Url,
PublicUrl: masterLoc.PublicUrl,
GrpcPort: int(masterLoc.GrpcPort),
DataCenter: masterLoc.DataCenter,
}
mc.addLocation(uint32(vid), loc)
locations = append(locations, loc)
}
if len(locations) > 0 {
batchResult[vidOnly] = locations
var locations []Location
for _, masterLoc := range vidLoc.Locations {
loc := Location{
Url: masterLoc.Url,
PublicUrl: masterLoc.PublicUrl,
GrpcPort: int(masterLoc.GrpcPort),
DataCenter: masterLoc.DataCenter,
}
// Update cache with the location
p.masterClient.addLocation(uint32(vid), loc)
locations = append(locations, loc)
}
return nil
})
if err != nil {
return batchResult, err
if len(locations) > 0 {
result[vidOnly] = locations
}
}
return batchResult, nil
return nil
})
if err != nil {
lookupErrors = append(lookupErrors, err)
return nil, err
}
// Merge singleflight batch results
if batchLocations, ok := sfResult.(map[string][]Location); ok {
for vid, locs := range batchLocations {
result[vid] = locs
}
}
// Check for volumes that still weren't found
for _, vidString := range needsLookup {
if _, found := result[vidString]; !found {
lookupErrors = append(lookupErrors, fmt.Errorf("volume %s not found", vidString))
}
// Return partial results with detailed errors
// Callers should check both result map and error
if len(lookupErrors) > 0 {
glog.V(2).Infof("MasterClient: looked up %d volumes, found %d, %d errors", len(volumeIds), len(result), len(lookupErrors))
return result, fmt.Errorf("master volume lookup errors: %w", errors.Join(lookupErrors...))
}
// Return aggregated errors using errors.Join to preserve error types
return result, errors.Join(lookupErrors...)
glog.V(3).Infof("MasterClient: looked up %d volumes, found %d", len(volumeIds), len(result))
return result, nil
}
func (mc *MasterClient) getCurrentMaster() pb.ServerAddress {
mc.currentMasterLock.RLock()
defer mc.currentMasterLock.RUnlock()
return mc.currentMaster
}
// MasterClient connects to master servers and maintains volume location cache
// It receives real-time updates via KeepConnected streaming and uses vidMapClient for caching
type MasterClient struct {
*vidMapClient // Embedded cache with shared logic
func (mc *MasterClient) setCurrentMaster(master pb.ServerAddress) {
mc.currentMasterLock.Lock()
mc.currentMaster = master
mc.currentMasterLock.Unlock()
FilerGroup string
clientType string
clientHost pb.ServerAddress
rack string
currentMaster pb.ServerAddress
currentMasterLock sync.RWMutex
masters pb.ServerDiscovery
grpcDialOption grpc.DialOption
grpcTimeout time.Duration // Timeout for gRPC calls to master
OnPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)
OnPeerUpdateLock sync.RWMutex
}
func (mc *MasterClient) GetMaster(ctx context.Context) pb.ServerAddress {
mc.WaitUntilConnected(ctx)
return mc.getCurrentMaster()
}
func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters pb.ServerDiscovery) *MasterClient {
mc := &MasterClient{
FilerGroup: filerGroup,
clientType: clientType,
clientHost: clientHost,
rack: rack,
masters: masters,
grpcDialOption: grpcDialOption,
grpcTimeout: 5 * time.Second, // Default: 5 seconds for gRPC calls to master
}
func (mc *MasterClient) GetMasters(ctx context.Context) []pb.ServerAddress {
mc.WaitUntilConnected(ctx)
return mc.masters.GetInstances()
}
// Create provider that references this MasterClient
provider := &masterVolumeProvider{masterClient: mc}
func (mc *MasterClient) WaitUntilConnected(ctx context.Context) {
attempts := 0
for {
select {
case <-ctx.Done():
return
default:
currentMaster := mc.getCurrentMaster()
if currentMaster != "" {
return
}
attempts++
if attempts%100 == 0 { // Log every 100 attempts (roughly every 20 seconds)
glog.V(0).Infof("%s.%s WaitUntilConnected still waiting for master connection (attempt %d)...", mc.FilerGroup, mc.clientType, attempts)
}
time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond)
}
}
}
// Initialize embedded vidMapClient with the provider and default cache size
mc.vidMapClient = newVidMapClient(provider, clientDataCenter, DefaultVidMapCacheSize)
func (mc *MasterClient) KeepConnectedToMaster(ctx context.Context) {
glog.V(1).Infof("%s.%s masterClient bootstraps with masters %v", mc.FilerGroup, mc.clientType, mc.masters)
for {
select {
case <-ctx.Done():
glog.V(0).Infof("Connection to masters stopped: %v", ctx.Err())
return
default:
mc.tryAllMasters(ctx)
time.Sleep(time.Second)
}
}
return mc
}
func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddress) (leader string) {
for _, master := range mc.masters.GetInstances() {
if master == myMasterAddress {
continue
}
if grpcErr := pb.WithMasterClient(false, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Millisecond)
defer cancel()
resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{})
if err != nil {
return err
}
leader = resp.Leader
return nil
}); grpcErr != nil {
glog.V(0).Infof("connect to %s: %v", master, grpcErr)
}
if leader != "" {
glog.V(0).Infof("existing leader is %s", leader)
return
}
}
glog.V(0).Infof("No existing leader found!")
return
func (mc *MasterClient) SetOnPeerUpdateFn(onPeerUpdate func(update *master_pb.ClusterNodeUpdate, startFrom time.Time)) {
mc.OnPeerUpdateLock.Lock()
mc.OnPeerUpdate = onPeerUpdate
mc.OnPeerUpdateLock.Unlock()
}
func (mc *MasterClient) tryAllMasters(ctx context.Context) {
@ -393,6 +208,8 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server
mc.resetVidMap()
mc.updateVidMap(resp)
} else {
// First message from master is not VolumeLocation (e.g., ClusterNodeUpdate)
// Still need to reset cache to ensure we don't use stale data from previous master
mc.resetVidMap()
}
mc.setCurrentMaster(master)
@ -406,7 +223,8 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server
}
if resp.VolumeLocation != nil {
// maybe the leader is changed
// Check for leader change during the stream
// If master announces a new leader, reconnect to it
if resp.VolumeLocation.Leader != "" && string(mc.GetMaster(ctx)) != resp.VolumeLocation.Leader {
glog.V(0).Infof("currentMaster %v redirected to leader %v", mc.GetMaster(ctx), resp.VolumeLocation.Leader)
nextHintedLeader = pb.ServerAddress(resp.VolumeLocation.Leader)
@ -415,7 +233,6 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server
}
mc.updateVidMap(resp)
}
if resp.ClusterNodeUpdate != nil {
update := resp.ClusterNodeUpdate
mc.OnPeerUpdateLock.RLock()
@ -442,7 +259,7 @@ func (mc *MasterClient) tryConnectToMaster(ctx context.Context, master pb.Server
stats.MasterClientConnectCounter.WithLabelValues(stats.Failed).Inc()
glog.V(1).Infof("%s.%s masterClient failed to connect with master %v: %v", mc.FilerGroup, mc.clientType, master, gprcErr)
}
return
return nextHintedLeader
}
func (mc *MasterClient) updateVidMap(resp *master_pb.KeepConnectedResponse) {
@ -494,110 +311,103 @@ func (mc *MasterClient) WithClientCustomGetMaster(getMasterF func() pb.ServerAdd
})
}
// getStableVidMap gets a stable pointer to the vidMap, releasing the lock immediately.
// This is safe for read operations as the returned pointer is a stable snapshot,
// and the underlying vidMap methods have their own internal locking.
func (mc *MasterClient) getStableVidMap() *vidMap {
mc.vidMapLock.RLock()
vm := mc.vidMap
mc.vidMapLock.RUnlock()
return vm
}
// withCurrentVidMap executes a function with the current vidMap under a read lock.
// This is for methods that modify vidMap's internal state, ensuring the pointer
// is not swapped by resetVidMap during the operation. The actual map mutations
// are protected by vidMap's internal mutex.
func (mc *MasterClient) withCurrentVidMap(f func(vm *vidMap)) {
mc.vidMapLock.RLock()
defer mc.vidMapLock.RUnlock()
f(mc.vidMap)
}
// Public methods for external packages to access vidMap safely
// GetLocations safely retrieves volume locations
func (mc *MasterClient) GetLocations(vid uint32) (locations []Location, found bool) {
return mc.getStableVidMap().GetLocations(vid)
}
// GetLocationsClone safely retrieves a clone of volume locations
func (mc *MasterClient) GetLocationsClone(vid uint32) (locations []Location, found bool) {
return mc.getStableVidMap().GetLocationsClone(vid)
}
// GetVidLocations safely retrieves volume locations by string ID
func (mc *MasterClient) GetVidLocations(vid string) (locations []Location, err error) {
return mc.getStableVidMap().GetVidLocations(vid)
}
// LookupFileId safely looks up URLs for a file ID
func (mc *MasterClient) LookupFileId(ctx context.Context, fileId string) (fullUrls []string, err error) {
return mc.getStableVidMap().LookupFileId(ctx, fileId)
}
// LookupVolumeServerUrl safely looks up volume server URLs
func (mc *MasterClient) LookupVolumeServerUrl(vid string) (serverUrls []string, err error) {
return mc.getStableVidMap().LookupVolumeServerUrl(vid)
func (mc *MasterClient) getCurrentMaster() pb.ServerAddress {
mc.currentMasterLock.RLock()
defer mc.currentMasterLock.RUnlock()
return mc.currentMaster
}
// GetDataCenter safely retrieves the data center
func (mc *MasterClient) GetDataCenter() string {
return mc.getStableVidMap().DataCenter
func (mc *MasterClient) setCurrentMaster(master pb.ServerAddress) {
mc.currentMasterLock.Lock()
mc.currentMaster = master
mc.currentMasterLock.Unlock()
}
// Thread-safe helpers for vidMap operations
// addLocation adds a volume location
func (mc *MasterClient) addLocation(vid uint32, location Location) {
mc.withCurrentVidMap(func(vm *vidMap) {
vm.addLocation(vid, location)
})
// GetMaster returns the current master address, blocking until connected.
//
// IMPORTANT: This method blocks until KeepConnectedToMaster successfully establishes
// a connection to a master server. If KeepConnectedToMaster hasn't been started in a
// background goroutine, this will block indefinitely (or until ctx is canceled).
//
// Typical initialization pattern:
//
// mc := wdclient.NewMasterClient(...)
// go mc.KeepConnectedToMaster(ctx) // Start connection management
// // ... later ...
// master := mc.GetMaster(ctx) // Will block until connected
//
// If called before KeepConnectedToMaster establishes a connection, this may cause
// unexpected timeouts in LookupVolumeIds and other operations that depend on it.
func (mc *MasterClient) GetMaster(ctx context.Context) pb.ServerAddress {
mc.WaitUntilConnected(ctx)
return mc.getCurrentMaster()
}
// deleteLocation removes a volume location
func (mc *MasterClient) deleteLocation(vid uint32, location Location) {
mc.withCurrentVidMap(func(vm *vidMap) {
vm.deleteLocation(vid, location)
})
// GetMasters returns all configured master addresses, blocking until connected.
// See GetMaster() for important initialization contract details.
func (mc *MasterClient) GetMasters(ctx context.Context) []pb.ServerAddress {
mc.WaitUntilConnected(ctx)
return mc.masters.GetInstances()
}
// addEcLocation adds an EC volume location
func (mc *MasterClient) addEcLocation(vid uint32, location Location) {
mc.withCurrentVidMap(func(vm *vidMap) {
vm.addEcLocation(vid, location)
})
// WaitUntilConnected blocks until a master connection is established or ctx is canceled.
// This does NOT initiate connections - it only waits for KeepConnectedToMaster to succeed.
func (mc *MasterClient) WaitUntilConnected(ctx context.Context) {
attempts := 0
for {
select {
case <-ctx.Done():
return
default:
currentMaster := mc.getCurrentMaster()
if currentMaster != "" {
return
}
attempts++
if attempts%100 == 0 { // Log every 100 attempts (roughly every 20 seconds)
glog.V(0).Infof("%s.%s WaitUntilConnected still waiting for master connection (attempt %d)...", mc.FilerGroup, mc.clientType, attempts)
}
time.Sleep(time.Duration(rand.Int31n(200)) * time.Millisecond)
}
}
}
// deleteEcLocation removes an EC volume location
func (mc *MasterClient) deleteEcLocation(vid uint32, location Location) {
mc.withCurrentVidMap(func(vm *vidMap) {
vm.deleteEcLocation(vid, location)
})
func (mc *MasterClient) KeepConnectedToMaster(ctx context.Context) {
glog.V(1).Infof("%s.%s masterClient bootstraps with masters %v", mc.FilerGroup, mc.clientType, mc.masters)
for {
select {
case <-ctx.Done():
glog.V(0).Infof("Connection to masters stopped: %v", ctx.Err())
return
default:
mc.tryAllMasters(ctx)
time.Sleep(time.Second)
}
}
}
func (mc *MasterClient) resetVidMap() {
mc.vidMapLock.Lock()
defer mc.vidMapLock.Unlock()
// Preserve the existing vidMap in the cache chain
// No need to clone - the existing vidMap has its own mutex for thread safety
tail := mc.vidMap
nvm := newVidMap(tail.DataCenter)
nvm.cache.Store(tail)
mc.vidMap = nvm
// Trim cache chain to vidMapCacheSize by traversing to the last node
// that should remain and cutting the chain after it
node := tail
for i := 0; i < mc.vidMapCacheSize-1; i++ {
if node.cache.Load() == nil {
func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddress) (leader string) {
for _, master := range mc.masters.GetInstances() {
if master == myMasterAddress {
continue
}
if grpcErr := pb.WithMasterClient(false, master, mc.grpcDialOption, false, func(client master_pb.SeaweedClient) error {
ctx, cancel := context.WithTimeout(context.Background(), 120*time.Millisecond)
defer cancel()
resp, err := client.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{})
if err != nil {
return err
}
leader = resp.Leader
return nil
}); grpcErr != nil {
glog.V(0).Infof("connect to %s: %v", master, grpcErr)
}
if leader != "" {
glog.V(0).Infof("existing leader is %s", leader)
return
}
node = node.cache.Load()
}
if node != nil {
node.cache.Store(nil)
}
glog.V(0).Infof("No existing leader found!")
return
}

347
weed/wdclient/vidmap_client.go

@ -0,0 +1,347 @@
package wdclient
import (
"context"
"errors"
"fmt"
"math/rand"
"sort"
"strconv"
"strings"
"sync"
"golang.org/x/sync/singleflight"
"github.com/seaweedfs/seaweedfs/weed/glog"
)
// VolumeLocationProvider is the interface for looking up volume locations
// This allows different implementations (master subscription, filer queries, etc.)
type VolumeLocationProvider interface {
// LookupVolumeIds looks up volume locations for the given volume IDs
// Returns a map of volume ID to locations
LookupVolumeIds(ctx context.Context, volumeIds []string) (map[string][]Location, error)
}
// vidMapClient provides volume location caching with pluggable lookup
// It wraps the battle-tested vidMap with customizable volume lookup strategies
type vidMapClient struct {
vidMap *vidMap
vidMapLock sync.RWMutex
vidMapCacheSize int
provider VolumeLocationProvider
vidLookupGroup singleflight.Group
}
const (
// DefaultVidMapCacheSize is the default number of historical vidMap snapshots to keep
// This provides cache history when volumes move between servers
DefaultVidMapCacheSize = 5
)
// newVidMapClient creates a new client with the given provider and data center
func newVidMapClient(provider VolumeLocationProvider, dataCenter string, cacheSize int) *vidMapClient {
if cacheSize <= 0 {
cacheSize = DefaultVidMapCacheSize
}
return &vidMapClient{
vidMap: newVidMap(dataCenter),
vidMapCacheSize: cacheSize,
provider: provider,
}
}
// GetLookupFileIdFunction returns a function that can be used to lookup file IDs
func (vc *vidMapClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
return vc.LookupFileIdWithFallback
}
// LookupFileIdWithFallback looks up a file ID, checking cache first, then using provider
func (vc *vidMapClient) LookupFileIdWithFallback(ctx context.Context, fileId string) (fullUrls []string, err error) {
// Try cache first - hold read lock during entire vidMap access to prevent swap during operation
vc.vidMapLock.RLock()
vm := vc.vidMap
dataCenter := vm.DataCenter
fullUrls, err = vm.LookupFileId(ctx, fileId)
vc.vidMapLock.RUnlock()
// Cache hit - return immediately
if err == nil && len(fullUrls) > 0 {
return
}
// Cache miss - extract volume ID from file ID (format: "volumeId,needle_id_cookie")
parts := strings.Split(fileId, ",")
if len(parts) != 2 {
return nil, fmt.Errorf("invalid fileId %s", fileId)
}
volumeId := parts[0]
// Use shared lookup logic with batching and singleflight
vidLocations, err := vc.LookupVolumeIdsWithFallback(ctx, []string{volumeId})
// Check for partial results first (important for multi-volume batched lookups)
locations, found := vidLocations[volumeId]
if !found || len(locations) == 0 {
// Volume not found - return specific error with context from lookup if available
if err != nil {
return nil, fmt.Errorf("volume %s not found for fileId %s: %w", volumeId, fileId, err)
}
return nil, fmt.Errorf("volume %s not found for fileId %s", volumeId, fileId)
}
// Volume found successfully - ignore any errors about other volumes
// (not relevant for single-volume lookup, but defensive for future batching)
// Build HTTP URLs from locations, preferring same data center
var sameDcUrls, otherDcUrls []string
for _, loc := range locations {
httpUrl := "http://" + loc.Url + "/" + fileId
if dataCenter != "" && dataCenter == loc.DataCenter {
sameDcUrls = append(sameDcUrls, httpUrl)
} else {
otherDcUrls = append(otherDcUrls, httpUrl)
}
}
// Shuffle to distribute load across volume servers
rand.Shuffle(len(sameDcUrls), func(i, j int) { sameDcUrls[i], sameDcUrls[j] = sameDcUrls[j], sameDcUrls[i] })
rand.Shuffle(len(otherDcUrls), func(i, j int) { otherDcUrls[i], otherDcUrls[j] = otherDcUrls[j], otherDcUrls[i] })
// Prefer same data center
fullUrls = append(sameDcUrls, otherDcUrls...)
return fullUrls, nil
}
// LookupVolumeIdsWithFallback looks up volume locations, querying provider if not in cache.
// Uses singleflight to coalesce concurrent requests for the same batch of volumes.
//
// IMPORTANT: This function may return PARTIAL results with a non-nil error.
// The result map contains successfully looked up volumes, while the error aggregates
// failures for volumes that couldn't be found or had lookup errors.
//
// Callers MUST check both the result map AND the error:
// - result != nil && err == nil: All volumes found successfully
// - result != nil && err != nil: Some volumes found, some failed (check both)
// - result == nil && err != nil: Complete failure (connection error, etc.)
//
// Example usage:
//
// locs, err := mc.LookupVolumeIdsWithFallback(ctx, []string{"1", "2", "999"})
// if len(locs) > 0 {
// // Process successfully found volumes
// }
// if err != nil {
// // Log/handle failed volumes
// }
func (vc *vidMapClient) LookupVolumeIdsWithFallback(ctx context.Context, volumeIds []string) (map[string][]Location, error) {
result := make(map[string][]Location)
var needsLookup []string
var lookupErrors []error
// Check cache first and parse volume IDs once
vidStringToUint := make(map[string]uint32, len(volumeIds))
// Get stable pointer to vidMap with minimal lock hold time
vm := vc.getStableVidMap()
for _, vidString := range volumeIds {
vid, err := strconv.ParseUint(vidString, 10, 32)
if err != nil {
return nil, fmt.Errorf("invalid volume id %s: %v", vidString, err)
}
vidStringToUint[vidString] = uint32(vid)
locations, found := vm.GetLocations(uint32(vid))
if found && len(locations) > 0 {
result[vidString] = locations
} else {
needsLookup = append(needsLookup, vidString)
}
}
if len(needsLookup) == 0 {
return result, nil
}
// Batch query all missing volumes using singleflight on the batch key
// Sort for stable key to coalesce identical batches
sort.Strings(needsLookup)
batchKey := strings.Join(needsLookup, ",")
sfResult, err, _ := vc.vidLookupGroup.Do(batchKey, func() (interface{}, error) {
// Double-check cache for volumes that might have been populated while waiting
stillNeedLookup := make([]string, 0, len(needsLookup))
batchResult := make(map[string][]Location)
// Get stable pointer with minimal lock hold time
vm := vc.getStableVidMap()
for _, vidString := range needsLookup {
vid := vidStringToUint[vidString] // Use pre-parsed value
if locations, found := vm.GetLocations(vid); found && len(locations) > 0 {
batchResult[vidString] = locations
} else {
stillNeedLookup = append(stillNeedLookup, vidString)
}
}
if len(stillNeedLookup) == 0 {
return batchResult, nil
}
// Query provider with batched volume IDs
glog.V(2).Infof("Looking up %d volumes from provider: %v", len(stillNeedLookup), stillNeedLookup)
providerResults, err := vc.provider.LookupVolumeIds(ctx, stillNeedLookup)
if err != nil {
return batchResult, fmt.Errorf("provider lookup failed: %v", err)
}
// Update cache with results
for vidString, locations := range providerResults {
vid, err := strconv.ParseUint(vidString, 10, 32)
if err != nil {
glog.Warningf("Failed to parse volume id '%s': %v", vidString, err)
continue
}
for _, loc := range locations {
vc.addLocation(uint32(vid), loc)
}
if len(locations) > 0 {
batchResult[vidString] = locations
}
}
return batchResult, nil
})
if err != nil {
lookupErrors = append(lookupErrors, err)
}
// Merge singleflight batch results
if batchLocations, ok := sfResult.(map[string][]Location); ok {
for vid, locs := range batchLocations {
result[vid] = locs
}
}
// Check for volumes that still weren't found
for _, vidString := range needsLookup {
if _, found := result[vidString]; !found {
lookupErrors = append(lookupErrors, fmt.Errorf("volume %s not found", vidString))
}
}
// Return aggregated errors
return result, errors.Join(lookupErrors...)
}
// getStableVidMap gets a stable pointer to the vidMap, releasing the lock immediately.
// WARNING: Use with caution. The returned vidMap pointer is stable (won't be garbage collected
// due to cache chain), but the vidMapClient.vidMap field may be swapped by resetVidMap().
// For operations that must use the current vidMap atomically, use withCurrentVidMap() instead.
func (vc *vidMapClient) getStableVidMap() *vidMap {
vc.vidMapLock.RLock()
vm := vc.vidMap
vc.vidMapLock.RUnlock()
return vm
}
// withCurrentVidMap executes a function with the current vidMap under a read lock.
// This guarantees the vidMap instance cannot be swapped during the function execution.
// Use this when you need atomic access to the current vidMap for multiple operations.
func (vc *vidMapClient) withCurrentVidMap(f func(vm *vidMap)) {
vc.vidMapLock.RLock()
defer vc.vidMapLock.RUnlock()
f(vc.vidMap)
}
// Public methods for external access
// GetLocations safely retrieves volume locations
func (vc *vidMapClient) GetLocations(vid uint32) (locations []Location, found bool) {
return vc.getStableVidMap().GetLocations(vid)
}
// GetLocationsClone safely retrieves a clone of volume locations
func (vc *vidMapClient) GetLocationsClone(vid uint32) (locations []Location, found bool) {
return vc.getStableVidMap().GetLocationsClone(vid)
}
// GetVidLocations safely retrieves volume locations by string ID
func (vc *vidMapClient) GetVidLocations(vid string) (locations []Location, err error) {
return vc.getStableVidMap().GetVidLocations(vid)
}
// LookupFileId safely looks up URLs for a file ID
func (vc *vidMapClient) LookupFileId(ctx context.Context, fileId string) (fullUrls []string, err error) {
return vc.getStableVidMap().LookupFileId(ctx, fileId)
}
// LookupVolumeServerUrl safely looks up volume server URLs
func (vc *vidMapClient) LookupVolumeServerUrl(vid string) (serverUrls []string, err error) {
return vc.getStableVidMap().LookupVolumeServerUrl(vid)
}
// GetDataCenter safely retrieves the data center
func (vc *vidMapClient) GetDataCenter() string {
return vc.getStableVidMap().DataCenter
}
// Thread-safe helpers for vidMap operations
// addLocation adds a volume location
func (vc *vidMapClient) addLocation(vid uint32, location Location) {
vc.withCurrentVidMap(func(vm *vidMap) {
vm.addLocation(vid, location)
})
}
// deleteLocation removes a volume location
func (vc *vidMapClient) deleteLocation(vid uint32, location Location) {
vc.withCurrentVidMap(func(vm *vidMap) {
vm.deleteLocation(vid, location)
})
}
// addEcLocation adds an EC volume location
func (vc *vidMapClient) addEcLocation(vid uint32, location Location) {
vc.withCurrentVidMap(func(vm *vidMap) {
vm.addEcLocation(vid, location)
})
}
// deleteEcLocation removes an EC volume location
func (vc *vidMapClient) deleteEcLocation(vid uint32, location Location) {
vc.withCurrentVidMap(func(vm *vidMap) {
vm.deleteEcLocation(vid, location)
})
}
// resetVidMap resets the volume ID map
func (vc *vidMapClient) resetVidMap() {
vc.vidMapLock.Lock()
defer vc.vidMapLock.Unlock()
// Preserve the existing vidMap in the cache chain
tail := vc.vidMap
nvm := newVidMap(tail.DataCenter)
nvm.cache.Store(tail)
vc.vidMap = nvm
// Trim cache chain to vidMapCacheSize
node := tail
for i := 0; i < vc.vidMapCacheSize-1; i++ {
if node.cache.Load() == nil {
return
}
node = node.cache.Load()
}
// node is guaranteed to be non-nil after the loop
node.cache.Store(nil)
}
Loading…
Cancel
Save