Browse Source

Implement SRV lookups for filer (#4767)

pull/4787/head
Nico D'Cotta 1 year ago
committed by GitHub
parent
commit
796b7508f3
No known key found for this signature in database GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      weed/command/benchmark.go
  2. 6
      weed/command/filer.go
  3. 4
      weed/command/server.go
  4. 3
      weed/filer/filer.go
  5. 7
      weed/filer/leveldb/leveldb_store_test.go
  6. 5
      weed/filer/leveldb2/leveldb2_store_test.go
  7. 5
      weed/filer/leveldb3/leveldb3_store_test.go
  8. 6
      weed/filer/rocksdb/rocksdb_store_test.go
  9. 2
      weed/iamapi/iamapi_server.go
  10. 2
      weed/mq/broker/broker_server.go
  11. 37
      weed/pb/server_address.go
  12. 36
      weed/pb/server_address_test.go
  13. 62
      weed/pb/server_discovery.go
  14. 2
      weed/server/filer_grpc_server_admin.go
  15. 10
      weed/server/filer_server.go
  16. 2
      weed/server/master_server.go
  17. 2
      weed/shell/commands.go
  18. 1
      weed/stats/metrics.go
  19. 15
      weed/wdclient/masterclient.go
  20. 5
      weed/wdclient/vid_map_test.go

2
weed/command/benchmark.go

@ -127,7 +127,7 @@ func runBenchmark(cmd *Command, args []string) bool {
defer pprof.StopCPUProfile() defer pprof.StopCPUProfile()
} }
b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "", "client", "", "", "", pb.ServerAddresses(*b.masters).ToAddressMap())
b.masterClient = wdclient.NewMasterClient(b.grpcDialOption, "", "client", "", "", "", *pb.ServerAddresses(*b.masters).ToServiceDiscovery())
go b.masterClient.KeepConnectedToMaster() go b.masterClient.KeepConnectedToMaster()
b.masterClient.WaitUntilConnected() b.masterClient.WaitUntilConnected()

6
weed/command/filer.go

