Browse Source

dynamically connect to a filer

pull/3379/head
chrislu 2 years ago
parent
commit
11d79615c8
  1. 34
      weed/cluster/master_client.go
  2. 30
      weed/command/mq_broker.go
  3. 1
      weed/command/server.go
  4. 23
      weed/filer/filer.go
  5. 72
      weed/mq/broker/broker_grpc_server_discovery.go
  6. 40
      weed/mq/broker/broker_server.go

34
weed/cluster/master_client.go

@ -0,0 +1,34 @@
package cluster
import (
"context"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"google.golang.org/grpc"
)
func ListExistingPeerUpdates(master pb.ServerAddress, grpcDialOption grpc.DialOption, filerGroup string, clientType string) (existingNodes []*master_pb.ClusterNodeUpdate) {
if grpcErr := pb.WithMasterClient(false, master, grpcDialOption, func(client master_pb.SeaweedClient) error {
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
ClientType: clientType,
FilerGroup: filerGroup,
})
glog.V(0).Infof("the cluster has %d %s\n", len(resp.ClusterNodes), clientType)
for _, node := range resp.ClusterNodes {
existingNodes = append(existingNodes, &master_pb.ClusterNodeUpdate{
NodeType: FilerType,
Address: node.Address,
IsLeader: node.IsLeader,
IsAdd: true,
CreatedAtNs: node.CreatedAtNs,
})
}
return err
}); grpcErr != nil {
glog.V(0).Infof("connect to %s: %v", master, grpcErr)
}
return
}

30
weed/command/mq_broker.go

