You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

118 lines
3.2 KiB

7 years ago
  1. package weed_server
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  7. "github.com/chrislusf/seaweedfs/weed/util"
  8. "golang.org/x/net/context"
  9. )
  10. func (vs *VolumeServer) GetMaster() string {
  11. return vs.currentMaster
  12. }
  13. func (vs *VolumeServer) heartbeat() {
  14. glog.V(0).Infof("Volume server start with masters: %v", vs.MasterNodes)
  15. vs.store.SetDataCenter(vs.dataCenter)
  16. vs.store.SetRack(vs.rack)
  17. var err error
  18. var newLeader string
  19. for {
  20. for _, master := range vs.MasterNodes {
  21. if newLeader != "" {
  22. master = newLeader
  23. }
  24. masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(master, 0)
  25. if parseErr != nil {
  26. glog.V(0).Infof("failed to parse master grpc %v", masterGrpcAddress)
  27. continue
  28. }
  29. newLeader, err = vs.doHeartbeat(master, masterGrpcAddress, time.Duration(vs.pulseSeconds)*time.Second)
  30. if err != nil {
  31. glog.V(0).Infof("heartbeat error: %v", err)
  32. time.Sleep(time.Duration(vs.pulseSeconds) * time.Second)
  33. }
  34. }
  35. }
  36. }
  37. func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, sleepInterval time.Duration) (newLeader string, err error) {
  38. grpcConection, err := util.GrpcDial(masterGrpcAddress)
  39. if err != nil {
  40. return "", fmt.Errorf("fail to dial %s : %v", masterNode, err)
  41. }
  42. defer grpcConection.Close()
  43. client := master_pb.NewSeaweedClient(grpcConection)
  44. stream, err := client.SendHeartbeat(context.Background())
  45. if err != nil {
  46. glog.V(0).Infof("SendHeartbeat to %s: %v", masterNode, err)
  47. return "", err
  48. }
  49. glog.V(0).Infof("Heartbeat to: %v", masterNode)
  50. vs.currentMaster = masterNode
  51. vs.store.Client = stream
  52. defer func() { vs.store.Client = nil }()
  53. doneChan := make(chan error, 1)
  54. go func() {
  55. for {
  56. in, err := stream.Recv()
  57. if err != nil {
  58. doneChan <- err
  59. return
  60. }
  61. if in.GetVolumeSizeLimit() != 0 {
  62. vs.store.VolumeSizeLimit = in.GetVolumeSizeLimit()
  63. }
  64. if in.GetLeader() != "" && masterNode != in.GetLeader() {
  65. glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), masterNode)
  66. newLeader = in.GetLeader()
  67. doneChan <- nil
  68. return
  69. }
  70. }
  71. }()
  72. if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
  73. glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
  74. return "", err
  75. }
  76. tickChan := time.Tick(sleepInterval)
  77. for {
  78. select {
  79. case vid := <-vs.store.NewVolumeIdChan:
  80. deltaBeat := &master_pb.Heartbeat{
  81. NewVids: []uint32{uint32(vid)},
  82. }
  83. if err = stream.Send(deltaBeat); err != nil {
  84. glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
  85. return "", err
  86. }
  87. case vid := <-vs.store.DeletedVolumeIdChan:
  88. deltaBeat := &master_pb.Heartbeat{
  89. DeletedVids: []uint32{uint32(vid)},
  90. }
  91. if err = stream.Send(deltaBeat); err != nil {
  92. glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
  93. return "", err
  94. }
  95. case <-tickChan:
  96. if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
  97. glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
  98. return "", err
  99. }
  100. case err = <-doneChan:
  101. return
  102. }
  103. }
  104. }