@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strings"
"sync/atomic"
"time"
"google.golang.org/grpc"
@ -11,6 +12,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
)
// UrlPreference controls which URL to use for volume access
@ -23,9 +25,11 @@ const (
// FilerClient provides volume location services by querying a filer
// It uses the shared vidMap cache for efficient lookups
// Supports multiple filer addresses with automatic failover for high availability
type FilerClient struct {
* vidMapClient
filerAddress pb . ServerAddress
filerAddresses [ ] pb . ServerAddress
filerIndex int32 // atomic: current filer index for round-robin
grpcDialOption grpc . DialOption
urlPreference UrlPreference
grpcTimeout time . Duration
@ -33,10 +37,9 @@ type FilerClient struct {
}
// filerVolumeProvider implements VolumeLocationProvider by querying filer
// Supports multiple filer addresses with automatic failover
type filerVolumeProvider struct {
filerAddress pb . ServerAddress
grpcDialOption grpc . DialOption
grpcTimeout time . Duration
filerClient * FilerClient
}
// FilerClientOption holds optional configuration for FilerClient
@ -46,9 +49,14 @@ type FilerClientOption struct {
CacheSize int // Number of historical vidMap snapshots (0 = use default)
}
// NewFilerClient creates a new client that queries filer for volume locations
// NewFilerClient creates a new client that queries filer(s) for volume locations
// Supports multiple filer addresses for high availability with automatic failover
// Uses sensible defaults: 5-second gRPC timeout, PreferUrl, DefaultVidMapCacheSize
func NewFilerClient ( filerAddress pb . ServerAddress , grpcDialOption grpc . DialOption , dataCenter string , opts ... * FilerClientOption ) * FilerClient {
func NewFilerClient ( filerAddresses [ ] pb . ServerAddress , grpcDialOption grpc . DialOption , dataCenter string , opts ... * FilerClientOption ) * FilerClient {
if len ( filerAddresses ) == 0 {
glog . Fatal ( "NewFilerClient requires at least one filer address" )
}
// Apply defaults
grpcTimeout := 5 * time . Second
urlPref := PreferUrl
@ -68,20 +76,23 @@ func NewFilerClient(filerAddress pb.ServerAddress, grpcDialOption grpc.DialOptio
}
}
provider := & filerVolumeProvider {
filerAddress : filerAddress ,
grpcDialOption : grpcDialOption ,
grpcTimeout : grpcTimeout ,
}
return & FilerClient {
vidMapClient : newVidMapClient ( provider , dataCenter , cacheSize ) ,
filerAddress : filerAddress ,
fc := & FilerClient {
filerAddresses : filerAddresses ,
filerIndex : 0 ,
grpcDialOption : grpcDialOption ,
urlPreference : urlPref ,
grpcTimeout : grpcTimeout ,
cacheSize : cacheSize ,
}
// Create provider that references this FilerClient for failover support
provider := & filerVolumeProvider {
filerClient : fc ,
}
fc . vidMapClient = newVidMapClient ( provider , dataCenter , cacheSize )
return fc
}
// GetLookupFileIdFunction returns a lookup function with URL preference handling
@ -123,21 +134,32 @@ func (fc *FilerClient) GetLookupFileIdFunction() LookupFileIdFunctionType {
}
}
// LookupVolumeIds queries the filer for volume locations
// LookupVolumeIds queries the filer for volume locations with automatic failover
// Tries all configured filer addresses until one succeeds (high availability)
// Note: Unlike master's VolumeIdLocation, filer's Locations message doesn't currently have
// an Error field. This implementation handles the current structure while being prepared
// for future error reporting enhancements.
func ( p * filerVolumeProvider ) LookupVolumeIds ( ctx context . Context , volumeIds [ ] string ) ( map [ string ] [ ] Location , error ) {
fc := p . filerClient
result := make ( map [ string ] [ ] Location )
// Create a timeout context for the gRPC call
timeoutCtx , cancel := context . WithTimeout ( ctx , p . grpcTimeout )
timeoutCtx , cancel := context . WithTimeout ( ctx , fc . grpcTimeout )
defer cancel ( )
// Convert grpcTimeout to milliseconds for the signature parameter
timeoutMs := int32 ( p . grpcTimeout . Milliseconds ( ) )
timeoutMs := int32 ( fc . grpcTimeout . Milliseconds ( ) )
// Try all filer addresses with round-robin starting from current index
var lastErr error
err := util . Retry ( "filer volume lookup" , func ( ) error {
i := atomic . LoadInt32 ( & fc . filerIndex )
n := int32 ( len ( fc . filerAddresses ) )
for x := int32 ( 0 ) ; x < n ; x ++ {
filerAddress := fc . filerAddresses [ i ]
err := pb . WithGrpcFilerClient ( false , timeoutMs , p . filerAddress , p . grpcDialOption , func ( client filer_pb . SeaweedFilerClient ) error {
err := pb . WithGrpcFilerClient ( false , timeoutMs , filerAddress , fc . grpcDialOption , func ( client filer_pb . SeaweedFilerClient ) error {
resp , err := client . LookupVolume ( timeoutCtx , & filer_pb . LookupVolumeRequest {
VolumeIds : volumeIds ,
} )
@ -180,10 +202,28 @@ func (p *filerVolumeProvider) LookupVolumeIds(ctx context.Context, volumeIds []s
} )
if err != nil {
// gRPC error - this is a communication or server failure
glog . V ( 1 ) . Infof ( "FilerClient: filer %s lookup failed (attempt %d/%d): %v" , filerAddress , x + 1 , n , err )
lastErr = err
i ++
if i >= n {
i = 0
}
continue
}
// Success - update the preferred filer index for next time
atomic . StoreInt32 ( & fc . filerIndex , i )
glog . V ( 3 ) . Infof ( "FilerClient: looked up %d volumes on %s, found %d" , len ( volumeIds ) , filerAddress , len ( result ) )
return nil
}
// All filers failed
return fmt . Errorf ( "all %d filer(s) failed, last error: %w" , n , lastErr )
} )
if err != nil {
return nil , fmt . Errorf ( "filer volume lookup failed for %d volume(s): %w" , len ( volumeIds ) , err )
}
glog . V ( 3 ) . Infof ( "FilerClient: looked up %d volumes, found %d" , len ( volumeIds ) , len ( result ) )
return result , nil
}