You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

181 lines
4.4 KiB

7 years ago
7 years ago
  1. package weed_server
  2. import (
  3. "fmt"
  4. "net"
  5. "strings"
  6. "github.com/chrislusf/raft"
  7. "github.com/chrislusf/seaweedfs/weed/glog"
  8. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  9. "github.com/chrislusf/seaweedfs/weed/topology"
  10. "google.golang.org/grpc/peer"
  11. )
  12. func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServer) error {
  13. var dn *topology.DataNode
  14. t := ms.Topo
  15. defer func() {
  16. if dn != nil {
  17. glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port)
  18. t.UnRegisterDataNode(dn)
  19. message := &master_pb.VolumeLocation{
  20. Url: dn.Url(),
  21. PublicUrl: dn.PublicUrl,
  22. }
  23. for _, v := range dn.GetVolumes() {
  24. message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
  25. }
  26. if len(message.DeletedVids) > 0 {
  27. ms.clientChansLock.RLock()
  28. for _, ch := range ms.clientChans {
  29. ch <- message
  30. }
  31. ms.clientChansLock.RUnlock()
  32. }
  33. }
  34. }()
  35. for {
  36. heartbeat, err := stream.Recv()
  37. if err == nil {
  38. if dn == nil {
  39. t.Sequence.SetMax(heartbeat.MaxFileKey)
  40. if heartbeat.Ip == "" {
  41. if pr, ok := peer.FromContext(stream.Context()); ok {
  42. if pr.Addr != net.Addr(nil) {
  43. heartbeat.Ip = pr.Addr.String()[0:strings.LastIndex(pr.Addr.String(), ":")]
  44. glog.V(0).Infof("remote IP address is detected as %v", heartbeat.Ip)
  45. }
  46. }
  47. }
  48. dcName, rackName := t.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
  49. dc := t.GetOrCreateDataCenter(dcName)
  50. rack := dc.GetOrCreateRack(rackName)
  51. dn = rack.GetOrCreateDataNode(heartbeat.Ip,
  52. int(heartbeat.Port), heartbeat.PublicUrl,
  53. int(heartbeat.MaxVolumeCount))
  54. glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
  55. if err := stream.Send(&master_pb.HeartbeatResponse{
  56. VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024,
  57. SecretKey: string(ms.guard.SecretKey),
  58. }); err != nil {
  59. return err
  60. }
  61. }
  62. newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn)
  63. message := &master_pb.VolumeLocation{
  64. Url: dn.Url(),
  65. PublicUrl: dn.PublicUrl,
  66. }
  67. for _, v := range newVolumes {
  68. message.NewVids = append(message.NewVids, uint32(v.Id))
  69. }
  70. for _, v := range deletedVolumes {
  71. message.DeletedVids = append(message.DeletedVids, uint32(v.Id))
  72. }
  73. if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 {
  74. ms.clientChansLock.RLock()
  75. for _, ch := range ms.clientChans {
  76. ch <- message
  77. }
  78. ms.clientChansLock.RUnlock()
  79. }
  80. } else {
  81. return err
  82. }
  83. // tell the volume servers about the leader
  84. newLeader, err := t.Leader()
  85. if err == nil {
  86. if err := stream.Send(&master_pb.HeartbeatResponse{
  87. Leader: newLeader,
  88. }); err != nil {
  89. return err
  90. }
  91. }
  92. }
  93. }
  94. // KeepConnected keep a stream gRPC call to the master. Used by clients to know the master is up.
  95. // And clients gets the up-to-date list of volume locations
  96. func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServer) error {
  97. req, err := stream.Recv()
  98. if err != nil {
  99. return err
  100. }
  101. if !ms.Topo.IsLeader() {
  102. return raft.NotLeaderError
  103. }
  104. // remember client address
  105. ctx := stream.Context()
  106. // fmt.Printf("FromContext %+v\n", ctx)
  107. pr, ok := peer.FromContext(ctx)
  108. if !ok {
  109. glog.Error("failed to get peer from ctx")
  110. return fmt.Errorf("failed to get peer from ctx")
  111. }
  112. if pr.Addr == net.Addr(nil) {
  113. glog.Error("failed to get peer address")
  114. return fmt.Errorf("failed to get peer address")
  115. }
  116. clientName := req.Name + pr.Addr.String()
  117. glog.V(0).Infof("+ client %v", clientName)
  118. messageChan := make(chan *master_pb.VolumeLocation)
  119. stopChan := make(chan bool)
  120. ms.clientChansLock.Lock()
  121. ms.clientChans[clientName] = messageChan
  122. ms.clientChansLock.Unlock()
  123. defer func() {
  124. glog.V(0).Infof("- client %v", clientName)
  125. ms.clientChansLock.Lock()
  126. delete(ms.clientChans, clientName)
  127. ms.clientChansLock.Unlock()
  128. }()
  129. for _, message := range ms.Topo.ToVolumeLocations() {
  130. if err := stream.Send(message); err != nil {
  131. return err
  132. }
  133. }
  134. go func() {
  135. for {
  136. _, err := stream.Recv()
  137. if err != nil {
  138. glog.V(2).Infof("- client %v: %v", clientName, err)
  139. stopChan <- true
  140. break
  141. }
  142. }
  143. }()
  144. for {
  145. select {
  146. case message := <-messageChan:
  147. if err := stream.Send(message); err != nil {
  148. return err
  149. }
  150. case <-stopChan:
  151. return nil
  152. }
  153. }
  154. return nil
  155. }