You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

242 lines
7.9 KiB

5 years ago
3 years ago
3 years ago
4 years ago
4 years ago
  1. package weed_server
  2. import (
  3. "fmt"
  4. "os"
  5. "syscall"
  6. "time"
  7. "github.com/chrislusf/seaweedfs/weed/operation"
  8. "google.golang.org/grpc"
  9. "github.com/chrislusf/seaweedfs/weed/pb"
  10. "github.com/chrislusf/seaweedfs/weed/security"
  11. "github.com/chrislusf/seaweedfs/weed/storage/backend"
  12. "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
  13. "golang.org/x/net/context"
  14. "github.com/chrislusf/seaweedfs/weed/glog"
  15. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  16. "github.com/chrislusf/seaweedfs/weed/util"
  17. )
  18. func (vs *VolumeServer) GetMaster() pb.ServerAddress {
  19. return vs.currentMaster
  20. }
  21. func (vs *VolumeServer) checkWithMaster() (err error) {
  22. for {
  23. for _, master := range vs.SeedMasterNodes {
  24. err = operation.WithMasterServerClient(false, master, vs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
  25. resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
  26. if err != nil {
  27. return fmt.Errorf("get master %s configuration: %v", master, err)
  28. }
  29. vs.metricsAddress, vs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
  30. backend.LoadFromPbStorageBackends(resp.StorageBackends)
  31. return nil
  32. })
  33. if err == nil {
  34. return
  35. } else {
  36. glog.V(0).Infof("checkWithMaster %s: %v", master, err)
  37. }
  38. }
  39. time.Sleep(1790 * time.Millisecond)
  40. }
  41. }
  42. func (vs *VolumeServer) heartbeat() {
  43. glog.V(0).Infof("Volume server start with seed master nodes: %v", vs.SeedMasterNodes)
  44. vs.store.SetDataCenter(vs.dataCenter)
  45. vs.store.SetRack(vs.rack)
  46. grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.volume")
  47. var err error
  48. var newLeader pb.ServerAddress
  49. for vs.isHeartbeating {
  50. for _, master := range vs.SeedMasterNodes {
  51. if newLeader != "" {
  52. // the new leader may actually is the same master
  53. // need to wait a bit before adding itself
  54. time.Sleep(3 * time.Second)
  55. master = newLeader
  56. }
  57. vs.store.MasterAddress = master
  58. newLeader, err = vs.doHeartbeat(master, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second)
  59. if err != nil {
  60. glog.V(0).Infof("heartbeat error: %v", err)
  61. time.Sleep(time.Duration(vs.pulseSeconds) * time.Second)
  62. newLeader = ""
  63. vs.store.MasterAddress = ""
  64. }
  65. if !vs.isHeartbeating {
  66. break
  67. }
  68. }
  69. }
  70. }
  71. func (vs *VolumeServer) StopHeartbeat() (isAlreadyStopping bool) {
  72. if !vs.isHeartbeating {
  73. return true
  74. }
  75. vs.isHeartbeating = false
  76. close(vs.stopChan)
  77. return false
  78. }
  79. func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader pb.ServerAddress, err error) {
  80. ctx, cancel := context.WithCancel(context.Background())
  81. defer cancel()
  82. grpcConection, err := pb.GrpcDial(ctx, masterAddress.ToGrpcAddress(), grpcDialOption)
  83. if err != nil {
  84. return "", fmt.Errorf("fail to dial %s : %v", masterAddress, err)
  85. }
  86. defer grpcConection.Close()
  87. client := master_pb.NewSeaweedClient(grpcConection)
  88. stream, err := client.SendHeartbeat(ctx)
  89. if err != nil {
  90. glog.V(0).Infof("SendHeartbeat to %s: %v", masterAddress, err)
  91. return "", err
  92. }
  93. glog.V(0).Infof("Heartbeat to: %v", masterAddress)
  94. vs.currentMaster = masterAddress
  95. doneChan := make(chan error, 1)
  96. go func() {
  97. for {
  98. in, err := stream.Recv()
  99. if err != nil {
  100. doneChan <- err
  101. return
  102. }
  103. if in.HasDuplicatedDirectory {
  104. glog.Error("Shut Down Volume Server due to duplicated volume directory")
  105. glog.V(0).Infof("send SIGINT to Volume Server")
  106. p, _ := os.FindProcess(vs.pid)
  107. p.Signal(syscall.SIGINT)
  108. }
  109. if in.GetVolumeSizeLimit() != 0 && vs.store.GetVolumeSizeLimit() != in.GetVolumeSizeLimit() {
  110. vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit())
  111. if vs.store.MaybeAdjustVolumeMax() {
  112. if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
  113. glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.currentMaster, err)
  114. return
  115. }
  116. }
  117. }
  118. if in.GetLeader() != "" && string(vs.currentMaster) != in.GetLeader() {
  119. glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), vs.currentMaster)
  120. newLeader = pb.ServerAddress(in.GetLeader())
  121. doneChan <- nil
  122. return
  123. }
  124. }
  125. }()
  126. if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
  127. glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
  128. return "", err
  129. }
  130. if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil {
  131. glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
  132. return "", err
  133. }
  134. volumeTickChan := time.Tick(sleepInterval)
  135. ecShardTickChan := time.Tick(17 * sleepInterval)
  136. for {
  137. select {
  138. case volumeMessage := <-vs.store.NewVolumesChan:
  139. deltaBeat := &master_pb.Heartbeat{
  140. NewVolumes: []*master_pb.VolumeShortInformationMessage{
  141. &volumeMessage,
  142. },
  143. }
  144. glog.V(0).Infof("volume server %s:%d adds volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
  145. if err = stream.Send(deltaBeat); err != nil {
  146. glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
  147. return "", err
  148. }
  149. case ecShardMessage := <-vs.store.NewEcShardsChan:
  150. deltaBeat := &master_pb.Heartbeat{
  151. NewEcShards: []*master_pb.VolumeEcShardInformationMessage{
  152. &ecShardMessage,
  153. },
  154. }
  155. glog.V(0).Infof("volume server %s:%d adds ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
  156. erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds())
  157. if err = stream.Send(deltaBeat); err != nil {
  158. glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
  159. return "", err
  160. }
  161. case volumeMessage := <-vs.store.DeletedVolumesChan:
  162. deltaBeat := &master_pb.Heartbeat{
  163. DeletedVolumes: []*master_pb.VolumeShortInformationMessage{
  164. &volumeMessage,
  165. },
  166. }
  167. glog.V(0).Infof("volume server %s:%d deletes volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
  168. if err = stream.Send(deltaBeat); err != nil {
  169. glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
  170. return "", err
  171. }
  172. case ecShardMessage := <-vs.store.DeletedEcShardsChan:
  173. deltaBeat := &master_pb.Heartbeat{
  174. DeletedEcShards: []*master_pb.VolumeEcShardInformationMessage{
  175. &ecShardMessage,
  176. },
  177. }
  178. glog.V(0).Infof("volume server %s:%d deletes ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
  179. erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds())
  180. if err = stream.Send(deltaBeat); err != nil {
  181. glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
  182. return "", err
  183. }
  184. case <-volumeTickChan:
  185. glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port)
  186. vs.store.MaybeAdjustVolumeMax()
  187. if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
  188. glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
  189. return "", err
  190. }
  191. case <-ecShardTickChan:
  192. glog.V(4).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port)
  193. if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil {
  194. glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
  195. return "", err
  196. }
  197. case err = <-doneChan:
  198. return
  199. case <-vs.stopChan:
  200. var volumeMessages []*master_pb.VolumeInformationMessage
  201. emptyBeat := &master_pb.Heartbeat{
  202. Ip: vs.store.Ip,
  203. Port: uint32(vs.store.Port),
  204. PublicUrl: vs.store.PublicUrl,
  205. MaxFileKey: uint64(0),
  206. DataCenter: vs.store.GetDataCenter(),
  207. Rack: vs.store.GetRack(),
  208. Volumes: volumeMessages,
  209. HasNoVolumes: len(volumeMessages) == 0,
  210. }
  211. glog.V(1).Infof("volume server %s:%d stops and deletes all volumes", vs.store.Ip, vs.store.Port)
  212. if err = stream.Send(emptyBeat); err != nil {
  213. glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
  214. return "", err
  215. }
  216. return
  217. }
  218. }
  219. }