@ -1,10 +1,6 @@
package command package command
import ( import (
"context"
"fmt"
"time"
"google.golang.org/grpc/reflection" "google.golang.org/grpc/reflection"
"github.com/chrislusf/seaweedfs/weed/util/grace" "github.com/chrislusf/seaweedfs/weed/util/grace"
@ -12,7 +8,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/mq/broker" "github.com/chrislusf/seaweedfs/weed/mq/broker"
"github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb" "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
"github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util"
@ -26,7 +21,6 @@ type MessageQueueBrokerOptions struct {
masters map[string]pb.ServerAddress masters map[string]pb.ServerAddress
mastersString *string mastersString *string
filerGroup *string filerGroup *string
filer *string
ip *string ip *string
port *int port *int
dataCenter *string dataCenter *string
@ -38,7 +32,6 @@ type MessageQueueBrokerOptions struct {
func init() { func init() {
cmdMqBroker.Run = runMqBroker // break init cycle cmdMqBroker.Run = runMqBroker // break init cycle
mqBrokerStandaloneOptions.mastersString = cmdMqBroker.Flag.String("master", "localhost:9333", "comma-separated master servers") mqBrokerStandaloneOptions.mastersString = cmdMqBroker.Flag.String("master", "localhost:9333", "comma-separated master servers")
mqBrokerStandaloneOptions.filer = cmdMqBroker.Flag.String("filer", "localhost:8888", "filer server address")
mqBrokerStandaloneOptions.filerGroup = cmdMqBroker.Flag.String("filerGroup", "", "share metadata with other filers in the same filerGroup") mqBrokerStandaloneOptions.filerGroup = cmdMqBroker.Flag.String("filerGroup", "", "share metadata with other filers in the same filerGroup")
mqBrokerStandaloneOptions.ip = cmdMqBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address") mqBrokerStandaloneOptions.ip = cmdMqBroker.Flag.String("ip", util.DetectedHostAddress(), "broker host address")
mqBrokerStandaloneOptions.port = cmdMqBroker.Flag.Int("port", 17777, "broker gRPC listen port") mqBrokerStandaloneOptions.port = cmdMqBroker.Flag.Int("port", 17777, "broker gRPC listen port")
@ -73,40 +66,17 @@ func (mqBrokerOpt *MessageQueueBrokerOptions) startQueueServer() bool {
grace.SetupProfiling(*mqBrokerStandaloneOptions.cpuprofile, *mqBrokerStandaloneOptions.memprofile) grace.SetupProfiling(*mqBrokerStandaloneOptions.cpuprofile, *mqBrokerStandaloneOptions.memprofile)
filerAddress := pb.ServerAddress(*mqBrokerOpt.filer)
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker") grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.msg_broker")
cipher := false
for {
err := pb.WithGrpcFilerClient(false, filerAddress, grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return fmt.Errorf("get filer %s configuration: %v", filerAddress, err)
}
cipher = resp.Cipher
return nil
})
if err != nil {
glog.V(0).Infof("wait to connect to filer %s grpc address %s", *mqBrokerOpt.filer, filerAddress.ToGrpcAddress())
time.Sleep(time.Second)
} else {
glog.V(0).Infof("connected to filer %s grpc address %s", *mqBrokerOpt.filer, filerAddress.ToGrpcAddress())
break
}
}
qs, err := broker.NewMessageBroker(&broker.MessageQueueBrokerOption{ qs, err := broker.NewMessageBroker(&broker.MessageQueueBrokerOption{
Masters: mqBrokerOpt.masters, Masters: mqBrokerOpt.masters,
FilerGroup: *mqBrokerOpt.filerGroup, FilerGroup: *mqBrokerOpt.filerGroup,
DataCenter: *mqBrokerOpt.dataCenter, DataCenter: *mqBrokerOpt.dataCenter,
Rack: *mqBrokerOpt.rack, Rack: *mqBrokerOpt.rack,
Filers: []pb.ServerAddress{filerAddress},
DefaultReplication: "", DefaultReplication: "",
MaxMB: 0, MaxMB: 0,
Ip: *mqBrokerOpt.ip, Ip: *mqBrokerOpt.ip,
Port: *mqBrokerOpt.port, Port: *mqBrokerOpt.port,
Cipher: cipher,
}, grpcDialOption) }, grpcDialOption)
// start grpc listener // start grpc listener

1
weed/command/server.go

@ -228,7 +228,6 @@ func runServer(cmd *Command, args []string) bool {
s3Options.filer = &filerAddress s3Options.filer = &filerAddress
iamOptions.filer = &filerAddress iamOptions.filer = &filerAddress
webdavOptions.filer = &filerAddress webdavOptions.filer = &filerAddress
mqBrokerOptions.filer = &filerAddress
mqBrokerOptions.filerGroup = filerOptions.filerGroup mqBrokerOptions.filerGroup = filerOptions.filerGroup
go stats_collect.StartMetricsServer(*serverMetricsHttpPort) go stats_collect.StartMetricsServer(*serverMetricsHttpPort)

23
weed/filer/filer.go

@ -99,28 +99,7 @@ func (f *Filer) AggregateFromPeers(self pb.ServerAddress, existingNodes []*maste
} }
func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNodeUpdate) { func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNodeUpdate) {
if grpcErr := pb.WithMasterClient(false, f.MasterClient.GetMaster(), f.GrpcDialOption, func(client master_pb.SeaweedClient) error {
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
ClientType: cluster.FilerType,
FilerGroup: f.MasterClient.FilerGroup,
})
glog.V(0).Infof("the cluster has %d filers\n", len(resp.ClusterNodes))
for _, node := range resp.ClusterNodes {
existingNodes = append(existingNodes, &master_pb.ClusterNodeUpdate{
NodeType: cluster.FilerType,
Address: node.Address,
IsLeader: node.IsLeader,
IsAdd: true,
CreatedAtNs: node.CreatedAtNs,
})
}
return err
}); grpcErr != nil {
glog.V(0).Infof("connect to %s: %v", f.MasterClient.GetMaster(), grpcErr)
}
return
return cluster.ListExistingPeerUpdates(f.GetMaster(), f.GrpcDialOption, f.MasterClient.FilerGroup, cluster.FilerType)
} }
func (f *Filer) SetStore(store FilerStore) (isFresh bool) { func (f *Filer) SetStore(store FilerStore) (isFresh bool) {

72
weed/mq/broker/broker_grpc_server_discovery.go

@ -1,72 +0,0 @@
package broker
import (
"context"
"github.com/chrislusf/seaweedfs/weed/cluster"
"github.com/chrislusf/seaweedfs/weed/pb"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
)
func (broker *MessageQueueBroker) checkFilers() {
// contact a filer about masters
var masters []pb.ServerAddress
found := false
for !found {
for _, filer := range broker.option.Filers {
err := broker.withFilerClient(false, filer, func(client filer_pb.SeaweedFilerClient) error {
resp, err := client.GetFilerConfiguration(context.Background(), &filer_pb.GetFilerConfigurationRequest{})
if err != nil {
return err
}
for _, m := range resp.Masters {
masters = append(masters, pb.ServerAddress(m))
}
return nil
})
if err == nil {
found = true
break
}
glog.V(0).Infof("failed to read masters from %+v: %v", broker.option.Filers, err)
time.Sleep(time.Second)
}
}
glog.V(0).Infof("received master list: %s", masters)
// contact each masters for filers
var filers []pb.ServerAddress
found = false
for !found {
for _, master := range masters {
err := broker.withMasterClient(false, master, func(client master_pb.SeaweedClient) error {
resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{
ClientType: cluster.FilerType,
})
if err != nil {
return err
}
for _, clusterNode := range resp.ClusterNodes {
filers = append(filers, pb.ServerAddress(clusterNode.Address))
}
return nil
})
if err == nil {
found = true
break
}
glog.V(0).Infof("failed to list filers: %v", err)
time.Sleep(time.Second)
}
}
glog.V(0).Infof("received filer list: %s", filers)
broker.option.Filers = filers
}

40
weed/mq/broker/broker_server.go

@ -5,6 +5,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb/mq_pb" "github.com/chrislusf/seaweedfs/weed/pb/mq_pb"
"github.com/chrislusf/seaweedfs/weed/wdclient" "github.com/chrislusf/seaweedfs/weed/wdclient"
"google.golang.org/grpc" "google.golang.org/grpc"
"time"
"github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@ -16,7 +17,6 @@ type MessageQueueBrokerOption struct {
FilerGroup string FilerGroup string
DataCenter string DataCenter string
Rack string Rack string
Filers []pb.ServerAddress
DefaultReplication string DefaultReplication string
MaxMB int MaxMB int
Ip string Ip string
@ -29,6 +29,8 @@ type MessageQueueBroker struct {
option *MessageQueueBrokerOption option *MessageQueueBrokerOption
grpcDialOption grpc.DialOption grpcDialOption grpc.DialOption
MasterClient *wdclient.MasterClient MasterClient *wdclient.MasterClient
filers map[pb.ServerAddress]struct{}
currentFiler pb.ServerAddress
} }
func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) { func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.DialOption) (mqBroker *MessageQueueBroker, err error) {
@ -37,15 +39,47 @@ func NewMessageBroker(option *MessageQueueBrokerOption, grpcDialOption grpc.Dial
option: option, option: option,
grpcDialOption: grpcDialOption, grpcDialOption: grpcDialOption,
MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, option.Masters), MasterClient: wdclient.NewMasterClient(grpcDialOption, option.FilerGroup, cluster.BrokerType, pb.NewServerAddress(option.Ip, option.Port, 0), option.DataCenter, option.Rack, option.Masters),
filers: make(map[pb.ServerAddress]struct{}),
} }
mqBroker.checkFilers()
mqBroker.MasterClient.OnPeerUpdate = mqBroker.OnBrokerUpdate
go mqBroker.MasterClient.KeepConnectedToMaster() go mqBroker.MasterClient.KeepConnectedToMaster()
existingNodes := cluster.ListExistingPeerUpdates(mqBroker.MasterClient.GetMaster(), grpcDialOption, option.FilerGroup, cluster.FilerType)
for _, newNode := range existingNodes {
mqBroker.OnBrokerUpdate(newNode, time.Now())
}
return mqBroker, nil return mqBroker, nil
} }
func (broker *MessageQueueBroker) OnBrokerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
if update.NodeType != cluster.FilerType {
return
}
address := pb.ServerAddress(update.Address)
if update.IsAdd {
broker.filers[address] = struct{}{}
if broker.currentFiler == "" {
broker.currentFiler = address
}
} else {
delete(broker.filers, address)
if broker.currentFiler == address {
for filer, _ := range broker.filers {
broker.currentFiler = filer
break
}
}
}
}
func (broker *MessageQueueBroker) GetFiler() pb.ServerAddress {
return broker.currentFiler
}
func (broker *MessageQueueBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error { func (broker *MessageQueueBroker) withFilerClient(streamingMode bool, filer pb.ServerAddress, fn func(filer_pb.SeaweedFilerClient) error) error {
return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn) return pb.WithFilerClient(streamingMode, filer, broker.grpcDialOption, fn)

Loading…
Cancel
Save