You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

273 lines
6.7 KiB

6 years ago
6 years ago
9 years ago
9 years ago
4 years ago
6 years ago
9 years ago
9 years ago
  1. package topology
  2. import (
  3. "fmt"
  4. "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
  5. "github.com/chrislusf/seaweedfs/weed/storage/needle"
  6. "github.com/chrislusf/seaweedfs/weed/storage/types"
  7. "github.com/chrislusf/seaweedfs/weed/util"
  8. "strconv"
  9. "github.com/chrislusf/seaweedfs/weed/glog"
  10. "github.com/chrislusf/seaweedfs/weed/storage"
  11. )
  12. type DataNode struct {
  13. NodeImpl
  14. Ip string
  15. Port int
  16. PublicUrl string
  17. LastSeen int64 // unix time in seconds
  18. Counter int // in race condition, the previous dataNode was not dead
  19. }
  20. func NewDataNode(id string) *DataNode {
  21. dn := &DataNode{}
  22. dn.id = NodeId(id)
  23. dn.nodeType = "DataNode"
  24. dn.diskUsages = newDiskUsages()
  25. dn.children = make(map[NodeId]Node)
  26. dn.NodeImpl.value = dn
  27. return dn
  28. }
  29. func (dn *DataNode) String() string {
  30. dn.RLock()
  31. defer dn.RUnlock()
  32. return fmt.Sprintf("Node:%s, Ip:%s, Port:%d, PublicUrl:%s", dn.NodeImpl.String(), dn.Ip, dn.Port, dn.PublicUrl)
  33. }
  34. func (dn *DataNode) AddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
  35. dn.Lock()
  36. defer dn.Unlock()
  37. return dn.doAddOrUpdateVolume(v)
  38. }
  39. func (dn *DataNode) getOrCreateDisk(diskType string) *Disk {
  40. c, found := dn.children[NodeId(diskType)]
  41. if !found {
  42. c = NewDisk(diskType)
  43. dn.doLinkChildNode(c)
  44. }
  45. disk := c.(*Disk)
  46. return disk
  47. }
  48. func (dn *DataNode) doAddOrUpdateVolume(v storage.VolumeInfo) (isNew, isChangedRO bool) {
  49. disk := dn.getOrCreateDisk(v.DiskType)
  50. return disk.AddOrUpdateVolume(v)
  51. }
  52. // UpdateVolumes detects new/deleted/changed volumes on a volume server
  53. // used in master to notify master clients of these changes.
  54. func (dn *DataNode) UpdateVolumes(actualVolumes []storage.VolumeInfo) (newVolumes, deletedVolumes, changeRO []storage.VolumeInfo) {
  55. actualVolumeMap := make(map[needle.VolumeId]storage.VolumeInfo)
  56. for _, v := range actualVolumes {
  57. actualVolumeMap[v.Id] = v
  58. }
  59. dn.Lock()
  60. defer dn.Unlock()
  61. existingVolumes := dn.getVolumes()
  62. for _, v := range existingVolumes {
  63. vid := v.Id
  64. if _, ok := actualVolumeMap[vid]; !ok {
  65. glog.V(0).Infoln("Deleting volume id:", vid)
  66. disk := dn.getOrCreateDisk(v.DiskType)
  67. delete(disk.volumes, vid)
  68. deletedVolumes = append(deletedVolumes, v)
  69. deltaDiskUsages := newDiskUsages()
  70. deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType))
  71. deltaDiskUsage.volumeCount = -1
  72. if v.IsRemote() {
  73. deltaDiskUsage.remoteVolumeCount = -1
  74. }
  75. if !v.ReadOnly {
  76. deltaDiskUsage.activeVolumeCount = -1
  77. }
  78. disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
  79. }
  80. }
  81. for _, v := range actualVolumes {
  82. isNew, isChangedRO := dn.doAddOrUpdateVolume(v)
  83. if isNew {
  84. newVolumes = append(newVolumes, v)
  85. }
  86. if isChangedRO {
  87. changeRO = append(changeRO, v)
  88. }
  89. }
  90. return
  91. }
  92. func (dn *DataNode) DeltaUpdateVolumes(newVolumes, deletedVolumes []storage.VolumeInfo) {
  93. dn.Lock()
  94. defer dn.Unlock()
  95. for _, v := range deletedVolumes {
  96. disk := dn.getOrCreateDisk(v.DiskType)
  97. delete(disk.volumes, v.Id)
  98. deltaDiskUsages := newDiskUsages()
  99. deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(types.ToDiskType(v.DiskType))
  100. deltaDiskUsage.volumeCount = -1
  101. if v.IsRemote() {
  102. deltaDiskUsage.remoteVolumeCount = -1
  103. }
  104. if !v.ReadOnly {
  105. deltaDiskUsage.activeVolumeCount = -1
  106. }
  107. disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
  108. }
  109. for _, v := range newVolumes {
  110. dn.doAddOrUpdateVolume(v)
  111. }
  112. return
  113. }
  114. func (dn *DataNode) AdjustMaxVolumeCounts(maxVolumeCounts map[string]uint32) {
  115. deltaDiskUsages := newDiskUsages()
  116. for diskType, maxVolumeCount := range maxVolumeCounts {
  117. if maxVolumeCount == 0 {
  118. // the volume server may have set the max to zero
  119. continue
  120. }
  121. dt := types.ToDiskType(diskType)
  122. currentDiskUsage := dn.diskUsages.getOrCreateDisk(dt)
  123. if currentDiskUsage.maxVolumeCount == int64(maxVolumeCount) {
  124. continue
  125. }
  126. disk := dn.getOrCreateDisk(dt.String())
  127. deltaDiskUsage := deltaDiskUsages.getOrCreateDisk(dt)
  128. deltaDiskUsage.maxVolumeCount = int64(maxVolumeCount) - currentDiskUsage.maxVolumeCount
  129. disk.UpAdjustDiskUsageDelta(deltaDiskUsages)
  130. }
  131. }
  132. func (dn *DataNode) GetVolumes() (ret []storage.VolumeInfo) {
  133. dn.RLock()
  134. for _, c := range dn.children {
  135. disk := c.(*Disk)
  136. ret = append(ret, disk.GetVolumes()...)
  137. }
  138. dn.RUnlock()
  139. return ret
  140. }
  141. func (dn *DataNode) GetVolumesById(id needle.VolumeId) (vInfo storage.VolumeInfo, err error) {
  142. dn.RLock()
  143. defer dn.RUnlock()
  144. found := false
  145. for _, c := range dn.children {
  146. disk := c.(*Disk)
  147. vInfo, found = disk.volumes[id]
  148. if found {
  149. break
  150. }
  151. }
  152. if found {
  153. return vInfo, nil
  154. } else {
  155. return storage.VolumeInfo{}, fmt.Errorf("volumeInfo not found")
  156. }
  157. }
  158. func (dn *DataNode) GetDataCenter() *DataCenter {
  159. rack := dn.Parent()
  160. if rack == nil {
  161. return nil
  162. }
  163. dcNode := rack.Parent()
  164. if dcNode == nil {
  165. return nil
  166. }
  167. dcValue := dcNode.GetValue()
  168. return dcValue.(*DataCenter)
  169. }
  170. func (dn *DataNode) GetRack() *Rack {
  171. return dn.Parent().(*NodeImpl).value.(*Rack)
  172. }
  173. func (dn *DataNode) GetTopology() *Topology {
  174. p := dn.Parent()
  175. for p.Parent() != nil {
  176. p = p.Parent()
  177. }
  178. t := p.(*Topology)
  179. return t
  180. }
  181. func (dn *DataNode) MatchLocation(ip string, port int) bool {
  182. return dn.Ip == ip && dn.Port == port
  183. }
  184. func (dn *DataNode) Url() string {
  185. return dn.Ip + ":" + strconv.Itoa(dn.Port)
  186. }
  187. func (dn *DataNode) ToMap() interface{} {
  188. ret := make(map[string]interface{})
  189. ret["Url"] = dn.Url()
  190. ret["PublicUrl"] = dn.PublicUrl
  191. // aggregated volume info
  192. var volumeCount, ecShardCount, maxVolumeCount int64
  193. var volumeIds string
  194. for _, diskUsage := range dn.diskUsages.usages {
  195. volumeCount += diskUsage.volumeCount
  196. ecShardCount += diskUsage.ecShardCount
  197. maxVolumeCount += diskUsage.maxVolumeCount
  198. }
  199. for _, disk := range dn.Children() {
  200. d := disk.(*Disk)
  201. volumeIds += " " + d.GetVolumeIds()
  202. }
  203. ret["Volumes"] = volumeCount
  204. ret["EcShards"] = ecShardCount
  205. ret["Max"] = maxVolumeCount
  206. ret["VolumeIds"] = volumeIds
  207. return ret
  208. }
  209. func (dn *DataNode) ToDataNodeInfo() *master_pb.DataNodeInfo {
  210. m := &master_pb.DataNodeInfo{
  211. Id: string(dn.Id()),
  212. DiskInfos: make(map[string]*master_pb.DiskInfo),
  213. }
  214. for _, c := range dn.Children() {
  215. disk := c.(*Disk)
  216. m.DiskInfos[string(disk.Id())] = disk.ToDiskInfo()
  217. }
  218. return m
  219. }
  220. // GetVolumeIds returns the human readable volume ids limited to count of max 100.
  221. func (dn *DataNode) GetVolumeIds() string {
  222. dn.RLock()
  223. defer dn.RUnlock()
  224. existingVolumes := dn.getVolumes()
  225. ids := make([]int, 0, len(existingVolumes))
  226. for k := range existingVolumes {
  227. ids = append(ids, int(k))
  228. }
  229. return util.HumanReadableIntsMax(100, ids...)
  230. }
  231. func (dn *DataNode) getVolumes() []storage.VolumeInfo {
  232. var existingVolumes []storage.VolumeInfo
  233. for _, c := range dn.children {
  234. disk := c.(*Disk)
  235. existingVolumes = append(existingVolumes, disk.GetVolumes()...)
  236. }
  237. return existingVolumes
  238. }