You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

117 lines
3.1 KiB

7 years ago
  1. package weed_server
  2. import (
  3. "fmt"
  4. "time"
  5. "github.com/chrislusf/seaweedfs/weed/glog"
  6. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  7. "github.com/chrislusf/seaweedfs/weed/security"
  8. "github.com/chrislusf/seaweedfs/weed/util"
  9. "golang.org/x/net/context"
  10. )
  11. func (vs *VolumeServer) GetMaster() string {
  12. return vs.currentMaster
  13. }
  14. func (vs *VolumeServer) heartbeat() {
  15. glog.V(0).Infof("Volume server start with masters: %v", vs.MasterNodes)
  16. vs.store.SetDataCenter(vs.dataCenter)
  17. vs.store.SetRack(vs.rack)
  18. var err error
  19. var newLeader string
  20. for {
  21. for _, master := range vs.MasterNodes {
  22. if newLeader != "" {
  23. master = newLeader
  24. }
  25. newLeader, err = vs.doHeartbeat(master, time.Duration(vs.pulseSeconds)*time.Second)
  26. if err != nil {
  27. glog.V(0).Infof("heartbeat error: %v", err)
  28. time.Sleep(time.Duration(vs.pulseSeconds) * time.Second)
  29. }
  30. }
  31. }
  32. }
  33. func (vs *VolumeServer) doHeartbeat(masterNode string, sleepInterval time.Duration) (newLeader string, err error) {
  34. grpcConection, err := util.GrpcDial(masterNode)
  35. if err != nil {
  36. return "", fmt.Errorf("fail to dial: %v", err)
  37. }
  38. defer grpcConection.Close()
  39. client := master_pb.NewSeaweedClient(grpcConection)
  40. stream, err := client.SendHeartbeat(context.Background())
  41. if err != nil {
  42. glog.V(0).Infof("SendHeartbeat to %s: %v", masterNode, err)
  43. return "", err
  44. }
  45. glog.V(0).Infof("Heartbeat to: %v", masterNode)
  46. vs.currentMaster = masterNode
  47. vs.store.Client = stream
  48. defer func() { vs.store.Client = nil }()
  49. doneChan := make(chan error, 1)
  50. go func() {
  51. for {
  52. in, err := stream.Recv()
  53. if err != nil {
  54. doneChan <- err
  55. return
  56. }
  57. if in.GetVolumeSizeLimit() != 0 {
  58. vs.store.VolumeSizeLimit = in.GetVolumeSizeLimit()
  59. }
  60. if in.GetSecretKey() != "" {
  61. vs.guard.SecretKey = security.Secret(in.GetSecretKey())
  62. }
  63. if in.GetLeader() != "" && masterNode != in.GetLeader() {
  64. glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), masterNode)
  65. newLeader = in.GetLeader()
  66. doneChan <- nil
  67. return
  68. }
  69. }
  70. }()
  71. if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
  72. glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
  73. return "", err
  74. }
  75. tickChan := time.Tick(sleepInterval)
  76. for {
  77. select {
  78. case vid := <-vs.store.NewVolumeIdChan:
  79. deltaBeat := &master_pb.Heartbeat{
  80. NewVids: []uint32{uint32(vid)},
  81. }
  82. if err = stream.Send(deltaBeat); err != nil {
  83. glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
  84. return "", err
  85. }
  86. case vid := <-vs.store.DeletedVolumeIdChan:
  87. deltaBeat := &master_pb.Heartbeat{
  88. DeletedVids: []uint32{uint32(vid)},
  89. }
  90. if err = stream.Send(deltaBeat); err != nil {
  91. glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
  92. return "", err
  93. }
  94. case <-tickChan:
  95. if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
  96. glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
  97. return "", err
  98. }
  99. case <-doneChan:
  100. return
  101. }
  102. }
  103. }