From 01bcc89803b5caefe6d1809d4a85bc8a1d19918e Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Sat, 28 Jul 2018 02:10:32 -0700 Subject: [PATCH] refactor into MasterClient --- weed/filer2/filer.go | 16 +++- .../masterclient.go} | 42 ++++++--- .../topolisenter/client_grpc_to_master.go | 85 ------------------- .../wdclient/topolisenter/cluster_listener.go | 56 ------------ weed/wdclient/wdclient.go | 37 ++------ 5 files changed, 48 insertions(+), 188 deletions(-) rename weed/{filer2/filer_master.go => wdclient/masterclient.go} (61%) delete mode 100644 weed/wdclient/topolisenter/client_grpc_to_master.go delete mode 100644 weed/wdclient/topolisenter/cluster_listener.go diff --git a/weed/filer2/filer.go b/weed/filer2/filer.go index 2deb8ffd5..f5c2849fe 100644 --- a/weed/filer2/filer.go +++ b/weed/filer2/filer.go @@ -11,20 +11,20 @@ import ( "path/filepath" "strings" "time" + "github.com/chrislusf/seaweedfs/weed/wdclient" + "context" ) type Filer struct { - masters []string store FilerStore directoryCache *ccache.Cache - - currentMaster string + masterClient *wdclient.MasterClient } func NewFiler(masters []string) *Filer { return &Filer{ - masters: masters, directoryCache: ccache.New(ccache.Configure().MaxSize(1000).ItemsToPrune(100)), + masterClient: wdclient.NewMasterClient(context.Background(), "filer", masters), } } @@ -36,6 +36,14 @@ func (f *Filer) DisableDirectoryCache() { f.directoryCache = nil } +func (fs *Filer) GetMaster() string { + return fs.masterClient.GetMaster() +} + +func (fs *Filer) KeepConnectedToMaster() { + fs.masterClient.KeepConnectedToMaster() +} + func (f *Filer) CreateEntry(entry *Entry) error { dirParts := strings.Split(string(entry.FullPath), "/") diff --git a/weed/filer2/filer_master.go b/weed/wdclient/masterclient.go similarity index 61% rename from weed/filer2/filer_master.go rename to weed/wdclient/masterclient.go index bbac17940..fb634d0f0 100644 --- a/weed/filer2/filer_master.go +++ b/weed/wdclient/masterclient.go @@ -1,28 +1,44 @@ -package filer2 +package wdclient import ( "context" + "time" "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/util" - "time" + "github.com/chrislusf/seaweedfs/weed/glog" ) -func (fs *Filer) GetMaster() string { - return fs.currentMaster +type MasterClient struct { + ctx context.Context + name string + currentMaster string + masters []string +} + +func NewMasterClient(ctx context.Context, clientName string, masters []string) *MasterClient { + return &MasterClient{ + ctx: ctx, + name: clientName, + masters: masters, + } +} + +func (mc *MasterClient) GetMaster() string { + return mc.currentMaster } -func (fs *Filer) KeepConnectedToMaster() { - glog.V(0).Infof("Filer bootstraps with masters %v", fs.masters) +func (mc *MasterClient) KeepConnectedToMaster() { + glog.V(0).Infof("%s bootstraps with masters %v", mc.name, mc.masters) for { - fs.tryAllMasters() + mc.tryAllMasters() time.Sleep(time.Second) } } -func (fs *Filer) tryAllMasters() { - for _, master := range fs.masters { +func (mc *MasterClient) tryAllMasters() { + for _, master := range mc.masters { glog.V(0).Infof("Connecting to %v", master) withMasterClient(master, func(client master_pb.SeaweedClient) error { stream, err := client.KeepConnected(context.Background()) @@ -32,9 +48,9 @@ func (fs *Filer) tryAllMasters() { } glog.V(0).Infof("Connected to %v", master) - fs.currentMaster = master + mc.currentMaster = master - if err = stream.Send(&master_pb.ClientListenRequest{Name: "filer"}); err != nil { + if err = stream.Send(&master_pb.ClientListenRequest{Name: mc.name}); err != nil { glog.V(0).Infof("failed to send to %s: %v", master, err) return err } @@ -48,7 +64,7 @@ func (fs *Filer) tryAllMasters() { } } }) - fs.currentMaster = "" + mc.currentMaster = "" } } diff --git a/weed/wdclient/topolisenter/client_grpc_to_master.go b/weed/wdclient/topolisenter/client_grpc_to_master.go deleted file mode 100644 index 863f79a1d..000000000 --- a/weed/wdclient/topolisenter/client_grpc_to_master.go +++ /dev/null @@ -1,85 +0,0 @@ -package clusterlistener - -import ( - "context" - "fmt" - "io" - - "google.golang.org/grpc" - - "code.uber.internal/fraud/alpine/.gen/proto/go/fraud/alpine" - "github.com/golang/glog" -) - -func (clusterListener *ClusterListener) establishConnectionWithMaster( - master string, msgChan chan *pb.ClusterStatusMessage) error { - grpcConnection, err := grpc.Dial(master, grpc.WithInsecure()) - if err != nil { - return fmt.Errorf("%s fail to dial %s: %v", clusterListener.clientName, master, err) - } - defer func() { _ = grpcConnection.Close() }() - - masterClient := pb.NewAlpineMasterClient(grpcConnection) - - stream, err := masterClient.RegisterClient(context.Background()) - if err != nil { - return fmt.Errorf("%s register client on master %v: %v", clusterListener.clientName, master, err) - } - - // TODO possible goroutine leaks if retry happens - go func() { - for keyspace := range clusterListener.clusters { - // glog.V(2).Infof("%s register cluster keyspace(%v) datacenter(%v)", clusterListener.clientName, keyspace, dataCenter) - if err := registerForClusterAtMaster(stream, string(keyspace), false, clusterListener.clientName); err != nil { - // glog.V(2).Infof("%s register cluster keyspace(%v) datacenter(%v): %v", clusterListener.clientName, keyspace, dataCenter, err) - return - } - } - - for { - msg := <-clusterListener.keyspaceFollowMessageChan - if err := registerForClusterAtMaster(stream, string(msg.keyspace), msg.isUnfollow, clusterListener.clientName); err != nil { - if msg.isUnfollow { - glog.V(2).Infof("%s unfollow cluster keyspace(%v): %v", clusterListener.clientName, msg.keyspace, err) - } else { - glog.V(2).Infof("%s register cluster new keyspace(%v): %v", clusterListener.clientName, msg.keyspace, err) - } - return - } - } - - }() - - // glog.V(2).Infof("Reporting allocated %v", as.allocatedResource) - - // glog.V(2).Infof("%s from %s register client to master %s", clusterListener.clientName, dataCenter, master) - - for { - msg, err := stream.Recv() - if err == io.EOF { - // read done. - return nil - } - if err != nil { - return fmt.Errorf("client receive topology : %v", err) - } - msgChan <- msg - // glog.V(2).Infof("%s client received message %v", clusterListener.clientName, msg) - } - -} - -func registerForClusterAtMaster(stream pb.AlpineMaster_RegisterClientClient, keyspace string, isUnfollow bool, clientName string) error { - clientHeartbeat := &pb.ClientHeartbeat{ - ClientName: clientName, - ClusterFollow: &pb.ClientHeartbeat_ClusterFollowMessage{ - Keyspace: keyspace, - IsUnfollow: isUnfollow, - }, - } - - if err := stream.Send(clientHeartbeat); err != nil { - return fmt.Errorf("%s client send heartbeat: %v", clientName, err) - } - return nil -} diff --git a/weed/wdclient/topolisenter/cluster_listener.go b/weed/wdclient/topolisenter/cluster_listener.go deleted file mode 100644 index 91ca6fb6f..000000000 --- a/weed/wdclient/topolisenter/cluster_listener.go +++ /dev/null @@ -1,56 +0,0 @@ -package clusterlistener - -import ( - "context" - "sync" - "time" - - "code.uber.internal/fraud/alpine/.gen/proto/go/fraud/alpine" - "code.uber.internal/fraud/alpine/server/util" - "github.com/chrislusf/seaweedfs/weed/storage" -) - -type Location struct { - Url string - PublicUrl string -} - -type ClusterListener struct { - sync.RWMutex - vid2locations map[storage.VolumeId][]*Location - clientName string -} - -func NewClusterListener(clientName string) *ClusterListener { - return &ClusterListener{ - vid2locations: make(map[storage.VolumeId][]*Location), - clientName: clientName, - } -} - -// StartListener keeps the listener connected to the master. -func (clusterListener *ClusterListener) StartListener(ctx context.Context, master string) { - - clusterUpdatesChan := make(chan *pb.ClusterStatusMessage) - - go util.RetryForever(ctx, clusterListener.clientName+" cluster listener", func() error { - return clusterListener.establishConnectionWithMaster(master, clusterUpdatesChan) - }, 2*time.Second) - - go func() { - for { - select { - case msg := <-clusterUpdatesChan: - clusterListener.processClusterStatusMessage(msg) - } - } - }() - - // println("client is connected to master", master, "data center", dataCenter) - - return - -} - -func (clusterListener *ClusterListener) processClusterStatusMessage(msg *pb.ClusterStatusMessage) { -} diff --git a/weed/wdclient/wdclient.go b/weed/wdclient/wdclient.go index cbe03359f..b16e239fb 100644 --- a/weed/wdclient/wdclient.go +++ b/weed/wdclient/wdclient.go @@ -2,41 +2,18 @@ package wdclient import ( "context" - "code.uber.internal/fraud/alpine/.gen/proto/go/fraud/alpine" ) type SeaweedClient struct { - ctx context.Context - Master string - ClientName string - ClusterListener *clusterlistener.ClusterListener + ctx context.Context + Master string + ClientName string } -// NewSeaweedClient creates a SeaweedFS client which contains a listener for the Seaweed system topology changes -func NewSeaweedClient(ctx context.Context, clientName, master string) *SeaweedClient { - c := &SeaweedClient{ - ctx: ctx, - Master: master, - ClusterListener: clusterlistener.NewClusterListener(clientName), - ClientName: clientName, - } - c.ClusterListener.StartListener(ctx, c.Master) +func NewSeaweedClient(ctx context.Context, clientName string, masters []string) *SeaweedClient { + return &SeaweedClient{ + ctx: ctx, + ClientName: clientName, - conn, err := grpc.Dial(c.Master, grpc.WithInsecure()) - if err != nil { - glog.Fatalf("%s fail to dial %v: %v", c.ClientName, c.Master, err) } - c.MasterClient = pb.NewAlpineMasterClient(conn) - - return c -} - -// NewClusterClient create a lightweight client to access a specific cluster -// TODO The call will block if the keyspace is not created in this data center. -func (c *SeaweedClient) NewClusterClient(keyspace string) (clusterClient *ClusterClient) { - - return &ClusterClient{ - keyspace: keyspace, - } - }