From 796b7508f33916f7fa734e3df2ceea9a80415ade Mon Sep 17 00:00:00 2001 From: Nico D'Cotta <45274424+Cottand@users.noreply.github.com> Date: Thu, 24 Aug 2023 16:08:56 +0200 Subject: [PATCH] Implement SRV lookups for filer (#4767) --- weed/command/benchmark.go | 2 +- weed/command/filer.go | 6 +-- weed/command/server.go | 4 +- weed/filer/filer.go | 3 +- weed/filer/leveldb/leveldb_store_test.go | 7 +-- weed/filer/leveldb2/leveldb2_store_test.go | 5 +- weed/filer/leveldb3/leveldb3_store_test.go | 5 +- weed/filer/rocksdb/rocksdb_store_test.go | 6 +-- weed/iamapi/iamapi_server.go | 2 +- weed/mq/broker/broker_server.go | 2 +- weed/pb/server_address.go | 37 +++++++++++++ weed/pb/server_address_test.go | 36 +++++++++++++ weed/pb/server_discovery.go | 62 ++++++++++++++++++++++ weed/server/filer_grpc_server_admin.go | 2 +- weed/server/filer_server.go | 10 ++-- weed/server/master_server.go | 2 +- weed/shell/commands.go | 2 +- weed/stats/metrics.go | 1 - weed/wdclient/masterclient.go | 15 +++--- weed/wdclient/vid_map_test.go | 5 +- 20 files changed, 177 insertions(+), 37 deletions(-) create mode 100644 weed/pb/server_address_test.go create mode 100644 weed/pb/server_discovery.go diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go index 7f132892e..7f9a23cf8 100644 --- a/weed/command/benchmark.go +++ b/weed/command/benchmark.go @@ -127,7 +127,7 @@ func runBenchmark(cmd *Command, args []string) bool { defer pprof.StopCPUProfile() } - b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "", "client", "", "", "", pb.ServerAddresses(*b.masters).ToAddressMap()) + b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "", "client", "", "", "", *pb.ServerAddresses(*b.masters).ToServiceDiscovery()) go b.masterClient.KeepConnectedToMaster() b.masterClient.WaitUntilConnected() diff --git a/weed/command/filer.go b/weed/command/filer.go index 83e2abdac..7e636974f 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -33,7 +33,7 @@ var ( ) type FilerOptions struct { - masters map[string]pb.ServerAddress + masters *pb.ServerDiscovery mastersString *string ip *string bindIp *string @@ -65,7 +65,7 @@ type FilerOptions struct { func init() { cmdFiler.Run = runFiler // break init cycle - f.mastersString = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers") + f.mastersString = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers or a single DNS SRV record of at least 1 master server, prepended with dnssrv+") f.filerGroup = cmdFiler.Flag.String("filerGroup", "", "share metadata with other filers in the same filerGroup") f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this default collection") f.ip = cmdFiler.Flag.String("ip", util.DetectedHostAddress(), "filer server http listen ip address") @@ -208,7 +208,7 @@ func runFiler(cmd *Command, args []string) bool { }(startDelay) } - f.masters = pb.ServerAddresses(*f.mastersString).ToAddressMap() + f.masters = pb.ServerAddresses(*f.mastersString).ToServiceDiscovery() f.startFiler() diff --git a/weed/command/server.go b/weed/command/server.go index fecb1cad6..7fbb59676 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -203,7 +203,7 @@ func runServer(cmd *Command, args []string) bool { // ip address masterOptions.ip = serverIp masterOptions.ipBind = serverBindIp - filerOptions.masters = pb.ServerAddresses(*masterOptions.peers).ToAddressMap() + filerOptions.masters = pb.ServerAddresses(*masterOptions.peers).ToServiceDiscovery() filerOptions.ip = serverIp filerOptions.bindIp = serverBindIp s3Options.bindIp = serverBindIp @@ -216,7 +216,7 @@ func runServer(cmd *Command, args []string) bool { serverOptions.v.dataCenter = serverDataCenter serverOptions.v.rack = serverRack mqBrokerOptions.ip = serverIp - mqBrokerOptions.masters = filerOptions.masters + mqBrokerOptions.masters = filerOptions.masters.GetInstancesAsMap() mqBrokerOptions.filerGroup = filerOptions.filerGroup // serverOptions.v.pulseSeconds = pulseSeconds diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 8570faa7a..fdc425f07 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -52,8 +52,7 @@ type Filer struct { Dlm *lock_manager.DistributedLockManager } -func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, - filerGroup string, collection string, replication string, dataCenter string, notifyFn func()) *Filer { +func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, filerGroup string, collection string, replication string, dataCenter string, notifyFn func()) *Filer { f := &Filer{ MasterClient: wdclient.NewMasterClient(grpcDialOption, filerGroup, cluster.FilerType, filerHost, dataCenter, "", masters), fileIdDeletionQueue: util.NewUnboundedQueue(), diff --git a/weed/filer/leveldb/leveldb_store_test.go b/weed/filer/leveldb/leveldb_store_test.go index 7013f67a7..c8e71a003 100644 --- a/weed/filer/leveldb/leveldb_store_test.go +++ b/weed/filer/leveldb/leveldb_store_test.go @@ -3,6 +3,7 @@ package leveldb import ( "context" "fmt" + "github.com/seaweedfs/seaweedfs/weed/pb" "os" "testing" "time" @@ -12,7 +13,7 @@ import ( ) func TestCreateAndFind(t *testing.T) { - testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil) + testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil) dir := t.TempDir() store := &LevelDBStore{} store.initialize(dir) @@ -65,7 +66,7 @@ func TestCreateAndFind(t *testing.T) { } func TestEmptyRoot(t *testing.T) { - testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil) + testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil) dir := t.TempDir() store := &LevelDBStore{} store.initialize(dir) @@ -87,7 +88,7 @@ func TestEmptyRoot(t *testing.T) { } func BenchmarkInsertEntry(b *testing.B) { - testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil) + testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil) dir := b.TempDir() store := &LevelDBStore{} store.initialize(dir) diff --git a/weed/filer/leveldb2/leveldb2_store_test.go b/weed/filer/leveldb2/leveldb2_store_test.go index f7ec99e06..b25dcc7b8 100644 --- a/weed/filer/leveldb2/leveldb2_store_test.go +++ b/weed/filer/leveldb2/leveldb2_store_test.go @@ -2,6 +2,7 @@ package leveldb import ( "context" + "github.com/seaweedfs/seaweedfs/weed/pb" "testing" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -9,7 +10,7 @@ import ( ) func TestCreateAndFind(t *testing.T) { - testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil) + testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil) dir := t.TempDir() store := &LevelDB2Store{} store.initialize(dir, 2) @@ -62,7 +63,7 @@ func TestCreateAndFind(t *testing.T) { } func TestEmptyRoot(t *testing.T) { - testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil) + testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil) dir := t.TempDir() store := &LevelDB2Store{} store.initialize(dir, 2) diff --git a/weed/filer/leveldb3/leveldb3_store_test.go b/weed/filer/leveldb3/leveldb3_store_test.go index e2e4d5099..a2d8dd8a3 100644 --- a/weed/filer/leveldb3/leveldb3_store_test.go +++ b/weed/filer/leveldb3/leveldb3_store_test.go @@ -2,6 +2,7 @@ package leveldb import ( "context" + "github.com/seaweedfs/seaweedfs/weed/pb" "testing" "github.com/seaweedfs/seaweedfs/weed/filer" @@ -9,7 +10,7 @@ import ( ) func TestCreateAndFind(t *testing.T) { - testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil) + testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil) dir := t.TempDir() store := &LevelDB3Store{} store.initialize(dir) @@ -62,7 +63,7 @@ func TestCreateAndFind(t *testing.T) { } func TestEmptyRoot(t *testing.T) { - testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil) + testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil) dir := t.TempDir() store := &LevelDB3Store{} store.initialize(dir) diff --git a/weed/filer/rocksdb/rocksdb_store_test.go b/weed/filer/rocksdb/rocksdb_store_test.go index e89327baa..e24274d2a 100644 --- a/weed/filer/rocksdb/rocksdb_store_test.go +++ b/weed/filer/rocksdb/rocksdb_store_test.go @@ -15,7 +15,7 @@ import ( ) func TestCreateAndFind(t *testing.T) { - testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil) + testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", 0, "", "", "", nil) dir := t.TempDir() store := &RocksDBStore{} store.initialize(dir) @@ -68,7 +68,7 @@ func TestCreateAndFind(t *testing.T) { } func TestEmptyRoot(t *testing.T) { - testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil) + testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", 0, "", "", "", nil) dir := t.TempDir() store := &RocksDBStore{} store.initialize(dir) @@ -90,7 +90,7 @@ func TestEmptyRoot(t *testing.T) { } func BenchmarkInsertEntry(b *testing.B) { - testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil) + testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", 0, "", "", "", nil) dir := b.TempDir() store := &RocksDBStore{} store.initialize(dir) diff --git a/weed/iamapi/iamapi_server.go b/weed/iamapi/iamapi_server.go index 223bcb296..63d2e7a75 100644 --- a/weed/iamapi/iamapi_server.go +++ b/weed/iamapi/iamapi_server.go @@ -50,7 +50,7 @@ var s3ApiConfigure IamS3ApiConfig func NewIamApiServer(router *mux.Router, option *IamServerOption) (iamApiServer *IamApiServer, err error) { s3ApiConfigure = IamS3ApiConfigure{ option: option, - masterClient: wdclient.NewMasterClient(option.GrpcDialOption, "", "iam", "", "", "", option.Masters), + masterClient: wdclient.NewMasterClient(option.GrpcDialOption, "", "iam", "", "", "", *pb.NewServiceDiscoveryFromMap(option.Masters)), } s3Option := s3api.S3ApiServerOption{Filer: option.Filer} iamApiServer = &IamApiServer{ diff --git a/weed/mq/broker/broker_server.go b/weed/mq/broker/broker_server.go index 4f5b3c28d..da1284c80 100644 --- a/weed/mq/broker/broker_server.go +++ b/weed/mq/broker/broker_server.go @@ -41,7 +41,7 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial mqBroker = &MessageQueueBroker{ option: option, grpcDialOption: grpcDialOption, - MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, option.Masters), + MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)), filers: make(map[pb.ServerAddress]struct{}), localTopicManager: topic.NewLocalTopicManager(), } diff --git a/weed/pb/server_address.go b/weed/pb/server_address.go index 56d0dba24..a0aa79ae4 100644 --- a/weed/pb/server_address.go +++ b/weed/pb/server_address.go @@ -11,6 +11,7 @@ import ( type ServerAddress string type ServerAddresses string +type ServerSrvAddress string func NewServerAddress(host string, port int, grpcPort int) ServerAddress { if grpcPort == 0 || grpcPort == port+10000 { @@ -76,6 +77,42 @@ func (sa ServerAddress) ToGrpcAddress() string { return ServerToGrpcAddress(string(sa)) } +// LookUp may return an error for some records along with successful lookups - make sure you do not +// discard `addresses` even if `err == nil` +func (r ServerSrvAddress) LookUp() (addresses []ServerAddress, err error) { + _, records, lookupErr := net.LookupSRV("", "", string(r)) + if lookupErr != nil { + err = fmt.Errorf("lookup SRV address %s: %v", r, lookupErr) + } + for _, srv := range records { + address := fmt.Sprintf("%s:%d", srv.Target, srv.Port) + addresses = append(addresses, ServerAddress(address)) + } + return +} + +// ToServiceDiscovery expects one of: a comma-separated list of ip:port, like +// +// 10.0.0.1:9999,10.0.0.2:24:9999 +// +// OR an SRV Record prepended with 'dnssrv+', like: +// +// dnssrv+_grpc._tcp.master.consul +// dnssrv+_grpc._tcp.headless.default.svc.cluster.local +// dnssrv+seaweed-master.master.consul +func (sa ServerAddresses) ToServiceDiscovery() (sd *ServerDiscovery) { + sd = &ServerDiscovery{} + prefix := "dnssrv+" + if strings.HasPrefix(string(sa), prefix) { + trimmed := strings.TrimPrefix(string(sa), prefix) + srv := ServerSrvAddress(trimmed) + sd.srvRecord = &srv + } else { + sd.list = sa.ToAddresses() + } + return +} + func (sa ServerAddresses) ToAddresses() (addresses []ServerAddress) { parts := strings.Split(string(sa), ",") for _, address := range parts { diff --git a/weed/pb/server_address_test.go b/weed/pb/server_address_test.go new file mode 100644 index 000000000..f5a12427a --- /dev/null +++ b/weed/pb/server_address_test.go @@ -0,0 +1,36 @@ +package pb + +import ( + "reflect" + "testing" +) + +func TestServerAddresses_ToAddressMapOrSrv_shouldRemovePrefix(t *testing.T) { + str := ServerAddresses("dnssrv+hello.srv.consul") + + d := str.ToServiceDiscovery() + + expected := ServerSrvAddress("hello.srv.consul") + if *d.srvRecord != expected { + t.Fatalf(`ServerAddresses("dnssrv+hello.srv.consul") = %s, expected %s`, *d.srvRecord, expected) + } +} + +func TestServerAddresses_ToAddressMapOrSrv_shouldHandleIPPortList(t *testing.T) { + str := ServerAddresses("10.0.0.1:23,10.0.0.2:24") + + d := str.ToServiceDiscovery() + + if d.srvRecord != nil { + t.Fatalf(`ServerAddresses("dnssrv+hello.srv.consul") = %s, expected nil`, *d.srvRecord) + } + + expected := []ServerAddress{ + ServerAddress("10.0.0.1:23"), + ServerAddress("10.0.0.2:24"), + } + + if !reflect.DeepEqual(d.list, expected) { + t.Fatalf(`Expected %q, got %q`, expected, d.list) + } +} diff --git a/weed/pb/server_discovery.go b/weed/pb/server_discovery.go new file mode 100644 index 000000000..25c0360c5 --- /dev/null +++ b/weed/pb/server_discovery.go @@ -0,0 +1,62 @@ +package pb + +import ( + "github.com/seaweedfs/seaweedfs/weed/glog" + "reflect" +) + +// ServerDiscovery encodes a way to find at least 1 instance of a service, +// and provides utility functions to refresh the instance list +type ServerDiscovery struct { + list []ServerAddress + srvRecord *ServerSrvAddress +} + +func NewServiceDiscoveryFromMap(m map[string]ServerAddress) (sd *ServerDiscovery) { + sd = &ServerDiscovery{} + for _, s := range m { + sd.list = append(sd.list, s) + } + return sd +} + +// RefreshBySrvIfAvailable performs a DNS SRV lookup and updates list with the results +// of the lookup +func (sd *ServerDiscovery) RefreshBySrvIfAvailable() { + if sd.srvRecord == nil { + return + } + newList, err := sd.srvRecord.LookUp() + if err != nil { + glog.V(0).Infof("failed to lookup SRV for %s: %v", *sd.srvRecord, err) + } + if newList == nil || len(newList) == 0 { + glog.V(0).Infof("looked up SRV for %s, but found no well-formed names", *sd.srvRecord) + return + } + if !reflect.DeepEqual(sd.list, newList) { + sd.list = newList + } +} + +// GetInstances returns a copy of the latest known list of addresses +// call RefreshBySrvIfAvailable prior to this in order to get a more up-to-date view +func (sd *ServerDiscovery) GetInstances() (addresses []ServerAddress) { + for _, a := range sd.list { + addresses = append(addresses, a) + } + return addresses +} +func (sd *ServerDiscovery) GetInstancesAsStrings() (addresses []string) { + for _, i := range sd.list { + addresses = append(addresses, string(i)) + } + return addresses +} +func (sd *ServerDiscovery) GetInstancesAsMap() (addresses map[string]ServerAddress) { + addresses = make(map[string]ServerAddress) + for _, i := range sd.list { + addresses[string(i)] = i + } + return addresses +} diff --git a/weed/server/filer_grpc_server_admin.go b/weed/server/filer_grpc_server_admin.go index 58215a927..8a58e287c 100644 --- a/weed/server/filer_grpc_server_admin.go +++ b/weed/server/filer_grpc_server_admin.go @@ -87,7 +87,7 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb. clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId")) t := &filer_pb.GetFilerConfigurationResponse{ - Masters: pb.ToAddressStringsFromMap(fs.option.Masters), + Masters: fs.option.Masters.GetInstancesAsStrings(), Collection: fs.option.Collection, Replication: fs.option.DefaultReplication, MaxMb: uint32(fs.option.MaxMB), diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 8e40b2145..98784bce3 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -50,7 +50,7 @@ import ( ) type FilerOption struct { - Masters map[string]pb.ServerAddress + Masters *pb.ServerDiscovery FilerGroup string Collection string DefaultReplication string @@ -118,11 +118,12 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) } fs.listenersCond = sync.NewCond(&fs.listenersLock) - if len(option.Masters) == 0 { + option.Masters.RefreshBySrvIfAvailable() + if len(option.Masters.GetInstances()) == 0 { glog.Fatal("master list is required!") } - fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.FilerGroup, option.Collection, option.DefaultReplication, option.DataCenter, func() { + fs.filer = filer.NewFiler(*option.Masters, fs.grpcDialOption, option.Host, option.FilerGroup, option.Collection, option.DefaultReplication, option.DataCenter, func() { fs.listenersCond.Broadcast() }) fs.filer.Cipher = option.Cipher @@ -195,7 +196,8 @@ func (fs *FilerServer) checkWithMaster() { isConnected := false for !isConnected { - for _, master := range fs.option.Masters { + fs.option.Masters.RefreshBySrvIfAvailable() + for _, master := range fs.option.Masters.GetInstances() { readErr := operation.WithMasterServerClient(false, master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 2489aaefd..9a5313a10 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -110,7 +110,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se vgCh: make(chan *topology.VolumeGrowRequest, 1<<6), clientChans: make(map[string]chan *master_pb.KeepConnectedResponse), grpcDialOption: grpcDialOption, - MasterClient: wdclient.NewMasterClient(grpcDialOption, "", cluster.MasterType, option.Master, "", "", peers), + MasterClient: wdclient.NewMasterClient(grpcDialOption, "", cluster.MasterType, option.Master, "", "", *pb.NewServiceDiscoveryFromMap(peers)), adminLocks: NewAdminLocks(), Cluster: cluster.NewCluster(), } diff --git a/weed/shell/commands.go b/weed/shell/commands.go index b1722edfb..e6e582376 100644 --- a/weed/shell/commands.go +++ b/weed/shell/commands.go @@ -51,7 +51,7 @@ var ( func NewCommandEnv(options *ShellOptions) *CommandEnv { ce := &CommandEnv{ env: make(map[string]string), - MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, *options.FilerGroup, pb.AdminShellClient, "", "", "", pb.ServerAddresses(*options.Masters).ToAddressMap()), + MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, *options.FilerGroup, pb.AdminShellClient, "", "", "", *pb.ServerAddresses(*options.Masters).ToServiceDiscovery()), option: options, } ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient, "shell") diff --git a/weed/stats/metrics.go b/weed/stats/metrics.go index dda4d95e5..3dda42423 100644 --- a/weed/stats/metrics.go +++ b/weed/stats/metrics.go @@ -299,7 +299,6 @@ func JoinHostPort(host string, port int) string { return net.JoinHostPort(host, portStr) } - func StartMetricsServer(ip string, port int) { if port == 0 { return diff --git a/weed/wdclient/masterclient.go b/weed/wdclient/masterclient.go index c693df582..a6ddf22f3 100644 --- a/weed/wdclient/masterclient.go +++ b/weed/wdclient/masterclient.go @@ -24,8 +24,8 @@ type MasterClient struct { rack string currentMaster pb.ServerAddress currentMasterLock sync.RWMutex - masters map[string]pb.ServerAddress - grpcDialOption grpc.DialOption + masters pb.ServerDiscovery + grpcDialOption grpc.DialOption *vidMap vidMapCacheSize int @@ -33,7 +33,7 @@ type MasterClient struct { OnPeerUpdateLock sync.RWMutex } -func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters map[string]pb.ServerAddress) *MasterClient { +func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters pb.ServerDiscovery) *MasterClient { return &MasterClient{ FilerGroup: filerGroup, clientType: clientType, @@ -108,9 +108,9 @@ func (mc *MasterClient) GetMaster() pb.ServerAddress { return mc.getCurrentMaster() } -func (mc *MasterClient) GetMasters() map[string]pb.ServerAddress { +func (mc *MasterClient) GetMasters() []pb.ServerAddress { mc.WaitUntilConnected() - return mc.masters + return mc.masters.GetInstances() } func (mc *MasterClient) WaitUntilConnected() { @@ -132,7 +132,7 @@ func (mc *MasterClient) KeepConnectedToMaster() { } func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddress) (leader string) { - for _, master := range mc.masters { + for _, master := range mc.masters.GetInstances() { if master == myMasterAddress { continue } @@ -159,7 +159,8 @@ func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddres func (mc *MasterClient) tryAllMasters() { var nextHintedLeader pb.ServerAddress - for _, master := range mc.masters { + mc.masters.RefreshBySrvIfAvailable() + for _, master := range mc.masters.GetInstances() { nextHintedLeader = mc.tryConnectToMaster(master) for nextHintedLeader != "" { nextHintedLeader = mc.tryConnectToMaster(nextHintedLeader) diff --git a/weed/wdclient/vid_map_test.go b/weed/wdclient/vid_map_test.go index 980e5bd8c..a734c6b0c 100644 --- a/weed/wdclient/vid_map_test.go +++ b/weed/wdclient/vid_map_test.go @@ -3,6 +3,7 @@ package wdclient import ( "context" "fmt" + "github.com/seaweedfs/seaweedfs/weed/pb" "google.golang.org/grpc" "strconv" "sync" @@ -65,7 +66,7 @@ func TestLocationIndex(t *testing.T) { } func TestLookupFileId(t *testing.T) { - mc := NewMasterClient(grpc.EmptyDialOption{}, "", "", "", "", "", nil) + mc := NewMasterClient(grpc.EmptyDialOption{}, "", "", "", "", "", pb.ServerDiscovery{}) length := 5 //Construct a cache linked list of length 5 @@ -135,7 +136,7 @@ func TestLookupFileId(t *testing.T) { } func TestConcurrentGetLocations(t *testing.T) { - mc := NewMasterClient(grpc.EmptyDialOption{}, "", "", "", "", "", nil) + mc := NewMasterClient(grpc.EmptyDialOption{}, "", "", "", "", "", pb.ServerDiscovery{}) location := Location{Url: "TestDataRacing"} mc.addLocation(1, location)