You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

96 lines
2.6 KiB

  1. package weed_server
  2. import (
  3. "net"
  4. "strings"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  7. "github.com/chrislusf/seaweedfs/weed/storage"
  8. "github.com/chrislusf/seaweedfs/weed/topology"
  9. "google.golang.org/grpc/peer"
  10. )
  11. func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServer) error {
  12. var dn *topology.DataNode
  13. t := ms.Topo
  14. defer func() {
  15. if dn != nil {
  16. glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port)
  17. t.UnRegisterDataNode(dn)
  18. }
  19. }()
  20. for {
  21. heartbeat, err := stream.Recv()
  22. if err == nil {
  23. if dn == nil {
  24. t.Sequence.SetMax(heartbeat.MaxFileKey)
  25. if heartbeat.Ip == "" {
  26. if pr, ok := peer.FromContext(stream.Context()); ok {
  27. if pr.Addr != net.Addr(nil) {
  28. heartbeat.Ip = pr.Addr.String()[0:strings.LastIndex(pr.Addr.String(), ":")]
  29. glog.V(0).Infof("remote IP address is detected as %v", heartbeat.Ip)
  30. }
  31. }
  32. }
  33. dcName, rackName := t.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
  34. dc := t.GetOrCreateDataCenter(dcName)
  35. rack := dc.GetOrCreateRack(rackName)
  36. dn = rack.GetOrCreateDataNode(heartbeat.Ip,
  37. int(heartbeat.Port), heartbeat.PublicUrl,
  38. int(heartbeat.MaxVolumeCount))
  39. glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
  40. if err := stream.Send(&master_pb.HeartbeatResponse{
  41. VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024,
  42. SecretKey: string(ms.guard.SecretKey),
  43. }); err != nil {
  44. return err
  45. }
  46. }
  47. var volumeInfos []storage.VolumeInfo
  48. for _, v := range heartbeat.Volumes {
  49. if vi, err := storage.NewVolumeInfo(v); err == nil {
  50. volumeInfos = append(volumeInfos, vi)
  51. } else {
  52. glog.V(0).Infof("Fail to convert joined volume information: %v", err)
  53. }
  54. }
  55. deletedVolumes := dn.UpdateVolumes(volumeInfos)
  56. for _, v := range volumeInfos {
  57. t.RegisterVolumeLayout(v, dn)
  58. }
  59. for _, v := range deletedVolumes {
  60. t.UnRegisterVolumeLayout(v, dn)
  61. }
  62. } else {
  63. return err
  64. }
  65. // tell the volume servers about the leader
  66. newLeader, err := t.Leader()
  67. if err == nil {
  68. if err := stream.Send(&master_pb.HeartbeatResponse{
  69. Leader: newLeader,
  70. }); err != nil {
  71. return err
  72. }
  73. }
  74. }
  75. }
  76. // KeepConnected keep a stream gRPC call to the master. Used by filer to know the master is up.
  77. func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServer) error {
  78. for {
  79. _, err := stream.Recv()
  80. if err != nil {
  81. return err
  82. }
  83. if err := stream.Send(&master_pb.Empty{}); err != nil {
  84. return err
  85. }
  86. }
  87. }