You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

85 lines
2.6 KiB

7 years ago
  1. package clusterlistener
  2. import (
  3. "context"
  4. "fmt"
  5. "io"
  6. "google.golang.org/grpc"
  7. "code.uber.internal/fraud/alpine/.gen/proto/go/fraud/alpine"
  8. "github.com/golang/glog"
  9. )
  10. func (clusterListener *ClusterListener) establishConnectionWithMaster(
  11. master string, msgChan chan *pb.ClusterStatusMessage) error {
  12. grpcConnection, err := grpc.Dial(master, grpc.WithInsecure())
  13. if err != nil {
  14. return fmt.Errorf("%s fail to dial %s: %v", clusterListener.clientName, master, err)
  15. }
  16. defer func() { _ = grpcConnection.Close() }()
  17. masterClient := pb.NewAlpineMasterClient(grpcConnection)
  18. stream, err := masterClient.RegisterClient(context.Background())
  19. if err != nil {
  20. return fmt.Errorf("%s register client on master %v: %v", clusterListener.clientName, master, err)
  21. }
  22. // TODO possible goroutine leaks if retry happens
  23. go func() {
  24. for keyspace := range clusterListener.clusters {
  25. // glog.V(2).Infof("%s register cluster keyspace(%v) datacenter(%v)", clusterListener.clientName, keyspace, dataCenter)
  26. if err := registerForClusterAtMaster(stream, string(keyspace), false, clusterListener.clientName); err != nil {
  27. // glog.V(2).Infof("%s register cluster keyspace(%v) datacenter(%v): %v", clusterListener.clientName, keyspace, dataCenter, err)
  28. return
  29. }
  30. }
  31. for {
  32. msg := <-clusterListener.keyspaceFollowMessageChan
  33. if err := registerForClusterAtMaster(stream, string(msg.keyspace), msg.isUnfollow, clusterListener.clientName); err != nil {
  34. if msg.isUnfollow {
  35. glog.V(2).Infof("%s unfollow cluster keyspace(%v): %v", clusterListener.clientName, msg.keyspace, err)
  36. } else {
  37. glog.V(2).Infof("%s register cluster new keyspace(%v): %v", clusterListener.clientName, msg.keyspace, err)
  38. }
  39. return
  40. }
  41. }
  42. }()
  43. // glog.V(2).Infof("Reporting allocated %v", as.allocatedResource)
  44. // glog.V(2).Infof("%s from %s register client to master %s", clusterListener.clientName, dataCenter, master)
  45. for {
  46. msg, err := stream.Recv()
  47. if err == io.EOF {
  48. // read done.
  49. return nil
  50. }
  51. if err != nil {
  52. return fmt.Errorf("client receive topology : %v", err)
  53. }
  54. msgChan <- msg
  55. // glog.V(2).Infof("%s client received message %v", clusterListener.clientName, msg)
  56. }
  57. }
  58. func registerForClusterAtMaster(stream pb.AlpineMaster_RegisterClientClient, keyspace string, isUnfollow bool, clientName string) error {
  59. clientHeartbeat := &pb.ClientHeartbeat{
  60. ClientName: clientName,
  61. ClusterFollow: &pb.ClientHeartbeat_ClusterFollowMessage{
  62. Keyspace: keyspace,
  63. IsUnfollow: isUnfollow,
  64. },
  65. }
  66. if err := stream.Send(clientHeartbeat); err != nil {
  67. return fmt.Errorf("%s client send heartbeat: %v", clientName, err)
  68. }
  69. return nil
  70. }