From 0d989491993a98753bf34569f74791c87cecb900 Mon Sep 17 00:00:00 2001 From: Chris Lu Date: Fri, 27 Jul 2018 01:54:45 -0700 Subject: [PATCH] tmp commit --- weed/pb/seaweed.proto | 8 ++ weed/util/http_util.go | 1 + .../topolisenter/client_grpc_to_master.go | 85 +++++++++++++++++++ .../wdclient/topolisenter/cluster_listener.go | 56 ++++++++++++ weed/wdclient/wdclient.go | 42 +++++++++ 5 files changed, 192 insertions(+) create mode 100644 weed/wdclient/topolisenter/client_grpc_to_master.go create mode 100644 weed/wdclient/topolisenter/cluster_listener.go create mode 100644 weed/wdclient/wdclient.go diff --git a/weed/pb/seaweed.proto b/weed/pb/seaweed.proto index 8f0fed72c..6ed580b00 100644 --- a/weed/pb/seaweed.proto +++ b/weed/pb/seaweed.proto @@ -9,6 +9,8 @@ service Seaweed { } rpc KeepConnected (stream Empty) returns (stream Empty) { } + rpc ListenForTopoChange (stream Empty) returns (stream VolumeLocation) { + } } ////////////////////////////////////////////////// @@ -55,3 +57,9 @@ message SuperBlockExtra { } ErasureCoding erasure_coding = 1; } + +message VolumeLocation { + string url = 1; + string public_url = 2; + repeated uint32 vid = 3; +} diff --git a/weed/util/http_util.go b/weed/util/http_util.go index 51bedcdfd..7b28d3d91 100644 --- a/weed/util/http_util.go +++ b/weed/util/http_util.go @@ -83,6 +83,7 @@ func Head(url string) (http.Header, error) { if err != nil { return nil, err } + defer r.Body.Close() if r.StatusCode >= 400 { return nil, fmt.Errorf("%s: %s", url, r.Status) } diff --git a/weed/wdclient/topolisenter/client_grpc_to_master.go b/weed/wdclient/topolisenter/client_grpc_to_master.go new file mode 100644 index 000000000..863f79a1d --- /dev/null +++ b/weed/wdclient/topolisenter/client_grpc_to_master.go @@ -0,0 +1,85 @@ +package clusterlistener + +import ( + "context" + "fmt" + "io" + + "google.golang.org/grpc" + + "code.uber.internal/fraud/alpine/.gen/proto/go/fraud/alpine" + "github.com/golang/glog" +) + +func (clusterListener *ClusterListener) establishConnectionWithMaster( + master string, msgChan chan *pb.ClusterStatusMessage) error { + grpcConnection, err := grpc.Dial(master, grpc.WithInsecure()) + if err != nil { + return fmt.Errorf("%s fail to dial %s: %v", clusterListener.clientName, master, err) + } + defer func() { _ = grpcConnection.Close() }() + + masterClient := pb.NewAlpineMasterClient(grpcConnection) + + stream, err := masterClient.RegisterClient(context.Background()) + if err != nil { + return fmt.Errorf("%s register client on master %v: %v", clusterListener.clientName, master, err) + } + + // TODO possible goroutine leaks if retry happens + go func() { + for keyspace := range clusterListener.clusters { + // glog.V(2).Infof("%s register cluster keyspace(%v) datacenter(%v)", clusterListener.clientName, keyspace, dataCenter) + if err := registerForClusterAtMaster(stream, string(keyspace), false, clusterListener.clientName); err != nil { + // glog.V(2).Infof("%s register cluster keyspace(%v) datacenter(%v): %v", clusterListener.clientName, keyspace, dataCenter, err) + return + } + } + + for { + msg := <-clusterListener.keyspaceFollowMessageChan + if err := registerForClusterAtMaster(stream, string(msg.keyspace), msg.isUnfollow, clusterListener.clientName); err != nil { + if msg.isUnfollow { + glog.V(2).Infof("%s unfollow cluster keyspace(%v): %v", clusterListener.clientName, msg.keyspace, err) + } else { + glog.V(2).Infof("%s register cluster new keyspace(%v): %v", clusterListener.clientName, msg.keyspace, err) + } + return + } + } + + }() + + // glog.V(2).Infof("Reporting allocated %v", as.allocatedResource) + + // glog.V(2).Infof("%s from %s register client to master %s", clusterListener.clientName, dataCenter, master) + + for { + msg, err := stream.Recv() + if err == io.EOF { + // read done. + return nil + } + if err != nil { + return fmt.Errorf("client receive topology : %v", err) + } + msgChan <- msg + // glog.V(2).Infof("%s client received message %v", clusterListener.clientName, msg) + } + +} + +func registerForClusterAtMaster(stream pb.AlpineMaster_RegisterClientClient, keyspace string, isUnfollow bool, clientName string) error { + clientHeartbeat := &pb.ClientHeartbeat{ + ClientName: clientName, + ClusterFollow: &pb.ClientHeartbeat_ClusterFollowMessage{ + Keyspace: keyspace, + IsUnfollow: isUnfollow, + }, + } + + if err := stream.Send(clientHeartbeat); err != nil { + return fmt.Errorf("%s client send heartbeat: %v", clientName, err) + } + return nil +} diff --git a/weed/wdclient/topolisenter/cluster_listener.go b/weed/wdclient/topolisenter/cluster_listener.go new file mode 100644 index 000000000..91ca6fb6f --- /dev/null +++ b/weed/wdclient/topolisenter/cluster_listener.go @@ -0,0 +1,56 @@ +package clusterlistener + +import ( + "context" + "sync" + "time" + + "code.uber.internal/fraud/alpine/.gen/proto/go/fraud/alpine" + "code.uber.internal/fraud/alpine/server/util" + "github.com/chrislusf/seaweedfs/weed/storage" +) + +type Location struct { + Url string + PublicUrl string +} + +type ClusterListener struct { + sync.RWMutex + vid2locations map[storage.VolumeId][]*Location + clientName string +} + +func NewClusterListener(clientName string) *ClusterListener { + return &ClusterListener{ + vid2locations: make(map[storage.VolumeId][]*Location), + clientName: clientName, + } +} + +// StartListener keeps the listener connected to the master. +func (clusterListener *ClusterListener) StartListener(ctx context.Context, master string) { + + clusterUpdatesChan := make(chan *pb.ClusterStatusMessage) + + go util.RetryForever(ctx, clusterListener.clientName+" cluster listener", func() error { + return clusterListener.establishConnectionWithMaster(master, clusterUpdatesChan) + }, 2*time.Second) + + go func() { + for { + select { + case msg := <-clusterUpdatesChan: + clusterListener.processClusterStatusMessage(msg) + } + } + }() + + // println("client is connected to master", master, "data center", dataCenter) + + return + +} + +func (clusterListener *ClusterListener) processClusterStatusMessage(msg *pb.ClusterStatusMessage) { +} diff --git a/weed/wdclient/wdclient.go b/weed/wdclient/wdclient.go new file mode 100644 index 000000000..cbe03359f --- /dev/null +++ b/weed/wdclient/wdclient.go @@ -0,0 +1,42 @@ +package wdclient + +import ( + "context" + "code.uber.internal/fraud/alpine/.gen/proto/go/fraud/alpine" +) + +type SeaweedClient struct { + ctx context.Context + Master string + ClientName string + ClusterListener *clusterlistener.ClusterListener +} + +// NewSeaweedClient creates a SeaweedFS client which contains a listener for the Seaweed system topology changes +func NewSeaweedClient(ctx context.Context, clientName, master string) *SeaweedClient { + c := &SeaweedClient{ + ctx: ctx, + Master: master, + ClusterListener: clusterlistener.NewClusterListener(clientName), + ClientName: clientName, + } + c.ClusterListener.StartListener(ctx, c.Master) + + conn, err := grpc.Dial(c.Master, grpc.WithInsecure()) + if err != nil { + glog.Fatalf("%s fail to dial %v: %v", c.ClientName, c.Master, err) + } + c.MasterClient = pb.NewAlpineMasterClient(conn) + + return c +} + +// NewClusterClient create a lightweight client to access a specific cluster +// TODO The call will block if the keyspace is not created in this data center. +func (c *SeaweedClient) NewClusterClient(keyspace string) (clusterClient *ClusterClient) { + + return &ClusterClient{ + keyspace: keyspace, + } + +}