You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

106 lines
2.5 KiB

  1. package topology
  2. import (
  3. "fmt"
  4. "strconv"
  5. "github.com/chrislusf/weed-fs/go/glog"
  6. "github.com/chrislusf/weed-fs/go/storage"
  7. )
  8. type DataNode struct {
  9. NodeImpl
  10. volumes map[storage.VolumeId]storage.VolumeInfo
  11. Ip string
  12. Port int
  13. AdminPort int
  14. PublicUrl string
  15. LastSeen int64 // unix time in seconds
  16. Dead bool
  17. }
  18. func NewDataNode(id string) *DataNode {
  19. s := &DataNode{}
  20. s.id = NodeId(id)
  21. s.nodeType = "DataNode"
  22. s.volumes = make(map[storage.VolumeId]storage.VolumeInfo)
  23. s.NodeImpl.value = s
  24. return s
  25. }
  26. func (dn *DataNode) String() string {
  27. return fmt.Sprintf("Node:%s, volumes:%v, Ip:%s, Port:%d, PublicUrl:%s, Dead:%v", dn.NodeImpl.String(), dn.volumes, dn.Ip, dn.Port, dn.PublicUrl, dn.Dead)
  28. }
  29. func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) {
  30. if _, ok := dn.volumes[v.Id]; !ok {
  31. dn.volumes[v.Id] = v
  32. dn.UpAdjustVolumeCountDelta(1)
  33. if !v.ReadOnly {
  34. dn.UpAdjustActiveVolumeCountDelta(1)
  35. }
  36. dn.UpAdjustMaxVolumeId(v.Id)
  37. } else {
  38. dn.volumes[v.Id] = v
  39. }
  40. }
  41. func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (deletedVolumes []storage.VolumeInfo) {
  42. actualVolumeMap := make(map[storage.VolumeId]storage.VolumeInfo)
  43. for _, v := range actualVolumes {
  44. actualVolumeMap[v.Id] = v
  45. }
  46. for vid, v := range dn.volumes {
  47. if _, ok := actualVolumeMap[vid]; !ok {
  48. glog.V(0).Infoln("Deleting volume id:", vid)
  49. delete(dn.volumes, vid)
  50. deletedVolumes = append(deletedVolumes, v)
  51. dn.UpAdjustVolumeCountDelta(-1)
  52. dn.UpAdjustActiveVolumeCountDelta(-1)
  53. }
  54. } //TODO: adjust max volume id, if need to reclaim volume ids
  55. for _, v := range actualVolumes {
  56. dn.AddOrUpdateVolume(v)
  57. }
  58. return
  59. }
  60. func (dn *DataNode) GetDataCenter() *DataCenter {
  61. return dn.Parent().Parent().(*NodeImpl).value.(*DataCenter)
  62. }
  63. func (dn *DataNode) GetRack() *Rack {
  64. return dn.Parent().(*NodeImpl).value.(*Rack)
  65. }
  66. func (dn *DataNode) GetTopology() *Topology {
  67. p := dn.Parent()
  68. for p.Parent() != nil {
  69. p = p.Parent()
  70. }
  71. t := p.(*Topology)
  72. return t
  73. }
  74. func (dn *DataNode) MatchLocation(ip string, port int) bool {
  75. return dn.Ip == ip && dn.Port == port
  76. }
  77. func (dn *DataNode) Url() string {
  78. return dn.Ip + ":" + strconv.Itoa(dn.Port)
  79. }
  80. func (dn *DataNode) AdminUrl() string {
  81. return dn.Ip + ":" + strconv.Itoa(dn.AdminPort)
  82. }
  83. func (dn *DataNode) ToMap() interface{} {
  84. ret := make(map[string]interface{})
  85. ret["Url"] = dn.Url()
  86. ret["Volumes"] = dn.GetVolumeCount()
  87. ret["Max"] = dn.GetMaxVolumeCount()
  88. ret["Free"] = dn.FreeSpace()
  89. ret["PublicUrl"] = dn.PublicUrl
  90. ret["AdminUrl"] = dn.AdminUrl()
  91. return ret
  92. }