@ -33,7 +33,7 @@ var (
) )
type FilerOptions struct { type FilerOptions struct {
masters map[string]pb.ServerAddress
masters *pb.ServerDiscovery
mastersString *string mastersString *string
ip *string ip *string
bindIp *string bindIp *string
@ -65,7 +65,7 @@ type FilerOptions struct {
func init() { func init() {
cmdFiler.Run = runFiler // break init cycle cmdFiler.Run = runFiler // break init cycle
f.mastersString = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers")
f.mastersString = cmdFiler.Flag.String("master", "localhost:9333", "comma-separated master servers or a single DNS SRV record of at least 1 master server, prepended with dnssrv+")
f.filerGroup = cmdFiler.Flag.String("filerGroup", "", "share metadata with other filers in the same filerGroup") f.filerGroup = cmdFiler.Flag.String("filerGroup", "", "share metadata with other filers in the same filerGroup")
f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this default collection") f.collection = cmdFiler.Flag.String("collection", "", "all data will be stored in this default collection")
f.ip = cmdFiler.Flag.String("ip", util.DetectedHostAddress(), "filer server http listen ip address") f.ip = cmdFiler.Flag.String("ip", util.DetectedHostAddress(), "filer server http listen ip address")
@ -208,7 +208,7 @@ func runFiler(cmd *Command, args []string) bool {
}(startDelay) }(startDelay)
} }
f.masters = pb.ServerAddresses(*f.mastersString).ToAddressMap()
f.masters = pb.ServerAddresses(*f.mastersString).ToServiceDiscovery()
f.startFiler() f.startFiler()

4
weed/command/server.go

@ -203,7 +203,7 @@ func runServer(cmd *Command, args []string) bool {
// ip address // ip address
masterOptions.ip = serverIp masterOptions.ip = serverIp
masterOptions.ipBind = serverBindIp masterOptions.ipBind = serverBindIp
filerOptions.masters = pb.ServerAddresses(*masterOptions.peers).ToAddressMap()
filerOptions.masters = pb.ServerAddresses(*masterOptions.peers).ToServiceDiscovery()
filerOptions.ip = serverIp filerOptions.ip = serverIp
filerOptions.bindIp = serverBindIp filerOptions.bindIp = serverBindIp
s3Options.bindIp = serverBindIp s3Options.bindIp = serverBindIp
@ -216,7 +216,7 @@ func runServer(cmd *Command, args []string) bool {
serverOptions.v.dataCenter = serverDataCenter serverOptions.v.dataCenter = serverDataCenter
serverOptions.v.rack = serverRack serverOptions.v.rack = serverRack
mqBrokerOptions.ip = serverIp mqBrokerOptions.ip = serverIp
mqBrokerOptions.masters = filerOptions.masters
mqBrokerOptions.masters = filerOptions.masters.GetInstancesAsMap()
mqBrokerOptions.filerGroup = filerOptions.filerGroup mqBrokerOptions.filerGroup = filerOptions.filerGroup
// serverOptions.v.pulseSeconds = pulseSeconds // serverOptions.v.pulseSeconds = pulseSeconds

3
weed/filer/filer.go

@ -52,8 +52,7 @@ type Filer struct {
Dlm *lock_manager.DistributedLockManager Dlm *lock_manager.DistributedLockManager
} }
func NewFiler(masters map[string]pb.ServerAddress, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress,
filerGroup string, collection string, replication string, dataCenter string, notifyFn func()) *Filer {
func NewFiler(masters pb.ServerDiscovery, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, filerGroup string, collection string, replication string, dataCenter string, notifyFn func()) *Filer {
f := &Filer{ f := &Filer{
MasterClient: wdclient.NewMasterClient(grpcDialOption, filerGroup, cluster.FilerType, filerHost, dataCenter, "", masters), MasterClient: wdclient.NewMasterClient(grpcDialOption, filerGroup, cluster.FilerType, filerHost, dataCenter, "", masters),
fileIdDeletionQueue: util.NewUnboundedQueue(), fileIdDeletionQueue: util.NewUnboundedQueue(),

7
weed/filer/leveldb/leveldb_store_test.go

@ -3,6 +3,7 @@ package leveldb
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"os" "os"
"testing" "testing"
"time" "time"
@ -12,7 +13,7 @@ import (
) )
func TestCreateAndFind(t *testing.T) { func TestCreateAndFind(t *testing.T) {
testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil)
testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil)
dir := t.TempDir() dir := t.TempDir()
store := &LevelDBStore{} store := &LevelDBStore{}
store.initialize(dir) store.initialize(dir)
@ -65,7 +66,7 @@ func TestCreateAndFind(t *testing.T) {
} }
func TestEmptyRoot(t *testing.T) { func TestEmptyRoot(t *testing.T) {
testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil)
testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil)
dir := t.TempDir() dir := t.TempDir()
store := &LevelDBStore{} store := &LevelDBStore{}
store.initialize(dir) store.initialize(dir)
@ -87,7 +88,7 @@ func TestEmptyRoot(t *testing.T) {
} }
func BenchmarkInsertEntry(b *testing.B) { func BenchmarkInsertEntry(b *testing.B) {
testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil)
testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil)
dir := b.TempDir() dir := b.TempDir()
store := &LevelDBStore{} store := &LevelDBStore{}
store.initialize(dir) store.initialize(dir)

5
weed/filer/leveldb2/leveldb2_store_test.go

@ -2,6 +2,7 @@ package leveldb
import ( import (
"context" "context"
"github.com/seaweedfs/seaweedfs/weed/pb"
"testing" "testing"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
@ -9,7 +10,7 @@ import (
) )
func TestCreateAndFind(t *testing.T) { func TestCreateAndFind(t *testing.T) {
testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil)
testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil)
dir := t.TempDir() dir := t.TempDir()
store := &LevelDB2Store{} store := &LevelDB2Store{}
store.initialize(dir, 2) store.initialize(dir, 2)
@ -62,7 +63,7 @@ func TestCreateAndFind(t *testing.T) {
} }
func TestEmptyRoot(t *testing.T) { func TestEmptyRoot(t *testing.T) {
testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil)
testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil)
dir := t.TempDir() dir := t.TempDir()
store := &LevelDB2Store{} store := &LevelDB2Store{}
store.initialize(dir, 2) store.initialize(dir, 2)

5
weed/filer/leveldb3/leveldb3_store_test.go

@ -2,6 +2,7 @@ package leveldb
import ( import (
"context" "context"
"github.com/seaweedfs/seaweedfs/weed/pb"
"testing" "testing"
"github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/filer"
@ -9,7 +10,7 @@ import (
) )
func TestCreateAndFind(t *testing.T) { func TestCreateAndFind(t *testing.T) {
testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil)
testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil)
dir := t.TempDir() dir := t.TempDir()
store := &LevelDB3Store{} store := &LevelDB3Store{}
store.initialize(dir) store.initialize(dir)
@ -62,7 +63,7 @@ func TestCreateAndFind(t *testing.T) {
} }
func TestEmptyRoot(t *testing.T) { func TestEmptyRoot(t *testing.T) {
testFiler := filer.NewFiler(nil, nil, "", "", "", "", "", nil)
testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", "", "", "", "", nil)
dir := t.TempDir() dir := t.TempDir()
store := &LevelDB3Store{} store := &LevelDB3Store{}
store.initialize(dir) store.initialize(dir)

6
weed/filer/rocksdb/rocksdb_store_test.go

@ -15,7 +15,7 @@ import (
) )
func TestCreateAndFind(t *testing.T) { func TestCreateAndFind(t *testing.T) {
testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", 0, "", "", "", nil)
dir := t.TempDir() dir := t.TempDir()
store := &RocksDBStore{} store := &RocksDBStore{}
store.initialize(dir) store.initialize(dir)
@ -68,7 +68,7 @@ func TestCreateAndFind(t *testing.T) {
} }
func TestEmptyRoot(t *testing.T) { func TestEmptyRoot(t *testing.T) {
testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", 0, "", "", "", nil)
dir := t.TempDir() dir := t.TempDir()
store := &RocksDBStore{} store := &RocksDBStore{}
store.initialize(dir) store.initialize(dir)
@ -90,7 +90,7 @@ func TestEmptyRoot(t *testing.T) {
} }
func BenchmarkInsertEntry(b *testing.B) { func BenchmarkInsertEntry(b *testing.B) {
testFiler := filer.NewFiler(nil, nil, "", 0, "", "", "", nil)
testFiler := filer.NewFiler(pb.ServerDiscovery{}, nil, "", 0, "", "", "", nil)
dir := b.TempDir() dir := b.TempDir()
store := &RocksDBStore{} store := &RocksDBStore{}
store.initialize(dir) store.initialize(dir)

2
weed/iamapi/iamapi_server.go

@ -50,7 +50,7 @@ var s3ApiConfigure IamS3ApiConfig
func NewIamApiServer(router *mux.Router, option *IamServerOption) (iamApiServer *IamApiServer, err error) { func NewIamApiServer(router *mux.Router, option *IamServerOption) (iamApiServer *IamApiServer, err error) {
s3ApiConfigure = IamS3ApiConfigure{ s3ApiConfigure = IamS3ApiConfigure{
option: option, option: option,
masterClient: wdclient.NewMasterClient(option.GrpcDialOption, "", "iam", "", "", "", option.Masters),
masterClient: wdclient.NewMasterClient(option.GrpcDialOption, "", "iam", "", "", "", *pb.NewServiceDiscoveryFromMap(option.Masters)),
} }
s3Option := s3api.S3ApiServerOption{Filer: option.Filer} s3Option := s3api.S3ApiServerOption{Filer: option.Filer}
iamApiServer = &IamApiServer{ iamApiServer = &IamApiServer{

2
weed/mq/broker/broker_server.go

@ -41,7 +41,7 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
mqBroker = &MessageQueueBroker{ mqBroker = &MessageQueueBroker{
option: option, option: option,
grpcDialOption: grpcDialOption, grpcDialOption: grpcDialOption,
MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, option.Masters),
MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, *pb.NewServiceDiscoveryFromMap(option.Masters)),
filers: make(map[pb.ServerAddress]struct{}), filers: make(map[pb.ServerAddress]struct{}),
localTopicManager: topic.NewLocalTopicManager(), localTopicManager: topic.NewLocalTopicManager(),
} }

37
weed/pb/server_address.go

@ -11,6 +11,7 @@ import (
type ServerAddress string type ServerAddress string
type ServerAddresses string type ServerAddresses string
type ServerSrvAddress string
func NewServerAddress(host string, port int, grpcPort int) ServerAddress { func NewServerAddress(host string, port int, grpcPort int) ServerAddress {
if grpcPort == 0 || grpcPort == port+10000 { if grpcPort == 0 || grpcPort == port+10000 {
@ -76,6 +77,42 @@ func (sa ServerAddress) ToGrpcAddress() string {
return ServerToGrpcAddress(string(sa)) return ServerToGrpcAddress(string(sa))
} }
// LookUp may return an error for some records along with successful lookups - make sure you do not
// discard `addresses` even if `err == nil`
func (r ServerSrvAddress) LookUp() (addresses []ServerAddress, err error) {
_, records, lookupErr := net.LookupSRV("", "", string(r))
if lookupErr != nil {
err = fmt.Errorf("lookup SRV address %s: %v", r, lookupErr)
}
for _, srv := range records {
address := fmt.Sprintf("%s:%d", srv.Target, srv.Port)
addresses = append(addresses, ServerAddress(address))
}
return
}
// ToServiceDiscovery expects one of: a comma-separated list of ip:port, like
//
// 10.0.0.1:9999,10.0.0.2:24:9999
//
// OR an SRV Record prepended with 'dnssrv+', like:
//
// dnssrv+_grpc._tcp.master.consul
// dnssrv+_grpc._tcp.headless.default.svc.cluster.local
// dnssrv+seaweed-master.master.consul
func (sa ServerAddresses) ToServiceDiscovery() (sd *ServerDiscovery) {
sd = &ServerDiscovery{}
prefix := "dnssrv+"
if strings.HasPrefix(string(sa), prefix) {
trimmed := strings.TrimPrefix(string(sa), prefix)
srv := ServerSrvAddress(trimmed)
sd.srvRecord = &srv
} else {
sd.list = sa.ToAddresses()
}
return
}
func (sa ServerAddresses) ToAddresses() (addresses []ServerAddress) { func (sa ServerAddresses) ToAddresses() (addresses []ServerAddress) {
parts := strings.Split(string(sa), ",") parts := strings.Split(string(sa), ",")
for _, address := range parts { for _, address := range parts {

36
weed/pb/server_address_test.go

@ -0,0 +1,36 @@
package pb
import (
"reflect"
"testing"
)
func TestServerAddresses_ToAddressMapOrSrv_shouldRemovePrefix(t *testing.T) {
str := ServerAddresses("dnssrv+hello.srv.consul")
d := str.ToServiceDiscovery()
expected := ServerSrvAddress("hello.srv.consul")
if *d.srvRecord != expected {
t.Fatalf(`ServerAddresses("dnssrv+hello.srv.consul") = %s, expected %s`, *d.srvRecord, expected)
}
}
func TestServerAddresses_ToAddressMapOrSrv_shouldHandleIPPortList(t *testing.T) {
str := ServerAddresses("10.0.0.1:23,10.0.0.2:24")
d := str.ToServiceDiscovery()
if d.srvRecord != nil {
t.Fatalf(`ServerAddresses("dnssrv+hello.srv.consul") = %s, expected nil`, *d.srvRecord)
}
expected := []ServerAddress{
ServerAddress("10.0.0.1:23"),
ServerAddress("10.0.0.2:24"),
}
if !reflect.DeepEqual(d.list, expected) {
t.Fatalf(`Expected %q, got %q`, expected, d.list)
}
}

62
weed/pb/server_discovery.go

@ -0,0 +1,62 @@
package pb
import (
"github.com/seaweedfs/seaweedfs/weed/glog"
"reflect"
)
// ServerDiscovery encodes a way to find at least 1 instance of a service,
// and provides utility functions to refresh the instance list
type ServerDiscovery struct {
list []ServerAddress
srvRecord *ServerSrvAddress
}
func NewServiceDiscoveryFromMap(m map[string]ServerAddress) (sd *ServerDiscovery) {
sd = &ServerDiscovery{}
for _, s := range m {
sd.list = append(sd.list, s)
}
return sd
}
// RefreshBySrvIfAvailable performs a DNS SRV lookup and updates list with the results
// of the lookup
func (sd *ServerDiscovery) RefreshBySrvIfAvailable() {
if sd.srvRecord == nil {
return
}
newList, err := sd.srvRecord.LookUp()
if err != nil {
glog.V(0).Infof("failed to lookup SRV for %s: %v", *sd.srvRecord, err)
}
if newList == nil || len(newList) == 0 {
glog.V(0).Infof("looked up SRV for %s, but found no well-formed names", *sd.srvRecord)
return
}
if !reflect.DeepEqual(sd.list, newList) {
sd.list = newList
}
}
// GetInstances returns a copy of the latest known list of addresses
// call RefreshBySrvIfAvailable prior to this in order to get a more up-to-date view
func (sd *ServerDiscovery) GetInstances() (addresses []ServerAddress) {
for _, a := range sd.list {
addresses = append(addresses, a)
}
return addresses
}
func (sd *ServerDiscovery) GetInstancesAsStrings() (addresses []string) {
for _, i := range sd.list {
addresses = append(addresses, string(i))
}
return addresses
}
func (sd *ServerDiscovery) GetInstancesAsMap() (addresses map[string]ServerAddress) {
addresses = make(map[string]ServerAddress)
for _, i := range sd.list {
addresses[string(i)] = i
}
return addresses
}

2
weed/server/filer_grpc_server_admin.go

@ -87,7 +87,7 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.
clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId")) clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId"))
t := &filer_pb.GetFilerConfigurationResponse{ t := &filer_pb.GetFilerConfigurationResponse{
Masters: pb.ToAddressStringsFromMap(fs.option.Masters),
Masters: fs.option.Masters.GetInstancesAsStrings(),
Collection: fs.option.Collection, Collection: fs.option.Collection,
Replication: fs.option.DefaultReplication, Replication: fs.option.DefaultReplication,
MaxMb: uint32(fs.option.MaxMB), MaxMb: uint32(fs.option.MaxMB),

10
weed/server/filer_server.go

@ -50,7 +50,7 @@ import (
) )
type FilerOption struct { type FilerOption struct {
Masters map[string]pb.ServerAddress
Masters *pb.ServerDiscovery
FilerGroup string FilerGroup string
Collection string Collection string
DefaultReplication string DefaultReplication string
@ -118,11 +118,12 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
} }
fs.listenersCond = sync.NewCond(&fs.listenersLock) fs.listenersCond = sync.NewCond(&fs.listenersLock)
if len(option.Masters) == 0 {
option.Masters.RefreshBySrvIfAvailable()
if len(option.Masters.GetInstances()) == 0 {
glog.Fatal("master list is required!") glog.Fatal("master list is required!")
} }
fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.FilerGroup, option.Collection, option.DefaultReplication, option.DataCenter, func() {
fs.filer = filer.NewFiler(*option.Masters, fs.grpcDialOption, option.Host, option.FilerGroup, option.Collection, option.DefaultReplication, option.DataCenter, func() {
fs.listenersCond.Broadcast() fs.listenersCond.Broadcast()
}) })
fs.filer.Cipher = option.Cipher fs.filer.Cipher = option.Cipher
@ -195,7 +196,8 @@ func (fs *FilerServer) checkWithMaster() {
isConnected := false isConnected := false
for !isConnected { for !isConnected {
for _, master := range fs.option.Masters {
fs.option.Masters.RefreshBySrvIfAvailable()
for _, master := range fs.option.Masters.GetInstances() {
readErr := operation.WithMasterServerClient(false, master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { readErr := operation.WithMasterServerClient(false, master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
if err != nil { if err != nil {

2
weed/server/master_server.go

@ -110,7 +110,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.Se
vgCh: make(chan *topology.VolumeGrowRequest, 1<<6), vgCh: make(chan *topology.VolumeGrowRequest, 1<<6),
clientChans: make(map[string]chan *master_pb.KeepConnectedResponse), clientChans: make(map[string]chan *master_pb.KeepConnectedResponse),
grpcDialOption: grpcDialOption, grpcDialOption: grpcDialOption,
MasterClient: wdclient.NewMasterClient(grpcDialOption, "", cluster.MasterType, option.Master, "", "", peers),
MasterClient: wdclient.NewMasterClient(grpcDialOption, "", cluster.MasterType, option.Master, "", "", *pb.NewServiceDiscoveryFromMap(peers)),
adminLocks: NewAdminLocks(), adminLocks: NewAdminLocks(),
Cluster: cluster.NewCluster(), Cluster: cluster.NewCluster(),
} }

2
weed/shell/commands.go

@ -51,7 +51,7 @@ var (
func NewCommandEnv(options *ShellOptions) *CommandEnv { func NewCommandEnv(options *ShellOptions) *CommandEnv {
ce := &CommandEnv{ ce := &CommandEnv{
env: make(map[string]string), env: make(map[string]string),
MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, *options.FilerGroup, pb.AdminShellClient, "", "", "", pb.ServerAddresses(*options.Masters).ToAddressMap()),
MasterClient: wdclient.NewMasterClient(options.GrpcDialOption, *options.FilerGroup, pb.AdminShellClient, "", "", "", *pb.ServerAddresses(*options.Masters).ToServiceDiscovery()),
option: options, option: options,
} }
ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient, "shell") ce.locker = exclusive_locks.NewExclusiveLocker(ce.MasterClient, "shell")

1
weed/stats/metrics.go

@ -299,7 +299,6 @@ func JoinHostPort(host string, port int) string {
return net.JoinHostPort(host, portStr) return net.JoinHostPort(host, portStr)
} }
func StartMetricsServer(ip string, port int) { func StartMetricsServer(ip string, port int) {
if port == 0 { if port == 0 {
return return

15
weed/wdclient/masterclient.go

@ -24,8 +24,8 @@ type MasterClient struct {
rack string rack string
currentMaster pb.ServerAddress currentMaster pb.ServerAddress
currentMasterLock sync.RWMutex currentMasterLock sync.RWMutex
masters map[string]pb.ServerAddress
grpcDialOption grpc.DialOption
masters pb.ServerDiscovery
grpcDialOption grpc.DialOption
*vidMap *vidMap
vidMapCacheSize int vidMapCacheSize int
@ -33,7 +33,7 @@ type MasterClient struct {
OnPeerUpdateLock sync.RWMutex OnPeerUpdateLock sync.RWMutex
} }
func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters map[string]pb.ServerAddress) *MasterClient {
func NewMasterClient(grpcDialOption grpc.DialOption, filerGroup string, clientType string, clientHost pb.ServerAddress, clientDataCenter string, rack string, masters pb.ServerDiscovery) *MasterClient {
return &MasterClient{ return &MasterClient{
FilerGroup: filerGroup, FilerGroup: filerGroup,
clientType: clientType, clientType: clientType,
@ -108,9 +108,9 @@ func (mc *MasterClient) GetMaster() pb.ServerAddress {
return mc.getCurrentMaster() return mc.getCurrentMaster()
} }
func (mc *MasterClient) GetMasters() map[string]pb.ServerAddress {
func (mc *MasterClient) GetMasters() []pb.ServerAddress {
mc.WaitUntilConnected() mc.WaitUntilConnected()
return mc.masters
return mc.masters.GetInstances()
} }
func (mc *MasterClient) WaitUntilConnected() { func (mc *MasterClient) WaitUntilConnected() {
@ -132,7 +132,7 @@ func (mc *MasterClient) KeepConnectedToMaster() {
} }
func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddress) (leader string) { func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddress) (leader string) {
for _, master := range mc.masters {
for _, master := range mc.masters.GetInstances() {
if master == myMasterAddress { if master == myMasterAddress {
continue continue
} }
@ -159,7 +159,8 @@ func (mc *MasterClient) FindLeaderFromOtherPeers(myMasterAddress pb.ServerAddres
func (mc *MasterClient) tryAllMasters() { func (mc *MasterClient) tryAllMasters() {
var nextHintedLeader pb.ServerAddress var nextHintedLeader pb.ServerAddress
for _, master := range mc.masters {
mc.masters.RefreshBySrvIfAvailable()
for _, master := range mc.masters.GetInstances() {
nextHintedLeader = mc.tryConnectToMaster(master) nextHintedLeader = mc.tryConnectToMaster(master)
for nextHintedLeader != "" { for nextHintedLeader != "" {
nextHintedLeader = mc.tryConnectToMaster(nextHintedLeader) nextHintedLeader = mc.tryConnectToMaster(nextHintedLeader)

5
weed/wdclient/vid_map_test.go

@ -3,6 +3,7 @@ package wdclient
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/seaweedfs/seaweedfs/weed/pb"
"google.golang.org/grpc" "google.golang.org/grpc"
"strconv" "strconv"
"sync" "sync"
@ -65,7 +66,7 @@ func TestLocationIndex(t *testing.T) {
} }
func TestLookupFileId(t *testing.T) { func TestLookupFileId(t *testing.T) {
mc := NewMasterClient(grpc.EmptyDialOption{}, "", "", "", "", "", nil)
mc := NewMasterClient(grpc.EmptyDialOption{}, "", "", "", "", "", pb.ServerDiscovery{})
length := 5 length := 5
//Construct a cache linked list of length 5 //Construct a cache linked list of length 5
@ -135,7 +136,7 @@ func TestLookupFileId(t *testing.T) {
} }
func TestConcurrentGetLocations(t *testing.T) { func TestConcurrentGetLocations(t *testing.T) {
mc := NewMasterClient(grpc.EmptyDialOption{}, "", "", "", "", "", nil)
mc := NewMasterClient(grpc.EmptyDialOption{}, "", "", "", "", "", pb.ServerDiscovery{})
location := Location{Url: "TestDataRacing"} location := Location{Url: "TestDataRacing"}
mc.addLocation(1, location) mc.addLocation(1, location)

Loading…
Cancel
Save