From 4729a57cc01ca360e7aa70c43b2290bb2e8bcd2d Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Mon, 8 Nov 2021 00:09:11 -0800 Subject: [PATCH] use constants --- weed/cluster/cluster.go | 17 +++++++++++------ weed/filer/filer.go | 7 ++++--- weed/filer/meta_aggregator.go | 3 ++- .../broker/broker_grpc_server_discovery.go | 3 ++- weed/server/master_server.go | 2 +- weed/shell/command_cluster_ps.go | 3 ++- weed/shell/shell_liner.go | 3 ++- 7 files changed, 24 insertions(+), 14 deletions(-) diff --git a/weed/cluster/cluster.go b/weed/cluster/cluster.go index 11187169e..348646f04 100644 --- a/weed/cluster/cluster.go +++ b/weed/cluster/cluster.go @@ -8,6 +8,11 @@ import ( "time" ) +const ( + MasterType = "master" + FilerType = "filer" +) + type ClusterNode struct { Address pb.ServerAddress Version string @@ -34,7 +39,7 @@ func NewCluster() *Cluster { func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress, version string) []*master_pb.KeepConnectedResponse { switch nodeType { - case "filer": + case FilerType: cluster.nodesLock.Lock() defer cluster.nodesLock.Unlock() if existingNode, found := cluster.nodes[address]; found { @@ -48,14 +53,14 @@ func (cluster *Cluster) AddClusterNode(nodeType string, address pb.ServerAddress createdTs: time.Now(), } return cluster.ensureLeader(true, nodeType, address) - case "master": + case MasterType: } return nil } func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddress) []*master_pb.KeepConnectedResponse { switch nodeType { - case "filer": + case FilerType: cluster.nodesLock.Lock() defer cluster.nodesLock.Unlock() if existingNode, found := cluster.nodes[address]; !found { @@ -67,20 +72,20 @@ func (cluster *Cluster) RemoveClusterNode(nodeType string, address pb.ServerAddr return cluster.ensureLeader(false, nodeType, address) } } - case "master": + case MasterType: } return nil } func (cluster *Cluster) ListClusterNode(nodeType string) (nodes []*ClusterNode) { switch nodeType { - case "filer": + case FilerType: cluster.nodesLock.RLock() defer cluster.nodesLock.RUnlock() for _, node := range cluster.nodes { nodes = append(nodes, node) } - case "master": + case MasterType: } return } diff --git a/weed/filer/filer.go b/weed/filer/filer.go index 6dca3321f..9eabbc337 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -3,6 +3,7 @@ package filer import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/cluster" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "os" @@ -51,7 +52,7 @@ type Filer struct { func NewFiler(masters []pb.ServerAddress, grpcDialOption grpc.DialOption, filerHost pb.ServerAddress, collection string, replication string, dataCenter string, notifyFn func()) *Filer { f := &Filer{ - MasterClient: wdclient.NewMasterClient(grpcDialOption, "filer", filerHost, dataCenter, masters), + MasterClient: wdclient.NewMasterClient(grpcDialOption, cluster.FilerType, filerHost, dataCenter, masters), fileIdDeletionQueue: util.NewUnboundedQueue(), GrpcDialOption: grpcDialOption, FilerConf: NewFilerConf(), @@ -82,13 +83,13 @@ func (f *Filer) ListExistingPeerUpdates() (existingNodes []*master_pb.ClusterNod if grpcErr := pb.WithMasterClient(f.MasterClient.GetMaster(), f.GrpcDialOption, func(client master_pb.SeaweedClient) error { resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ - ClientType: "filer", + ClientType: cluster.FilerType, }) glog.V(0).Infof("the cluster has %d filers\n", len(resp.ClusterNodes)) for _, node := range resp.ClusterNodes { existingNodes = append(existingNodes, &master_pb.ClusterNodeUpdate{ - NodeType: "filer", + NodeType: cluster.FilerType, Address: node.Address, IsLeader: node.IsLeader, IsAdd: true, diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 6e42b1902..bb2c947e5 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -3,6 +3,7 @@ package filer import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/cluster" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/util" "io" @@ -48,7 +49,7 @@ func NewMetaAggregator(filer *Filer, self pb.ServerAddress, grpcDialOption grpc. } func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate) { - if update.NodeType != "filer" { + if update.NodeType != cluster.FilerType { return } diff --git a/weed/messaging/broker/broker_grpc_server_discovery.go b/weed/messaging/broker/broker_grpc_server_discovery.go index 3ff5c11d1..66821d404 100644 --- a/weed/messaging/broker/broker_grpc_server_discovery.go +++ b/weed/messaging/broker/broker_grpc_server_discovery.go @@ -3,6 +3,7 @@ package broker import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/cluster" "github.com/chrislusf/seaweedfs/weed/pb" "time" @@ -94,7 +95,7 @@ func (broker *MessageBroker) checkFilers() { for _, master := range masters { err := broker.withMasterClient(master, func(client master_pb.SeaweedClient) error { resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ - ClientType: "filer", + ClientType: cluster.FilerType, }) if err != nil { return err diff --git a/weed/server/master_server.go b/weed/server/master_server.go index a73e8384e..d2286dfbf 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -104,7 +104,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []pb.ServerAddre vgCh: make(chan *topology.VolumeGrowRequest, 1<<6), clientChans: make(map[string]chan *master_pb.KeepConnectedResponse), grpcDialOption: grpcDialOption, - MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Master, "", peers), + MasterClient: wdclient.NewMasterClient(grpcDialOption, cluster.MasterType, option.Master, "", peers), adminLocks: NewAdminLocks(), Cluster: cluster.NewCluster(), } diff --git a/weed/shell/command_cluster_ps.go b/weed/shell/command_cluster_ps.go index ff92064db..5ed1677c8 100644 --- a/weed/shell/command_cluster_ps.go +++ b/weed/shell/command_cluster_ps.go @@ -4,6 +4,7 @@ import ( "context" "flag" "fmt" + "github.com/chrislusf/seaweedfs/weed/cluster" "io" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -37,7 +38,7 @@ func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.W err = commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ - ClientType: "filer", + ClientType: cluster.FilerType, }) fmt.Fprintf(writer, "the cluster has %d filers\n", len(resp.ClusterNodes)) diff --git a/weed/shell/shell_liner.go b/weed/shell/shell_liner.go index fed62aba1..caf8da859 100644 --- a/weed/shell/shell_liner.go +++ b/weed/shell/shell_liner.go @@ -3,6 +3,7 @@ package shell import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/cluster" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -54,7 +55,7 @@ func RunShell(options ShellOptions) { var filers []pb.ServerAddress commandEnv.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { resp, err := client.ListClusterNodes(context.Background(), &master_pb.ListClusterNodesRequest{ - ClientType: "filer", + ClientType: cluster.FilerType, }) if err != nil { return err