You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

57 lines
1.7 KiB

  1. package weed_server
  2. import (
  3. "github.com/chrislusf/seaweedfs/weed/glog"
  4. "github.com/chrislusf/seaweedfs/weed/pb"
  5. "github.com/chrislusf/seaweedfs/weed/storage"
  6. "github.com/chrislusf/seaweedfs/weed/topology"
  7. )
  8. func (ms MasterServer) SendHeartbeat(stream pb.Seaweed_SendHeartbeatServer) error {
  9. var dn *topology.DataNode
  10. t := ms.Topo
  11. for {
  12. heartbeat, err := stream.Recv()
  13. if err == nil {
  14. if dn == nil {
  15. t.Sequence.SetMax(heartbeat.MaxFileKey)
  16. dcName, rackName := t.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
  17. dc := t.GetOrCreateDataCenter(dcName)
  18. rack := dc.GetOrCreateRack(rackName)
  19. dn = rack.GetOrCreateDataNode(heartbeat.Ip,
  20. int(heartbeat.Port), heartbeat.PublicUrl,
  21. int(heartbeat.MaxVolumeCount))
  22. glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
  23. if err := stream.Send(&pb.HeartbeatResponse{
  24. VolumeSizeLimit: uint64(ms.volumeSizeLimitMB) * 1024 * 1024,
  25. SecretKey: string(ms.guard.SecretKey),
  26. }); err != nil {
  27. return err
  28. }
  29. }
  30. var volumeInfos []storage.VolumeInfo
  31. for _, v := range heartbeat.Volumes {
  32. if vi, err := storage.NewVolumeInfo(v); err == nil {
  33. volumeInfos = append(volumeInfos, vi)
  34. } else {
  35. glog.V(0).Infof("Fail to convert joined volume information: %v", err)
  36. }
  37. }
  38. deletedVolumes := dn.UpdateVolumes(volumeInfos)
  39. for _, v := range volumeInfos {
  40. t.RegisterVolumeLayout(v, dn)
  41. }
  42. for _, v := range deletedVolumes {
  43. t.UnRegisterVolumeLayout(v, dn)
  44. }
  45. } else {
  46. glog.V(0).Infof("lost volume server %s:%d", dn.Ip, dn.Port)
  47. if dn != nil {
  48. t.UnRegisterDataNode(dn)
  49. }
  50. return err
  51. }
  52. }
  53. }